新博客:https://blog.bigdataboy.cn/article/455.html说明上篇:https://blog.bigdataboy.cn/article/454.html接上篇:只能在添加job时一次性修改,而无法实现任务中途动态修改state的值,所以继续改造思路其实改造也很简单,首先通过job_id获取到Job,更新Job的state属性,然后执行相应的停止(pause)、恢复(resume)等函数,所以我们就在停止恢复这些函数上动手脚分析源码job=scheduler.get_job(job_id=job_id)job.pause()继续看pause()内部逻辑,本质调用的是BaseScheduler类下的pause_job()方法defpause(self):"""Temporarilysuspendtheexecutionofthisjob...seealso:::meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`:returnJob:thisjobinstance"""self._scheduler.pause_job(self.id,self._jobstore_alias)returnself继续看pause_job()方法内部逻辑,发现本质调用的是BaseScheduler类下的modify_job()方法defpause_job(self,job_id,state,jobstore=None):"""Causesthegivenjobnottobeexecuteduntilitisexplicitlyresumed.:paramstr|unicodejob_id:theidentifierofthejob:paramstr|unicodejobstore:aliasofthejobstorethatcontainsthejob:returnJob:therelevantjobinstance"""returnself.modify_job(job_id,jobstore,next_run_time=None)继续看modify_job()方法内部逻辑,发现就更新了job的状态,其实就是修改defmodify_job(self,job_id,jobstore=None,**changes):"""Modifiesthepropertiesofasinglejob.Modificationsarepassedtothismethodasextrakeywordarguments.:paramstr|unicodejob_id:theidentifierofthejob:paramstr|unicodejobstore:aliasofthejobstorethatcontainsthejob:returnJob:therelevantjobinstance"""withself._jobstores_lock:job,jobstore=self._lookup_job(job_id,jobstore)job._modify(**changes)ifjobstore:self._lookup_jobstore(jobstore).update_job(job)self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED,job_id,jobstore))#Wakeuptheschedulersincethejob'snextruntimemayhavebeenchangedifself.state==STATE_RUNNING:self.wakeup()returnjob总结其实需要改动的就在传参上,到最后给modify_job()函数,一并修改就可以了改造apscheduler/job.pydefpause(self):"""Temporarilysuspendtheexecutionofthisjob...seealso:::meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`:returnJob:thisjobinstance"""#self._scheduler.pause_job(self.id,self._jobstore_alias)#原self._scheduler.pause_job(self.id,self.state,self._jobstore_alias)returnselfapscheduler/schedulers/base.pydefpause_job(self,job_id,state,jobstore=None):#添加state参数"""Causesthegivenjobnottobeexecuteduntilitisexplicitlyresumed.:paramstr|unicodejob_id:theidentifierofthejob:paramstr|unicodejobstore:aliasofthejobstorethatcontainsthejob:returnJob:therelevantjobinstance"""returnself.modify_job(job_id,jobstore,state=state,next_run_time=None)#传入state其他就不用修改了,modify_job()函数内部是不定长获取的,所以恢复(resume)也是一样验证停止任务,然后再使用job_id获取
ps:好久没发文章,悄悄翻一个新博客:https://blog.bigdataboy.cn/article/454.html说明最近在使用apscheduler过程中,为了控制每个Job的状态,发现有点困难,所以就尝试改造一下目标为Job类添加一个dict类型的属性,用来保存用户自定义的数据,例如当job在执行过程发生错误,就可以把状态改成error并停止job改造地方一apscheduler/job.py#增加state属性__slots__=('_scheduler','_jobstore_alias','id','trigger','executor','func','func_ref','args','kwargs','name','misfire_grace_time','coalesce','max_instances','next_run_time','__weakref__','state')地方二#在返回值中增加statedef__getstate__(self):...return{'version':1,'id':self.id,'func':self.func_ref,'trigger':self.trigger,'executor':self.executor,'args':args,'kwargs':self.kwargs,'name':self.name,'misfire_grace_time':self.misfire_grace_time,'coalesce':self.coalesce,'max_instances':self.max_instances,'next_run_time':self.next_run_time,'state':self.state####}地方三#在设置job属性时,增加statedef__setstate__(self,state):ifstate.get('version',1)>1:raiseValueError('Jobhasversion%s,butonlyversion1canbehandled'%state['version'])self.id=state['id']self.func_ref=state['func']self.func=ref_to_obj(self.func_ref)self.trigger=state['trigger']self.executor=state['executor']self.args=state['args']self.kwargs=state['kwargs']self.name=state['name']self.misfire_grace_time=state['misfire_grace_time']self.coalesce=state['coalesce']self.max_instances=state['max_instances']self.next_run_time=state['next_run_time']self.state=state['state']####地方四#在add_job函数上,添加上state参数defadd_job(self,func,trigger=None,args=None,kwargs=None,id=None,name=None,state={},####misfire_grace_time=undefined,coalesce=undefined,max_instances=undefined,next_run_time=undefined,jobstore='default',executor='default',replace_existing=False,**trigger_args):...#把state参数添加到job_kwargs字典job_kwargs={'trigger':self._create_trigger(trigger,trigger_args),'executor':executor,'func':func,'args':tuple(args)ifargsisnotNoneelse(),'kwargs':dict(kwargs)ifkwargsisnotNoneelse{},'id':id,'name':name,'misfire_grace_time':misfire_grace_time,'coalesce':coalesce,'max_instances':max_instances,'next_run_time':next_run_time,'state':state####}地方五#在参数修复函数里,添加上state,并进行检测def_modify(self,**changes):...if'state'inchanges:state=changes.pop('state')ifnotisinstance(state,dict):raiseTypeError('statemustbeadict')approved['state']=state验证在以上五个地方添加上,就可以实现为每个job开辟一个用户自定义属性的存储添加任务可以看到job能够正常获取到state获取任务获取任务时state也是存在的当任务发生异常时按照官网的方案,监控任务状态需要添加监听#调度监听fromapscheduler.schedulers.backgroundimportBackgroundSchedulerfromapscheduler.eventsimportEVENT_JOB_ERROR,EVENT_JOB_MISSED,\EVENT_JOB_ADDED,EVENT_JOB_REMOVED,EVENT_JOB_EXECUTED,EVENT_JOB_MODIFIEDscheduler=BackgroundScheduler(timezone='Asia/Shanghai')defapscheduler_listener(event):ifevent.code==EVENT_JOB_ERROR:#任务发送错误时job=scheduler.get_job(event.job_id)job.state['state']='error'job.pause()logger.error(f'jobid:{event.job_id}traceback:{event.traceback}')scheduler.add_listener(apscheduler_listener,EVENT_JOB_ADDED|EVENT_JOB_REMOVED|EVENT_JOB_MODIFIED|EVENT_JOB_EXECUTED|EVENT_JOB_ERROR|EVENT_JOB_MISSED)scheduler.start()可以看到当任务发生错误时,就可以改变任务状态,并停止该任务
说明poetry是一个Python虚拟环境和依赖管理的工具,oetry和pipenv类似,另外还提供了打包和发布的功能。安装安装不成功,大多数是网站被墙了macOS&Linuxcurl-sSLhttps://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py|python-windows在powershell使用该命令(Invoke-WebRequest-Urihttps://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py-UseBasicParsing).Content|python-会自动添加到环境变量,需要重新打开powershell,查看是否安装成功配置使用前修改这些配置后,更好用查看当前配置poetryconfig--list-------------------PSC:\Users\ASUS>poetryconfig--listcache-dir="D:\\Python\\peotry_env"experimental.new-installer=trueinstaller.parallel=truevirtualenvs.create=truevirtualenvs.in-project=truevirtualenvs.path="{cache-dir}\\virtualenvs"#D:\Python\peotry_env\virtualenvs修改配置poetryconfigcache-dirD:\\Python\\peotry_envpoetryconfigvirtualenvs.in-projecttrue创建项目该命令,会建立一个目录结构poetrynewdemo结构demo├──pyproject.toml├──README.rst├──demo│└──__init__.py└──tests├──__init__.py└──test_demo.py其中pyproject.toml文件将协调项目及其依赖项[tool.poetry]name="demo"version="0.1.0"description=""authors=["‘zbigdataboy‘<876545500@qq.com>"][tool.poetry.dependencies]python="^3.9"[tool.poetry.dev-dependencies]pytest="^5.2"[build-system]requires=["poetry-core>=1.0.0"]build-backend="poetry.core.masonry.api"初始化项目这是在项目已经存在的情况下使用,为了创建pyproject.toml管理项目poetryinit添加依赖&库如果项目不存在虚拟环境,将会创建,同时还会创建poetry.lock,用以记录项目依赖的版本poetryaddrequests卸载依赖poetryremoverequests安装依赖项用以使用项目时,依赖于开发时一样poetryinstall修改依赖安装源项目的pyproject.toml文末追加[[tool.poetry.source]]name="tsinghua"url="https://pypi.tuna.tsinghua.edu.cn/simple"虚拟环境执行项目poetry会自动在相应目录寻找该项目的环境poetryrunpythonapp.py显示的激活poetryshell其他命令#显示当前环境信息poetryenvinfo#列出与项目关联的环境(一个项目可能需要不同版本依赖做测试)poetryenvlist#列出当前配置poetryconfig--list导出requirements.txt文件暂时只支持requirements.txt格式#导出requirements.txt文件-f导出格式--output导出文件名poetryexport-frequirements.txt--outputrequirements.txt
ORM说明SQLAlchemy是Python中,通过ORM操作数据库的框架。简单点来说,就是帮助我们从烦冗的SQL语句中解脱出来,从而不需要再去写原生的SQL语句,只需要用Python的语法来操作对象,就能被自动映射为SQL语句。安装SQLAlchemy:pip3installSQLAlchemy使用本文以sqlite数据为例,当然sqlalchemy也支持其他数据,MySQL、PostgreSQL、Oracle、MSSQL...连接引擎任何SQLAlchemy应用程序的开始都是一个Engine对象,此对象充当连接到特定数据库的中心源,提供被称为connectionpool的对于这些数据库连接。"""配置连接数据库database.py"""fromsqlalchemyimportcreate_engineDATABASE_URL='sqlite:///orm.db'#sqlite://数据库路径engine=create_engine(#创建引擎DATABASE_URL,encoding='utf-8',echo=True,#引擎将用repr()函数记录所有语句及其参数列表到日志connect_args={'check_same_thread':False},#【使用sqlite数据库才配置】sqlalchemy是多线程的,'check_same_thread':False来让建立的任意线程都可以使用)声明映射Python中的一个类,对应一个表,类的每个属性对应表的字段名。每个映射类都需要继承declarative_base()"""配置连接数据库database.py"""fromsqlalchemy.ormimportdeclarative_baseBase=declarative_base()创建两张表,我们不需要事先在数据库创建表"""ORM操作的模型类models.py"""fromsqlalchemyimportColumn,Integer,String,Enum,DateTime,func,ForeignKeyfromdatabaseimportBase,engineclassClassRoom(Base):__tablename__='orm_classroom'#表名id=Column(Integer,primary_key=True,index=True,autoincrement=True)classroom_num=Column(Integer,unique=True,nullable=False,comment='几班')teacher=Column(String(100),unique=False,nullable=False,comment='班主任')student_num=Column(Integer,unique=False,nullable=False,comment='班级人数')create_at=Column(DateTime,server_default=func.now(),comment='创建时间')update_at=Column(DateTime,server_default=func.now(),onupdate=func.now(),comment='更新时间')def__repr__(self):#方便显示returnf'id={self.id}classroom_num={self.classroom_num}teacher={self.teacher}student_num={self.student_num}'classStudent(Base):__tablename__='orm_student'#表名id=Column(Integer,primary_key=True,index=True,autoincrement=True)#主键自增name=Column(String(100),unique=False,nullable=False,comment='姓名')#字符串不唯一不为空备注sex=Column(Enum('男','女'),comment='性别')classroom_num=Column(Integer,ForeignKey('orm_classroom.classroom_num'),comment='所属班级')create_at=Column(DateTime,server_default=func.now(),comment='创建时间')update_at=Column(DateTime,server_default=func.now(),onupdate=func.now(),comment='更新时间')def__repr__(self):#方便显示returnf'id={self.id}name={self.name}sex={self.sex}classroom_num={self.classroom_num}'Base.metadata.create_all(engine)#创建表运行models.py文件创建会话参数autoflush、autocommit的说明,推荐博客:https://zhuanlan.zhihu.com/p/48994990"""配置连接数据库database.py"""#在SQLAlchemy中,CRUD都是通过会话(session)进行的,所以我们必须要先创建会话SessionLocal=sessionmaker(bind=engine,autoflush=False,#关闭flush()函数的调用,flush()作用是发送语句到数据库,但数据库不一定执行写入磁盘操作autocommit=False,#关闭commit()函数调用,commit()作用是提交事务,将变更保存到数据库文件expire_on_commit=True)进行crud操作封装"""对数据库增删改查crud.py""""""对数据库增删改查crud.py"""fromsqlalchemy.ormimportSessionfromstoragesimportmodels,schemas#增defcreate_student(db:Session,student:schemas.CreateStudent):"""传入BaseMode实例"""db_student=models.Student(**student.dict())db.add(db_student)db.commit()#提交事务db.refresh(db_student)#刷新returndb_student#增defcreate_class_room(db:Session,classroom:schemas.CreateClassRoom):"""传入BaseMode实例"""db_classroom=models.Student(**classroom.dict())db.add(db_classroom)db.commit()#提交事务db.refresh(db_classroom)#刷新returndb_classroom#查defget_student_by_id(db:Session,student_id:int):"""通过id查询学生表"""returndb.query(models.Student).filter(models.Student.id==student_id).first()#查defget_student_by_name(db:Session,name:str):"""通过name查询学生表"""returndb.query(models.Student).filter(models.Student.name==name)#删defdel_student_by_id(db:Session,student_id:int):student=get_student_by_id(db,student_id)db.delete(student)#先查询再删除db.commit()returnstudent#改defupdate_student_name_by_id(db:Session,student_id:int,name:str):student=get_student_by_id(db,student_id)student.name=name#查询结果修改提交事务db.commit()returnstudent使用"""主文件main.py"""fromstorages.databaseimportSessionLocal,Base,enginefromstorages.crudimportcreate_student,get_student_by_id,del_student_by_id,update_student_name_by_idfromstorages.schemasimportCreateStudentBase.metadata.create_all(engine)#创建表db=SessionLocal()student=CreateStudent(name='Bob',sex='男',classroom_num=1)create_student(db=db,student=student)print('查询结果',get_student_by_id(db=db,student_id=1))#print('删除结果',del_student_by_id(db=db,student_id=2))#print('修改结果',update_student_name_by_id(db=db,student_id=3,name='Aoa'))查询数据query()的使用query()可以理解为SQL的selectdb.query(models.Student).filter()过滤db.query(models.Student).filter_by()根据关键字过滤db.query(models.Student).all()返回全部【列表】db.query(models.Student).first()返回第一个元素db.query(models.Student).one()有且只有一个元素时才正确返回db.query(models.Student).one_or_none(),类似one,但如果没有找到结果,则不会引发错误db.query(models.Student).scalar(),调用one方法,并在成功时返回行的第一列db.query(models.Student).count()计数db.query(models.Student).order_by()排序filter()与filter_by()过滤filter()使用#等于db.query(models.Student).filter(Student.name=='Bob')#不等于db.query(models.Student).filter(Student.name!='Bob')#like和ilikedb.query(models.Student).filter(Student.name.like('%Bob%'))db.query(models.Student).filter(Student.name.ilike('%Bob%'))#不区分大小写#isdb.query(models.Student).filter(Student.name==None)#isnotdb.query(models.Student).filter(Student.name!=None)#andfromsqlalchemyimportand_db.query(models.Student).filter(and_(Student.name=='Bob',Student.id==2))db.query(models.Student).filter(Student.name=='Bob',Student.fullname==2)db.query(models.Student).filter(Student.name=='Bob').filter(Student.fullname==2)#orfromsqlalchemyimportor_db.query(models.Student).filter(or_(Student.name=='Bob',Student.name=='Aoa'))#matchdb.query(models.Student).filter(Student.name.match('Bob'))filter_by()使用#id是模型的字段db.query(models.Student).filter_by(id=student_id,name='Bob').first()jion()连接查询db.query(models.Student).join(models.ClassRoom).\filter(\models.Student.classroom_num==models.ClassRoom.classroom_num\).all()
说明官方文档:https://tenacity.readthedocs.io/en/latest/Tenacity是一个Apache2.0许可的通用重试库,用Python编写,用于简化将重试行为添加到几乎任何内容的任务。它起源于retrying的一个分支,遗憾的是retrying不再维护。Tenacity与retrying不兼容,但添加了重要的新功能并修复了许多长期存在的错误。特征通用装饰器API指定停止条件(即限制尝试次数)指定等待条件(即尝试之间的指数退避休眠)自定义对预期返回结果的重试停止、等待、重试条件任意组合协程重试使用上下文管理器重试代码块安装:pipinstalltenacity例子不加任何条件,一直重试到正确结果,才停止重试importrandomfromtenacityimportretry@retrydeffun():r=random.randint(1,3)print(f'当前{r}')assertr==2if__name__=='__main__':fun()-----------------------------------当前1当前3当前3当前2Processfinishedwithexitcode0停止条件stop_after_attempt比较常用,停止条件还有很多fromtenacityimportretryfromtenacity.stopimportstop_after_attempt#重试2次,停止@retry(stop=stop_after_attempt(2))deffun():pass等待条件fromtenacityimportretry#每次重试等待2秒fromtenacity.waitimportwait_fixed@retry(wait=wait_fixed(2))deffun():#每次重试随机等待1~2秒fromtenacity.waitimportwait_random@retry(wait=wait_random(min=1,max=2))deffun():#在重试分布式服务和其他远程端点时,结合固定等待和抖动(以帮助避免雷鸣般的群体)#固定基础等待3秒,随机增加0~2秒fromtenacity.waitimportwait_fixed,wait_random@retry(wait=wait_fixed(3)+wait_random(0,2))deffun():#生成一个等待链(等待时间的列表)等待时间相对于:[3,3,3,7,7,9]fromtenacity.waitimportwait_chain,wait_fixed@retry(wait=wait_chain(*[wait_fixed(3)foriinrange(3)]+[wait_fixed(7)foriinrange(2)]+[wait_fixed(9)]))deffun():wait_exponential重试分布式服务和其他远程端点时,等待时间呈指数增长multiplier:增长的指数【默认值1】min:最小等待时间【默认值0】max:最大等待时间【默认值sys.maxsize/2】importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.waitimportwait_exponential@retry(wait=wait_exponential(multiplier=1,min=4,max=10))deffun():r=random.randint(1,5)logger.info(f'当前{r}')assertr==6if__name__=='__main__':fun()-----------------------------------------------2022-02-2810:52:37.918|INFO|__main__:fun:10-当前22022-02-2810:52:41.930|INFO|__main__:fun:10-当前22022-02-2810:52:45.938|INFO|__main__:fun:10-当前12022-02-2810:52:49.949|INFO|__main__:fun:10-当前42022-02-2810:52:57.952|INFO|__main__:fun:10-当前32022-02-2810:53:07.965|INFO|__main__:fun:10-当前12022-02-2810:53:17.975|INFO|__main__:fun:10-当前42022-02-2810:53:27.977|INFO|__main__:fun:10-当前3stop_after_delay函数运行超过限制秒数,引发重试异常importrandomimporttimefromtenacityimportretryfromtenacity.waitimportstop_after_delay@retry(stop=stop_after_delay(2))deffun():r=random.randint(1,5)print(f'当前{r}')time.sleep(2)assertr==2print(f'对了')-----------------------------------tenacity.RetryError:RetryError[<Futureat0x260529a62b0state=finishedraisedAssertionError>]是否重试#AssertionError异常,触发重试fromtenacity.retryimportretry_if_exception_type@retry(retry=retry_if_exception_type(AssertionError))deffun():使用函数判断,是否重试importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.retryimportretry_if_result@retry(retry=retry_if_result(lambdax:x!=2))deffun():r=random.randint(1,5)logger.info(f'当前{r}')returnrif__name__=='__main__':fun()--------------------------------------------2022-02-2811:26:33.588|INFO|__main__:fun:10-当前52022-02-2811:26:33.588|INFO|__main__:fun:10-当前12022-02-2811:26:33.588|INFO|__main__:fun:10-当前32022-02-2811:26:33.588|INFO|__main__:fun:10-当前2组合重试条件importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.retryimportretry_if_result,retry_if_exception_type"""函数结果不可能为2和3-2重试-3触发AssertionError异常,重试"""@retry(retry=(retry_if_result(lambdax:x!=2)andretry_if_exception_type(AssertionError)))deffun():r=random.randint(1,5)logger.info(f'当前{r}')ifr==3:raiseAssertionError(f'ris3')returnrif__name__=='__main__':logger.info(f'fun{fun()}')协程重试importrandomimportasynciofromloguruimportloggerfromtenacityimportretryfromtenacity.waitimportwait_fixed@retry(wait=wait_fixed(2))asyncdeffun():r=random.randint(1,3)logger.info(f'当前{r}')assertr==2if__name__=='__main__':asyncio.run(fun())-----------------------------------------2022-02-2812:36:07.801|INFO|__main__:fun:11-当前32022-02-2812:36:09.805|INFO|__main__:fun:11-当前12022-02-2812:36:11.807|INFO|__main__:fun:11-当前32022-02-2812:36:13.814|INFO|__main__:fun:11-当前2统计数据函数.retry.statistics重试次数开始运行时间整个重试的耗时importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.retryimportretry_if_result@retry(reraise=True,retry=retry_if_result(lambdax:x!=2))deffun():r=random.randint(1,5)logger.info(f'当前{r}')returnrif__name__=='__main__':logger.info(f'fun{fun()}')logger.info(f'fun{fun.retry.statistics}')----------------------------------------------------2022-02-2811:43:33.558|INFO|__main__:fun:10-当前12022-02-2811:43:33.558|INFO|__main__:fun:10-当前12022-02-2811:43:33.558|INFO|__main__:fun:10-当前12022-02-2811:43:33.559|INFO|__main__:fun:10-当前32022-02-2811:43:33.559|INFO|__main__:fun:10-当前52022-02-2811:43:33.559|INFO|__main__:fun:10-当前42022-02-2811:43:33.559|INFO|__main__:fun:10-当前22022-02-2811:43:33.559|INFO|__main__:<module>:15-fun22022-02-2811:43:33.559|INFO|__main__:<module>:16-fun{'start_time':6972.546,'attempt_number':7,'idle_for':0,'delay_since_first_attempt':0.0}重试代码块重试代码块的逻辑包含的一个for循环中,然后包含一个上下文管理器,使需要重试的逻辑脱离函数importrandomfromloguruimportloggerfromtenacityimportRetrying,RetryError,stop_after_attempttry:#重试3次forattemptinRetrying(stop=stop_after_attempt(3)):#重试、等待、停止条件设置withattempt:#重试逻辑包含在上下文管理器r=random.randint(1,3)logger.info(f'r->{r}')assertr==2exceptRetryError:logger.error(f'超出重试次数')#超出重试次数处理逻辑pass----------------------------------------------------------2022-02-2813:08:13.491|INFO|__main__:<module>:10-r->32022-02-2813:08:13.492|INFO|__main__:<module>:10-r->32022-02-2813:08:13.492|INFO|__main__:<module>:10-r->32022-02-2813:08:13.492|ERROR|__main__:<module>:13-超出重试次数异步重试代码块异步使用importrandomimportasynciofromloguruimportloggerfromtenacityimportAsyncRetrying,RetryError,stop_after_attemptasyncdeffun():try:asyncforattemptinAsyncRetrying(stop=stop_after_attempt(3)):#重试、等待、停止条件设置#这个with前不能带asyncwithattempt:#重试逻辑包含在上下文管理器r=random.randint(1,3)logger.debug(f'r->{r}')assertr==2exceptRetryError:logger.error('达到重试上限')passif__name__=='__main__':asyncio.run(fun())------------------------------------------------------------------2022-02-2813:09:17.247|DEBUG|__main__:fun:13-r->32022-02-2813:09:17.247|DEBUG|__main__:fun:13-r->12022-02-2813:09:17.247|DEBUG|__main__:fun:13-r->32022-02-2813:09:17.247|ERROR|__main__:fun:16-达到重试上限
使用Jinja2Templates模板引擎推荐文章:【FastAPI】Static静态文件配置安装命令:pip3installjinja2路径:\routes\index.pyfromfastapiimportFastAPI,Requestfromfastapi.responsesimportHTMLResponsefromfastapi.staticfilesimportStaticFilesfromfastapi.templatingimportJinja2Templatesapp=FastAPI()app.mount("/static",StaticFiles(directory="static"),name="static")#jscss等静态资源存放的目录templates=Jinja2Templates(directory="templates")#模板html存放的目录@app.get("/index",response_class=HTMLResponse)asyncdefread_item(request:Request):returntemplates.TemplateResponse(name='index.html',context={'request':request,#必要参数'data':{'title':'模板'},#渲染给前端的数据})main.js文件路径\static\main.jsconsole.log('加载成功')index.html模板路径:\templates\index.html<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><scriptsrc="{{url_for('static',path='main.js')}}"></script><title>Title</title></head><body><h1>{{data.title}}</h1></body></html>程序主文件fromfastapiimportFastAPIfromfastapi.staticfilesimportStaticFilesfromroutes.indeximportapplicationimportuvicornapp=FastAPI()#注意是在FastAPI()实例上挂载静态文件路径app.mount(path='/static',#网页的路径app=StaticFiles(directory='./static'),#静态文件目录的路径name='static')app.include_router(application,prefix='',tags=['首页'])if__name__=='__main__':uvicorn.run(app)
说明测试用例在项目中是不可缺失的一部分FastAPI的测试用例需要安装pipinstallpytest包含测试用例的文件名格式一般为:test_xx.py使用编写两个接口importuvicornfromrandomimportrandintfromtypingimportOptionalfromfastapiimportFastAPI,Headerapp=FastAPI()@app.get('/random')asyncdefget_random():"""获取一个随机数"""returnrandint(1,10)@app.get('/ua')asyncdefget_ua(user_agent:Optional[str]=Header(None,convert_underscores=True)):returnuser_agentif__name__=='__main__':uvicorn.run(app)编写测试用例文件需要导入FastAPI()实例化的对象fromfastapi.testclientimportTestClientfromrunimportappclient=TestClient(app=app)deftest_app_random():res=client.get('/random')assertres.status_code==200asserttype(res)==intdeftest_app_ua():res=client.get('/ua')assertres.status_code==200直接再命令行使用pytest启动测试
说明可以先返回响应,然后服务器再执行任务FastAPI的BackgroundTasks只能实现简单的后台任务,如需要开启进程等,就需要更强大的工具Celery使用后台任务importuvicornfromfastapiimportFastAPI,BackgroundTasksapp=FastAPI()#异步&同步函数红豆可以asyncdefwrite_notification(email:str,message):withopen('./log.txt',mode='w')asfile:file.write(f'notificationfor{email}:{message}')@app.post("/send-notification/{email}")asyncdefsend_notification(email:str,background_tasks:BackgroundTasks):#会自动识别为后台任务background_tasks.add_task(write_notification,email,message="somenotification")return{"message":"在后台发送通知"}if__name__=='__main__':uvicorn.run(app)依赖使用后台任务importuvicornfromfastapiimportFastAPI,BackgroundTasks,Dependsapp=FastAPI()asyncdefwrite_log(email:str):message="somenotification"withopen('./log.txt',mode='w')asfile:file.write(f'notificationfor{email}:{message}')#异步&同步函数红豆可以asyncdefadd_task(email:str,background_tasks:BackgroundTasks):background_tasks.add_task(write_log,email)@app.post("/send-notification/{email}")asyncdefsend_notification(q:str=Depends(add_task)):#使用依赖添加后台任务return{"message":"在后台发送通知"}if__name__=='__main__':uvicorn.run(app)
什么是跨域资源共享是浏览器资源安全的一个方案,使开发的接口,不会被其他网站滥用,需要满足你指定的条件的网站才能使用条件可以指定:域,请求方法,头部信息域的格式:协议://域名:端口(https://bigdataboy.cn、https://bigdataboy.cn:433)请求方法:GET、POST、PUT…头部信息:指请求的头部信息使用importuvicornfromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewareapp=FastAPI()app.add_middleware(CORSMiddleware,allow_origins=[#允许访问的域域-->协议:域名:端口'http://127.0.0.1','http://127.0.0.1:8000',],allow_methods=['*','GET'],#*可以通配allow_headers=['*'],#头部allow_credentials=False,#HTTPS证书allow_origin_regex=None,#正则表达式匹配'https://.*\.example\.orgexpose_headers=[],#指明可以被浏览器访问的响应头max_age=600#设定浏览器缓存CORS响应的最长时间,单位是秒。默认为600)@app.get("/")asyncdefmain():return{"message":"HelloWorld"}if__name__=='__main__':uvicorn.run(app)
什么是中间件中间件是一个函数请求达到响应逻辑之前会经过一层或多层中间件,响应结果返回客户端也会经过一层或多层中间件FastAPI中间件开发自定义中间件importtimeimportuvicornfromfastapiimportFastAPIfromfastapi.requestsimportRequestfromfastapi.responsesimportResponseapp=FastAPI()@app.middleware('http')#中间件类型现在只支持http类型asyncdefadd_process_time_header(request:Request,call_next):"""计算请求响应时间:paramrequest:请求:paramcall_next:请求处理回调:return:"""start_time=time.time()res:Response=awaitcall_next(request)process_time=time.time()-start_timeres.headers['X-Process-Time']=str(process_time)returnres@app.get('/middleware')defmiddleware():return'没有处理逻辑,响应成功'if__name__=='__main__':uvicorn.run(app)提供的中间件CORSMiddleware跨域资源共享中间件fromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewareapp=FastAPI()app.add_middleware(CORSMiddleware,allow_origins=[#允许访问的域域-->协议:域名:端口'http://127.0.0.1','http://127.0.0.1:8000',],allow_methods=['*','GET'],#*可以通配allow_headers=['*'],#头部allow_credentials=False,#HTTPS证书allow_origin_regex=None,#正则表达式匹配'https://.*\.example\.orgexpose_headers=[],#指明可以被浏览器访问的响应头max_age=600#设定浏览器缓存CORS响应的最长时间,单位是秒。默认为600)@app.get("/")asyncdefmain():return{"message":"HelloWorld"}HTTPSRedirectMiddleware强制所有传入请求必须是https或wss如果不是https则会自动跳转到https,如果网站没有配置https,则报错h11._util.RemoteProtocolError:illegalrequestlineWARNING:InvalidHTTPrequestreceived.fromfastapiimportFastAPIfromfastapi.middleware.httpsredirectimportHTTPSRedirectMiddlewareapp=FastAPI()app.add_middleware(HTTPSRedirectMiddleware)@app.get("/")asyncdefmain():return{"message":"HelloWorld"}TrustedHostMiddleware强制所有传入请求都具有正确设置的Host标头,以防止HTTP主机标头攻击。如果不包含,则响应400状态码,并返回InvalidhostheaderfromfastapiimportFastAPIfromfastapi.middleware.trustedhostimportTrustedHostMiddlewareapp=FastAPI()app.add_middleware(TrustedHostMiddleware,allowed_hosts=["bigdataboy.cn","*.bigdataboy.cn"]#允许的hosts列表)@app.get("/")asyncdefmain():return{"message":"HelloWorld"}GZipMiddleware处理包含”gzip”在Accept-Encoding标头中的任何请求的GZip响应。fromfastapiimportFastAPIfromfastapi.middleware.gzipimportGZipMiddlewareapp=FastAPI()app.add_middleware(GZipMiddleware,minimum_size=1000)#不要GZip响应小于此最小字节大小。默认为500.@app.get("/")asyncdefmain():return{"message":"HelloWorld"}带yield关键字依赖,依赖中的退出代码将在执行中间件后执行defget_db():db=dbSession()try:yielddbfinally:db.close()
什么是JsonWebTokens(JWT)请求流程首先浏览器向服务器发送请求得到token然后浏览器每次请求都自动在头部带上tokentoken的组成token是一些信息(用户名,过期时间…)的加密结果,加密的秘钥保存在服务器,因此只要秘钥不被泄漏,就认为是安全的。FastAPI的JWT哈希密码把用户的明文密码,加密保存在数据库,即使密码泄漏,也不知道用户真正的密码。推荐的算法是「Bcrypt」:pip3installpasslib[bcrypt]frompasslib.contextimportCryptContext#创建哈希上下文pwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")#校验密文明文defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)#加密明文defget_password_hash(password):returnpwd_context.hash(password)if__name__=='__main__':password_hash=get_password_hash('123456')#加密结果是动态的每次不一样,但是验证是一样的print(password_hash)print(verify_password('123456',password_hash))token生成&校验逻辑说明需要加密使用的秘钥加密算法token过期时间#opensslrand-hex32SECRET_KEY="12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30token生成与校验安装模块:pipinstallpython-jose[cryptography]fromtypingimportOptionalfromjoseimportJWTError,jwtfromdatetimeimportdatetime,timedelta#opensslrand-hex32SECRET_KEY="12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30defcreate_access_token(user:dict,expires_delta:Optional[timedelta]=None):to_encode=user.copy()#浅拷贝:深拷贝父对象(一级目录),子对象(二级目录)不拷贝,还是引用ifexpires_delta:#能通过参数指定过期时间expire=datetime.utcnow()+expires_deltaelse:expire=datetime.utcnow()+timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({"exp":expire})#加入过期时间encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)#加密信息得到tokenreturnencoded_jwtif__name__=='__main__':access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)#生成时间格式0:30:00token=create_access_token(#生成Token{'username':'Bob'},expires_delta=access_token_expires)print(f'user-->token:{token}')to_decode=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])#解密tokenprint(f'token-->user{to_decode}')案例此案例结合上篇的基于PasswordBearerToken的OAuth2认证获取TokenimportuvicornfromtypingimportOptionalfrompydanticimportBaseModelfromdatetimeimportdatetime,timedeltafromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjoseimportJWTError,jwt#token使用frompasslib.contextimportCryptContext#哈希密码#opensslrand-hex32SECRET_KEY="12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_user_db={#模拟用户数据库'Bob':{'username':'Bob','hash_password':'$2b$12$Qm6i.pPxCM/Kc672T0GJZOr6Wnq2YkjZm.UMk1O9abq.8fx3fas52'},#明文123456'Mary':{'username':'Mary','hash_password':'$2b$12$Qm6i.pPxCM/Kc672T0GJZOr6Wnq2YkjZm.UMk1O9abq.8fx3fas52'},#明文123456}classUser(BaseModel):username:strhash_password:strclassToken(BaseModel):access_token:strtoken_type:str='bearer'pwd_context=CryptContext(schemes=["bcrypt"],deprecated="auto")#实例化OAuth2PasswordBearer类,指明请求token接口地址oauth2_scheme=OAuth2PasswordBearer(tokenUrl='/jwt/token')app=FastAPI()#验证密码defverify_password(plain_password:str,hash_password)->bool:returnpwd_context.verify(plain_password,hash_password)defget_password_hash(password:str):returnpwd_context.hash(password)defget_user(db,username:str)->User:ifusernameindb:user_dict=db[username]returnUser(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hash_password):returnFalsereturnuserdefcreate_access_token(user:dict,expires_delta:Optional[timedelta]=None):to_encode=user.copy()#浅拷贝:深拷贝父对象(一级目录),子对象(二级目录)不拷贝,还是引用ifexpires_delta:#能通过参数指定过期时间expire=datetime.utcnow()+expires_deltaelse:expire=datetime.utcnow()+timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)to_encode.update({"exp":expire})#加入过期时间encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)#加密信息得到tokenreturnencoded_jwt@app.post('/jwt/token')#获取token的接口具体方法asyncdefget_token(form_data:OAuth2PasswordRequestForm=Depends()):user=authenticate_user(#获取用户对象fake_db=fake_user_db,username=form_data.username,password=form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail='Incorrectusernameorpassword',headers={'WWW-Authenticate':'Bearer'})#规范access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(#生成Tokenuser=user.dict(),expires_delta=access_token_expires)returnToken(access_token=access_token,token_type='bearer')if__name__=='__main__':uvicorn.run(app)获取当前用户信息asyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Couldnotvalidatecredentials",headers={"WWW-Authenticate":"Bearer"},)try:#解密token获取用户payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get('username')ifusernameisNone:raisecredentials_exceptiontoken_data=usernameexceptJWTError:returncredentials_exceptionuser=get_user(fake_user_db,token_data)ifuserisNone:raisecredentials_exceptionreturnuser@app.post('/jwt/me')asyncdefread_users_me(current_user:User=Depends(get_current_user)):returncurrent_user获取Token获取当前用户信息