Python极客

新博客:https://blog.bigdataboy.cn/article/455.html说明上篇:https://blog.bigdataboy.cn/article/454.html接上篇:只能在添加job时一次性修改,而无法实现任务中途动态修改state的值,所以继续改造思路其实改造也很简单,首先通过job_id获取到Job,更新Job的state属性,然后执行相应的停止(pause)、恢复(resume)等函数,所以我们就在停止恢复这些函数上动手脚分析源码job=scheduler.get_job(job_id=job_id)job.pause()继续看pause()内部逻辑,本质调用的是BaseScheduler类下的pause_job()方法defpause(self):"""Temporarilysuspendtheexecutionofthisjob...seealso:::meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`:returnJob:thisjobinstance"""self._scheduler.pause_job(self.id,self._jobstore_alias)returnself继续看pause_job()方法内部逻辑,发现本质调用的是BaseScheduler类下的modify_job()方法defpause_job(self,job_id,state,jobstore=None):"""Causesthegivenjobnottobeexecuteduntilitisexplicitlyresumed.:paramstr|unicodejob_id:theidentifierofthejob:paramstr|unicodejobstore:aliasofthejobstorethatcontainsthejob:returnJob:therelevantjobinstance"""returnself.modify_job(job_id,jobstore,next_run_time=None)继续看modify_job()方法内部逻辑,发现就更新了job的状态,其实就是修改defmodify_job(self,job_id,jobstore=None,**changes):"""Modifiesthepropertiesofasinglejob.Modificationsarepassedtothismethodasextrakeywordarguments.:paramstr|unicodejob_id:theidentifierofthejob:paramstr|unicodejobstore:aliasofthejobstorethatcontainsthejob:returnJob:therelevantjobinstance"""withself._jobstores_lock:job,jobstore=self._lookup_job(job_id,jobstore)job._modify(**changes)ifjobstore:self._lookup_jobstore(jobstore).update_job(job)self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED,job_id,jobstore))#Wakeuptheschedulersincethejob'snextruntimemayhavebeenchangedifself.state==STATE_RUNNING:self.wakeup()returnjob总结其实需要改动的就在传参上,到最后给modify_job()函数,一并修改就可以了改造apscheduler/job.pydefpause(self):"""Temporarilysuspendtheexecutionofthisjob...seealso:::meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`:returnJob:thisjobinstance"""#self._scheduler.pause_job(self.id,self._jobstore_alias)#原self._scheduler.pause_job(self.id,self.state,self._jobstore_alias)returnselfapscheduler/schedulers/base.pydefpause_job(self,job_id,state,jobstore=None):#添加state参数"""Causesthegivenjobnottobeexecuteduntilitisexplicitlyresumed.:paramstr|unicodejob_id:theidentifierofthejob:paramstr|unicodejobstore:aliasofthejobstorethatcontainsthejob:returnJob:therelevantjobinstance"""returnself.modify_job(job_id,jobstore,state=state,next_run_time=None)#传入state其他就不用修改了,modify_job()函数内部是不定长获取的,所以恢复(resume)也是一样验证停止任务,然后再使用job_id获取

Python极客

ps:好久没发文章,悄悄翻一个新博客:https://blog.bigdataboy.cn/article/454.html说明最近在使用apscheduler过程中,为了控制每个Job的状态,发现有点困难,所以就尝试改造一下目标为Job类添加一个dict类型的属性,用来保存用户自定义的数据,例如当job在执行过程发生错误,就可以把状态改成error并停止job改造地方一apscheduler/job.py#增加state属性__slots__=('_scheduler','_jobstore_alias','id','trigger','executor','func','func_ref','args','kwargs','name','misfire_grace_time','coalesce','max_instances','next_run_time','__weakref__','state')地方二#在返回值中增加statedef__getstate__(self):...return{'version':1,'id':self.id,'func':self.func_ref,'trigger':self.trigger,'executor':self.executor,'args':args,'kwargs':self.kwargs,'name':self.name,'misfire_grace_time':self.misfire_grace_time,'coalesce':self.coalesce,'max_instances':self.max_instances,'next_run_time':self.next_run_time,'state':self.state####}地方三#在设置job属性时,增加statedef__setstate__(self,state):ifstate.get('version',1)>1:raiseValueError('Jobhasversion%s,butonlyversion1canbehandled'%state['version'])self.id=state['id']self.func_ref=state['func']self.func=ref_to_obj(self.func_ref)self.trigger=state['trigger']self.executor=state['executor']self.args=state['args']self.kwargs=state['kwargs']self.name=state['name']self.misfire_grace_time=state['misfire_grace_time']self.coalesce=state['coalesce']self.max_instances=state['max_instances']self.next_run_time=state['next_run_time']self.state=state['state']####地方四#在add_job函数上,添加上state参数defadd_job(self,func,trigger=None,args=None,kwargs=None,id=None,name=None,state={},####misfire_grace_time=undefined,coalesce=undefined,max_instances=undefined,next_run_time=undefined,jobstore='default',executor='default',replace_existing=False,**trigger_args):...#把state参数添加到job_kwargs字典job_kwargs={'trigger':self._create_trigger(trigger,trigger_args),'executor':executor,'func':func,'args':tuple(args)ifargsisnotNoneelse(),'kwargs':dict(kwargs)ifkwargsisnotNoneelse{},'id':id,'name':name,'misfire_grace_time':misfire_grace_time,'coalesce':coalesce,'max_instances':max_instances,'next_run_time':next_run_time,'state':state####}地方五#在参数修复函数里,添加上state,并进行检测def_modify(self,**changes):...if'state'inchanges:state=changes.pop('state')ifnotisinstance(state,dict):raiseTypeError('statemustbeadict')approved['state']=state验证在以上五个地方添加上,就可以实现为每个job开辟一个用户自定义属性的存储添加任务可以看到job能够正常获取到state获取任务获取任务时state也是存在的当任务发生异常时按照官网的方案,监控任务状态需要添加监听#调度监听fromapscheduler.schedulers.backgroundimportBackgroundSchedulerfromapscheduler.eventsimportEVENT_JOB_ERROR,EVENT_JOB_MISSED,\EVENT_JOB_ADDED,EVENT_JOB_REMOVED,EVENT_JOB_EXECUTED,EVENT_JOB_MODIFIEDscheduler=BackgroundScheduler(timezone='Asia/Shanghai')defapscheduler_listener(event):ifevent.code==EVENT_JOB_ERROR:#任务发送错误时job=scheduler.get_job(event.job_id)job.state['state']='error'job.pause()logger.error(f'jobid:{event.job_id}traceback:{event.traceback}')scheduler.add_listener(apscheduler_listener,EVENT_JOB_ADDED|EVENT_JOB_REMOVED|EVENT_JOB_MODIFIED|EVENT_JOB_EXECUTED|EVENT_JOB_ERROR|EVENT_JOB_MISSED)scheduler.start()可以看到当任务发生错误时,就可以改变任务状态,并停止该任务

编程杂谈

说明思想:用Miniconda管理多个Python版本,在使用poetry创建虚拟环境的时候,指定Miniconda环境里的Py版本使用前的准备安装Miniconda链接:https://docs.conda.io/en/latest/miniconda.html安装poetry链接:https://bigdataboy.cn/post-399.html创建环境使用Miniconda创建多个Py版本的环境创建不同Python版本的虚拟环境condacreate-nPy3.9python=3.9查看环境condaenvlist使用第一步初始化poetry,使用命令:poetryinit,如果不是新项目直接跳到第二步这里有个坑,poetry指定Py版本的时候,只好写成这种格式~3.8(支持3.8.<3.9.0),不然会报版本不一致的错误,原因就是conda下载的版本是3.8.x,两处版本支持范围要严格一直F:\Tools\pyCode\test>poetryinitThiscommandwillguideyouthroughcreatingyourpyproject.tomlconfig.Packagename[test]:Version[0.1.0]:Description[]:Author[‘zbigdataboy‘<876545500@qq.com>,ntoskip]:bigdataboyLicense[]:CompatiblePythonversions[^3.9]:~3.8Wouldyouliketodefineyourmaindependenciesinteractively?(yes/no)[yes]noWouldyouliketodefineyourdevelopmentdependenciesinteractively?(yes/no)[yes]noGeneratedfile[tool.poetry]name="test"version="0.1.0"description=""authors=["bigdataboy"][tool.poetry.dependencies]python="~3.8"[tool.poetry.dev-dependencies][build-system]requires=["poetry-core>=1.0.0"]build-backend="poetry.core.masonry.api"Doyouconfirmgeneration?(yes/no)[yes]yes第二步peotry使用指定的解释器如果报版本不一致的错误,看第一步的坑poetryenvuseD:\ProgramData\miniconda3\envs\Py3.8\python.exe第三步安装相关依赖poetryinstall

Python极客

说明poetry是一个Python虚拟环境和依赖管理的工具,oetry和pipenv类似,另外还提供了打包和发布的功能。安装安装不成功,大多数是网站被墙了macOS&Linuxcurl-sSLhttps://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py|python-windows在powershell使用该命令(Invoke-WebRequest-Urihttps://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py-UseBasicParsing).Content|python-会自动添加到环境变量,需要重新打开powershell,查看是否安装成功配置使用前修改这些配置后,更好用查看当前配置poetryconfig--list-------------------PSC:\Users\ASUS>poetryconfig--listcache-dir="D:\\Python\\peotry_env"experimental.new-installer=trueinstaller.parallel=truevirtualenvs.create=truevirtualenvs.in-project=truevirtualenvs.path="{cache-dir}\\virtualenvs"#D:\Python\peotry_env\virtualenvs修改配置poetryconfigcache-dirD:\\Python\\peotry_envpoetryconfigvirtualenvs.in-projecttrue创建项目该命令,会建立一个目录结构poetrynewdemo结构demo├──pyproject.toml├──README.rst├──demo│└──__init__.py└──tests├──__init__.py└──test_demo.py其中pyproject.toml文件将协调项目及其依赖项[tool.poetry]name="demo"version="0.1.0"description=""authors=["‘zbigdataboy‘<876545500@qq.com>"][tool.poetry.dependencies]python="^3.9"[tool.poetry.dev-dependencies]pytest="^5.2"[build-system]requires=["poetry-core>=1.0.0"]build-backend="poetry.core.masonry.api"初始化项目这是在项目已经存在的情况下使用,为了创建pyproject.toml管理项目poetryinit添加依赖&库如果项目不存在虚拟环境,将会创建,同时还会创建poetry.lock,用以记录项目依赖的版本poetryaddrequests卸载依赖poetryremoverequests安装依赖项用以使用项目时,依赖于开发时一样poetryinstall修改依赖安装源项目的pyproject.toml文末追加[[tool.poetry.source]]name="tsinghua"url="https://pypi.tuna.tsinghua.edu.cn/simple"虚拟环境执行项目poetry会自动在相应目录寻找该项目的环境poetryrunpythonapp.py显示的激活poetryshell其他命令#显示当前环境信息poetryenvinfo#列出与项目关联的环境(一个项目可能需要不同版本依赖做测试)poetryenvlist#列出当前配置poetryconfig--list导出requirements.txt文件暂时只支持requirements.txt格式#导出requirements.txt文件-f导出格式--output导出文件名poetryexport-frequirements.txt--outputrequirements.txt

Python极客

ORM说明SQLAlchemy是Python中,通过ORM操作数据库的框架。简单点来说,就是帮助我们从烦冗的SQL语句中解脱出来,从而不需要再去写原生的SQL语句,只需要用Python的语法来操作对象,就能被自动映射为SQL语句。安装SQLAlchemy:pip3installSQLAlchemy使用本文以sqlite数据为例,当然sqlalchemy也支持其他数据,MySQL、PostgreSQL、Oracle、MSSQL...连接引擎任何SQLAlchemy应用程序的开始都是一个Engine对象,此对象充当连接到特定数据库的中心源,提供被称为connectionpool的对于这些数据库连接。"""配置连接数据库database.py"""fromsqlalchemyimportcreate_engineDATABASE_URL='sqlite:///orm.db'#sqlite://数据库路径engine=create_engine(#创建引擎DATABASE_URL,encoding='utf-8',echo=True,#引擎将用repr()函数记录所有语句及其参数列表到日志connect_args={'check_same_thread':False},#【使用sqlite数据库才配置】sqlalchemy是多线程的,'check_same_thread':False来让建立的任意线程都可以使用)声明映射Python中的一个类,对应一个表,类的每个属性对应表的字段名。每个映射类都需要继承declarative_base()"""配置连接数据库database.py"""fromsqlalchemy.ormimportdeclarative_baseBase=declarative_base()创建两张表,我们不需要事先在数据库创建表"""ORM操作的模型类models.py"""fromsqlalchemyimportColumn,Integer,String,Enum,DateTime,func,ForeignKeyfromdatabaseimportBase,engineclassClassRoom(Base):__tablename__='orm_classroom'#表名id=Column(Integer,primary_key=True,index=True,autoincrement=True)classroom_num=Column(Integer,unique=True,nullable=False,comment='几班')teacher=Column(String(100),unique=False,nullable=False,comment='班主任')student_num=Column(Integer,unique=False,nullable=False,comment='班级人数')create_at=Column(DateTime,server_default=func.now(),comment='创建时间')update_at=Column(DateTime,server_default=func.now(),onupdate=func.now(),comment='更新时间')def__repr__(self):#方便显示returnf'id={self.id}classroom_num={self.classroom_num}teacher={self.teacher}student_num={self.student_num}'classStudent(Base):__tablename__='orm_student'#表名id=Column(Integer,primary_key=True,index=True,autoincrement=True)#主键自增name=Column(String(100),unique=False,nullable=False,comment='姓名')#字符串不唯一不为空备注sex=Column(Enum('男','女'),comment='性别')classroom_num=Column(Integer,ForeignKey('orm_classroom.classroom_num'),comment='所属班级')create_at=Column(DateTime,server_default=func.now(),comment='创建时间')update_at=Column(DateTime,server_default=func.now(),onupdate=func.now(),comment='更新时间')def__repr__(self):#方便显示returnf'id={self.id}name={self.name}sex={self.sex}classroom_num={self.classroom_num}'Base.metadata.create_all(engine)#创建表运行models.py文件创建会话参数autoflush、autocommit的说明,推荐博客:https://zhuanlan.zhihu.com/p/48994990"""配置连接数据库database.py"""#在SQLAlchemy中,CRUD都是通过会话(session)进行的,所以我们必须要先创建会话SessionLocal=sessionmaker(bind=engine,autoflush=False,#关闭flush()函数的调用,flush()作用是发送语句到数据库,但数据库不一定执行写入磁盘操作autocommit=False,#关闭commit()函数调用,commit()作用是提交事务,将变更保存到数据库文件expire_on_commit=True)进行crud操作封装"""对数据库增删改查crud.py""""""对数据库增删改查crud.py"""fromsqlalchemy.ormimportSessionfromstoragesimportmodels,schemas#增defcreate_student(db:Session,student:schemas.CreateStudent):"""传入BaseMode实例"""db_student=models.Student(**student.dict())db.add(db_student)db.commit()#提交事务db.refresh(db_student)#刷新returndb_student#增defcreate_class_room(db:Session,classroom:schemas.CreateClassRoom):"""传入BaseMode实例"""db_classroom=models.Student(**classroom.dict())db.add(db_classroom)db.commit()#提交事务db.refresh(db_classroom)#刷新returndb_classroom#查defget_student_by_id(db:Session,student_id:int):"""通过id查询学生表"""returndb.query(models.Student).filter(models.Student.id==student_id).first()#查defget_student_by_name(db:Session,name:str):"""通过name查询学生表"""returndb.query(models.Student).filter(models.Student.name==name)#删defdel_student_by_id(db:Session,student_id:int):student=get_student_by_id(db,student_id)db.delete(student)#先查询再删除db.commit()returnstudent#改defupdate_student_name_by_id(db:Session,student_id:int,name:str):student=get_student_by_id(db,student_id)student.name=name#查询结果修改提交事务db.commit()returnstudent使用"""主文件main.py"""fromstorages.databaseimportSessionLocal,Base,enginefromstorages.crudimportcreate_student,get_student_by_id,del_student_by_id,update_student_name_by_idfromstorages.schemasimportCreateStudentBase.metadata.create_all(engine)#创建表db=SessionLocal()student=CreateStudent(name='Bob',sex='男',classroom_num=1)create_student(db=db,student=student)print('查询结果',get_student_by_id(db=db,student_id=1))#print('删除结果',del_student_by_id(db=db,student_id=2))#print('修改结果',update_student_name_by_id(db=db,student_id=3,name='Aoa'))查询数据query()的使用query()可以理解为SQL的selectdb.query(models.Student).filter()过滤db.query(models.Student).filter_by()根据关键字过滤db.query(models.Student).all()返回全部【列表】db.query(models.Student).first()返回第一个元素db.query(models.Student).one()有且只有一个元素时才正确返回db.query(models.Student).one_or_none(),类似one,但如果没有找到结果,则不会引发错误db.query(models.Student).scalar(),调用one方法,并在成功时返回行的第一列db.query(models.Student).count()计数db.query(models.Student).order_by()排序filter()与filter_by()过滤filter()使用#等于db.query(models.Student).filter(Student.name=='Bob')#不等于db.query(models.Student).filter(Student.name!='Bob')#like和ilikedb.query(models.Student).filter(Student.name.like('%Bob%'))db.query(models.Student).filter(Student.name.ilike('%Bob%'))#不区分大小写#isdb.query(models.Student).filter(Student.name==None)#isnotdb.query(models.Student).filter(Student.name!=None)#andfromsqlalchemyimportand_db.query(models.Student).filter(and_(Student.name=='Bob',Student.id==2))db.query(models.Student).filter(Student.name=='Bob',Student.fullname==2)db.query(models.Student).filter(Student.name=='Bob').filter(Student.fullname==2)#orfromsqlalchemyimportor_db.query(models.Student).filter(or_(Student.name=='Bob',Student.name=='Aoa'))#matchdb.query(models.Student).filter(Student.name.match('Bob'))filter_by()使用#id是模型的字段db.query(models.Student).filter_by(id=student_id,name='Bob').first()jion()连接查询db.query(models.Student).join(models.ClassRoom).\filter(\models.Student.classroom_num==models.ClassRoom.classroom_num\).all()

Python极客

说明官方文档:https://tenacity.readthedocs.io/en/latest/Tenacity是一个Apache2.0许可的通用重试库,用Python编写,用于简化将重试行为添加到几乎任何内容的任务。它起源于retrying的一个分支,遗憾的是retrying不再维护。Tenacity与retrying不兼容,但添加了重要的新功能并修复了许多长期存在的错误。特征通用装饰器API指定停止条件(即限制尝试次数)指定等待条件(即尝试之间的指数退避休眠)自定义对预期返回结果的重试停止、等待、重试条件任意组合协程重试使用上下文管理器重试代码块安装:pipinstalltenacity例子不加任何条件,一直重试到正确结果,才停止重试importrandomfromtenacityimportretry@retrydeffun():r=random.randint(1,3)print(f'当前{r}')assertr==2if__name__=='__main__':fun()-----------------------------------当前1当前3当前3当前2Processfinishedwithexitcode0停止条件stop_after_attempt比较常用,停止条件还有很多fromtenacityimportretryfromtenacity.stopimportstop_after_attempt#重试2次,停止@retry(stop=stop_after_attempt(2))deffun():pass等待条件fromtenacityimportretry#每次重试等待2秒fromtenacity.waitimportwait_fixed@retry(wait=wait_fixed(2))deffun():#每次重试随机等待1~2秒fromtenacity.waitimportwait_random@retry(wait=wait_random(min=1,max=2))deffun():#在重试分布式服务和其他远程端点时,结合固定等待和抖动(以帮助避免雷鸣般的群体)#固定基础等待3秒,随机增加0~2秒fromtenacity.waitimportwait_fixed,wait_random@retry(wait=wait_fixed(3)+wait_random(0,2))deffun():#生成一个等待链(等待时间的列表)等待时间相对于:[3,3,3,7,7,9]fromtenacity.waitimportwait_chain,wait_fixed@retry(wait=wait_chain(*[wait_fixed(3)foriinrange(3)]+[wait_fixed(7)foriinrange(2)]+[wait_fixed(9)]))deffun():wait_exponential重试分布式服务和其他远程端点时,等待时间呈指数增长multiplier:增长的指数【默认值1】min:最小等待时间【默认值0】max:最大等待时间【默认值sys.maxsize/2】importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.waitimportwait_exponential@retry(wait=wait_exponential(multiplier=1,min=4,max=10))deffun():r=random.randint(1,5)logger.info(f'当前{r}')assertr==6if__name__=='__main__':fun()-----------------------------------------------2022-02-2810:52:37.918|INFO|__main__:fun:10-当前22022-02-2810:52:41.930|INFO|__main__:fun:10-当前22022-02-2810:52:45.938|INFO|__main__:fun:10-当前12022-02-2810:52:49.949|INFO|__main__:fun:10-当前42022-02-2810:52:57.952|INFO|__main__:fun:10-当前32022-02-2810:53:07.965|INFO|__main__:fun:10-当前12022-02-2810:53:17.975|INFO|__main__:fun:10-当前42022-02-2810:53:27.977|INFO|__main__:fun:10-当前3stop_after_delay函数运行超过限制秒数,引发重试异常importrandomimporttimefromtenacityimportretryfromtenacity.waitimportstop_after_delay@retry(stop=stop_after_delay(2))deffun():r=random.randint(1,5)print(f'当前{r}')time.sleep(2)assertr==2print(f'对了')-----------------------------------tenacity.RetryError:RetryError[<Futureat0x260529a62b0state=finishedraisedAssertionError>]是否重试#AssertionError异常,触发重试fromtenacity.retryimportretry_if_exception_type@retry(retry=retry_if_exception_type(AssertionError))deffun():使用函数判断,是否重试importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.retryimportretry_if_result@retry(retry=retry_if_result(lambdax:x!=2))deffun():r=random.randint(1,5)logger.info(f'当前{r}')returnrif__name__=='__main__':fun()--------------------------------------------2022-02-2811:26:33.588|INFO|__main__:fun:10-当前52022-02-2811:26:33.588|INFO|__main__:fun:10-当前12022-02-2811:26:33.588|INFO|__main__:fun:10-当前32022-02-2811:26:33.588|INFO|__main__:fun:10-当前2组合重试条件importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.retryimportretry_if_result,retry_if_exception_type"""函数结果不可能为2和3-2重试-3触发AssertionError异常,重试"""@retry(retry=(retry_if_result(lambdax:x!=2)andretry_if_exception_type(AssertionError)))deffun():r=random.randint(1,5)logger.info(f'当前{r}')ifr==3:raiseAssertionError(f'ris3')returnrif__name__=='__main__':logger.info(f'fun{fun()}')协程重试importrandomimportasynciofromloguruimportloggerfromtenacityimportretryfromtenacity.waitimportwait_fixed@retry(wait=wait_fixed(2))asyncdeffun():r=random.randint(1,3)logger.info(f'当前{r}')assertr==2if__name__=='__main__':asyncio.run(fun())-----------------------------------------2022-02-2812:36:07.801|INFO|__main__:fun:11-当前32022-02-2812:36:09.805|INFO|__main__:fun:11-当前12022-02-2812:36:11.807|INFO|__main__:fun:11-当前32022-02-2812:36:13.814|INFO|__main__:fun:11-当前2统计数据函数.retry.statistics重试次数开始运行时间整个重试的耗时importrandomfromloguruimportloggerfromtenacityimportretryfromtenacity.retryimportretry_if_result@retry(reraise=True,retry=retry_if_result(lambdax:x!=2))deffun():r=random.randint(1,5)logger.info(f'当前{r}')returnrif__name__=='__main__':logger.info(f'fun{fun()}')logger.info(f'fun{fun.retry.statistics}')----------------------------------------------------2022-02-2811:43:33.558|INFO|__main__:fun:10-当前12022-02-2811:43:33.558|INFO|__main__:fun:10-当前12022-02-2811:43:33.558|INFO|__main__:fun:10-当前12022-02-2811:43:33.559|INFO|__main__:fun:10-当前32022-02-2811:43:33.559|INFO|__main__:fun:10-当前52022-02-2811:43:33.559|INFO|__main__:fun:10-当前42022-02-2811:43:33.559|INFO|__main__:fun:10-当前22022-02-2811:43:33.559|INFO|__main__:<module>:15-fun22022-02-2811:43:33.559|INFO|__main__:<module>:16-fun{'start_time':6972.546,'attempt_number':7,'idle_for':0,'delay_since_first_attempt':0.0}重试代码块重试代码块的逻辑包含的一个for循环中,然后包含一个上下文管理器,使需要重试的逻辑脱离函数importrandomfromloguruimportloggerfromtenacityimportRetrying,RetryError,stop_after_attempttry:#重试3次forattemptinRetrying(stop=stop_after_attempt(3)):#重试、等待、停止条件设置withattempt:#重试逻辑包含在上下文管理器r=random.randint(1,3)logger.info(f'r->{r}')assertr==2exceptRetryError:logger.error(f'超出重试次数')#超出重试次数处理逻辑pass----------------------------------------------------------2022-02-2813:08:13.491|INFO|__main__:<module>:10-r->32022-02-2813:08:13.492|INFO|__main__:<module>:10-r->32022-02-2813:08:13.492|INFO|__main__:<module>:10-r->32022-02-2813:08:13.492|ERROR|__main__:<module>:13-超出重试次数异步重试代码块异步使用importrandomimportasynciofromloguruimportloggerfromtenacityimportAsyncRetrying,RetryError,stop_after_attemptasyncdeffun():try:asyncforattemptinAsyncRetrying(stop=stop_after_attempt(3)):#重试、等待、停止条件设置#这个with前不能带asyncwithattempt:#重试逻辑包含在上下文管理器r=random.randint(1,3)logger.debug(f'r->{r}')assertr==2exceptRetryError:logger.error('达到重试上限')passif__name__=='__main__':asyncio.run(fun())------------------------------------------------------------------2022-02-2813:09:17.247|DEBUG|__main__:fun:13-r->32022-02-2813:09:17.247|DEBUG|__main__:fun:13-r->12022-02-2813:09:17.247|DEBUG|__main__:fun:13-r->32022-02-2813:09:17.247|ERROR|__main__:fun:16-达到重试上限

Python极客

说明pydantic库是python中用于数据接口定义检查与设置管理的库。pydantic在运行时强制执行类型提示,并在数据无效时提供友好的错误。安装:pipinstallpydanticBaseModel基本使用frompydanticimportBaseModelclassInfo(BaseModel):id:intname:strif__name__=='__main__':#实例化使用方式info={'id':1,'name':'Bob'}print(Info(**info))print(Info(id='1',name='Bob'))print(Info(id=1,name='Bob').id)print(Info(id=1,name='Bob').name)print(Info(id=1,name='Bob').json())print(Info(id=1,name='Bob').dict())print(Info(id=1,name='Bob').copy())#浅拷贝>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>id=1name='Bob'id=1name='Bob'1Bob{"id":1,"name":"Bob"}{'id':1,'name':'Bob'}id=1name='Bob'BaseModel错误提示错误提示很详细frompydanticimportBaseModel,ValidationErrorclassInfo(BaseModel):id:intname:strif__name__=='__main__':try:print(Info(id=1,name=[12,34]))exceptValidationErrorase:print(e.json())>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>#提示很详细[{"loc":["name"],"msg":"strtypeexpected","type":"type_error.str"}]BaseModel默认验证类型其他类型:https://pydantic-docs.helpmanual.io/usage/types/相关限制conlist:item_type:Type[T]:列表项的类型min_items:int=None:列表中的最小项目数max_items:int=None:列表中的最大项目数conset:item_type:Type[T]:列表项的类型min_items:int=None:集合中的最小项目数max_items:int=None:集合中的最大项目数conint:strict:bool=False:控制类型强制gt:int=None:强制整数大于设定值ge:int=None:强制整数大于或等于设定值lt:int=None:强制整数小于设定值le:int=None:强制整数小于或等于设定值multiple_of:int=None:强制整数为设定值的倍数confloat:strict:bool=False:控制类型强制gt:int=None:强制浮点数大于设定值ge:int=None:强制浮点数大于或等于设定值lt:int=None:强制浮点数小于设定值le:int=None:强制浮点数小于或等于设定值multiple_of:int=None:强制浮点数为设定值的倍数constr:strip_whitespace:bool=False:删除前尾空格to_lower:bool=False:将所有字符转为小写strict:bool=False:控制类型强制min_length:int=None:字符串的最小长度max_length:int=None:字符串的最大长度regex:str=None:正则表达式来验证字符串frompydanticimportBaseModel,constr,conintfromtypingimportListfromdatetimeimportdateclassInfo(BaseModel):id:int#整形name:str#字符串age:conint(gt=0,le=100)#gt>、ge>=、lt<、le<=time:dateis_boy:bool#布尔friend:List[str]=None#有默认值(此参数可选)自定义组合hobby:List[constr(max_length=255)]#现在str长度if__name__=='__main__':print(Info(id=1,name='Bob',age=12,time="2001-12-23",is_boy=True,friend=['A','B'],hobby=['CC','DD']).json())>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>{"id":1,"name":"Bob","age":12,"time":"2001-12-23","is_boy":true,"friend":["A","B"],"hobby":["CC","DD"]}BaseModel子类嵌套frompydanticimportBaseModel,constr,conintfromtypingimportOptionalclassCity(BaseModel):city_name:strdescription:constr(max_length=255)classInfo(BaseModel):id:int#整形name:str#字符串age:Optional[conint(gt=0,le=100)]#可选gt>、ge>=、lt<、le<=city:Cityif__name__=='__main__':print(Info(id=1,name='Bob',age=12,city={'city_name':'成都','description':'好地方'})..dict())#json中文会被编码>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>#{"id":1,"name":"Bob","age":12,"city":{"city_name":"\u6210\u90fd","description":"\u597d\u5730\u65b9"}}{'id':1,'name':'Bob','age':12,'city':{'city_name':'成都','description':'好地方'}}从ORM对象模型创建BaseModel实例这的ORM对象模型使用的SQLAlchemy模块安装:pip3instaillSQLAlchemy创建ORM对象模型fromsqlalchemyimportColumn,Integer,Stringfromsqlalchemy.ext.declarativeimportdeclarative_baseBase=declarative_base()classORMInfo(Base):#创建ORM映射__tablename__='orm_info'id=Column(Integer,primary_key=True,index=True,autoincrement=True)name=Column(String(100),unique=True,nullable=False)city=Column(String(100),unique=True,nullable=False)frompydanticimportBaseModel,constrclassModelInfo(BaseModel):#创建与ORM对应的BaseModel类id:intname:constr(max_length=100)city:constr(max_length=100)classConfig:orm_mode=True#默认False,需要从ORM实例化数据需要设置Trueorm_info=ORMInfo(id=1,name='Bob',city='zx')print(orm_info)model_info=ModelInfo.from_orm(orm_info)#从ORM模型实例化数据print(model_info)

Python极客

介绍用于Python的下一代HTTP客户端。支持同步&异步支持HTTP/1.1和HTTP/2支持HTTP&HTTPS代理还不支持SOCKS代理【aiohttp支持】需要Python3.6+安装pip安装$pipinstallhttpxHTTP/2支持pipinstallhttpx[http2]同步跟requests基本一样r=httpx.get('https://httpbin.org/get')r>>><Response[200OK]>同步客户端(会话)与使用顶级API相比,这可以带来显着的性能提升,包括:减少请求之间的延迟(无握手)。减少CPU使用率和往返次数。减少网络拥塞。还支持顶级API中不可用的功能,例如:跨请求的Cookie持久性。跨所有传出请求应用配置。通过HTTP代理发送请求。使用HTTP/2。importhttpx#上下文自动关闭会话withhttpx.Client()asclient:res=client.get(url='https://bigdataboy.cn')print(res)>>><Response[200OK]>#手动关闭会话client=httpx.Client()try:...finally:client.close()异步异步需要httpx.AsyncClient()importhttpximportasyncio#上下文方式asyncdefmain():asyncwithhttpx.AsyncClient()asclient:r=awaitclient.get('https://bigdataboy.cn/')print(r)#普通方式asyncdefmain():client=httpx.AsyncClient()r=awaitclient.get('https://bigdataboy.cn/')print(r)awaitclient.aclose()asyncio.run(main())>>><Response[200OK]>高级使用同步普通写法,同步上下文写法,异步普通写法,异步上下文写法都是一样的用法,不一样的地方会标注请求httpx.get('https://bigdataboy.cn/',params=params)#data:str&表单数据json:json数据content:字节数据httpx.post('https://bigdataboy.cn/',data='',json={},content=b'')httpx.put('https://bigdataboy.cn/',data={'key':'value'})httpx.delete('https://bigdataboy.cn/')httpx.head('https://bigdataboy.cn/')httpx.options('https://bigdataboy.cn/')响应#请求urlr.url#文本显示r.text#Json响应r.json()#解码方式r.encoding#设置解码方式r.encoding='utf-8'#字节响应r.content#cookiesr.cookiesr.cookies[xxx]#响应状态码r.status_code#响应headersr.headers#检查http版本r.http_version流式响应#二进制流式响应【同步模式】withhttpx.stream("GET","https://www.example.com")asr:fordatainr.iter_bytes():print(data)#文本流式响应【同步模式】withhttpx.stream("GET","https://www.example.com")asr:forlineinr.iter_lines():print(line)#二进制流式响应【异步模式】client=httpx.AsyncClient()asyncwithclient.stream('GET','https://www.example.com/')asresponse:asyncforchunkinresponse.aiter_bytes():#response.aiter_text()文本流式响应#response.aiter_raw()用于流式传输原始响应字节,而不应用内容解码。Cookiecookies={"k":"v"}httpx.get('https://httpbin.org/cookies',cookies=cookies)#或者cookies=httpx.Cookies()cookies.set('cookie_on_domain','hello,there!',domain='httpbin.org')cookies.set('cookie_off_domain','nope.',domain='example.org')httpx.get('http://httpbin.org/cookies',cookies=cookies)代理还不支持SOCKS代理#str型withhttpx.Client(proxies="http://localhost:8030")asclient:...#字典型proxies={"http://":"http://localhost:8030","https://":"http://localhost:8031",}withhttpx.Client(proxies=proxies)asclient:...重定向httpx默认不会进行重定向跳转#不会重定向跳转r=httpx.get('https://www.bigdataboy.cn/')r.history#[]r.next_request#<Request('GET','https://bigdataboy.cn/')>#开启重定向跳转r=httpx.get('https://www.bigdataboy.cn/',follow_redirects=True)r.history#[<Response[301MovedPermanently]>]r.history[0]#<Response[301MovedPermanently]>r.history[0].url#https://www.bigdataboy.cn/r.next_request#None启用HTTP/2需要服务端支持HTTP/2才有用client=httpx.AsyncClient(http2=True)事件挂钩httpx.AsyncClient()需要异步钩子函数目前支持两种事件:request:在请求完全准备好之后,但在它被发送到网络之前调用。response:在从网络获取响应之后但在返回给调用者之前调用。deflog_request(request):print(f"Requesteventhook:{request.method}{request.url}-Waitingforresponse")deflog_response(response):request=response.requestprint(f"Responseeventhook:{request.method}{request.url}-Status{response.status_code}")client=httpx.Client(event_hooks={'request':[log_request],'response':[log_response]})事件允许对request&response进修改defadd_timestamp(request):request.headers['x-request-timestamp']=datetime.now(tz=datetime.utc).isoformat()client=httpx.Client(event_hooks={'request':[add_timestamp]})

Python极客

主要特点支持异步客户端和异步服务端支持开箱即用的WebSocket服务端和WebSocket客服端服务端还支持中间件(Middlewares)和信号(Signals)初始安装pipinstallaiohttp客户端importasyncioimportaiohttpasyncdefmain():asyncwithaiohttp.ClientSession()assession:res=awaitsession.get(url='https://bigdataboy.cn')print(res)if__name__=='__main__':loop=asyncio.get_event_loop()loop.run_until_complete(main())服务端本文不会介绍fromaiohttpimportwebasyncdefhandle(request):name=request.match_info.get('name',"bigdataboy")text="Hello,"+namereturnweb.Response(text=text)app=web.Application()app.add_routes([web.get('/',handle),web.get('/{name}',handle)])if__name__=='__main__':web.run_app(app=app,host='127.0.0.1',port=8080)快速上手发出请求importasyncio,aiohttpasyncdefmain():asyncwithaiohttp.ClientSession()assession:#会话上下文asyncwithsession.get('http://httpbin.org/get')asresp:#请求上下文print(resp.status)print(awaitresp.text())asyncio.run(main())其他请求方式session.post('http://httpbin.org/post',data=b'data')session.put('http://httpbin.org/put',data='data')session.delete('http://httpbin.org/delete')session.head('http://httpbin.org/get')session.options('http://httpbin.org/get')session.patch('http://httpbin.org/patch',data='data')为了请求同一个网站更方便asyncwithaiohttp.ClientSession(base_url='http://httpbin.org')assession:asyncwithsession.get(url='/get')asresp:print(resp.status)#或调用其他函数asyncwithsession.post(url='/post',data='data')asresp:print(resp.status)asyncwithsession.put(url='/put',data='data')asresp:print(resp.status)提示&技巧一个站点的使用一个会话,==不要==为一个请求创建一个会话会话内部包含一个连接池。连接重用和保持活动(默认情况下都打开)可以提高整体性能。不使用上下文形式需要收到关闭会话session=aiohttp.ClientSession()asyncwithsession.get('...'):#...awaitsession.close()参数传递#GETparams={'k':'v'}params=[('k','v'),('k1','v1')]asyncwithsession.get(url='/get',params=params)asresp:pass#POSTdata='str'asyncwithsession.post(url='/post',data=data)asresp:passjson={'k':'v'}asyncwithsession.post(url='/post',json=json)asresp:pass响应内容asyncwithsession.get('https://bigdataboy.cn')asresp:#状态码resp.status#文本解码awaitresp.text(encoding='utf-8')#指定解码方式#json解码awaitresp.json()#二进制解码awaitresp.read()流式响应内容当响应文件过于庞大,使用read()、json()、text()会把内容全部加载到内存,这是愚蠢的做法,应该使用流式响应。asyncwithsession.get('https://api.github.com/events')asresp:withopen('./xx','wb')asfd:chunk_size=10asyncforchunkinresp.content.iter_chunked(chunk_size):fd.write(chunk)网络套接字(WebSocket)#使用小蝌蚪聊天室测试asyncwithsession.ws_connect('ws://kedou.workerman.net:8280/')asws:asyncformsginws:print(msg)awaitws.send_str('1122')#发送数据#awaitws.send_json()#awaitws.send_json()超时#单位秒默认超时300秒timeout=aiohttp.ClientTimeout(total=60)asyncwithaiohttp.ClientSession(timeout=timeout)assession:...#会覆盖session设置的超时asyncwithsession.get(url,timeout=timeout)asresp:...自定义请求头#会话请求头headers={'User-Agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36'}asyncwithaiohttp.ClientSession(headers=headers)assession:...#单独设置会合并会话请求头asyncwithsession.get(url='http://httpbin.org/headers',headers=headers)asresp:...自定义Cookieurl='http://httpbin.org/cookies'cookies={'cookies_are':'working'}asyncwithaiohttp.ClientSession(cookies=cookies)assession:#添加Cookiessession.cookie_jar.update_cookies(cookies={'k1':'v1'})#单独设置会合并到会话Cookiesasyncwithsession.get(url,cookies={'k':'v'})asresp:res=awaitresp.json()print(res)重定向禁止重定向allow_redirects=Falseasyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresp:print(resp.history)#重定向历史元祖print(resp.history[0])print(resp.history[0].url)代理aiohttp还不能很好的支持https代理#HTTP代理asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url=url,proxy='http://127.0.0.1:7890')asresp:print(resp.status)#授权代理asyncwithaiohttp.ClientSession()assession:proxy_auth=aiohttp.BasicAuth('user','pass')asyncwithsession.get("http://python.org",proxy="http://127.0.0.1:7890",proxy_auth=proxy_auth)asresp:print(resp.status)#socks代理pipinstallaiohttp_socksfromaiohttp_socksimportProxyConnectorconn=ProxyConnector.from_url('socks5://127.0.0.1:7890')asyncwithaiohttp.ClientSession(connector=conn,headers=headers)assession:...小技巧推荐写法importaiohttpimportasyncioasyncdeffetch(session,url):asyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncdefmain():asyncwithaiohttp.ClientSession()assession:#此步html=awaitfetch(session,'https://bigdataboy.cn')print(html)loop=asyncio.get_event_loop()loop.run_until_complete(main())

Python极客

什么是协程协程不是计算机提供的,是人为创造的(多线程、多进程就是计算机提供)协程(Coroutine),可以认为是微线程,是一种用户态内的上下文切换技术,简单理解就是,遇到IO耗时操作时,切换到其他代码块继续执行的技术。Py协程实现的方式greenlet,早期的第三方模块yield关键字asyncio装饰器(@asyncio.coroutine)【py3.4】async、await关键字【py3.5】【推荐】异步编程事件循环可以理解成一个死循环,去检查并执行某些代码。#伪代码任务列表=[任务1,任务2,任务3]whileTrue:#将'可执行'和'已完成'的任务返回可执行的任务列表,已完成的任务列表=去任务列表中检查所有的任务for就绪任务in可执行的任务列表:'执行'已就绪的任务for已完成的任务in已完成的任务列表:在任务列表中'移除'已完成的任务如果任务列表中的任务'都已完成',则'终止'循环importasyncio#生成或获取一个事件循环loop=asyncio.get_event_loop()#把任务放入事件循环loop.run_until_complete(task_list)快速上手协程函数:定义函数时asyncdef函数名(不是普通的函数了)importasyncioasyncdeffun():passresult=fun()#事件对象(Py3.7以前)【两种写法都有应用场景】#loop=asyncio.get_event_loop()#loop.run_until_complete(result)#事件对象asyncio.run(result)#Py3.7以后支持await关键字await后面只可以跟(协程对象、Task对象、Future对象)也可以理解为等待耗时操作,在这个等待的时间可以去执行其他任务。importasynciofromloguruimportloggerasyncdeffun():logger.info(f'开始')awaitasyncio.sleep(2)logger.info(f'结束')#两个协程对象任务tasks=[fun(),fun(),]asyncio.run(asyncio.wait(tasks))常规执行两次fun()需要四秒,使用协程只需要两秒Task对象理解:可以向事件循环里添加任务的对象。Task用于并发调度协程,使用asyncio.create_task(协程对象,...)的方式创建Task对象,这样就可以加入到事件循环等待调度。还能使用更低一级的loop.create_task()或者ensure_future()创建,不建议手动实例化Task函数。importasynciofromloguruimportloggerasyncdeffun():logger.info(f'开始')awaitasyncio.sleep(2)logger.info(f'结束')#执行两次`fun()`用时两秒asyncdefmain():task1=asyncio.create_task(fun())task2=asyncio.create_task(fun())awaittask1awaittask2asyncio.run(main())Task的返回值与nameimportasynciofromloguruimportloggerasyncdefio():logger.debug('io执行中...')awaitasyncio.sleep(2)logger.debug('io操作完成...')asyncdeffun():awaitio()returnTrueasyncdefmain():tasks=[asyncio.create_task(fun(),name='0'),asyncio.create_task(fun(),name='1'),]done,padding=awaitasyncio.wait(tasks,timeout=None)logger.info(f'done={done}')logger.info(f'padding={padding}')if__name__=='__main__':asyncio.run(main())Future可等待对象Task继承Future,Task对象内部await结果的处理基于Future对象而来。使用loop.create_future()来创建Future对象。Future的特性:awaitfuture等待future结果,future没有结果则一直等待importasynciofromloguruimportloggerasyncdeffun(fut):#设置fut值fut.set_result('xxx')asyncdefmain():#获取当前时间循环下面的runloop=asyncio.get_event_loop()#创建future对象fut=loop.create_future()#创建Task对象,通过'fun()'给fut赋值awaitasyncio.create_task(fun(fut))#注释掉fut一直等待#等待fut结果,fut没有结果则一直等待data=awaitfutlogger.info(data)asyncio.run(main())异步迭代器异步迭代器:实现了__aiter__()和__anext__()方法的对象,必须返回一个awaitable对象。asyncfor支持处理异步迭代器的__anext__()方法返回的可等待对象,直到引发一个stopAsyncIteration异常异步可迭代对象:可在asyncfor语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器)importasyncioclassReader(object):def__init__(self):self.count=0#返回自己def__aiter__(self):returnself#迭代asyncdef__anext__(self):self.count+=1ifself.count==5:raiseStopAsyncIteration#迭代完成returnself.countasyncdefmain():reader=Reader()asyncforiteminreader:print(item)if__name__=='__main__':asyncio.run(main())异步上下文管理器上下文管理器:withopen操作,实现了\_\_enter__(),\_\_exit__()。异步上下文管理器:通过定义__aenter__()和__aexit__()方法来对asyncwith语句中的环境进行控制的对象。importasynciofromloguruimportloggerclassAsyncContextManager(object):asyncdefdo(self):logger.debug(f'操作数据库')asyncdef__aenter__(self):logger.debug(f'连接数据库')returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):logger.debug(f'关闭数据库')asyncdefmain():asyncwithAsyncContextManager()asacm:awaitacm.do()if__name__=='__main__':asyncio.run(main())

Python极客

解决痛点常规定义类的参数,如果参数很多(几十个),这样写,就很不方便classTestInfo():#定义类参数name:str=Noneage:int=None#构造器初始化def__init__(self,name:str,age:int):self.name=nameself.age=age#为了方便打印重写__repr__def__repr__(self):returnf'{self.__class__.__name__}(name={self.name},age={self.age})'if__name__=='__main__':tinfo=TestInfo(name='boy',age=16)print(tinfo)attr写法使用attr的写法,可以少写__init、__repr,并且打印也比较方便fromattrimportattrs,attr,fields_dict@attrsclassInfo():#定义类参数name=attr(type=str,default=None)age=attr(type=int,default=18)if__name__=='__main__':info=Info(name='boy')print(info)print(fields_dict(Info))attr属性用法参数解释type类型,比如int、str等各种类型,默认为Nonedefault属性的默认值,如果没有传入初始化数据,那么就会使用默认值,如果没有默认值定义,那么就是NOTHING,即没有默认值validator验证器,检查传入的参数是否合法,三个参数(实例,属性,值)init是否参与初始化,如果为False,那么这个参数不能当做类的初始化参数,默认是True。metadata元数据,只读性的附加数据converter转换器,进行一些值的处理和转换器,增加容错性kw_only是否为强制关键字参数,默认为Falsefromattrimportattrs,attrdefage_va(instance,attribute,value):"instance:实例attribute:参数value:值"print(instance.name)#boyifvalue>18:raiseValueError(f'age大于18')@attrsclassInfo():#定义类参数name=attr(type=str,default=None)age=attr(type=int,default=17,validator=age_va)if__name__=='__main__':info=Info(name='boy',age=9)print(info)