Newer
Older
safe-algo-pro / scene_handler / zyn_limit_space_scene_handler.py
import asyncio
import base64
import traceback
from asyncio import Event
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy, copy
from datetime import datetime
import time
import cv2
from datetime import datetime
import csv

from algo.model_manager import AlgoModelExec
from algo.stream_loader import OpenCVStreamLoad
from common.device_status_manager import DeviceStatusManager
from common.global_logger import logger
from common.global_thread_pool import GlobalThreadPool
from common.http_utils import send_request, get_request
from common.image_plotting import Annotator
from entity.device import Device
import numpy as np
from ultralytics import YOLO

from scene_handler.alarm_message_center import AlarmMessageCenter
from scene_handler.base_scene_handler import BaseSceneHandler
from services.global_config import GlobalConfig
from tcp.tcp_client_manager import TcpClientManager

last_time = ""
def create_value_iterator(values):
    for value in values:
        yield value
fake_list = [   # 假如这是你从四合一后台请求来的数据
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}},
    {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
    {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
              'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
    {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
              'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
    {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
              'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
    {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
              'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
    {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
              'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}},
]
value_iterator = create_value_iterator(fake_list)


def flatten(lst):
    result = []
    for i in lst:
        if isinstance(i, list):
            result.extend(flatten(i))  # 递归调用以处理嵌套列表
        else:
            result.append(i)
    return result


def getGasGata_fake():
    # 模拟 读取四合一数据(四合一数据是假的)
    try:
        time.sleep(1)
        return next(value_iterator)
    except StopIteration:
        return None


def getGasData():
    # 从后台数据库读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'}
    harmful_device_code = '862635063168165A'
    url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'
    response = get_request(url)
    if response and response.get('data'):
        return response.get('data')




def getNewGasData():
    global last_time
    data = getGasGata_fake()
    while True:
        if data == None: return None
        if last_time == "" or  datetime.strptime(data["uptime"], '%Y-%m-%d %H:%M:%S') > datetime.strptime(last_time , '%Y-%m-%d %H:%M:%S'): # 有最新数据产生了!
            last_time = data["uptime"]  # 更新最新数据时间
            break
        else:   # 获取的还是上次的重复数据,继续访问
            data = getGasGata_fake()
            time.sleep(2)
    return data


ALARM_DICT = {
    'hat_and_mask': {
        'alarmType': '11',
        'alarmContent': '未佩戴呼吸防护设备',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '未佩戴呼吸防护设备'
    },
    'no_jiandu': {
        'alarmType': '12',
        'alarmContent': '没有监护人员',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '没有监护人员'
    },
    'break': {
        'alarmType': '3',
        'alarmContent': '非法闯入',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x00\x00\x94',
        'label': '非法闯入'
    },
    'smoke': {
        'alarmType': '6',
        'alarmContent': '吸烟',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x03\x00\x97',
        'label': '吸烟'
    },
    'no_blower': {
        'alarmType': '13',
        'alarmContent': '没有检测到通风设备',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x1A\x00\xAE',
        'label': '没有检测到通风设备'
    },
    'no_extinguisher': {
        'alarmType': '14',
        'alarmContent': '没有检测到灭火器',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x1B\x00\xAF',
        'label': '没有检测到灭火器'
    }
}



def writeFile(file_path, data):
    # print(f"写入{data}")
    with open(file_path, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerows(data)

class EventController():
    def __init__(self):
        self.timeout_event = asyncio.Event()
        self.umd_complete = asyncio.Event()
        self.qianzhi_check_complete  = asyncio.Event()
        self.laobao_complete = asyncio.Event()

class SiHeYi():
    def __init__(self, harmful_device_code):
        self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'  # 后台访问数据的url
        self.harmful_device_code = harmful_device_code  # 四合一标识符
        self.last_ts = None # 上次读取的数据的生成时间戳

    def waitPowerOn(self, script_start_time):
        '''
        阻塞函数
        循环是否开机,只有检测到开机才会退出函数

        :param script_start_time:脚本启动的时间戳

        :return:
        '''
        print("检测四合一是否开机")

        while True:
            self.getNewData()
            flag = script_start_time < self.last_ts  # 当前时间T/F 开机/未开机
            if flag == True:
                print("检测到开机")
                return
            else:
                print("未开机")
                time.sleep(2)

    def getNewData(self):
        '''
        阻塞函数
        访问后台数据库 读取最新产生的四合一浓度
        如果有返回数据则记录 该数据产生的时间。
            如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间:
                则视为读取到新数据,返回新数据
        没有数据等待n秒后重复询问
        :return:
        '''
        while True:
            url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={self.harmful_device_code}'
            print("访问四合一数据...")
            # response = get_request(url)
            response = getGasGata_fake()
            if response and response.get('data'):
                print("访问到四合一数据")
                data = response.get('data')
                uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S")
                if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0:
                    self.last_ts = uptime.timestamp()
                    if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10:  # 10分钟以前的数据不做处理
                        ch4 = data.get('ch4')
                        co = data.get('co')
                        h2s = data.get('h2s')
                        o2 = data.get('o2')
                        return ch4, co, h2s, o2
            else:   # url没有返回数据
                print("四合一没有读取到数据")
            time.sleep(5)

    def isDataNormal(self, ch4, co, h2s, o2):
        '''
        判断四项气体是否正常
        :param ch4:
        :param co:
        :param h2s:
        :param o2:
        :return:
        '''
        if float(ch4) > 10.0 \
                or float(co) > 10.0 \
                or float(h2s) > 120.0 \
                or float(o2) < 15:
            return False # 气体异常
        else:
            return True # 气体正常

class AnQuanMao():
    def __init__(self, helmet_code):
        self.helmet_code = helmet_code
        self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={helmet_code}'  # 后台访问数据的url
        self.last_ts = None  # 上次读取的数据的生成时间戳

    def getNewData(self):
        '''
        阻塞进程
        :return:
        '''
        while True:
            header = {
                'ak': 'fe80b2f021644b1b8c77fda743a83670',
                'sk': '8771ea6e931d4db646a26f67bcb89909',
            }
            url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}'
            print("访问心率血氧数据...")
            response = get_request(url, headers=header)
            if response and response.get('data'):
                print("访问到心率血氧数据")
                vitalsigns_data = response.get('data').get('vitalSignsData')  # 访问而来的数据
                if vitalsigns_data:  # 访问成功
                    upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'),
                                                         "%Y-%m-%d %H:%M:%S")  # 访问数据的时间
                    if self.last_ts is None or (
                            upload_timestamp.timestamp() - self.last_ts) > 0:  # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据
                        self.last_ts= upload_timestamp.timestamp()  # 更新数据
                        if time.time() - upload_timestamp.timestamp() < 10 * 60:  # 访问到的数据是 10分钟内的数据
                            return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate')
            else:
                print("无法访问到心率血氧数据")
            time.sleep(5)

    def isDataNormal(self, blood_oxygen, heartrate):
        if heartrate < 60 or heartrate > 120 or blood_oxygen < 85:  # 心率和血氧异常
            return False
        else: return True


class Laobaocheck():
    def __init__(self, eventController=None, alarm=None):
        self.laobao_model = YOLO("weights/labor-v8-20241114.pt")
        self.jiaodi_model = YOLO("weights/jiaodi.pt")
        self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]}
        self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False}  # OD 模型有无检测这些目标
        self.jiaodi_flag = False    # 分类模型 有无检测到交底
        self.laobao_pool = {}

        self.eventController = eventController
        self.alarm = alarm

    def getUndetectedTarget(self):
        # 获取未检测目标的名称,返回str列表
        result = []
        for name, flag in self.target_flag.items():
            if flag == False: result.append(name)
        return result

    def getDetectedTarget(self):
        # 获取已检测目标的名称,返回str列表
        result = []
        for name, flag in self.target_flag.items():
            if flag == True: result.append(name)
        return result

    def name2id(self, input):
        # 检测名称 映射为 id
        if isinstance(input, str):
            return self.target[input]
        elif isinstance(input, list):
            result = []
            for item in input:
                result.append(self.target[item])
            if len(result) == 0:return []
            return list(set(np.concatenate([r for r in result]).astype(int).tolist()))

    def id2name(self, input):
        '''

        :param input: int 或 [int,int,int...]
        :return:
        '''
        # id -> 类别名称
        result = []
        if isinstance(input, int):
            input = [input]
        for id in input:
            for k,v in self.target.items(): # k: 类别名称 , v: id_list
                if id in v:result.append(k)

        return list(set(result))



    def predict_isJiaodi(self,frames):
        '''
        调用 jiaodi.pt 分类模型
        :return: True:交底,False:没检测到 交底
        '''

        jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False)
        jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results]
        for prob in jiaodi_prob:
            if prob > 0.6:
                return True
        return False

    def predict_laobao(self, frames):
        '''
        调用 labor-v8-20241114.pt OD 模型
        :param frames:
        :return: [类别1,类别2]
        '''
        target_idx = self.name2id(self.getUndetectedTarget())
        results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6,
                                            save=False, verbose=False)  # results:list(4) 4帧的检测结果
        pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(
            int).tolist()))  # 检测到的目标类别id_list,已去重
        return self.id2name(pred_c_list)

    def updateUnpredictedTargets(self, jiaodi_flag, pred_labels):
        '''
        更新 已检测到的目标 列表 和有无检测到交底
        :param pred_c_list: [str, str...]
        :return: None
        '''
        for pred_label in pred_labels:
            print(f"检测到{pred_label}")
            self.target_flag[pred_label] = True
        self.jiaodi_flag = jiaodi_flag
        if self.jiaodi_flag == False and jiaodi_flag ==True:
            print(f"劳保检测:检测到交底")
            self.alarm.addAlarm("-------劳保检测:检测到交底")

    def model_predict_fake(self, video_path):
        cap = cv2.VideoCapture(video_path)
        while True:
            try:
                ret, frames = cap.read()
                frames = [frames]
                if not ret:
                    break
                # cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE)
                # cv2.resizeWindow('Video Frame', 800, 600)  # 宽度800像素,高度600像素
                # cv2.imshow("Video Frame", frames[0])
            except Exception as ex:
                traceback.print_exc()
                logger.error(ex)
                cap.release()
                cv2.destroyAllWindows()
            # 等待1毫秒,检查是否按下了'q'键退出
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

            if self.eventController != None and self.eventController.timeout_event.is_set():  # 超时退出
                cap.release()
                cv2.destroyAllWindows()
                return

            # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出)
            jiaodi_flag = self.predict_isJiaodi(frames)   # bool, 检测 新收集的这几帧有无交底
            pred_label = self.predict_laobao(frames)   # [str, str...]
            self.updateUnpredictedTargets(jiaodi_flag, pred_label)

            if self.eventController != None and  self.eventController.umd_complete.is_set():  # 上中下气体检测完毕

                # 检验所有物体都检验到了吗
                if self.getUndetectedTarget() == []:  # 如果全部检验到了
                    # print("劳保物品 通过")
                    # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过")
                    self.alarm.addAlarm("-------劳保检测:劳保物品 通过")
                    self.eventController.laobao_complete.set()
                    cap.release()
                    cv2.destroyAllWindows()
                    return  # 退出检测
                else:  # 如果还有未检测到的
                    undetectedTargets = self.getUndetectedTarget()
                    self.alarm.addAlarm(f"-------报警:劳保检测:劳保物品缺失:{undetectedTargets[0]}")
        cap.release()
        cv2.destroyAllWindows()
    def model_predict(self, stream_loader):
        for frames in stream_loader:  # type : list (4),连续的4帧
            if self.eventController != None and self.eventController.timeout_event.is_set():  # 超时退出
                return
            # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出)
            jiaodi_flag = self.predict_isJiaodi(frames)  # bool, 检测 新收集的这几帧有无交底
            pred_label = self.predict_laobao(frames)  # [str, str...]
            self.updateUnpredictedTargets(jiaodi_flag, pred_label)

            if self.eventController != None and self.eventController.umd_complete.is_set():  # 上中下气体检测完毕

                # 检验所有物体都检验到了吗
                if self.getUndetectedTarget() == []:  # 如果全部检验到了
                    # print("劳保物品 通过")
                    # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过")
                    self.alarm.addAlarm(f"劳保检测:劳保物品 通过")
                    self.eventController.laobao_complete.set()
                    return  # 退出检测
                else:  # 如果还有未检测到的

                    undetectedTargets = self.getUndetectedTarget()
                    self.alarm.addAlarm(f"-------报警:劳保检测:劳保物品缺失:{undetectedTargets[0]}")

class YinHuanCheck():
    def __init__(self, eventController=None, alarm=None):
        self.model = YOLO("weights/yinhuan.pt")
        self.eventController = eventController
        self.alarm = alarm


        self.anquanmao_tolerate = []
        self.xiyan_tolerate = []
        self.xiubiao_tolerate = []
        self.dadianhua_tolerate = []
        self.xianzaren_tolerate = []

    def id2name(self, input):
        '''

        :param input: int 或 [int,int,int...]
        :return:
        '''
        # id -> 类别名称
        result = []
        if isinstance(input, int):
            input = [input]
        for id in input:
            for k,v in self.model.names.items(): # k: id , v: label名称
                if k == id:result.append(v)

        return list(set(result))


    def judge_anquanmao(self, person_targets):
        '''
        判定有没有 没带安全帽的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:    # 每个人身上的物件
            if "安全帽" not in person_result:
                return False    # 没有通过安全帽检测
        return True # 通过安全帽检测


    def judge_xiyan(self, person_targets):
        '''
        判定有没有 吸烟的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:    # 每个人身上的物件
            if "烟头" in person_result:   # 有一个人抽烟
                return False    # 没有通过吸烟检测
        return True # 通过吸烟检测

    def judge_xiubiao(self, person_targets):
        '''
        判定有没有 袖标的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:  # 每个人身上的物件
            if "袖标" in person_result:   # 有一个人带袖标就通过
                return True  # 通过袖标检测
        return False  # 没有通过袖标检测
    def judge_dadianhua(self, person_targets):
        '''
        判定有没有 打电话的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:  # 每个人身上的物件
            if "电话" in person_result:  # 有一个人打电话
                return False  # 没有通过打电话检测
        return True  # 通过打电话检测
    def judge_xianzaren(self, person_targets):
        '''
        判定有没有 打电话的人
        param person_targets: [[label1,label2..],  [label1...]]  每个人身上的目标
        :return:
        '''
        for person_result in person_targets:  # 每个人身上的物件
            if "安全帽" not in person_result and "工服" not in person_result:    # 既不穿安全帽也不穿安全帽
                return False  # 没有通过闲杂人检测
        return True  # 通过闲杂人检测
    def detect_person(self, frames):
        '''

        :param frames: list of pic or ndarray
        :return: list of ndarray of person
        '''
        people_results = self.model.predict(source=frames, conf=0.6,classes = [0],
                                            save=False, verbose=False) # 检测人
        result = []
        for people_result in people_results:
            orig_img = people_result.orig_img   # 这一帧的原图矩阵
            person_boxes = people_result.boxes.xyxy.tolist()   # 这一帧所有人的box的xyxy
            for box in person_boxes:
                cropped_image = orig_img[int(box[1]):int(box[3]),
                                int(box[0]):int(box[2])]  # y1:y2, x1:x2

                result.append(cropped_image)
        return result

    def isAlarm(self):
        def has_consecutive_false(bool_list, threshold=20):
            count = 0  # 计数器,用于记录连续 True 的次数
            for value in bool_list:
                if value == False:  # 如果当前值为 False
                    count += 1  # 增加计数器
                    if count > threshold:  # 如果计数器超过阈值
                        return True
                else:  # 如果当前值为 False
                    count = 0  # 重置计数器
            return False  # 遍历完成后没有发现连续超过 threshold 次的 True

        if has_consecutive_false(self.anquanmao_tolerate) == True and self.alarm != None:
            self.anquanmao_tolerate.clear()
            self.alarm.addAlarm("-------报警:未带安全帽")

        if has_consecutive_false(self.xiyan_tolerate) == True and self.alarm != None:
            self.xiyan_tolerate.clear()
            self.alarm.addAlarm("-------报警:吸烟")
        if has_consecutive_false(self.xiubiao_tolerate) == True and self.alarm != None:
            self.xiubiao_tolerate.clear()
            self.alarm.addAlarm("-------报警:袖标")

        if has_consecutive_false(self.dadianhua_tolerate) == True and self.alarm != None:
            self.dadianhua_tolerate.clear()
            self.alarm.addAlarm("-------报警:打电话")

        if has_consecutive_false(self.xianzaren_tolerate) == True and self.alarm != None:
            self.xianzaren_tolerate.clear()
            self.alarm.addAlarm("-------报警:闲杂人")
    def detect_person_targets(self,person_crops):
        '''

        :param person_crops: list of ndarray of person
        :return: [[label1,label2..],  [label1...]]  每个人身上的目标
        '''
        if len(person_crops) == 0:return []
        results = self.model.predict(source=person_crops, conf=0.6, classes=[2, 3, 4, 5, 6],  # 安全帽、工服、烟头、电话、袖标
                                     save=False, verbose=False)  # 检测人
        person_detect_targets = []
        for result in results:  # 每个人的检测结果
            labels = result.boxes.cls.tolist()  # 这个人的检测标签,id形式
            labels = [int(num) for num in labels]  # 转换为 int
            labels = self.id2name(labels)  # 转换为label_name
            person_detect_targets.append(labels)
        return person_detect_targets

    def main_fake(self, video_path):
        cap = cv2.VideoCapture(video_path)
        ret, frames = cap.read()
        while True:
            try:
                ret, frames = cap.read()
                frames = [frames]
                if not ret:
                    cap.release()
                    cv2.destroyAllWindows()
                    break
                cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE)
                cv2.resizeWindow('Video Frame2', 800, 600)  # 宽度800像素,高度600像素
                cv2.imshow("Video Frame2", frames[0])
            except Exception as ex:
                traceback.print_exc()
                logger.error(ex)
                cv2.destroyAllWindows()
            # 等待1毫秒,检查是否按下了'q'键退出
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            people_results = self.detect_person(frames) # 检测人
            person_detect_targets = self.detect_person_targets(people_results)
            self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets))
            self.xiubiao_tolerate.append(self.judge_xiubiao(person_detect_targets))
            self.xiyan_tolerate.append(self.judge_xiyan(person_detect_targets))
            self.xianzaren_tolerate.append(self.judge_xianzaren(person_detect_targets))
            self.dadianhua_tolerate.append(self.judge_dadianhua(person_detect_targets))
            self.isAlarm()

    def main(self, stream_loader):
        for frames in stream_loader:  # type : list (4),连续的4帧
            people_results = self.detect_person(frames) # 检测人
            person_detect_targets = self.detect_person_targets(people_results)
            self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets))
            self.xiubiao_tolerate.append(self.judge_xiubiao(person_detect_targets))
            self.xiyan_tolerate.append(self.judge_xiyan(person_detect_targets))
            self.xianzaren_tolerate.append(self.judge_xianzaren(person_detect_targets))
            self.dadianhua_tolerate.append(self.judge_dadianhua(person_detect_targets))
            self.isAlarm()

class Alarm():
    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpClientManager, main_loop, eventController = None):
        self.pool = []
        self.device = device
        self.thread_id = thread_id
        self.tcp_manager =tcp_manager
        self.main_loop = main_loop
        self.eventController = eventController

        self.alarm_interval_dict = {}
        self.alarm_interval = device.alarm_interval

        self.socket_interval_dict = {}
        self.socket_interval = device.alarm_interval
        self.socket_retry = 3
    def addAlarm(self, content):
        '''
        添加一条报警到报警队列中
        :param content:
        :return:
        '''
        if content in self.pool:
            self.pool.remove(content)
        self.pool.append(content)

    def deleteAlarmOfLaoBao(self):
        '''
        删除池子中有关劳保检测的报警
        :return:
        '''
        self.pool = [item for item in self.pool if "劳保" not in item]

    def deleteAlaramOfUmdGas(self):
        '''
        删除池子中有关上中下气体的报警
        :return:
        '''
        self.pool = [item for item in self.pool if "劳保" not in item]
    def main(self):
        for i in range(1000000):
            if self.eventController.timeout_event.is_set():  # 前置条件检查超时
                self.deleteAlarmOfLaoBao()
                self.deleteAlaramOfUmdGas()
            if len(self.pool) != 0:
                content = self.pool.pop(0)
                print(f"{content},报警队列长度:{len(self.pool)}")
                # self.send_alarm_message("no_jiandu")
            time.sleep(1)

    def send_tcp_message(self, message: bytes, have_response=False):
        asyncio.run_coroutine_threadsafe(
            self.tcp_manager.send_message_to_device(device_id=self.device.id,
                                                    message=message,
                                                    have_response=have_response),
            self.main_loop)

    def send_alarm_message(self, type):
        if self.tcp_manager:
            # if self.socket_interval_dict.get(type) is None \
            #         or (datetime.now() - self.socket_interval_dict.get(type)).total_seconds() > int(self.socket_interval):
            logger.debug("send alarm message %s %s", ALARM_DICT[type]['alarmContent'],
                         ALARM_DICT[type]['alarmSoundMessage'])
            self.send_tcp_message(ALARM_DICT[type]['alarmSoundMessage'], have_response=True)
            self.socket_interval_dict[type] = datetime.now()

class ZynLimitSpaceSceneHandler(BaseSceneHandler):

    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpClientManager, main_loop, range_points):
        super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)


        self.start_time = time.time()   # 脚本启动时间戳

        # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
        #                                       device_thread_id=thread_id)

        self.health_ts_dict = {}
        self.harmful_ts_dict = {}
        # todo 要改成通过后台接口读取设备编号
        self.health_device_codes = ['HWIH061000056395'] # 安全帽编号

        self.eventController = EventController()
        self.alarm = Alarm(device, thread_id,tcp_manager, main_loop, self.eventController)
        self.laobao_check = Laobaocheck(self.eventController, self.alarm)
        self.yinhuan_check = YinHuanCheck(self.eventController, self.alarm)
        self.umd_pool = {}
        self.siHeyi = SiHeYi("862635063168165A")
        self.anQuanMao = AnQuanMao("HWIH061000056395")





    def addLog(self, pool, time, text):
        if time not in pool.keys():
            pool[time] = [text]
        else:
            pool[time].append(text)
            pool[time] = list(set(pool[time]))  # 不添加重复的日志

    async def laobaoCheck_task(self):
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, self.laobao_check.model_predict_fake,
                                   r"/home/pc/Desktop/project/safe-algo-pro/2025-02-25 15-25-48.mkv")


    async def uMDGasCheck_task(self,eventController=None):
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        tflag_pool = []  # 返回数据正常了几次
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?")
        await loop.run_in_executor(executor, self.siHeyi.waitPowerOn, self.start_time)  # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!")
        self.alarm.addAlarm(f"-------报警:上中下气体检测:检测到四合一已开机")

        for i in range(1000000):  # 模拟循环检测气体
            if eventController.timeout_event.is_set():  # 超时退出
                return
            # print(f"uMDGasCheck_task:{i}")

            ch4, co, h2s, o2 = await loop.run_in_executor(executor, self.siHeyi.getNewData) # 判断气体是否合规
            flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2)
            if flag == False:
                tflag_pool.clear()
                self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体"
                # self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常")
                self.alarm.addAlarm(f"-------报警:上中下气体检测:上中下气体异常")
            else:
                print(f"上中下气体检测正常次数:{tflag_pool}")
                tflag_pool.append(True)
                if len(tflag_pool) == 3:
                    break  # 退出检测
        self.alarm.addAlarm(f"-------报警:上中下气体检测:上中下气体检测通过")
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过")
        self.eventController.umd_complete.set()
        return
    async def alarm_task(self):

        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, self.alarm.main)

    async def logger_task(self):
        def fun():
            for i in range(10000000):
                now = datetime.now().strftime("%H:%M:%S")
                if now in self.laobao_pool.keys():laobaoevent = self.laobao_pool[now]
                else: laobaoevent = "-"
                if now in self.umd_pool.keys():
                    umdevent = self.umd_pool[now]
                else:
                    umdevent = "-"
                writeFile("events_table.csv", [[now, laobaoevent, umdevent]])

                time.sleep(1)

        headers = ["时间", "劳保用品检测", "上中下气体检测"]
        writeFile("events_table.csv", [[headers[0],headers[1],headers[2]]])
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)

    async def yinhuanCheck_task(self):
        '''
        检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头)
        :return:
        '''
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, self.yinhuan_check.main_fake, r"/home/pc/Desktop/project/safe-algo-pro/2025-02-26 08-49-39.mkv")

    async def xinlvCheck_task(self):


        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        blood_oxygen, heartrate = await loop.run_in_executor(executor, self.anQuanMao.getNewData)
        flag = self.anQuanMao.isDataNormal(blood_oxygen, heartrate)
        if flag == False:
            self.alarm.addAlarm("-------报警:gasCheck:心率血氧数据异常")
    async def gasCheck(self):
        '''
        四合一气体检测
        :return:
        '''
        def fun():
            ch4, co, h2s, o2 = self.siHeyi.getNewData()
            flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2)
            if flag == False:
                self.alarm.addAlarm("------报警:gasCheck:四合一气体异常")
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)

    def run(self):
        async def fun():
            # 并行执行任务
            # uMDGasCheck_task = asyncio.create_task(
            #     self.uMDGasCheck_task(self.eventController))
            # laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task())
            alarm_task = asyncio.create_task(self.alarm_task())
            logger_task = asyncio.create_task(self.logger_task())

            # done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=300000.0)

            # if uMDGasCheck_task in done and laobaoCheck_task in done:
            #     await uMDGasCheck_task
            #     await laobaoCheck_task
            #     self.eventController.timeout_event.set()
            #     print("前置条件检查完成,退出")
            #
            # else:
            #     # 如果超时,则取消未完成的任务
            #     self.eventController.timeout_event.set()
            #     laobaoCheck_task.cancel()
            #     uMDGasCheck_task.cancel()
            #     print("前置条件检查时间过长,退出")

            # 并行执行任务
            print("开始工作")
            xinlvCheck = asyncio.create_task(self.xinlvCheck_task())
            yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task())
            gasCheck = asyncio.create_task(self.gasCheck())
            results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck)

            done1, pending1 = await asyncio.wait({alarm_task, logger_task}, timeout=300000.0)

        asyncio.run(fun())

if __name__=='__main__':
    # print(getNewGasData())
    model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt")
    print(model.names)