编程杂谈

新博客:https://blog.bigdataboy.cn/article/451.html说明学校的小组课程,自行拟定项目,我们小组做的是客户端控制风扇、警报、大门、灯泡,打开摄像头并识别其中的车牌号,还实现了温度异常监控,自动开关灯,因为没有板子,用仿真软件模拟的控制设备。实现效果采用技术我是使用Python实现上述功能服务端:FastAPI、apscheduler客户端:PyQt5通信方式客户端---post--->服务端(单向)客户端<---WebSocket--->服务端(双向)服务端<--串口-->仿真平台架构图服务端启动事件在FastAPI的事件里,连接串口,启动后台监控任务@app.on_event("startup")asyncdefstartup_event():start_serial("COM41",115200)logger.info("串口连接成功")scheduler.add_job(MonitorService().run,trigger="interval",seconds=1,max_instances=10)scheduler.start()logger.info("后台监控任务启动成功")后台监控任务后台监控任务启动时,FastAPI没有启动完成,所以这时后台监控任务的WebSocket是不能及时连接上,所以需要做判断,没有连上或者断开了就连接self.ws:WebSocket=websocket.WebSocket()...try:#测试是否连通self.ws.recv()#骚操作第一次测试连接,后面防止断开exceptWebSocketConnectionClosedException:#尝试连接try:self.ws.connect(f"ws://127.0.0.1:8080/ws/data?user=monitor")exceptWebSocketExceptionase:logger.error(f"连接失败:{e}")exceptConnectionRefusedErrorase:logger.error(f"连接失败:{e}")else:logger.success("监控任务连接成功")exceptConnectionAbortedErrorase:try:self.ws.connect(f"ws://127.0.0.1:8080/ws/data?user=monitor")exceptWebSocketExceptionase:logger.error(f"连接失败:{e}")exceptConnectionRefusedErrorase:logger.error(f"连接失败:{e}")else:logger.success("重新连接成功")群发仿真平台环境数据后台监控任务连接上WebSocket,就会把环境信息发给WebSocket接口就会群发到其他连接的客户端上。群发的实现就是把连接后的WebSocket对象保存起来,然后循环发送。classConnectionManager:def__init__(self):#存放需要广播的的链接self.active_connections:List[Dict[str,WebSocket]]=[]asyncdefconnect(self,user:str,ws:WebSocket):#链接awaitws.accept()self.active_connections.append({"user":user,"ws":ws})defdisconnect(self,user:str,ws:WebSocket):#关闭时移除ws对象self.active_connections.remove({"user":user,"ws":ws})asyncdefbroadcast_json(self,data:dict):#广播消息forconnectioninself.active_connections:awaitconnection['ws'].send_json(data)manager=ConnectionManager()@ws_app.websocket("/data")asyncdefwebsocket_endpoint(websocket:WebSocket,user:str):awaitmanager.connect(user,websocket)try:whileTrue:data=awaitwebsocket.receive_json()#等待请求,才响应#logger.debug(data)ifdata.get("user")=="monitor":data.get("data")["park_all"]=ParkingService.get_all()#全部车位data.get("data")["park_remain_num"]=ParkingService.get_remain_num()#剩余车位awaitmanager.broadcast_json(data["data"])#是监控服务发来的直接广播消息exceptConnectionClosedError:logger.error(f"{user}ConnectionClosedError")manager.disconnect(user,websocket)exceptWebSocketDisconnect:logger.error(f"{user}WebSocketDisconnect")manager.disconnect(user,websocket)客户端为了防止界面“假死”,耗时操作使用线程完成,这里的WebSocket连接更新环境数据,摄像头的使用,识别车牌均使用了线程PyQt使用摄像摄像头使用的cv2库,界面显示的画面,其实一帧一帧的显示的显示到Label组件上的,所以子线程也是把画面一帧一帧的发送出来。classCameraThread(QThread):frame_signal=pyqtSignal(numpy.ndarray)#获取的换面其实一种矩阵is_open_signal=pyqtSignal(bool)close_video_signal=pyqtSignal(bool)def__init__(self,parent=None):super().__init__(parent)self.is_open_video=True#defrun(self)->None:capture=cv2.VideoCapture(0)whileself.is_open_video:#读取摄像头显示视频ifcapture.isOpened():ret,frame=capture.read()#读取画面frame=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)self.frame_signal.emit(frame)#把图片发射出去else:self.is_open_signal.emit(capture.isOpened())#打开摄像头失败ifnotself.is_open_video:#摄像头关闭清理资源发送信号capture.release()self.close_video_signal.emit(True)#打开摄像头失败WebSocket连接classWSThread(QThread):info_signal=pyqtSignal(dict)ws:WebSocket=Nonedef__init__(self,parent=None):super().__init__(parent)defrun(self)->None:print("WSThreadrunning")whileTrue:date=json.loads(self.ws.recv())print(date)#发送信号self.info_signal.emit(date)time.sleep(0.2)全部代码:https://pan.bigdataboy.cn/s/dm9tk