Newer
Older
safe-algo-pro / scene_handler / zyn_limit_space_scene_handler.py
import asyncio
import base64
import traceback
from asyncio import Event
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy, copy
from datetime import datetime
import time
import cv2
from datetime import datetime
import csv

from algo.model_manager import AlgoModelExec
from algo.stream_loader import OpenCVStreamLoad
from common.device_status_manager import DeviceStatusManager
from common.global_logger import logger
from common.global_thread_pool import GlobalThreadPool
from common.http_utils import send_request, get_request
from common.image_plotting import Annotator
from entity.device import Device
import numpy as np
from ultralytics import YOLO

from scene_handler.alarm_message_center import AlarmMessageCenter
from scene_handler.base_scene_handler import BaseSceneHandler
from services.global_config import GlobalConfig
from tcp.tcp_manager import TcpManager

last_time = ""
def create_value_iterator(values):
    for value in values:
        yield value
fake_list = [   # 假如这是你从四合一后台请求来的数据
    {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'},
    {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'},
    {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'},
    {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'},
    {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
     'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'},
]
value_iterator = create_value_iterator(fake_list)


def flatten(lst):
    result = []
    for i in lst:
        if isinstance(i, list):
            result.extend(flatten(i))  # 递归调用以处理嵌套列表
        else:
            result.append(i)
    return result


def getGasGata_fake():
    # 模拟 读取四合一数据(四合一数据是假的)
    try:
        time.sleep(1)
        return next(value_iterator)
    except StopIteration:
        return None


def getGasData():
    # 从后台读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'}
    harmful_device_code = '862635063168165A'
    url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'
    response = get_request(url)
    if response and response.get('data'):
        return response.get('data')

def getNewGasData():
    global last_time
    data = getGasGata_fake()
    while True:
        if data == None: return None
        if last_time == "" or  datetime.strptime(data["uptime"], '%Y-%m-%d %H:%M:%S') > datetime.strptime(last_time , '%Y-%m-%d %H:%M:%S'): # 有最新数据产生了!
            last_time = data["uptime"]  # 更新最新数据时间
            break
        else:   # 获取的还是上次的重复数据,继续访问
            data = getGasGata_fake()
            time.sleep(2)
    return data


def isPowerOn():
    '''
    模拟询问开机,
    :param flag:True:认定为开机,False,关机
    :return:
    '''
    global last_time
    print("检测四合一是否开机")

    while True:
        data = getNewGasData()
        if data == None:
            time.sleep(2)
        else:
            flag = datetime.now() < datetime.strptime(data.get('uptime'),"%Y-%m-%d %H:%M:%S")  # T/F 开机/未开机
            if flag == True:
                last_time = data["uptime"]
                return
            else:
                print("重新访问")
                time.sleep(2)

def getGasFlag():
    # 判断气体是否合规,T:合规。F:不合规
    try:
        data = getNewGasData()
        if float(data["ch4"]) > 10.0 \
                or float(data["co"]) > 10.0 \
                or float(data["h2s"]) > 120.0 \
                or float(data["o2"]) < 15:
            return False
        else:return True
    except Exception as e:
        print(str(e))

def writeFile(file_path, data):
    print(f"写入{data}")
    with open(file_path, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerows(data)


class TestLimitSpaceSceneHandler(BaseSceneHandler):

    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points):
        super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)
        print("__init__")
        self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager,
                                                       category_priority={2: 0, 1: 1, 3: 2,
                                                                          0: 3})  # alarmCategory:优先级 0代表优先级最高
        # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
        #                                       device_thread_id=thread_id)


        self.laobao_complete = asyncio.Event()
        self.umd_complete = asyncio.Event()
        # self.umd_complete.set()
        self.laobao_model = YOLO("weights/labor-v8-20241114.pt")
        self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]}
        self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False}
        self.alarm_list = []
        self.laobao_pool = {}
        self.umd_pool = {}

    def addLog(self, pool, time, text):
        if time not in pool.keys():
            pool[time] = [text]
        else:
            pool[time].append(text)
            pool[time] = list(set(pool[time]))  # 不添加重复的日志

    async def laobaoCheck_task(self, executor, cancel_event=None):
        # 报警只在 上中下气体进行完了之后进行,就算上中下一直没结束 也不报警

        def getWrongClass():
            wrong_class_list = []
            for k, v in self.target_flag.items():
                if v == False:
                    wrong_class_list.append(k)
            return wrong_class_list

        def getDetectedTarget():
            # 获取已检测目标的名称,返回str列表
            result = []
            for name,flag in self.target_flag.items():
                if flag == True:result.append(name)
            return result

        def getUndetectedTarget():
            # 获取未检测目标的名称,返回str列表
            result = []
            for name, flag in self.target_flag.items():
                if flag == False: result.append(name)
            return result

        def name2id(input):
            # 检测名称 映射为 id
            if isinstance(input, str):
                return self.target[input]
            elif isinstance(input, list):
                result = []
                for item in input:
                    result.append(self.target[item])
                return list(set(np.concatenate([r for r in result]).astype(int).tolist()))

        def model_predict_test(video_path):
            self.addLog(self.laobao_pool,datetime.now().strftime("%H:%M:%S"),"开始劳保检测")
            cap = cv2.VideoCapture(video_path)
            ret, frames = cap.read()

            # output_file = "output.avi"  # 输出视频文件名
            # fps = 20.0  # 每秒帧数
            # frame_size = (1280, 720)  # 帧大小,根据实际情况调整
            # fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 使用XVID编码器
            # out = cv2.VideoWriter(output_file, fourcc, fps, frame_size)
            while True:
            # for frames in self.stream_loader:  # type : list (4),连续的4帧
                try:
                    ret, frames = cap.read()
                    frames = [frames]
                    self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧")
                    if not ret:
                        self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出")
                        cap.release()
                        cv2.destroyAllWindows()
                        break
                    # print(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx{len(frames)}")
                    cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE)
                    cv2.resizeWindow('Video Frame', 800, 600)  # 宽度800像素,高度600像素
                    cv2.imshow("Video Frame", frames[0])
                    # out.write(frames[0])
                    # out.write(frames[1])
                    # out.write(frames[2])
                    # out.write(frames[3])
                except Exception as ex:
                    traceback.print_exc()
                    logger.error(ex)
                    cv2.destroyAllWindows()
                # 等待1毫秒,检查是否按下了'q'键退出
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

                if cancel_event.is_set():  # 超时退出
                    return

                # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出)
                try:
                    target_idx = name2id(getUndetectedTarget())
                    results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6,
                                                        save=False, verbose=False)  # results:list(4) 4帧的检测结果
                    pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) # 检测到的目标类别id_list,已去重
                    for k, v in self.target.items():    # k:类别名称,v:类别id, 更新target_flag状态,如果之前存在c物体的报警,现在检测到c物体,移除它的报警
                        for pred_c in pred_c_list:
                            if pred_c in v:
                                self.target_flag[k] = True
                                self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),f"检测到{k}")
                                print(f"检测到{k}")
                                if f"报警:劳保物品缺失:{k}" in self.alarm_list:self.alarm_list.remove(f"报警:劳保物品缺失:{k}")
                                break
                    if self.umd_complete.is_set():  # 上中下气体检测完毕
                        # 检验所有物体都检验到了吗
                        if getUndetectedTarget()==[]:  # 如果全部检验到了
                            # print("劳保物品 通过")
                            self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过")
                            self.alarm_list.append("劳保物品 通过")
                            self.laobao_complete.set()
                            cap.release()
                            cv2.destroyAllWindows()
                            return  # 退出检测
                        else:   # 如果还有未检测到的
                            undetectedTargets = getUndetectedTarget()
                            # print(f"报警:劳保物品缺失:{wrong_class_list}")

                            for undetectedTarget in undetectedTargets:
                                alarm_content = f"报警:劳保物品缺失:{undetectedTarget}"
                                if alarm_content not in self.alarm_list:    # 不加入 重复的报警内容
                                    self.alarm_list.append(alarm_content)
                                    self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"报警:劳保物品缺失:{undetectedTarget}")
                                # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}")  # 对于同样内容的报警,删除 早的报警
                                # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}")  # 保留新的报警
                except Exception as e:
                    print(str(e))
        def model_predict():
            for frames in self.stream_loader:  # type : list (4),连续的4帧
                if cancel_event.is_set():  # 超时退出
                    return
                # print(f"laobaoCheck_task:检测中,已检测到{self.complete_target}")
                target_idx = []
                for k,v in self.target.items():
                    if k not in self.complete_target:
                        target_idx.append(v)
                results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6,
                                                    save=False, verbose=False)  # results:list(4) 4帧的检测结果
                pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist()))
                for k, v in self.target.items():
                    for pred_c in pred_c_list:
                        if pred_c in v:
                            self.target_flag[k] = True
                            self.complete_target.append(k)  # 检测到 类别c 以后就不需要检测c
                            self.alarm_list.remove(f"报警:劳保物品缺失:{k}")

                            break
                if self.umd_complete.is_set():  # 上中下气体检测完毕
                    # 检验所有物体都检验到了吗
                    if all(self.target_flag.values()):  # 如果全部检验到了
                        # print("劳保物品 通过")
                        self.alarm_list.append("劳保物品 通过")
                        self.laobao_complete.set()
                        return  # 退出检测
                    else:
                        wrong_class_list = getWrongClass()
                        # print(f"报警:劳保物品缺失:{wrong_class_list}")

                        for wrong_class in wrong_class_list:
                            alarm_content = f"报警:劳保物品缺失:{wrong_class}"
                            if alarm_content not in self.alarm_list:    # 不加入 重复的报警内容
                                self.alarm_list.append(alarm_content)
                            # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}")  # 对于同样内容的报警,删除 早的报警
                            # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}")  # 保留新的报警


        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, model_predict_test, r"/home/pc/桌面/2025-01-21 14-42-00.mkv")
        # await loop.run_in_executor(executor, model_predict)
        print("劳保用品退出检测")

    async def uMDGasCheck_task(self,cancel_event=None):

        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        tflag_pool = []  # 返回数据正常了几次
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?")
        await loop.run_in_executor(executor, isPowerOn)  # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!")

        for i in range(1000000):  # 模拟循环检测气体
            if cancel_event.is_set():  # 超时退出
                return
            print(f"uMDGasCheck_task:{i}")

            flag = await loop.run_in_executor(executor, getGasFlag) # 判断气体是否合规
            if flag == False:
                tflag_pool.clear()
                self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体"
                self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常")
            else:
                tflag_pool.append(True)
                if len(tflag_pool) == 3:
                    break  # 退出检测
        print("上中下气体检测通过")
        self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过")
        self.umd_complete.set()
        return
    async def alarm_task(self, cancel_event):
        def fun():
            for i in range(10000):  # 检验中
                if self.laobao_complete.is_set():
                    if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过")
                    return
                # print(f"alarm_task:{len(self.alarm_list)}")
                if cancel_event.is_set():  # 超时退出
                    return
                if len(self.alarm_list) != 0:
                    print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}")
                time.sleep(1)

        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)

    async def logger_task(self):
        def fun():
            for i in range(10000000):
                now = datetime.now().strftime("%H:%M:%S")
                if now in self.laobao_pool.keys():laobaoevent = self.laobao_pool[now]
                else: laobaoevent = "-"
                if now in self.umd_pool.keys():
                    umdevent = self.umd_pool[now]
                else:
                    umdevent = "-"
                writeFile("events_table.csv", [[now, laobaoevent, umdevent]])

                time.sleep(1)

        headers = ["时间", "劳保用品检测", "上中下气体检测"]
        writeFile("events_table.csv", [[headers[0],headers[1],headers[2]]])
        executor = ThreadPoolExecutor(max_workers=3)
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(executor, fun)
    def run(self):
        print("runrunrunrunrunrunrunrunrunrunrunrunrunrunrunv")
        async def fun():
            # d = {
            #     'alarmCategory': 0,
            #     'alarmType': '1',
            #     'category_order': 1,  # alarmCategory 下的优先级
            #     'alarm_name': 'no_fire_extinguisher',
            #     'alarmContent': '未检测到灭火器',
            #     'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
            #     'label': ''
            # }
            # self.alarm_message_center.add_message(d)
            # self.alarm_message_center.process_messages()

            # 并行执行任务
            cancel_event = asyncio.Event()
            uMDGasCheck_task = asyncio.create_task(
                self.uMDGasCheck_task(cancel_event))
            laobaoCheck_executor = ThreadPoolExecutor(max_workers=3)
            laobaoCheck_task = asyncio.create_task(
                self.laobaoCheck_task(laobaoCheck_executor, cancel_event))
            alarm_task = asyncio.create_task(self.alarm_task(cancel_event))
            logger_task = asyncio.create_task(self.logger_task())
            done, pending = await asyncio.wait({logger_task, uMDGasCheck_task, laobaoCheck_task, alarm_task}, timeout=300000.0)
            if uMDGasCheck_task in done and laobaoCheck_task in done:
                await uMDGasCheck_task
                await laobaoCheck_task
                # alarm_task.cancel()
                print("前置条件检查完成,退出")

            else:
                # 如果超时,则取消未完成的任务
                cancel_event.set()
                laobaoCheck_executor.shutdown(wait=False)
                laobaoCheck_task.cancel()
                uMDGasCheck_task.cancel()
                print("前置条件检查时间过长,退出")

            # 并行执行任务
            # print("开始工作")
            # yinhuanCheck_task = asyncio.create_task(yinhuanCheck_task_fake(10000))
            # gasCheck = asyncio.create_task(gasCheck_task_fake(10000))
            # xinlvCheck = asyncio.create_task(xinlvCheck_task_fake(10000))
            # results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck)

        asyncio.run(fun())

if __name__=='__main__':
    # print(getNewGasData())
    # print(getNewGasData())

    getGasFlag()