Newer
Older
safe-algo-pro / scene_handler / limit_space_scene_handler.py
zhangyingjie on 6 Nov 2 KB 增加场景运行脚本和任务
import asyncio
from asyncio import Event

from algo.model_manager import AlgoModelExec
from algo.stream_loader import OpenCVStreamLoad
from common.device_status_manager import DeviceStatusManager
from common.global_logger import logger
from entity.device import Device

from ultralytics import YOLO

from scene_handler.base_scene_handler import BaseSceneHandler
from tcp.tcp_manager import TcpManager


class LimitSpaceSceneHandler(BaseSceneHandler):

    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop):
        super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)
        # self.device = device
        # self.thread_id = thread_id
        self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
                                              device_thread_id=thread_id)
        self.device_status_manager = DeviceStatusManager()

        self.person_model = YOLO('weights/yolov8s.pt')

        self.__stop_event = Event(loop=main_loop)  # 使用 Event 控制线程的运行状态

    def stop_task(self, **kwargs):
        logger.info(f'stop detection task {self.device.id}, thread_id: {self.thread_id}')
        self.__stop_event.set()
        self.stream_loader.stop()  # 停止视频流加载的线程

    def send_tcp_message(self, message: bytes, have_response=False):
        asyncio.run_coroutine_threadsafe(
            self.tcp_manager.send_message_to_device(device_id=self.device.id,
                                                    message=message,
                                                    have_response=have_response),
            self.main_loop)

    def run(self):
        while not self.stream_loader.init:
            if self.__stop_event.is_set():
                break  # 如果触发了停止事件,则退出循环
            self.stream_loader.init_cap()
        for frame in self.stream_loader:
            if self.__stop_event.is_set():
                break  # 如果触发了停止事件,则退出循环
            # print('frame')
            if frame is None:
                continue

            self.device_status_manager.set_status(device_id=self.device.id)
            results = self.person_model.predict(source=frame, imgsz=640,
                                                save_txt=False,
                                                save=False,
                                                verbose=False, stream=True)
            result = (list(results))
            if len(result[0]) > 0:
                asyncio.run_coroutine_threadsafe(
                    self.tcp_manager.send_message_to_device(device_id=self.device.id,
                                                            message=b'\xaa\x01\x00\x93\x07\x00\x9B',
                                                            have_response=False),
                    self.main_loop)