新博客:https://blog.bigdataboy.cn/article/451.html
说明
学校的小组课程,自行拟定项目,我们小组做的是客户端控制 风扇
、警报
、大门
、灯泡
,打开摄像头并识别
其中的车牌号
,还实现了 温度异常监控
,自动开关灯
,因为没有板子,用仿真软件模拟的控制设备。
实现效果
采用技术
我是使用 Python 实现上述功能
服务端:FastAPI、apscheduler
客户端:PyQt5
通信方式
客户端 --- post --->
服务端 (单向)
客户端 <--- WebSocket --->
服务端 (双向)
服务端 <-- 串口 -->
仿真平台
架构图
服务端
启动事件
在 FastAPI 的事件里,
连接串口
,启动后台监控任务
@app.on_event("startup")
async def startup_event():
start_serial("COM41", 115200)
logger.info("串口连接成功")
scheduler.add_job(MonitorService().run, trigger="interval", seconds=1, max_instances=10)
scheduler.start()
logger.info("后台监控任务启动成功")
后台监控任务
后台监控任务
启动
时,FastAPI 没有启动完成,所以这时 后台监控任务的 WebSocket 是不能及时连接上,所以需要做判断,没有连上
或者断开了
就连接
self.ws: WebSocket = websocket.WebSocket()
...
try:
# 测试是否连通
self.ws.recv() # 骚操作 第一次测试连接,后面防止断开
except WebSocketConnectionClosedException:
# 尝试连接
try:
self.ws.connect(f"ws://127.0.0.1:8080/ws/data?user=monitor")
except WebSocketException as e:
logger.error(f"连接失败:{e}")
except ConnectionRefusedError as e:
logger.error(f"连接失败:{e}")
else:
logger.success("监控任务连接成功")
except ConnectionAbortedError as e:
try:
self.ws.connect(f"ws://127.0.0.1:8080/ws/data?user=monitor")
except WebSocketException as e:
logger.error(f"连接失败:{e}")
except ConnectionRefusedError as e:
logger.error(f"连接失败:{e}")
else:
logger.success("重新连接成功")
群发仿真平台环境数据
后台监控任务 连接上 WebSocket ,就会把
环境信息
发给WebSocket 接口
就会群发
到其他连接的客户端
上。
群发的实现就是把连接后的 WebSocket 对象保存起来,然后循环发送。
class ConnectionManager:
def __init__(self):
# 存放需要广播的的链接
self.active_connections: List[Dict[str, WebSocket]] = []
async def connect(self, user: str, ws: WebSocket):
# 链接
await ws.accept()
self.active_connections.append({"user": user, "ws": ws})
def disconnect(self, user: str, ws: WebSocket):
# 关闭时 移除ws对象
self.active_connections.remove({"user": user, "ws": ws})
async def broadcast_json(self, data: dict):
# 广播消息
for connection in self.active_connections:
await connection['ws'].send_json(data)
manager = ConnectionManager()
@ws_app.websocket("/data")
async def websocket_endpoint(websocket: WebSocket, user: str):
await manager.connect(user, websocket)
try:
while True:
data = await websocket.receive_json() # 等待请求,才响应
# logger.debug(data)
if data.get("user") == "monitor":
data.get("data")["park_all"] = ParkingService.get_all() # 全部车位
data.get("data")["park_remain_num"] = ParkingService.get_remain_num() # 剩余车位
await manager.broadcast_json(data["data"]) # 是监控服务发来的 直接广播消息
except ConnectionClosedError:
logger.error(f"{user} ConnectionClosedError")
manager.disconnect(user, websocket)
except WebSocketDisconnect:
logger.error(f"{user} WebSocketDisconnect")
manager.disconnect(user, websocket)
客户端
为了防止界面“假死”,耗时操作使用线程完成,这里的
WebSocket连接更新环境数据
,摄像头的使用
,识别车牌
均使用了线程
PyQt 使用摄像
摄像头使用的 cv2 库,界面显示的画面,其实
一帧一帧
的显示的显示到Label组件
上的,所以子线程也是把画面一帧一帧的发送出来。
class CameraThread(QThread):
frame_signal = pyqtSignal(numpy.ndarray) # 获取的换面其实一种矩阵
is_open_signal = pyqtSignal(bool)
close_video_signal = pyqtSignal(bool)
def __init__(self, parent=None):
super().__init__(parent)
self.is_open_video = True #
def run(self) -> None:
capture = cv2.VideoCapture(0)
while self.is_open_video: # 读取摄像头显示视频
if capture.isOpened():
ret, frame = capture.read() # 读取画面
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.frame_signal.emit(frame) # 把图片发射出去
else:
self.is_open_signal.emit(capture.isOpened()) # 打开摄像头失败
if not self.is_open_video: # 摄像头关闭 清理资源 发送信号
capture.release()
self.close_video_signal.emit(True) # 打开摄像头失败
WebSocket 连接
class WSThread(QThread):
info_signal = pyqtSignal(dict)
ws: WebSocket = None
def __init__(self, parent=None):
super().__init__(parent)
def run(self) -> None:
print("WSThread running")
while True:
date = json.loads(self.ws.recv())
print(date)
# 发送信号
self.info_signal.emit(date)
time.sleep(0.2)
版权声明:《 【项目总结】物联网课程实习项目总结 》为明妃原创文章,转载请注明出处!
最后编辑:2022-9-25 13:09:26