Python极客

说明在软件使用中,大多数需要使用多线程来实现,并且需要线程长久执行,所以就需要用到死循环,哪如何停止该死循环线程呢?监控文件是否修改完整代码:点击查看主界面代码importsysimportosfromPyQt5.QtWidgetsimport(QWidget,QApplication,QTextEdit,QPushButton,QBoxLayout)fromPyQt5.QtCoreimport(QThread,pyqtSignal)classWindow(QWidget):def__init__(self,parent=None,*args,**kwargs):super().__init__(parent,*args,**kwargs)self.initUI()definitUI(self):self.setWindowTitle('停止多线程死循环')self.resize(550,450)#创建多行文本框qtext_edit=QTextEdit()qtext_edit.resize(530,430)qtext_edit.setObjectName("edit")#创建三个按钮pushButton=QPushButton()pushButton.setText("导入文件")pushButton.clicked.connect(self.open_file)#导入文件qpush_button_start=QPushButton()qpush_button_start.setText("开始监控")qpush_button_start.clicked.connect(self.start_monitor)#连接开始监控槽函数qpush_button_stop=QPushButton()qpush_button_stop.setText("停止监控")qpush_button_stop.clicked.connect(self.stop_monitor)#连接停止监控槽函数#添加到垂直布局到父控件v_layout=QBoxLayout(QBoxLayout.TopToBottom)self.setLayout(v_layout)#添加水平布局控件lrlayout=QBoxLayout(QBoxLayout.LeftToRight)lrlayout.addWidget(pushButton)lrlayout.addWidget(qpush_button_start)lrlayout.addWidget(qpush_button_stop)v_layout.addWidget(qtext_edit)v_layout.addLayout(lrlayout)defopen_file(self):print("导入文件")#开始监控defstart_monitor(self):print("点击开始")#停止监控defstop_monitor(self):print("点击停止")if__name__=='__main__':app=QApplication(sys.argv)window=Window()window.show()sys.exit(app.exec_())监控文件是否更新线程监控文件需要开启一个线程,不然主界面就会出现假死classThread(QThread):#自定义修改时间信号file_time=pyqtSignal(float)def__init__(self,path:str,parent=None):super().__init__(parent)#文件路径self.path=path#死循环状态self.status=True#默认文件最后修改时间self.endTime=0.0def__del__(self):self.wait()defrun(self)->None:whileself.status:#判断文件修改时间ifself.endTime!=os.path.getmtime(self.path):self.endTime=os.path.getmtime(self.path)#发送文件修改信号self.file_time.emit(self.endTime)self.sleep(1)导入文件&更新文本框内容defopen_file(self):print("导入文件")#选择文件self.file_Name_Type=QFileDialog.getOpenFileName(self,"选取文件")ifself.file_Name_Type=="":print("\n取消选择")returnself.qtext_edit.append(f"导入文件路径{self.file_Name_Type[0]}")#添加文本框内容defset_text(self,text:str):self.qtext_edit.append(f"文件最后修改时间{text}")开始&停止监控事件点击开始监控开启监控线程,点击停止监控,修改死循环条件#开始监控defstart_monitor(self):self.thread=Thread(self.file_Name_Type[0])#接收文件修改时间信号self.thread.file_time.connect(self.set_text)self.thread.start()#停止监控defstop_monitor(self):self.thread.status=False效果

Python极客

安装&加载pip3installxlrd-ihttps://mirrors.aliyun.com/pypi/simple/importxlrd打开Excel文件path='data/在线表格2.0.xlsx'data=xlrd.open_workbook(path)获取sheet名称#返回所有的sheet名称names=data.sheet_names()#返回类型为是一个列表得到表格对象#通过索引获取操作的表格对象table=data.sheet_by_index(0)行操作得到有效行#获取该表格中的有效行数nrows=table.nrows得到行对象#通过索引获取操作的表格对象table=data.sheet_by_index(0)#获取该把表格中的有效行数rows=table.nrows#使用有效行去遍历得到行对象forrinrange(rows):v=table.row(r)print(v)获得具体数据这是通过行对象获取数据#通过索引获取操作的表格对象table=data.sheet_by_index(0)#获取该把表格中的有效行数rows=table.nrows#使用有效行去遍历得到行对象forrinrange(rows):o,t,s=table.row(r)[0].value,table.row(r)[1].value,table.row(r)[2].valueprint(o,t,s)直接获取一行的数据,然后组成list#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效行数rows=table.nrows#使用有效行去遍历得到行对象forrinrange(rows):o=table.row_values(r)print(o)列操作获取有效列#获取该表格中的有效行数cols=table.ncolsprint(cols)得到列对象#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效列数cols=table.ncols#使用有效行去遍历得到列对象forcinrange(cols):t=table.col(c,start_rowx=0,end_rowx=None)print(t)获得具体数据这是通过列对象获取数据#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效列数cols=table.ncols#使用有效行去遍历得到列对象forcinrange(cols):t=table.col(c)[0].valueprint(t)直接获取一行的数据,然后组成list#通过索引获取操作的表格对象table=data.sheet_by_index(0)#通过索引顺序获取#获取该把表格中的有效列数cols=table.ncols#使用有效行去遍历得到列对象forcinrange(cols):t=table.col_values(c)print(t)常用函数合计path='test.xlsx'#读取文件data=xlrd.open_workbook(path)data.sheet_names()#返回所有的sheet名称的listtable=data.sheet_by_index(0)#通过索引获取操作的表格对象table=data.sheet_by_name(sheet_name)#通过名称获取操作的表格对象行操作nrows=table.nrows#获取该sheet中的有效行数table.row(rowx)#返回由该行中所有的单元格对象组成的列表table.row_types(rowx,[start_colx=0],[end_colx=None])#返回由该行中所有单元格的数据类型组成的列表table.row_values(rowx,[start_colx=0],[end_colx=None])#返回由该行中所有单元格的数据组成的列表table.row_len(rowx)#返回该列的有效单元格长度列操作ncols=table.ncols#获取列表的有效列数table.col(colx,[start_rowx=0],[end_rowx=None])#返回由该列中所有的单元格对象组成的列表table.col_types(colx,[start_rowx=0],[end_rowx=None])#返回由该列中所有单元格的数据类型组成的列表table.col_values(colx,[start_rowx=0],[end_rowx=None])#返回由该列中所有单元格的数据组成的列表

Python极客

安装&加载pip3installpymongo-ihttps://mirrors.aliyun.com/pypi/simple/importpymongo连接MongoDB服务client=pymongo.MongoClient("mongodb://localhost:27017/")print(client)--------------#Mongo服务MongoClient(host=['localhost:27017'],document_class=dict,tz_aware=False,connect=True)获取所有数据库dblist=client.list_database_names()print(dblist)-------------#所有数据库名称['admin','config','local']创建数据库&获取数据库对象如果没有该数据库则创建db=client["db"]print(db)---------#db数据库对象Database(MongoClient(host=['localhost:27017'],document_class=dict,tz_aware=False,connect=True),'db')创建集合&获取集合如果没有该集合则创建#创建集合collection=db["collection"]print(sets)-----------#sets集合对象Collection(Database(MongoClient(host=['localhost:27017'],document_class=dict,tz_aware=False,connect=True),'db'),'sets')#获取所有集合sets=db.list_collection_names()print(sets)--------------#该数据库所有集合['collection']插入数据#插入一条数据data={"name":"bigdataboy","age":"18"}x=collection.insert_one(data)print(x.inserted_id)--------#数据的_id5ed666e1bca2037c30662e97#插入多条数据datas=[{"name":"大数据男孩","age":"18"},{"name":"bigdataboy","age":"16","addr":"China"},{"name":"bigdataboy","age":"18"}]x=collection.insert_many(datas)print(x.inserted_ids)---------------------#插入数据的_id[ObjectId('5ed667e044fb69d445e510d8'),ObjectId('5ed667e044fb69d445e510d9'),ObjectId('5ed667e044fb69d445e510da')]自定义_id#链式写法collection=pymongo.MongoClient("mongodb://localhost:27017/")["db"]["collection"]#数据自行固定_iddatas=[{"_id":"1","name":"大数据男孩","age":"18"},{"_id":"2","name":"bigdataboy","age":"16","addr":"China"},{"_id":"3","name":"bigdataboy","age":"18"}]i=collection.insert_many(datas)print(i.inserted_ids)---------------------#插入数据的_id['1','2','3']查看所有数据collection=pymongo.MongoClient("mongodb://localhost:27017/")["db"]["collection"]forxincollection.find():print(x)------------#所有数据{'_id':'1','name':'大数据男孩','age':'18'}{'_id':'2','name':'bigdataboy','age':'16','addr':'China'}{'_id':'3','name':'bigdataboy','age':'18'}

2020-6-3 36 0
Python极客

说明在许多情况下,您需要将错误通知给正在使用API​​的客户端。FastAPI使用Python的HTTPException异常进行处理,所以raise它。实例fromfastapiimportFastAPI,HTTPExceptionimportuvicornapp=FastAPI()@app.get("/user")asyncdefread_item(age:int,name:str):ifage<18:#返回客户端500状态码及错误原因raiseHTTPException(status_code=500,detail="age小于18")return{"name":name,"age":age}if__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)添加自定义Headers@app.get("/user")asyncdefread_item(age:int,name:str):ifage<18:#返回客户端500状态码及错误原因raiseHTTPException(status_code=500,#detail可以是list、dict、strdetail={"msg":"age小于18"},#自定义headersheaders={"X-Error":"Theregoesmyerror"})return{"name":name,"age":age}自定义异常处理程序fromfastapiimportFastAPI,Requestfromfastapi.responsesimportJSONResponseimportuvicornapp=FastAPI()#异常参数类classUnicornException(Exception):def__init__(self,status_code:int,msg:str):#错误状态码self.status_code=status_code#错误内容self.msg=msg#异常响应@app.exception_handler(UnicornException)asyncdefunicorn_exception_handler(request:Request,exc:UnicornException):returnJSONResponse(#错误响应状态码status_code=exc.status_code,#错误返回内容格式content={"msg":exc.msg},)@app.get("/user/{name}")asyncdefread_item(name:str):ifname=="bigdata":#错误提示raiseUnicornException(status_code=404,msg="用户名错误,没有该用户")return{"name":name}if__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)测试URL:http://127.0.0.1/user/bigdata?name=bigdataboy

Python极客

实例代码fromfastapiimportFastAPIimportuvicornapp=FastAPI()@app.post("/user/{name}")#路径参数asyncdefread_item(name:str):return{"name":name}if__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)导入验证fromfastapiimportFastAPI,Query可选参数name可传可不传#正常写法name为可选参数@app.get("/user/")asyncdefread_item(age:int,name:str=None):return{"name":name,"age":age}#验证写法name为可选参数@app.get("/user/")asyncdefread_item(age:int,name:str=Query(None)):return{"name":name,"age":age}必选参数name必须传#正常写法name为必选参数asyncdefread_item(age:int,name:str):return{"name":name,"age":age}#验证写法name为必选参数asyncdefread_item(age:int,name:str=Query(...)):return{"name":name,"age":age}默认参数值#正常写法name默认值asyncdefread_item(age:int,name:str="bigdataboy"):return{"name":name,"age":age}#验证写法name默认值asyncdefread_item(age:int,name:str=Query("bigdataboy")):return{"name":name,"age":age}Query验证Query主要支持一下验证长度验证:max_length、min_length正则验证:regex@app.get("/user")#路径参数asyncdefread_item(age:int,name:str=Query(...,#该参数必选min_length=1,#最小长度为1max_length=50,#最大长度为50#regex=re.compile(".+bigdataboy.+").pattern#使用编译的正则表达式需要导入re库regex=".+bigdataboy.+"#正则表达式匹配验证)):return{"name":name,"age":age}其他参数这些参数主要是对接口进行描述@app.get("/user")#路径参数asyncdefread_item(age:int,name:str=Query(...,#该参数必选title="name",#标题description="用户的新名字",#参数作用的描述alias="new-name",#接口参数别名,URL参数就使用该别名deprecated=True,#代表该参数,即将弃用)):return{"name":name,"age":age}

2020-5-27 76 0
Python极客

路径参数比如访问:127.0.0.1/user/123/fromfastapiimportFastAPIimportuvicornapp=FastAPI()@app.get("/user/{id}/")asyncdefmain(id:int):return{"id":id}if__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)API文档地址:http://127.0.0.1/docs固定的路径参数@app.get("/user/id/")asyncdefmain():return{"id":"thecurrentuser"}@app.get("/user/{id}/")asyncdefmain(id:int):return{"id":id}预留路径值传入的路径参数只能在预留路径里,不然就报错fromfastapiimportFastAPI#导入枚举类fromenumimportEnumimportuvicorn#预留路径类继承str,文档将能够知道这些值的类型classModelName(str,Enum):#预留的路径alexnet="alexnet"resnet="resnet"lenet="lenet"app=FastAPI()@app.get("/model/{model_name}")asyncdefget_model(model_name:ModelName):#参数类型是预留路径类#第一种判定写法ifmodel_name==ModelName.alexnet:return{"model_name":model_name,"message":"DeepLearningFTW!"}#第二种判定写法ifmodel_name.value=="lenet":return{"model_name":model_name,"message":"LeCNNalltheimages"}#默认返回return{"model_name":model_name,"message":"Havesomeresiduals"}if__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)路径转换器访问/files/bigdataboy/a.txt,file_path将能得到bigdataboy/a.txt#添加上:path即可@app.get("/files/{file_path:path}")asyncdefread_file(file_path:str):return{"file_path":file_path}查询参数查询参数是指?后面的&分割的键值对http://127.0.0.1:8000/items/?skip=0&limit=2fromfastapiimportFastAPIimportuvicornapp=FastAPI()fake_items_db=[{"item_name":"Foo"},{"item_name":"Bar"},{"item_name":"Baz"}]@app.get("/items/")#设置的默认值,当默认值是None时,表示该值为可选asyncdefread_item(skip:int=0,limit:int=2,default:str=None):#一个列表切片returnfake_items_db[skip:skip+limit]if__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)注意如果声明可选时出现以下类似错误#声明可选limit:int=None#出现以下类似错误Incompatibletypesinassignment(expressionhastype"None",variablehastype"int")这是你需要这样做fromtypingimportOptionallimit:Optional[int]=None完整实例fromfastapiimportFastAPIfromtypingimportOptionalimportuvicornapp=FastAPI()@app.get("/items/")asyncdefread_item(skip:str,limit:Optional[int]=None):item={"skip":skip,"limit":limit}returnitemif__name__=='__main__':uvicorn.run(app=app,host="127.0.0.1",port=80,)

2020-5-26 76 0
2020-5-21 47 0
Python极客

安装&加载pip3installrequests-ihttps://mirrors.aliyun.com/pypi/simple/importrequestsGET请求#普通请求r=requests.get('https://bigdataboy.cn/')#带Query参数,等价于https://bigdataboy.cn/?key1=value1&key2=value2payload={'key1':'value1','key2':'value2'}r=requests.get('https://bigdataboy.cn/',params=params)#带Headersheaders={'user-agent':'anoyi-app/0.0.1'}r=requests.get('https://bigdataboy.cn/',headers=headers)#带BasicAuthenticationr=requests.get('https://bigdataboy.cn/',auth=('user','pass'))POST请求POST请求-表单提交r=requests.post('https://bigdataboy.cn/',data={'key':'value'})POST请求-x-www-form-urlencodedheaders={'content-type':'application/x-www-form-urlencoded;charset=UTF-8'}r=requests.post('https://bigdataboy.cn/',headers=headers,data='key=value')POST请求-application/jsonpayload={'some':'data'}r=requests.post('https://bigdataboy.cn/',json=payload)其他请求#PUTr=requests.put('https://bigdataboy.cn/',data={'key':'value'})#DELETEr=requests.delete('https://bigdataboy.cn/')#HEADr=requests.head('https://bigdataboy.cn/')#OPTIONSr=requests.options('https://bigdataboy.cn/')网络响应-Reponse基本信息#状态码r.status_code#响应头r.headers#响应Cookier.cookies返回结果#文本内容r.text#二进制r.content#JSONr.json()#流r=requests.get('https://bigdataboy.cn/',stream=True)r.raw.read(10)常用方法URL编码fromrequests.utilsimportquotequote('ab')->'a%20b'URL解码fromrequests.utilsimportunquoteunquote('a%20b')->'ab'自动推断响应编码r.encoding=r.apparent_encoding下载文件r=requests.get('https://bigdataboy.cn/')open('bigdataboy.html','wb').write(r.content)上传文件files={'file':open('report.xls','rb')}r=requests.post(url,files=files)超时设置#单位:秒requests.get('https://bigdataboy.cn/',timeout=0.001)

Python极客

说明selenium是一个自动化测试工具,而爬虫中使用它主要是为了解决requests无法执行javaScript代码的问题。优点通过驱动浏览器,完全模拟浏览器的操作,比如跳转、输入、点击、下拉等…进而拿到网页渲染之后的结果,可支持多种浏览器,真正做到可见及可爬。缺点使用selenium本质上是驱动浏览器对目标站点发送请求,那浏览器在访问目标站点的时候,需要把静态资源都加载完毕,比如html、css、js这些文件,加载完成后,等待浏览器执行,最后呈现在页面。所以用它的坏处就是效率极低!所以我们一般用它来做登录验证,Js特别复杂,没有必要去解密的场景等。使用下载驱动以谷歌浏览器为例驱动下载地址:http://chromedriver.storage.googleapis.com/index.html找到本机安装的谷歌浏览器,与之对应的版本。(虽然驱动是32位,但是支持64位浏览器的)基本使用打开大数据男孩网站fromseleniumimportwebdriver#驱动的路径bro=webdriver.Chrome(executable_path="./chromedriver_win32/chromedriver.exe")#打开这个网址bro.get(url="https://bigdataboy.cn/")找到搜索按钮,并点击fromseleniumimportwebdriver#驱动的地址bro=webdriver.Chrome(executable_path="./chromedriver_win32/chromedriver.exe")bro.get(url="https://bigdataboy.cn/")#找到搜索按钮search_tab=bro.find_element_by_xpath('//span[@class="icon-search"]')#点击搜索按钮search_tab.click()输入搜索内容在输入内容之前,需要判断内容是否加载完成,给出的判断方案是:定时去某个标签,如果该标签存在了,则认为页面加载完成。fromseleniumimportwebdriver#驱动的地址bro=webdriver.Chrome(executable_path="./chromedriver_win32/chromedriver.exe")bro.get(url="https://bigdataboy.cn/")#找到搜索按钮search_tab=bro.find_element_by_xpath('//span[@class="icon-search"]')#点击搜索按钮search_tab.click()fromselenium.webdriver.common.byimportByfromselenium.webdriver.supportimportexpected_conditionsasECfromselenium.webdriver.support.uiimportWebDriverWait"""判断搜索页面的记载情况1.默认0.5秒判断一次。Wait()里poll_frequency参数设置2.判断标签的方法有:ID、NAME、CLASS_NAME、XPATH、TAG_NAME、LINK_TEXT、PARTIAL_LINK_TEXT、CSS_SELECTOR"""#60秒没有检测到id为keyword的标签表示,加载失败WebDriverWait(bro,60).until(EC.presence_of_element_located((By.ID,"keyword")))#找到输入框input_tab=bro.find_element_by_xpath('//input[@id="keyword"]')#输入值input_tab.send_keys("大数据男孩")#关闭浏览器#bro.close()

2020-4-20 58 0
Python极客

高阶函数使用—-函数式编程Python函数的参数可以是:字符串,整数…普通的变量,但Python函数的参数还能是一个函数,返回值也能是一个函数。函数作为参数函数fun()接受一个函数f作为参数首先定义一个主函数函数的第一个参数需要传入一个函数deffun(f,a:int,b:int):returnf(a)+f(b)定义参数函数,只是简单的返回参数deff(num):returnnum使用主函数print(fun(f,1,2))-------------------3函数作为返回值定义返回值函数f()deff(num):returnnum**num定义主函数fun(),直接返回f()函数deffun():#不能加括号,带括号就是调用returnf使用主函数第一个括号调用fun()函数,第二个括号调用返回的函数foo=fun()print(foo)#一个函数print(foo(2))#合并写法print(fun()(2))---------------<functionfat0x000001FD81A62268>44内置的高阶函数sort()排序普通排序使用list=[1,5,6,8,7,6,5,5,2]list.sort(reverse=True)#降序print(list)---------------------------[8,7,6,6,5,5,5,2,1]高级使用—按照指定规则排序按照十位数的大小排序#作为参数的函数deffun(num):returnstr(num)[0]list=[11,51,61,82,72,64,52,512,21]list.sort(key=fun,reverse=True)print(list)------------------------------------[82,72,61,64,51,52,512,21,11]#使用匿名函数合并list=[11,51,61,82,72,64,52,512,21]list.sort(key=lambdanum:str(num)[0],reverse=True)print(list)------------------------------------[82,72,61,64,51,52,512,21,11]map()&filter()函数编程语言通常都会提供map,filter,reduce三个高阶函数。在Python3中,map和filter仍然是内置函数,但是由于引入了列表推导和生成器表达式,他们变得没有那么重要了。列表推导和生成器表达式具有了map和filter两个函数的功能,而且更易于阅读。map生成新的迭代类型list_m=map(lambdanum:num**2,[1,2,3,4,5,6,7])print(list(list_m))print(type(list_m))-------------------[1,4,9,16,25,36,49]<class'map'>列表推导实现map()list_m=[num**2fornumin[1,2,3,4,5,6,7]]print(list_m)-------------[1,4,9,16,25,36,49]同时迭代多个list,生成新的迭代类型的长度是最短list的长度list1=[10,20,30,40]list2=[1,2]list_m=map(lambdaa,b:a+b,list1,list2)print(list(list_m))-------------------[11,22]注意列表推倒中这里是使用的笛卡尔积list1=[10,20,30,40]list2=[1,2]list_m=[a+bforainlist1forbinlist2]print(list_m)-------------[11,12,21,22,31,32,41,42]filter过滤filter参数函数的返回值需要是布尔值list_m=filter(lambdanum:num%2,[1,2,3,4,5,6,7])print(list(list_m))-------------------[1,3,5,7]列表推导实现filter()list_m=[numfornumin[1,2,3,4,5,6,7]ifnum%2==1]print(list_m)-------------[1,3,5,7]reducePython3中reduce被移动到了functools模块中fromfunctoolsimportreduce用法两两做运算fromfunctoolsimportreducenum=reduce(lambdaa,b:a+b,[1,2,3,4,5,6,7],0)#0为a的初始值print(num)----------28