【项目总结】物联网课程实习项目总结

新博客:https://blog.bigdataboy.cn/article/451.html

说明

学校的小组课程,自行拟定项目,我们小组做的是客户端控制 风扇警报大门灯泡,打开摄像头并识别其中的车牌号,还实现了 温度异常监控自动开关灯,因为没有板子,用仿真软件模拟的控制设备。

mark

实现效果

mark

采用技术

我是使用 Python 实现上述功能

服务端:FastAPI、apscheduler

客户端:PyQt5

通信方式

客户端 --- post ---> 服务端 (单向)

客户端 <--- WebSocket ---> 服务端 (双向)

服务端 <-- 串口 --> 仿真平台

架构图

mark

服务端

启动事件

在 FastAPI 的事件里,连接串口,启动 后台监控任务

@app.on_event("startup")
async def startup_event():
    start_serial("COM41", 115200)
    logger.info("串口连接成功")

    scheduler.add_job(MonitorService().run, trigger="interval", seconds=1, max_instances=10)
    scheduler.start()
    logger.info("后台监控任务启动成功")

后台监控任务

后台监控任务启动时,FastAPI 没有启动完成,所以这时 后台监控任务的 WebSocket 是不能及时连接上,所以需要做判断,没有连上 或者 断开了 就连接

self.ws: WebSocket = websocket.WebSocket()
...

try:
    # 测试是否连通
    self.ws.recv()  # 骚操作  第一次测试连接,后面防止断开
except WebSocketConnectionClosedException:
    # 尝试连接
    try:
        self.ws.connect(f"ws://127.0.0.1:8080/ws/data?user=monitor")
    except WebSocketException as e:
        logger.error(f"连接失败:{e}")
    except ConnectionRefusedError as e:
        logger.error(f"连接失败:{e}")
    else:
        logger.success("监控任务连接成功")
except ConnectionAbortedError as e:
    try:
        self.ws.connect(f"ws://127.0.0.1:8080/ws/data?user=monitor")
    except WebSocketException as e:
        logger.error(f"连接失败:{e}")
    except ConnectionRefusedError as e:
        logger.error(f"连接失败:{e}")
    else:
        logger.success("重新连接成功")

群发仿真平台环境数据

后台监控任务 连接上 WebSocket ,就会把环境信息发给 WebSocket 接口就会群发到其他连接的客户端上。

群发的实现就是把连接后的 WebSocket 对象保存起来,然后循环发送。

class ConnectionManager:
    def __init__(self):
        # 存放需要广播的的链接
        self.active_connections: List[Dict[str, WebSocket]] = []

    async def connect(self, user: str, ws: WebSocket):
        # 链接
        await ws.accept()
        self.active_connections.append({"user": user, "ws": ws})

    def disconnect(self, user: str, ws: WebSocket):
        # 关闭时 移除ws对象
        self.active_connections.remove({"user": user, "ws": ws})

    async def broadcast_json(self, data: dict):
        # 广播消息
        for connection in self.active_connections:
            await connection['ws'].send_json(data)

manager = ConnectionManager()

@ws_app.websocket("/data")
async def websocket_endpoint(websocket: WebSocket, user: str):
    await manager.connect(user, websocket)
    try:
        while True:
            data = await websocket.receive_json()  # 等待请求,才响应
            # logger.debug(data)
            if data.get("user") == "monitor":
                data.get("data")["park_all"] = ParkingService.get_all()  # 全部车位
                data.get("data")["park_remain_num"] = ParkingService.get_remain_num()  # 剩余车位
                await manager.broadcast_json(data["data"])  # 是监控服务发来的 直接广播消息

    except ConnectionClosedError:
        logger.error(f"{user} ConnectionClosedError")
        manager.disconnect(user, websocket)
    except WebSocketDisconnect:
        logger.error(f"{user} WebSocketDisconnect")
        manager.disconnect(user, websocket)

客户端

为了防止界面“假死”,耗时操作使用线程完成,这里的 WebSocket连接更新环境数据摄像头的使用识别车牌均使用了线程

PyQt 使用摄像

摄像头使用的 cv2 库,界面显示的画面,其实一帧一帧的显示的显示到 Label组件上的,所以子线程也是把画面一帧一帧的发送出来。

class CameraThread(QThread):
    frame_signal = pyqtSignal(numpy.ndarray) # 获取的换面其实一种矩阵
    is_open_signal = pyqtSignal(bool)
    close_video_signal = pyqtSignal(bool)

    def __init__(self, parent=None):
        super().__init__(parent)
        self.is_open_video = True  #

    def run(self) -> None:
        capture = cv2.VideoCapture(0)
        while self.is_open_video:  # 读取摄像头显示视频
            if capture.isOpened():
                ret, frame = capture.read()  # 读取画面
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                self.frame_signal.emit(frame)  # 把图片发射出去
            else:
                self.is_open_signal.emit(capture.isOpened())  # 打开摄像头失败
        if not self.is_open_video:  # 摄像头关闭 清理资源 发送信号
            capture.release()
            self.close_video_signal.emit(True)  # 打开摄像头失败

WebSocket 连接

class WSThread(QThread):
    info_signal = pyqtSignal(dict)
    ws: WebSocket = None

    def __init__(self, parent=None):
        super().__init__(parent)

    def run(self) -> None:
        print("WSThread running")
        while True:
            date = json.loads(self.ws.recv())
            print(date)
            # 发送信号
            self.info_signal.emit(date)
            time.sleep(0.2)

全部代码:https://pan.bigdataboy.cn/s/dm9tk

发表评论 / Comment

用心评论~