Python极客

介绍用于Python的下一代HTTP客户端。支持同步&异步支持HTTP/1.1和HTTP/2支持HTTP&HTTPS代理还不支持SOCKS代理【aiohttp支持】需要Python3.6+安装pip安装$pipinstallhttpxHTTP/2支持pipinstallhttpx[http2]同步跟requests基本一样r=httpx.get('https://httpbin.org/get')r>>><Response[200OK]>同步客户端(会话)与使用顶级API相比,这可以带来显着的性能提升,包括:减少请求之间的延迟(无握手)。减少CPU使用率和往返次数。减少网络拥塞。还支持顶级API中不可用的功能,例如:跨请求的Cookie持久性。跨所有传出请求应用配置。通过HTTP代理发送请求。使用HTTP/2。importhttpx#上下文自动关闭会话withhttpx.Client()asclient:res=client.get(url='https://bigdataboy.cn')print(res)>>><Response[200OK]>#手动关闭会话client=httpx.Client()try:...finally:client.close()异步异步需要httpx.AsyncClient()importhttpximportasyncio#上下文方式asyncdefmain():asyncwithhttpx.AsyncClient()asclient:r=awaitclient.get('https://bigdataboy.cn/')print(r)#普通方式asyncdefmain():client=httpx.AsyncClient()r=awaitclient.get('https://bigdataboy.cn/')print(r)awaitclient.aclose()asyncio.run(main())>>><Response[200OK]>高级使用同步普通写法,同步上下文写法,异步普通写法,异步上下文写法都是一样的用法,不一样的地方会标注请求httpx.get('https://bigdataboy.cn/',params=params)#data:str&表单数据json:json数据content:字节数据httpx.post('https://bigdataboy.cn/',data='',json={},content=b'')httpx.put('https://bigdataboy.cn/',data={'key':'value'})httpx.delete('https://bigdataboy.cn/')httpx.head('https://bigdataboy.cn/')httpx.options('https://bigdataboy.cn/')响应#请求urlr.url#文本显示r.text#Json响应r.json()#解码方式r.encoding#设置解码方式r.encoding='utf-8'#字节响应r.content#cookiesr.cookiesr.cookies[xxx]#响应状态码r.status_code#响应headersr.headers#检查http版本r.http_version流式响应#二进制流式响应【同步模式】withhttpx.stream("GET","https://www.example.com")asr:fordatainr.iter_bytes():print(data)#文本流式响应【同步模式】withhttpx.stream("GET","https://www.example.com")asr:forlineinr.iter_lines():print(line)#二进制流式响应【异步模式】client=httpx.AsyncClient()asyncwithclient.stream('GET','https://www.example.com/')asresponse:asyncforchunkinresponse.aiter_bytes():#response.aiter_text()文本流式响应#response.aiter_raw()用于流式传输原始响应字节,而不应用内容解码。Cookiecookies={"k":"v"}httpx.get('https://httpbin.org/cookies',cookies=cookies)#或者cookies=httpx.Cookies()cookies.set('cookie_on_domain','hello,there!',domain='httpbin.org')cookies.set('cookie_off_domain','nope.',domain='example.org')httpx.get('http://httpbin.org/cookies',cookies=cookies)代理还不支持SOCKS代理#str型withhttpx.Client(proxies="http://localhost:8030")asclient:...#字典型proxies={"http://":"http://localhost:8030","https://":"http://localhost:8031",}withhttpx.Client(proxies=proxies)asclient:...重定向httpx默认不会进行重定向跳转#不会重定向跳转r=httpx.get('https://www.bigdataboy.cn/')r.history#[]r.next_request#<Request('GET','https://bigdataboy.cn/')>#开启重定向跳转r=httpx.get('https://www.bigdataboy.cn/',follow_redirects=True)r.history#[<Response[301MovedPermanently]>]r.history[0]#<Response[301MovedPermanently]>r.history[0].url#https://www.bigdataboy.cn/r.next_request#None启用HTTP/2需要服务端支持HTTP/2才有用client=httpx.AsyncClient(http2=True)事件挂钩httpx.AsyncClient()需要异步钩子函数目前支持两种事件:request:在请求完全准备好之后,但在它被发送到网络之前调用。response:在从网络获取响应之后但在返回给调用者之前调用。deflog_request(request):print(f"Requesteventhook:{request.method}{request.url}-Waitingforresponse")deflog_response(response):request=response.requestprint(f"Responseeventhook:{request.method}{request.url}-Status{response.status_code}")client=httpx.Client(event_hooks={'request':[log_request],'response':[log_response]})事件允许对request&response进修改defadd_timestamp(request):request.headers['x-request-timestamp']=datetime.now(tz=datetime.utc).isoformat()client=httpx.Client(event_hooks={'request':[add_timestamp]})

Python极客

主要特点支持异步客户端和异步服务端支持开箱即用的WebSocket服务端和WebSocket客服端服务端还支持中间件(Middlewares)和信号(Signals)初始安装pipinstallaiohttp客户端importasyncioimportaiohttpasyncdefmain():asyncwithaiohttp.ClientSession()assession:res=awaitsession.get(url='https://bigdataboy.cn')print(res)if__name__=='__main__':loop=asyncio.get_event_loop()loop.run_until_complete(main())服务端本文不会介绍fromaiohttpimportwebasyncdefhandle(request):name=request.match_info.get('name',"bigdataboy")text="Hello,"+namereturnweb.Response(text=text)app=web.Application()app.add_routes([web.get('/',handle),web.get('/{name}',handle)])if__name__=='__main__':web.run_app(app=app,host='127.0.0.1',port=8080)快速上手发出请求importasyncio,aiohttpasyncdefmain():asyncwithaiohttp.ClientSession()assession:#会话上下文asyncwithsession.get('http://httpbin.org/get')asresp:#请求上下文print(resp.status)print(awaitresp.text())asyncio.run(main())其他请求方式session.post('http://httpbin.org/post',data=b'data')session.put('http://httpbin.org/put',data='data')session.delete('http://httpbin.org/delete')session.head('http://httpbin.org/get')session.options('http://httpbin.org/get')session.patch('http://httpbin.org/patch',data='data')为了请求同一个网站更方便asyncwithaiohttp.ClientSession(base_url='http://httpbin.org')assession:asyncwithsession.get(url='/get')asresp:print(resp.status)#或调用其他函数asyncwithsession.post(url='/post',data='data')asresp:print(resp.status)asyncwithsession.put(url='/put',data='data')asresp:print(resp.status)提示&技巧一个站点的使用一个会话,==不要==为一个请求创建一个会话会话内部包含一个连接池。连接重用和保持活动(默认情况下都打开)可以提高整体性能。不使用上下文形式需要收到关闭会话session=aiohttp.ClientSession()asyncwithsession.get('...'):#...awaitsession.close()参数传递#GETparams={'k':'v'}params=[('k','v'),('k1','v1')]asyncwithsession.get(url='/get',params=params)asresp:pass#POSTdata='str'asyncwithsession.post(url='/post',data=data)asresp:passjson={'k':'v'}asyncwithsession.post(url='/post',json=json)asresp:pass响应内容asyncwithsession.get('https://bigdataboy.cn')asresp:#状态码resp.status#文本解码awaitresp.text(encoding='utf-8')#指定解码方式#json解码awaitresp.json()#二进制解码awaitresp.read()流式响应内容当响应文件过于庞大,使用read()、json()、text()会把内容全部加载到内存,这是愚蠢的做法,应该使用流式响应。asyncwithsession.get('https://api.github.com/events')asresp:withopen('./xx','wb')asfd:chunk_size=10asyncforchunkinresp.content.iter_chunked(chunk_size):fd.write(chunk)网络套接字(WebSocket)#使用小蝌蚪聊天室测试asyncwithsession.ws_connect('ws://kedou.workerman.net:8280/')asws:asyncformsginws:print(msg)awaitws.send_str('1122')#发送数据#awaitws.send_json()#awaitws.send_json()超时#单位秒默认超时300秒timeout=aiohttp.ClientTimeout(total=60)asyncwithaiohttp.ClientSession(timeout=timeout)assession:...#会覆盖session设置的超时asyncwithsession.get(url,timeout=timeout)asresp:...自定义请求头#会话请求头headers={'User-Agent':'Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/96.0.4664.110Safari/537.36'}asyncwithaiohttp.ClientSession(headers=headers)assession:...#单独设置会合并会话请求头asyncwithsession.get(url='http://httpbin.org/headers',headers=headers)asresp:...自定义Cookieurl='http://httpbin.org/cookies'cookies={'cookies_are':'working'}asyncwithaiohttp.ClientSession(cookies=cookies)assession:#添加Cookiessession.cookie_jar.update_cookies(cookies={'k1':'v1'})#单独设置会合并到会话Cookiesasyncwithsession.get(url,cookies={'k':'v'})asresp:res=awaitresp.json()print(res)重定向禁止重定向allow_redirects=Falseasyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresp:print(resp.history)#重定向历史元祖print(resp.history[0])print(resp.history[0].url)代理aiohttp还不能很好的支持https代理#HTTP代理asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url=url,proxy='http://127.0.0.1:7890')asresp:print(resp.status)#授权代理asyncwithaiohttp.ClientSession()assession:proxy_auth=aiohttp.BasicAuth('user','pass')asyncwithsession.get("http://python.org",proxy="http://127.0.0.1:7890",proxy_auth=proxy_auth)asresp:print(resp.status)#socks代理pipinstallaiohttp_socksfromaiohttp_socksimportProxyConnectorconn=ProxyConnector.from_url('socks5://127.0.0.1:7890')asyncwithaiohttp.ClientSession(connector=conn,headers=headers)assession:...小技巧推荐写法importaiohttpimportasyncioasyncdeffetch(session,url):asyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncdefmain():asyncwithaiohttp.ClientSession()assession:#此步html=awaitfetch(session,'https://bigdataboy.cn')print(html)loop=asyncio.get_event_loop()loop.run_until_complete(main())

Python极客

什么是协程协程不是计算机提供的,是人为创造的(多线程、多进程就是计算机提供)协程(Coroutine),可以认为是微线程,是一种用户态内的上下文切换技术,简单理解就是,遇到IO耗时操作时,切换到其他代码块继续执行的技术。Py协程实现的方式greenlet,早期的第三方模块yield关键字asyncio装饰器(@asyncio.coroutine)【py3.4】async、await关键字【py3.5】【推荐】异步编程事件循环可以理解成一个死循环,去检查并执行某些代码。#伪代码任务列表=[任务1,任务2,任务3]whileTrue:#将'可执行'和'已完成'的任务返回可执行的任务列表,已完成的任务列表=去任务列表中检查所有的任务for就绪任务in可执行的任务列表:'执行'已就绪的任务for已完成的任务in已完成的任务列表:在任务列表中'移除'已完成的任务如果任务列表中的任务'都已完成',则'终止'循环importasyncio#生成或获取一个事件循环loop=asyncio.get_event_loop()#把任务放入事件循环loop.run_until_complete(task_list)快速上手协程函数:定义函数时asyncdef函数名(不是普通的函数了)importasyncioasyncdeffun():passresult=fun()#事件对象(Py3.7以前)【两种写法都有应用场景】#loop=asyncio.get_event_loop()#loop.run_until_complete(result)#事件对象asyncio.run(result)#Py3.7以后支持await关键字await后面只可以跟(协程对象、Task对象、Future对象)也可以理解为等待耗时操作,在这个等待的时间可以去执行其他任务。importasynciofromloguruimportloggerasyncdeffun():logger.info(f'开始')awaitasyncio.sleep(2)logger.info(f'结束')#两个协程对象任务tasks=[fun(),fun(),]asyncio.run(asyncio.wait(tasks))常规执行两次fun()需要四秒,使用协程只需要两秒Task对象理解:可以向事件循环里添加任务的对象。Task用于并发调度协程,使用asyncio.create_task(协程对象,...)的方式创建Task对象,这样就可以加入到事件循环等待调度。还能使用更低一级的loop.create_task()或者ensure_future()创建,不建议手动实例化Task函数。importasynciofromloguruimportloggerasyncdeffun():logger.info(f'开始')awaitasyncio.sleep(2)logger.info(f'结束')#执行两次`fun()`用时两秒asyncdefmain():task1=asyncio.create_task(fun())task2=asyncio.create_task(fun())awaittask1awaittask2asyncio.run(main())Task的返回值与nameimportasynciofromloguruimportloggerasyncdefio():logger.debug('io执行中...')awaitasyncio.sleep(2)logger.debug('io操作完成...')asyncdeffun():awaitio()returnTrueasyncdefmain():tasks=[asyncio.create_task(fun(),name='0'),asyncio.create_task(fun(),name='1'),]done,padding=awaitasyncio.wait(tasks,timeout=None)logger.info(f'done={done}')logger.info(f'padding={padding}')if__name__=='__main__':asyncio.run(main())Future可等待对象Task继承Future,Task对象内部await结果的处理基于Future对象而来。使用loop.create_future()来创建Future对象。Future的特性:awaitfuture等待future结果,future没有结果则一直等待importasynciofromloguruimportloggerasyncdeffun(fut):#设置fut值fut.set_result('xxx')asyncdefmain():#获取当前时间循环下面的runloop=asyncio.get_event_loop()#创建future对象fut=loop.create_future()#创建Task对象,通过'fun()'给fut赋值awaitasyncio.create_task(fun(fut))#注释掉fut一直等待#等待fut结果,fut没有结果则一直等待data=awaitfutlogger.info(data)asyncio.run(main())异步迭代器异步迭代器:实现了__aiter__()和__anext__()方法的对象,必须返回一个awaitable对象。asyncfor支持处理异步迭代器的__anext__()方法返回的可等待对象,直到引发一个stopAsyncIteration异常异步可迭代对象:可在asyncfor语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器)importasyncioclassReader(object):def__init__(self):self.count=0#返回自己def__aiter__(self):returnself#迭代asyncdef__anext__(self):self.count+=1ifself.count==5:raiseStopAsyncIteration#迭代完成returnself.countasyncdefmain():reader=Reader()asyncforiteminreader:print(item)if__name__=='__main__':asyncio.run(main())异步上下文管理器上下文管理器:withopen操作,实现了\_\_enter__(),\_\_exit__()。异步上下文管理器:通过定义__aenter__()和__aexit__()方法来对asyncwith语句中的环境进行控制的对象。importasynciofromloguruimportloggerclassAsyncContextManager(object):asyncdefdo(self):logger.debug(f'操作数据库')asyncdef__aenter__(self):logger.debug(f'连接数据库')returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):logger.debug(f'关闭数据库')asyncdefmain():asyncwithAsyncContextManager()asacm:awaitacm.do()if__name__=='__main__':asyncio.run(main())

Python极客

解决痛点常规定义类的参数,如果参数很多(几十个),这样写,就很不方便classTestInfo():#定义类参数name:str=Noneage:int=None#构造器初始化def__init__(self,name:str,age:int):self.name=nameself.age=age#为了方便打印重写__repr__def__repr__(self):returnf'{self.__class__.__name__}(name={self.name},age={self.age})'if__name__=='__main__':tinfo=TestInfo(name='boy',age=16)print(tinfo)attr写法使用attr的写法,可以少写__init、__repr,并且打印也比较方便fromattrimportattrs,attr,fields_dict@attrsclassInfo():#定义类参数name=attr(type=str,default=None)age=attr(type=int,default=18)if__name__=='__main__':info=Info(name='boy')print(info)print(fields_dict(Info))attr属性用法参数解释type类型,比如int、str等各种类型,默认为Nonedefault属性的默认值,如果没有传入初始化数据,那么就会使用默认值,如果没有默认值定义,那么就是NOTHING,即没有默认值validator验证器,检查传入的参数是否合法,三个参数(实例,属性,值)init是否参与初始化,如果为False,那么这个参数不能当做类的初始化参数,默认是True。metadata元数据,只读性的附加数据converter转换器,进行一些值的处理和转换器,增加容错性kw_only是否为强制关键字参数,默认为Falsefromattrimportattrs,attrdefage_va(instance,attribute,value):"instance:实例attribute:参数value:值"print(instance.name)#boyifvalue>18:raiseValueError(f'age大于18')@attrsclassInfo():#定义类参数name=attr(type=str,default=None)age=attr(type=int,default=17,validator=age_va)if__name__=='__main__':info=Info(name='boy',age=9)print(info)

Python极客

说明使用Anaconda安装,是方便控制虚拟环境Python的版本,不于本机的全局相冲突。比如:全局==>Python3.8/虚拟环境==>Python3.6PyTorch0.4/虚拟环境==>Python3.7PyTorch1.7安装参考:https://bigdataboy.cn/post-342.html安装CUDA正常安装过程就行CUDA(ComputeUnifiedDeviceArchitecture),是显卡厂商NVIDIA推出的运算平台。CUDA是一种由NVIDIA推出的通用并行计算架构,该架构使GPU能够解决复杂的计算问题。下载网站:https://developer.nvidia.com/cuda-downloads历史版本下载:https://developer.nvidia.com/cuda-toolkit-archive检测CUDAcmd输入:nvcc-V安装cuDNNNVIDIAcuDNN是用于深度神经网络的GPU加速库。它强调性能、易用性和低内存开销。NVIDIAcuDNN可以集成到更高级别的机器学习框架中,如谷歌的Tensorflow、加州大学伯克利分校的流行caffe软件。简单的插入式设计可以让开发人员专注于设计和实现神经网络模型,而不是简单调整性能,同时还可以在GPU上实现高性能现代并行计算。cuDNN的下载需要登录,但是nvidia登录时的验证码经常被强(各种办法尝试都不行),登陆难度极大不用登陆小技巧:进入历史版本选择好,右键复制链接(就是资源实际地址),然后使用迅雷等工具下载下载网站:https://developer.nvidia.com/rdp/cudnn-download历史版本:https://developer.nvidia.com/rdp/cudnn-archive解压&移动解压下载的cuDNN把解压的cuDNN里的文件,移动到[路径]\NVIDIAGPUComputingToolkit\CUDA\v11.4\下相应的文件下cnDNN[bin]->CUDN[bin]cnDNN[include]->CUDN[include]cnDNN[lib\x64]->CUDN[lib\x64]添加环境变量把下面两个路径添加进环境变量C:\ProgramFiles\NVIDIAGPUComputingToolkit\CUDA\v11.4\lib\x64C:\ProgramFiles\NVIDIAGPUComputingToolkit\CUDA\v11.4\检测结果cmd运行这个exe程序创建虚拟环境condacreate-npytorchpython=3.7虚拟环境名字:pytorch虚拟环境PY版本:3.7安装PyTorch切换环境condaactivatepytorch查看本机CUDA版本生成安装命令&安装PyTorch有点大,安装时耐心等待PyTorch官网:https://pytorch.org/命令:condainstallpytorchtorchvisiontorchaudiocudatoolkit=11.1-cpytorch-cconda-forge如果下载是在太慢,可以先用其他工具下载好,再本地安装PyTorch国内镜像:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorchcondainstall--use-local~/Downloads/a.tar.bz2验证安装进入pytthonshell进行验证importtorchprint(torch.__version__)print(torch.cuda.is_available())Pycharm使用该环境创建新项目,使用conda管理虚拟环境选择这个环境验证

Python极客

安装AnacondaAnaconda历史版本:https://repo.anaconda.com/archive/(本文使用:Anaconda3-5.2.0-Windows-x86_64.exe)正常安装过程同意->下一步->下一步->…->跳过(Skip)不安装VS->完成校验安装是否成功打开AnacondaPrompt配置环境变量添加{路径}Anaconda3\Scripts到Path里cmd输入:conda--version常用操作修改下载源可参考清华源官方文:https://mirror.tuna.tsinghua.edu.cn/help/anaconda/打开C:\Users\[电脑用户名]\.condarcchannels:-defaultsshow_channel_urls:truedefault_channels:-https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main-https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r-https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2custom_channels:conda-forge:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloudmsys2:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloudbioconda:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloudmenpo:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloudpytorch:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloudsimpleitk:https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud查看修改后的源命令:condainfo常用命令建议在AnacondaPrompt里使用,而不是在cmd更新全部工具包condaupgrade--all创建指定Py版本虚拟环境condacreate-n[名字]python=[版本号]condacreate-nvenvpython=3.6进入虚拟环境condaactivate[名字]模块安装(进入环境执行)condainstall[模块名]或者pipinstall[模块名]卸载模块(进入环境执行)condaremove[模块名]或者pipuninstall[模块名]卸载环境condaremove-n[环境名]--all查看环境列表condaenvlist

2021-8-25 771 0
Python极客

日志级别级别说明级别何时使用DEBUG细节信息,仅当诊断问题时适用。INFO确认程序按预期运行WARNING表明有已经或即将发生的意外(例如:磁盘空间不足)。程序仍按预期进行ERROR由于严重的问题,程序的某些功能已经不能正常执行CRITICAL严重的错误,表明程序已不能继续执行级别等级默认等级是WARNING,这意味着仅仅这个等级及以上的才会反馈信息,除非logging模块被用来做其它事情。级别数字值CRITICAL50ERROR40WARNING30INFO20DEBUG10NOTSET0中间处理器(Handler)是配置日志是打印在控制台,还是输出到文件等等,多个处理器可以共用logging.FileHandler该类会把日志写入磁盘文件importloggingfromloggingimportFileHandler#返回一个指定记录器名称logger=logging.getLogger(__name__)#该中间程序器会把日志写入磁盘文件handler=FileHandler(filename="error.log",#日志文件名称mode='a',#写入模式encoding="utf-8"#编码)#该中间程序处理器的日志级别handler.setLevel(logging.ERROR)#设置该中间处理器的日志输出格式handler.setFormatter(logging.Formatter('%(asctime)-15s%(levelname)s%(filename)s%(lineno)d%(process)d%(message)s'))#添加中间处理器logger.addHandler(handler)#使用logger.error(msg=f'文件输出')logging.StreamHandler控制台输出importloggingfromloggingimportStreamHandler#返回一个指定记录器名称logger=logging.getLogger(__name__)#该中间程序器会把日志写入到流中handler=StreamHandler()#该中间程序处理器的日志级别handler.setLevel(logging.ERROR)#设置该中间处理器的日志输出格式handler.setFormatter(logging.Formatter('%(asctime)-15s%(levelname)s%(filename)s%(lineno)d%(process)d%(message)s'))#添加中间处理器logger.addHandler(handler)#使用logger.error(msg=f'文件输出')日志输入格式配置(Formatter)规定日志输出的内容的格式格式描述%(levelno)s打印日志级别的数值%(levelname)s日志级别%(pathname)s当前执行程序的路径%(filename)s当前执行程序名称%(funcName)s日志的当前函数%(lineno)d日志的当前行号%(asctime)s日志的时间%(thread)d线程id%(threadName)s线程名称%(process)d进程ID%(message)s日志信息logging.Formatter('%(asctime)-15s%(levelname)s%(filename)s%(lineno)d%(process)d%(message)s')----2020-08-1017:20:50,687ERRORtest.py2233480文件输出常见配置单个文件使用默认输出到控制台importlogginglogging.basicConfig(level=logging.INFO,format='%(asctime)s-%(name)s-%(levelname)s-%(message)s')logger=logging.getLogger(__name__)logger.info(msg=f"大数据男孩")输出结果2020-08-1017:48:30,772-__main__-INFO-大数据男孩多个文件应用importloggingimportsysfromosimportmakedirsfromos.pathimportdirname,existsloggers={}LOG_ENABLED=True#是否开启日志LOG_TO_CONSOLE=True#是否输出到控制台LOG_TO_FILE=True#是否输出到文件LOG_TO_ES=True#是否输出到ElasticsearchLOG_PATH='./runtime.log'#日志文件路径LOG_LEVEL='DEBUG'#日志级别LOG_FORMAT='%(levelname)s-%(asctime)s-process:%(process)d-%(filename)s-%(name)s-%(lineno)d-%(module)s-%(message)s'#每条日志输出格式defget_logger(name=None):"""getloggerbyname:paramname:nameoflogger:return:logger"""globalloggersifnotname:name=__name__ifloggers.get(name):returnloggers.get(name)logger=logging.getLogger(name)logger.setLevel(LOG_LEVEL)#输出到控制台ifLOG_ENABLEDandLOG_TO_CONSOLE:stream_handler=logging.StreamHandler(sys.stdout)stream_handler.setLevel(level=LOG_LEVEL)formatter=logging.Formatter(LOG_FORMAT)stream_handler.setFormatter(formatter)logger.addHandler(stream_handler)#输出到文件ifLOG_ENABLEDandLOG_TO_FILE:#如果路径不存在,创建日志文件文件夹log_dir=dirname(LOG_PATH)ifnotexists(log_dir):makedirs(log_dir)#添加FileHandlerfile_handler=logging.FileHandler(LOG_PATH,encoding='utf-8')file_handler.setLevel(level=LOG_LEVEL)formatter=logging.Formatter(LOG_FORMAT)file_handler.setFormatter(formatter)logger.addHandler(file_handler)#保存到全局loggersloggers[name]=loggerreturnloggerif__name__=='__main__':logger=get_logger()logger.debug('thisisamessage')输出结果DEBUG-2020-08-1017:46:12,213-process:17884-demo.py-__main__-59-demo-thisisamessage

Python极客

说明现在网上关于FastAPI的项目结构的资料比较少,以下仅供参考,如有更好的想法,可私聊博主交流项目下载:https://pan.bigdataboy.cn/#/s/oWsb结构说明:项目结构的静态文件使用Jinja2渲染项目├──modules(功能模块)│├──│└──init.py├──routers(路由控制)│├──index.py│├──items.py│├──users.py│└──init.py├──static(静态文件)│├──css│└──js├──templates(网页文件)│└──index.html├──init.py├──app.py(主文件)├──public.py(公共方法)└──config.py(配置文件)结构说明routers(路由控制)控制路由的响应(静态文件,还是Json,…)响应静态文件html.pyfromfastapiimportAPIRouter,HTTPExceptionfromstarlette.templatingimportJinja2Templatesfromstarlette.requestsimportRequestrouter=APIRouter()#绑定模板路径template=Jinja2Templates(directory="templates")@router.get("/")asyncdefindex(request:Request):#request为必传,后面的就跟Jinja2一样returntemplate.TemplateResponse('index.html',{'request':request,"user":"啦啦啦啦"})接口响应路由users.py。/users/路径在本文件定义fromfastapiimportAPIRouterrouter=APIRouter()@router.get("/users/",tags=["users"])asyncdefread_users():return[{"username":"Foo"},{"username":"Bar"}]@router.get("/users/me",tags=["users"])asyncdefread_user_me():return{"username":"fakecurrentuser"}@router.get("/users/{username}",tags=["users"])asyncdefread_user(username:str):return{"username":username}接口响应路由items.py。/items/路径在app.py定义fromfastapiimportAPIRouter,HTTPExceptionrouter=APIRouter()@router.get("/")asyncdefuse_api_munber():return[{"name":"ItemFoo"},{"name":"itemBar"}]@router.get("/{item_id}")asyncdefread_item(item_id:str):return{"name":"FakeSpecificItem","item_id":item_id}@router.put("/{item_id}",tags=["custom"],responses={403:{"description":"Operationforbidden"}},)asyncdefupdate_item(item_id:str):ifitem_id!="foo":raiseHTTPException(status_code=403,detail="Youcanonlyupdatetheitem:foo")return{"item_id":item_id,"name":"TheFighters"}app.py(主文件)fromfastapiimportDepends,FastAPIfromFastAPI_Project.routersimporthtml,items,usersfrompublicimportcount_usefromfastapi.staticfilesimportStaticFilesimportconfigimportuvicornapp=FastAPI(debug=True,version=config.VERSION,)#静态文件绑定app.mount('/static',StaticFiles(directory='static'),name='static')#静态文件路由app.include_router(html.router)#API路由app.include_router(items.router,prefix="/items",#添加路由路径tags=["items"],#标签路由分组作用#注入依赖请求这个路由之前,这个依赖的方法可以用于验证统计等dependencies=[Depends(count_use)],)app.include_router(users.router,dependencies=[Depends(count_use)],responses={404:{"description":"Notfound"}},)if__name__=='__main__':uvicorn.run(app=app,host=config.HOST,port=config.PORT,)config.py配置文件配置参数,链接数据什么的importpymongo#通用配置classConfig:#MongoDB连接地址MONGODB=pymongo.MongoClient("mongodb://localhost:27017/")DATABASES="bigdataboy"pass#运行地址HOST="127.0.0.1"#运行端口PORT=8686#版本VERSION="1.0.1"public.py(公共方法)fromfastapi.loggerimportloggerfromfastapi.requestsimportRequestfromtimeimporttimefromFastAPI_Project.configimportConfig"""记录接口使用信息"""asyncdefcount_use(r:Request):#获取api的路径path=r.url.path.strip("/").split("/")[0]#获取客户端ipip=r.client.host#获取客户端UAuser_agent=r.headers.getlist("user-agent")[-1]logger.info(msg=f"{path}{ip}{user_agent}")#插入mongodbConfig.MONGODB[Config.DATABASES]['count_use_api'].insert({"path":path,#接口路径"ip":ip,#客户端ip"user_agent":user_agent,#客户端UA"timestamp":time()#时间戳})

Python极客

说明在软件使用中,大多数需要使用多线程来实现,并且需要线程长久执行,所以就需要用到死循环,哪如何停止该死循环线程呢?监控文件是否修改完整代码:点击查看主界面代码importsysimportosfromPyQt5.QtWidgetsimport(QWidget,QApplication,QTextEdit,QPushButton,QBoxLayout)fromPyQt5.QtCoreimport(QThread,pyqtSignal)classWindow(QWidget):def__init__(self,parent=None,*args,**kwargs):super().__init__(parent,*args,**kwargs)self.initUI()definitUI(self):self.setWindowTitle('停止多线程死循环')self.resize(550,450)#创建多行文本框qtext_edit=QTextEdit()qtext_edit.resize(530,430)qtext_edit.setObjectName("edit")#创建三个按钮pushButton=QPushButton()pushButton.setText("导入文件")pushButton.clicked.connect(self.open_file)#导入文件qpush_button_start=QPushButton()qpush_button_start.setText("开始监控")qpush_button_start.clicked.connect(self.start_monitor)#连接开始监控槽函数qpush_button_stop=QPushButton()qpush_button_stop.setText("停止监控")qpush_button_stop.clicked.connect(self.stop_monitor)#连接停止监控槽函数#添加到垂直布局到父控件v_layout=QBoxLayout(QBoxLayout.TopToBottom)self.setLayout(v_layout)#添加水平布局控件lrlayout=QBoxLayout(QBoxLayout.LeftToRight)lrlayout.addWidget(pushButton)lrlayout.addWidget(qpush_button_start)lrlayout.addWidget(qpush_button_stop)v_layout.addWidget(qtext_edit)v_layout.addLayout(lrlayout)defopen_file(self):print("导入文件")#开始监控defstart_monitor(self):print("点击开始")#停止监控defstop_monitor(self):print("点击停止")if__name__=='__main__':app=QApplication(sys.argv)window=Window()window.show()sys.exit(app.exec_())监控文件是否更新线程监控文件需要开启一个线程,不然主界面就会出现假死classThread(QThread):#自定义修改时间信号file_time=pyqtSignal(float)def__init__(self,path:str,parent=None):super().__init__(parent)#文件路径self.path=path#死循环状态self.status=True#默认文件最后修改时间self.endTime=0.0def__del__(self):self.wait()defrun(self)->None:whileself.status:#判断文件修改时间ifself.endTime!=os.path.getmtime(self.path):self.endTime=os.path.getmtime(self.path)#发送文件修改信号self.file_time.emit(self.endTime)self.sleep(1)导入文件&更新文本框内容defopen_file(self):print("导入文件")#选择文件self.file_Name_Type=QFileDialog.getOpenFileName(self,"选取文件")ifself.file_Name_Type=="":print("\n取消选择")returnself.qtext_edit.append(f"导入文件路径{self.file_Name_Type[0]}")#添加文本框内容defset_text(self,text:str):self.qtext_edit.append(f"文件最后修改时间{text}")开始&停止监控事件点击开始监控开启监控线程,点击停止监控,修改死循环条件#开始监控defstart_monitor(self):self.thread=Thread(self.file_Name_Type[0])#接收文件修改时间信号self.thread.file_time.connect(self.set_text)self.thread.start()#停止监控defstop_monitor(self):self.thread.status=False效果

Python极客

安装&加载pip3installxlrd-ihttps://mirrors.aliyun.com/pypi/simple/importxlrd打开Excel文件path='data/在线表格2.0.xlsx'data=xlrd.open_workbook(path)获取sheet名称#返回所有的sheet名称names=data.sheet_names()#返回类型为是一个列表得到表格对象#通过索引获取操作的表格对象table=data.sheet_by_index(0)行操作得到有效行#获取该表格中的有效行数nrows=table.nrows得到行对象#通过索引获取操作的表格对象table=data.sheet_by_index(0)#获取该把表格中的有效行数rows=table.nrows#使用有效行去遍历得到行对象forrinrange(rows):v=table.row(r)print(v)获得具体数据这是通过行对象获取数据#通过索引获取操作的表格对象table=data.sheet_by_index(0)#获取该把表格中的有效行数rows=table.nrows#使用有效行去遍历得到行对象forrinrange(rows):o,t,s=table.row(r)[0].value,table.row(r)[1].value,table.row(r)[2].valueprint(o,t,s)直接获取一行的数据,然后组成list#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效行数rows=table.nrows#使用有效行去遍历得到行对象forrinrange(rows):o=table.row_values(r)print(o)列操作获取有效列#获取该表格中的有效行数cols=table.ncolsprint(cols)得到列对象#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效列数cols=table.ncols#使用有效行去遍历得到列对象forcinrange(cols):t=table.col(c,start_rowx=0,end_rowx=None)print(t)获得具体数据这是通过列对象获取数据#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效列数cols=table.ncols#使用有效行去遍历得到列对象forcinrange(cols):t=table.col(c)[0].valueprint(t)直接获取一行的数据,然后组成list#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效列数cols=table.ncols#使用有效行去遍历得到列对象forcinrange(cols):t=table.col_values(c)print(t)常用函数合计path='test.xlsx'#读取文件data=xlrd.open_workbook(path)data.sheet_names()#返回所有的sheet名称的listtable=data.sheet_by_index(0)#通过索引获取操作的表格对象table=data.sheet_by_name(sheet_name)#通过名称获取操作的表格对象行操作nrows=table.nrows#获取该sheet中的有效行数table.row(rowx)#返回由该行中所有的单元格对象组成的列表table.row_types(rowx,[start_colx=0],[end_colx=None])#返回由该行中所有单元格的数据类型组成的列表table.row_values(rowx,[start_colx=0],[end_colx=None])#返回由该行中所有单元格的数据组成的列表table.row_len(rowx)#返回该列的有效单元格长度列操作ncols=table.ncols#获取列表的有效列数table.col(colx,[start_rowx=0],[end_rowx=None])#返回由该列中所有的单元格对象组成的列表table.col_types(colx,[start_rowx=0],[end_rowx=None])#返回由该列中所有单元格的数据类型组成的列表table.col_values(colx,[start_rowx=0],[end_rowx=None])#返回由该列中所有单元格的数据组成的列表

Python极客

安装&加载pip3installpymongo-ihttps://mirrors.aliyun.com/pypi/simple/importpymongo连接MongoDB服务client=pymongo.MongoClient("mongodb://localhost:27017/")print(client)--------------#Mongo服务MongoClient(host=['localhost:27017'],document_class=dict,tz_aware=False,connect=True)获取所有数据库dblist=client.list_database_names()print(dblist)-------------#所有数据库名称['admin','config','local']创建数据库&获取数据库对象如果没有该数据库则创建db=client["db"]print(db)---------#db数据库对象Database(MongoClient(host=['localhost:27017'],document_class=dict,tz_aware=False,connect=True),'db')创建集合&获取集合如果没有该集合则创建#创建集合collection=db["collection"]print(sets)-----------#sets集合对象Collection(Database(MongoClient(host=['localhost:27017'],document_class=dict,tz_aware=False,connect=True),'db'),'sets')#获取所有集合sets=db.list_collection_names()print(sets)--------------#该数据库所有集合['collection']插入数据#插入一条数据data={"name":"bigdataboy","age":"18"}x=collection.insert_one(data)print(x.inserted_id)--------#数据的_id5ed666e1bca2037c30662e97#插入多条数据datas=[{"name":"大数据男孩","age":"18"},{"name":"bigdataboy","age":"16","addr":"China"},{"name":"bigdataboy","age":"18"}]x=collection.insert_many(datas)print(x.inserted_ids)---------------------#插入数据的_id[ObjectId('5ed667e044fb69d445e510d8'),ObjectId('5ed667e044fb69d445e510d9'),ObjectId('5ed667e044fb69d445e510da')]自定义_id#链式写法collection=pymongo.MongoClient("mongodb://localhost:27017/")["db"]["collection"]#数据自行固定_iddatas=[{"_id":"1","name":"大数据男孩","age":"18"},{"_id":"2","name":"bigdataboy","age":"16","addr":"China"},{"_id":"3","name":"bigdataboy","age":"18"}]i=collection.insert_many(datas)print(i.inserted_ids)---------------------#插入数据的_id['1','2','3']查看所有数据collection=pymongo.MongoClient("mongodb://localhost:27017/")["db"]["collection"]forxincollection.find():print(x)------------#所有数据{'_id':'1','name':'大数据男孩','age':'18'}{'_id':'2','name':'bigdataboy','age':'16','addr':'China'}{'_id':'3','name':'bigdataboy','age':'18'}