Newer
Older
safe-algo-pro / scene_handler / zyn_limit_space_scene_handler.py
relax on 26 Feb 32 KB 1
import asyncio
import base64
import traceback
from asyncio import Event
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy, copy
from datetime import datetime
import time
import cv2
from datetime import datetime
import csv

from algo.model_manager import AlgoModelExec
from algo.stream_loader import OpenCVStreamLoad
from common.device_status_manager import DeviceStatusManager
from common.global_logger import logger
from common.global_thread_pool import GlobalThreadPool
from common.http_utils import send_request, get_request
from common.image_plotting import Annotator
from entity.device import Device
import numpy as np
from ultralytics import YOLO

from scene_handler.alarm_message_center import AlarmMessageCenter
from scene_handler.base_scene_handler import BaseSceneHandler
from services.global_config import GlobalConfig
from tcp.tcp_manager import TcpManager

last_time = ""
def create_value_iterator(values):
    for value in values:
        yield value
fake_list = [   # 假如这是你从四合一后台请求来的数据
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
]
value_iterator = create_value_iterator(fake_list)


def flatten(lst):
    result = []
    for i in lst:
        if isinstance(i, list):
            result.extend(flatten(i))  # 递归调用以处理嵌套列表
        else:
            result.append(i)
    return result


def getGasGata_fake():
    # 模拟 读取四合一数据(四合一数据是假的)
    try:
        time.sleep(1)
        return next(value_iterator)
    except StopIteration:
        return None


def getGasData():
    # 从后台数据库读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'}
    harmful_device_code = '862635063168165A'
    url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'
    response = get_request(url)
    if response and response.get('data'):
        return response.get('data')




def getNewGasData():
    global last_time
    data = getGasGata_fake()
    while True:
        if data == None: return None
        if last_time == "" or  datetime.strptime(data["uptime"], '%Y-%m-%d %H:%M:%S') > datetime.strptime(last_time , '%Y-%m-%d %H:%M:%S'): # 有最新数据产生了!
            last_time = data["uptime"]  # 更新最新数据时间
            break
        else:   # 获取的还是上次的重复数据,继续访问
            data = getGasGata_fake()
            time.sleep(2)
    return data






def writeFile(file_path, data):
    # print(f"写入{data}")
    with open(file_path, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerows(data)

class EventController():
    def __init__(self):
        self.cancel_event = asyncio.Event()
        self.umd_complete = asyncio.Event()
        self.laobao_complete = asyncio.Event()

class SiHeYi():
    def __init__(self, harmful_device_code):
        self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'  # 后台访问数据的url
        self.harmful_device_code = harmful_device_code  # 四合一标识符
        self.last_ts = None # 上次读取的数据的生成时间戳

    def waitPowerOn(self, script_start_time):
        '''
        阻塞函数
        循环是否开机,只有检测到开机才会退出函数

        :param script_start_time:脚本启动的时间戳

        :return:
        '''
        print("检测四合一是否开机")

        while True:
            self.getNewData()
            flag = script_start_time < self.last_ts  # 当前时间T/F 开机/未开机
            if flag == True:
                print("检测到开机")
                return
            else:
                print("未开机")
                time.sleep(2)

    def getNewData(self):
        '''
        阻塞函数
        访问后台数据库 读取最新产生的四合一浓度
        如果有返回数据则记录 该数据产生的时间。
            如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间:
                则视为读取到新数据,返回新数据
        没有数据等待n秒后重复询问
        :return:
        '''
        while True:
            url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={self.harmful_device_code}'
            print("访问四合一数据...")
            # response = get_request(url)
            response = getGasGata_fake()
            if response and response.get('data'):
                print("访问到四合一数据")
                data = response.get('data')
                uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S")
                if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0:
                    self.last_ts = uptime.timestamp()
                    if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10:  # 10分钟以前的数据不做处理
                        ch4 = data.get('ch4')
                        co = data.get('co')
                        h2s = data.get('h2s')
                        o2 = data.get('o2')
                        return ch4, co, h2s, o2
            else:   # url没有返回数据
                print("四合一没有读取到数据")
            time.sleep(5)

    def isDataNormal(self, ch4, co, h2s, o2):
        '''
        判断四项气体是否正常
        :param ch4:
        :param co:
        :param h2s:
        :param o2:
        :return:
        '''
        if float(ch4) > 10.0 \
                or float(co) > 10.0 \
                or float(h2s) > 120.0 \
                or float(o2) < 15:
            return False # 气体异常
        else:
            return True # 气体正常

class AnQuanMao():
    def __init__(self, helmet_code):
        self.helmet_code = helmet_code
        self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={helmet_code}'  # 后台访问数据的url
        self.last_ts = None  # 上次读取的数据的生成时间戳

    def getNewData(self):
        '''
        阻塞进程
        :return:
        '''
        while True:
            header = {
                'ak': 'fe80b2f021644b1b8c77fda743a83670',
                'sk': '8771ea6e931d4db646a26f67bcb89909',
            }
            url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}'
            print("访问心率血氧数据...")
            response = get_request(url, headers=header)
            if response and response.get('data'):
                print("访问到心率血氧数据")
                vitalsigns_data = response.get('data').get('vitalSignsData')  # 访问而来的数据
                if vitalsigns_data:  # 访问成功
                    upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'),
                                                         "%Y-%m-%d %H:%M:%S")  # 访问数据的时间
                    if self.last_ts is None or (
                            upload_timestamp.timestamp() - self.last_ts) > 0:  # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据
                        self.last_ts= upload_timestamp.timestamp()  # 更新数据
                        if time.time() - upload_timestamp.timestamp() < 10 * 60:  # 访问到的数据是 10分钟内的数据
                            return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate')
            else:
                print("无法访问到心率血氧数据")
            time.sleep(5)

    def isDataNormal(self, blood_oxygen, heartrate):
        if heartrate < 60 or heartrate > 120 or blood_oxygen < 85:  # 心率和血氧异常
            return False
        else: return True


class Laobaocheck():
    def __init__(self, eventController=None, alarm=None):
        self.laobao_model = YOLO("weights/labor-v8-20241114.pt")
        self.jiaodi_model = YOLO("weights/jiaodi.pt")
        self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]}
        self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False}  # OD 模型有无检测这些目标
        self.jiaodi_flag = False    # 分类模型 有无检测到交底
        self.laobao_pool = {}

        self.eventController = eventController
        self.alarm = alarm

    def getUndetectedTarget(self):
        # 获取未检测目标的名称,返回str列表
        result = []
        for name, flag in self.target_flag.items():
            if flag == False: result.append(name)
        return result

    def getDetectedTarget(self):
        # 获取已检测目标的名称,返回str列表
        result = []
        for name, flag in self.target_flag.items():
            if flag == True: result.append(name)
        return result

    def name2id(self, input):
        # 检测名称 映射为 id
        if isinstance(input, str):
            return self.target[input]
        elif isinstance(input, list):
            result = []
            for item in input:
                result.append(self.target[item])
            if len(result) == 0:return []
            return list(set(np.concatenate([r for r in result]).astype(int).tolist()))

    def id2name(self, input):
        '''

        :param input: int 或 [int,int,int...]
        :return:
        '''
        # id -> 类别名称
        result = []
        if isinstance(input, int):
            input = [input]
        for id in input:
            for k,v in self.target.items(): # k: 类别名称 , v: id_list
                if id in v:result.append(k)

        return list(set(result))



    def predict_isJiaodi(self,frames):
        '''
        调用 jiaodi.pt 分类模型
        :return: True:交底,False:没检测到 交底
        '''

        jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False)
        jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results]
        for prob in jiaodi_prob:
            if prob > 0.6:
                return True
        return False

    def predict_laobao(self, frames):
        '''
        调用 labor-v8-20241114.pt OD 模型
        :param frames:
        :return: [类别1,类别2]
        '''
        target_idx = self.name2id(self.getUndetectedTarget())
        results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6,
                                            save=False, verbose=False)  # results:list(4) 4帧的检测结果
        pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(
            int).tolist()))  # 检测到的目标类别id_list,已去重
        return self.id2name(pred_c_list)

    def updateUnpredictedTargets(self, jiaodi_flag, pred_labels):
        '''
        更新 已检测到的目标 列表 和有无检测到交底
        :param pred_c_list: [str, str...]
        :return: None
        '''
        for pred_label in pred_labels:
            # print(f"检测到{pred_label}")
            self.target_flag[pred_label] = True
        self.jiaodi_flag = jiaodi_flag
        if self.jiaodi_flag == False and jiaodi_flag ==True:
            self.alarm.addAlarm("-------检测到交底")

    def model_predict_fake(self, video_path):
        cap = cv2.VideoCapture(video_path)
        while True:
            try:
                ret, frames = cap.read()
                frames = [frames]
                # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧")
                if not ret:
                    # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出")
                    break
                # cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE)
                # cv2.resizeWindow('Video Frame', 800, 600)  # 宽度800像素,高度600像素
                # cv2.imshow("Video Frame", frames[0])
            except Exception as ex:
                traceback.print_exc()
                logger.error(ex)
                cap.release()
                cv2.destroyAllWindows()
            # 等待1毫秒,检查是否按下了'q'键退出
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            if self.eventController != None and self.eventController.cancel_event.is_set():  # 超时退出
                cap.release()
                cv2.destroyAllWindows()
                return

            # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出)
            jiaodi_flag = self.predict_isJiaodi(frames)   # bool, 检测 新收集的这几帧有无交底
            pred_label = self.predict_laobao(frames)   # [str, str...]
            self.updateUnpredictedTargets(jiaodi_flag, pred_label)

            if self.eventController != None and  self.eventController.umd_complete.is_set():  # 上中下气体检测完毕

                # 检验所有物体都检验到了吗
                if self.getUndetectedTarget() == []:  # 如果全部检验到了
                    # print("劳保物品 通过")
                    # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过")
                    self.alarm.addAlarm("-------劳保物品 通过")
                    self.eventController.laobao_complete.set()
                    cap.release()
                    cv2.destroyAllWindows()
                    return  # 退出检测
                else:  # 如果还有未检测到的

                    undetectedTargets = self.getUndetectedTarget()
                    # print(f"报警:劳保物品缺失:{wrong_class_list}")
                    self.alarm.addAlarm(f"-------报警:劳保物品缺失:{undetectedTargets[0]}")
        cap.release()
        cv2.destroyAllWindows()
    def model_predict(self, stream_loader):
        for frames in stream_loader:  # type : list (4),连续的4帧
            if self.eventController != None and self.eventController.cancel_event.is_set():  # 超时退出
                return
            # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出)
            jiaodi_flag = self.predict_isJiaodi(frames)  # bool, 检测 新收集的这几帧有无交底
            pred_label = self.predict_laobao(frames)  # [str, str...]
            self.updateUnpredictedTargets(jiaodi_flag, pred_label)

            if self.eventController != None and self.eventController.umd_complete.is_set():  # 上中下气体检测完毕

                # 检验所有物体都检验到了吗
                if self.getUndetectedTarget() == []:  # 如果全部检验到了
                    # print("劳保物品 通过")
                    # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过")
                    self.alarm_list.append("劳保物品 通过")
                    self.eventController.laobao_complete.set()
                    return  # 退出检测
                else:  # 如果还有未检测到的

                    undetectedTargets = self.getUndetectedTarget()
                    # print(f"报警:劳保物品缺失:{wrong_class_list}")

                    for undetectedTarget in undetectedTargets:
                        alarm_content = f"报警:劳保物品缺失:{undetectedTarget}"
                        if alarm_content not in self.alarm_list:  # 不加入 重复的报警内容
                            self.alarm_list.append(alarm_content)
                            # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),
                            #             f"报警:劳保物品缺失:{undetectedTarget}")
                        # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}")  # 对于同样内容的报警,删除 早的报警
                        # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}")  # 保留新的报警

class YinHuanCheck():
    def __init__(self, eventController=None, alarm_list=None):
        self.model = YOLO("weights/yinhuan.pt")
        self.eventController = eventController
        self.alarm_list = alarm_list


        self.anquanmao_tolerate = []
        self.xiyan_tolerate = []
        self.xiubiao_tolerate = []
        self.dadianhua_tolerate = []
        self.xianzaren_tolerate = []

    def id2name(self, input):
        '''

        :param input: int 或 [int,int,int...]
        :return:
        '''
        # id -> 类别名称
        result = []
        if isinstance(input, int):
            input = [input]
        for id in input:
            for k,v in self.model.names.items(): # k: id , v: label名称
                if k == id:result.append(v)

        return list(set(result))


    def judge_anquanmao(self, person_targets):
        '''
        判定有没有 没带安全帽的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:    # 每个人身上的物件
            if "安全帽" not in person_result:
                return False    # 没有通过安全帽检测
        return True # 通过安全帽检测


    def judge_xiyan(self, person_targets):
        '''
        判定有没有 吸烟的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:    # 每个人身上的物件
            if "吸烟" in person_result:
                return False    # 没有通过吸烟检测
        return True # 通过吸烟检测

    def judge_xiubiao(self, person_targets):
        '''
        判定有没有 吸烟的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:  # 每个人身上的物件
            if "吸烟" in person_result:
                return False  # 没有通过吸烟检测
        return True  # 通过吸烟检测
    def judge_dadianhua(self, person_targets):
        '''
        判定有没有 打电话的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:  # 每个人身上的物件
            if "打电话" in person_result:
                return False  # 没有通过打电话检测
        return True  # 通过打电话检测

    def judge_xianzaren(self, person_targets):
        '''
        判定有没有 打电话的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:  # 每个人身上的物件
            if "安全帽" not in person_result and "工服" not in person_result:    # 既不穿安全帽也不穿安全帽
                return False  # 没有通过闲杂人检测
        return True  # 通过闲杂人检测
    def detect_person(self, frames):
        '''

        :param frames: list of pic or ndarray
        :return: list of ndarray of person
        '''
        people_results = self.model.predict(source=frames, conf=0.6,classes = [0],
                                            save=False, verbose=False) # 检测人
        result = []
        for people_result in people_results:
            orig_img = people_result.orig_img   # 这一帧的原图矩阵
            person_boxes = people_result.boxes.xyxy.tolist()   # 这一帧所有人的box的xyxy
            for box in person_boxes:
                cropped_image = orig_img[int(box[1]):int(box[3]),
                                int(box[0]):int(box[2])]  # y1:y2, x1:x2

                result.append(cropped_image)
        return result

    def isAlarm(self):
        def has_consecutive_true(bool_list, threshold=30):
            count = 0  # 计数器,用于记录连续 True 的次数
            for value in bool_list:
                if value:  # 如果当前值为 True
                    count += 1  # 增加计数器
                    if count > threshold:  # 如果计数器超过阈值
                        return True
                else:  # 如果当前值为 False
                    count = 0  # 重置计数器
            return False  # 遍历完成后没有发现连续超过 threshold 次的 True

        if has_consecutive_true(self.anquanmao_tolerate) == True:
            self.anquanmao_tolerate.clear()
            # print("安全帽报警")


    def detect_person_targets(self,person_crops):
        '''

        :param person_crops: list of ndarray of person
        :return: [[label1,label2..],  [label1...]]  每个人身上的目标
        '''
        if len(person_crops) == 0:return []
        results = self.model.predict(source=person_crops, conf=0.6, classes=[2, 3, 4, 5, 6],  # 安全帽、工服、烟头、电话、袖标
                                     save=False, verbose=False)  # 检测人
        person_detect_targets = []
        for result in results:  # 每个人的检测结果
            labels = result.boxes.cls.tolist()  # 这个人的检测标签,id形式
            labels = [int(num) for num in labels]  # 转换为 int
            labels = self.id2name(labels)  # 转换为label_name
            person_detect_targets.append(labels)
        return person_detect_targets

    def main_fake(self, video_path):
        cap = cv2.VideoCapture(video_path)
        ret, frames = cap.read()
        while True:
            # for frames in self.stream_loader:  # type : list (4),连续的4帧
            try:
                ret, frames = cap.read()
                print(f"读取帧{ret}")
                frames = [frames]
                # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧")
                if not ret:
                    # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出")
                    cap.release()
                    cv2.destroyAllWindows()
                    break
                cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE)
                cv2.resizeWindow('Video Frame2', 800, 600)  # 宽度800像素,高度600像素
                cv2.imshow("Video Frame2", frames[0])
            except Exception as ex:
                traceback.print_exc()
                logger.error(ex)
                cv2.destroyAllWindows()
            # 等待1毫秒,检查是否按下了'q'键退出
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            people_results = self.detect_person(frames) # 检测人
            person_detect_targets = self.detect_person_targets(people_results)
            print(person_detect_targets)
            self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets))
            self.isAlarm()

    def main(self, stream_loader):
        for frames in stream_loader:  # type : list (4),连续的4帧
            people_results = self.detect_person(frames) # 检测人
            person_detect_targets = self.detect_person_targets(people_results)
            self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets))
            self.isAlarm()

class Alarm():
    def __init__(self):
        self.pool = []
    def addAlarm(self, content):
        '''

        :param content:
        :return:
        '''
        if content in self.pool:
            self.pool.remove(content)
        self.pool.append(content)

    def main(self):
        for i in range(1000000):
            if len(self.pool) != 0:
                print(f"{self.pool.pop(0)},报警队列长度:{len(self.pool)}")
            time.sleep(1)

class ZynLimitSpaceSceneHandler(BaseSceneHandler):

    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points):
        super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)
        self.start_time = time.time()   # 脚本启动时间戳
        self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager,
                                                       category_priority={2: 0, 1: 1, 3: 2,
                                                                          0: 3})  # alarmCategory:优先级 0代表优先级最高
        # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
        #                                       device_thread_id=thread_id)

        self.health_ts_dict = {}
        self.harmful_ts_dict = {}
        # todo 要改成通过后台接口读取设备编号
        self.health_device_codes = ['HWIH061000056395'] # 安全帽编号

        self.eventController = EventController()
        self.alarm = Alarm()

        self.laobao_check = Laobaocheck(self.eventController, self.alarm)
        self.yinhuan_check = YinHuanCheck(self.eventController, self.alarm)
        self.umd_pool = {}
        self.siHeyi = SiHeYi("862635063168165A")
        self.anQuanMao = AnQuanMao("HWIH061000056395")

    def addLog(self, pool, time, text):
        if time not in pool.keys():
            pool[time] = [text]
        else:
            pool[time].append(text)
            pool[time] = list(set(pool[time]))  # 不添加重复的日志

    async def laobaoCheck_task(self):
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, self.laobao_check.model_predict_fake,
                                   r"/home/pc/Desktop/project/safe-algo-pro/2025-02-25 15-25-48.mkv")


    async def uMDGasCheck_task(self,eventController=None):
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        tflag_pool = []  # 返回数据正常了几次
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?")
        await loop.run_in_executor(executor, self.siHeyi.waitPowerOn, self.start_time)  # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!")
        self.alarm_list.append("-------报警:检测到四合一已开机")

        for i in range(1000000):  # 模拟循环检测气体
            if eventController.cancel_event.is_set():  # 超时退出
                return
            # print(f"uMDGasCheck_task:{i}")

            ch4, co, h2s, o2 = await loop.run_in_executor(executor, self.siHeyi.getNewData) # 判断气体是否合规
            flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2)
            if flag == False:
                tflag_pool.clear()
                self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体"
                self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常")

            else:
                print(f"上中下气体检测正常次数:{tflag_pool}")
                tflag_pool.append(True)
                if len(tflag_pool) == 3:
                    break  # 退出检测
        self.alarm_list.append("-------报警:上中下气体检测通过")
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过")
        self.eventController.umd_complete.set()
        return
    async def alarm_task(self):
        def fun():
            for i in range(1000000):  # 检验中
                if self.eventController.laobao_complete.is_set():
                    if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过")
                    return
                # print(f"alarm_task:{len(self.alarm_list)}")
                if self.eventController.cancel_event.is_set():  # 超时退出
                    return
                if len(self.alarm_list) != 0:
                    print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}")
                time.sleep(1)

        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)

    async def logger_task(self):
        def fun():
            for i in range(10000000):
                now = datetime.now().strftime("%H:%M:%S")
                if now in self.laobao_pool.keys():laobaoevent = self.laobao_pool[now]
                else: laobaoevent = "-"
                if now in self.umd_pool.keys():
                    umdevent = self.umd_pool[now]
                else:
                    umdevent = "-"
                writeFile("events_table.csv", [[now, laobaoevent, umdevent]])

                time.sleep(1)

        headers = ["时间", "劳保用品检测", "上中下气体检测"]
        writeFile("events_table.csv", [[headers[0],headers[1],headers[2]]])
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)

    async def yinhuanCheck_task(self):
        '''
        检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头)
        :return:
        '''
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, self.yinhuan_check.main_fake, r"/home/pc/Desktop/project/safe-algo-pro/2025-02-26 08-49-39.mkv")

    async def xinlvCheck_task(self):


        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        blood_oxygen, heartrate = await loop.run_in_executor(executor, self.anQuanMao.getNewData)
        flag = self.anQuanMao.isDataNormal(blood_oxygen, heartrate)
        if flag == False:
            self.alarm_list.append("-------报警:gasCheck:心率血氧数据异常")
    async def gasCheck(self):
        '''
        四合一气体检测
        :return:
        '''
        def fun():
            ch4, co, h2s, o2 = self.siHeyi.getNewData()
            flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2)
            if flag == False:
                self.alarm_list.append("-------报警:gasCheck:四合一气体异常")
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)

    def run(self):
        async def fun():
            # 并行执行任务
            uMDGasCheck_task = asyncio.create_task(
                self.uMDGasCheck_task(self.eventController))
            laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task())
            alarm_task = asyncio.create_task(self.alarm_task())
            logger_task = asyncio.create_task(self.logger_task())

            done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=300000.0)

            if uMDGasCheck_task in done and laobaoCheck_task in done:
                await uMDGasCheck_task
                await laobaoCheck_task
                print("前置条件检查完成,退出")

            else:
                # 如果超时,则取消未完成的任务
                self.eventController.cancel_event.set()
                laobaoCheck_task.cancel()
                uMDGasCheck_task.cancel()
                print("前置条件检查时间过长,退出")

            # 并行执行任务
            print("开始工作")
            xinlvCheck = asyncio.create_task(self.xinlvCheck_task())
            yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task())
            gasCheck = asyncio.create_task(self.gasCheck())
            results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck)

            done1, pending1 = await asyncio.wait({alarm_task, logger_task}, timeout=300000.0)
        asyncio.run(fun())

if __name__=='__main__':
    # print(getNewGasData())
    model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt")
    print(model.names)