import asyncio import base64 import traceback from asyncio import Event from concurrent.futures import ThreadPoolExecutor from copy import deepcopy, copy from datetime import datetime import time import cv2 from datetime import datetime import csv from algo.model_manager import AlgoModelExec from algo.stream_loader import OpenCVStreamLoad from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.http_utils import send_request, get_request from common.image_plotting import Annotator from entity.device import Device import numpy as np from ultralytics import YOLO from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.base_scene_handler import BaseSceneHandler from services.global_config import GlobalConfig from tcp.tcp_manager import TcpManager last_time = "" def create_value_iterator(values): for value in values: yield value fake_list = [ # 假如这是你从四合一后台请求来的数据 {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}, {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}, {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}, ] value_iterator = create_value_iterator(fake_list) def flatten(lst): result = [] for i in lst: if isinstance(i, list): result.extend(flatten(i)) # 递归调用以处理嵌套列表 else: result.append(i) return result def getGasGata_fake(): # 模拟 读取四合一数据(四合一数据是假的) try: time.sleep(1) return next(value_iterator) except StopIteration: return None def getGasData(): # 从后台读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'} harmful_device_code = '862635063168165A' url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' response = get_request(url) if response and response.get('data'): return response.get('data') def getNewGasData(): global last_time data = getGasGata_fake() while True: if data == None: return None if last_time == "" or datetime.strptime(data["uptime"], '%Y-%m-%d %H:%M:%S') > datetime.strptime(last_time , '%Y-%m-%d %H:%M:%S'): # 有最新数据产生了! last_time = data["uptime"] # 更新最新数据时间 break else: # 获取的还是上次的重复数据,继续访问 data = getGasGata_fake() time.sleep(2) return data def isPowerOn(): ''' 模拟询问开机, :param flag:True:认定为开机,False,关机 :return: ''' global last_time print("检测四合一是否开机") while True: data = getNewGasData() if data == None: time.sleep(2) else: flag = datetime.now() < datetime.strptime(data.get('uptime'),"%Y-%m-%d %H:%M:%S") # T/F 开机/未开机 if flag == True: last_time = data["uptime"] return else: print("重新访问") time.sleep(2) def getGasFlag(): # 判断气体是否合规,T:合规。F:不合规 try: data = getNewGasData() if float(data["ch4"]) > 10.0 \ or float(data["co"]) > 10.0 \ or float(data["h2s"]) > 120.0 \ or float(data["o2"]) < 15: return False else:return True except Exception as e: print(str(e)) def writeFile(file_path, data): print(f"写入{data}") with open(file_path, mode='a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerows(data) class TestLimitSpaceSceneHandler(BaseSceneHandler): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) print("__init__") self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) # alarmCategory:优先级 0代表优先级最高 # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, # device_thread_id=thread_id) self.laobao_complete = asyncio.Event() self.umd_complete = asyncio.Event() # self.umd_complete.set() self.laobao_model = YOLO("weights/labor-v8-20241114.pt") self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} self.alarm_list = [] self.laobao_pool = {} self.umd_pool = {} def addLog(self, pool, time, text): if time not in pool.keys(): pool[time] = [text] else: pool[time].append(text) pool[time] = list(set(pool[time])) # 不添加重复的日志 async def laobaoCheck_task(self, executor, cancel_event=None): # 报警只在 上中下气体进行完了之后进行,就算上中下一直没结束 也不报警 def getWrongClass(): wrong_class_list = [] for k, v in self.target_flag.items(): if v == False: wrong_class_list.append(k) return wrong_class_list def getDetectedTarget(): # 获取已检测目标的名称,返回str列表 result = [] for name,flag in self.target_flag.items(): if flag == True:result.append(name) return result def getUndetectedTarget(): # 获取未检测目标的名称,返回str列表 result = [] for name, flag in self.target_flag.items(): if flag == False: result.append(name) return result def name2id(input): # 检测名称 映射为 id if isinstance(input, str): return self.target[input] elif isinstance(input, list): result = [] for item in input: result.append(self.target[item]) return list(set(np.concatenate([r for r in result]).astype(int).tolist())) def model_predict_test(video_path): self.addLog(self.laobao_pool,datetime.now().strftime("%H:%M:%S"),"开始劳保检测") cap = cv2.VideoCapture(video_path) ret, frames = cap.read() # output_file = "output.avi" # 输出视频文件名 # fps = 20.0 # 每秒帧数 # frame_size = (1280, 720) # 帧大小,根据实际情况调整 # fourcc = cv2.VideoWriter_fourcc(*'XVID') # 使用XVID编码器 # out = cv2.VideoWriter(output_file, fourcc, fps, frame_size) while True: # for frames in self.stream_loader: # type : list (4),连续的4帧 try: ret, frames = cap.read() frames = [frames] self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") if not ret: self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") cap.release() cv2.destroyAllWindows() break # print(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx{len(frames)}") cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 cv2.imshow("Video Frame", frames[0]) # out.write(frames[0]) # out.write(frames[1]) # out.write(frames[2]) # out.write(frames[3]) except Exception as ex: traceback.print_exc() logger.error(ex) cv2.destroyAllWindows() # 等待1毫秒,检查是否按下了'q'键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break if cancel_event.is_set(): # 超时退出 return # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) try: target_idx = name2id(getUndetectedTarget()) results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, save=False, verbose=False) # results:list(4) 4帧的检测结果 pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) # 检测到的目标类别id_list,已去重 for k, v in self.target.items(): # k:类别名称,v:类别id, 更新target_flag状态,如果之前存在c物体的报警,现在检测到c物体,移除它的报警 for pred_c in pred_c_list: if pred_c in v: self.target_flag[k] = True self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),f"检测到{k}") print(f"检测到{k}") if f"报警:劳保物品缺失:{k}" in self.alarm_list:self.alarm_list.remove(f"报警:劳保物品缺失:{k}") break if self.umd_complete.is_set(): # 上中下气体检测完毕 # 检验所有物体都检验到了吗 if getUndetectedTarget()==[]: # 如果全部检验到了 # print("劳保物品 通过") self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") self.alarm_list.append("劳保物品 通过") self.laobao_complete.set() cap.release() cv2.destroyAllWindows() return # 退出检测 else: # 如果还有未检测到的 undetectedTargets = getUndetectedTarget() # print(f"报警:劳保物品缺失:{wrong_class_list}") for undetectedTarget in undetectedTargets: alarm_content = f"报警:劳保物品缺失:{undetectedTarget}" if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 self.alarm_list.append(alarm_content) self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"报警:劳保物品缺失:{undetectedTarget}") # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 except Exception as e: print(str(e)) def model_predict(): for frames in self.stream_loader: # type : list (4),连续的4帧 if cancel_event.is_set(): # 超时退出 return # print(f"laobaoCheck_task:检测中,已检测到{self.complete_target}") target_idx = [] for k,v in self.target.items(): if k not in self.complete_target: target_idx.append(v) results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, save=False, verbose=False) # results:list(4) 4帧的检测结果 pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) for k, v in self.target.items(): for pred_c in pred_c_list: if pred_c in v: self.target_flag[k] = True self.complete_target.append(k) # 检测到 类别c 以后就不需要检测c self.alarm_list.remove(f"报警:劳保物品缺失:{k}") break if self.umd_complete.is_set(): # 上中下气体检测完毕 # 检验所有物体都检验到了吗 if all(self.target_flag.values()): # 如果全部检验到了 # print("劳保物品 通过") self.alarm_list.append("劳保物品 通过") self.laobao_complete.set() return # 退出检测 else: wrong_class_list = getWrongClass() # print(f"报警:劳保物品缺失:{wrong_class_list}") for wrong_class in wrong_class_list: alarm_content = f"报警:劳保物品缺失:{wrong_class}" if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 self.alarm_list.append(alarm_content) # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 loop = asyncio.get_running_loop() await loop.run_in_executor(executor, model_predict_test, r"/home/pc/桌面/2025-01-21 14-42-00.mkv") # await loop.run_in_executor(executor, model_predict) print("劳保用品退出检测") async def uMDGasCheck_task(self,cancel_event=None): executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() tflag_pool = [] # 返回数据正常了几次 self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?") await loop.run_in_executor(executor, isPowerOn) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!") for i in range(1000000): # 模拟循环检测气体 if cancel_event.is_set(): # 超时退出 return print(f"uMDGasCheck_task:{i}") flag = await loop.run_in_executor(executor, getGasFlag) # 判断气体是否合规 if flag == False: tflag_pool.clear() self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体" self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常") else: tflag_pool.append(True) if len(tflag_pool) == 3: break # 退出检测 print("上中下气体检测通过") self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过") self.umd_complete.set() return async def alarm_task(self, cancel_event): def fun(): for i in range(10000): # 检验中 if self.laobao_complete.is_set(): if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过") return # print(f"alarm_task:{len(self.alarm_list)}") if cancel_event.is_set(): # 超时退出 return if len(self.alarm_list) != 0: print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}") time.sleep(1) executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() await loop.run_in_executor(executor, fun) async def logger_task(self): def fun(): for i in range(10000000): now = datetime.now().strftime("%H:%M:%S") if now in self.laobao_pool.keys():laobaoevent = self.laobao_pool[now] else: laobaoevent = "-" if now in self.umd_pool.keys(): umdevent = self.umd_pool[now] else: umdevent = "-" writeFile("events_table.csv", [[now, laobaoevent, umdevent]]) time.sleep(1) headers = ["时间", "劳保用品检测", "上中下气体检测"] writeFile("events_table.csv", [[headers[0],headers[1],headers[2]]]) executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() await loop.run_in_executor(executor, fun) def run(self): print("runrunrunrunrunrunrunrunrunrunrunrunrunrunrunv") async def fun(): # d = { # 'alarmCategory': 0, # 'alarmType': '1', # 'category_order': 1, # alarmCategory 下的优先级 # 'alarm_name': 'no_fire_extinguisher', # 'alarmContent': '未检测到灭火器', # 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', # 'label': '' # } # self.alarm_message_center.add_message(d) # self.alarm_message_center.process_messages() # 并行执行任务 cancel_event = asyncio.Event() uMDGasCheck_task = asyncio.create_task( self.uMDGasCheck_task(cancel_event)) laobaoCheck_executor = ThreadPoolExecutor(max_workers=3) laobaoCheck_task = asyncio.create_task( self.laobaoCheck_task(laobaoCheck_executor, cancel_event)) alarm_task = asyncio.create_task(self.alarm_task(cancel_event)) logger_task = asyncio.create_task(self.logger_task()) done, pending = await asyncio.wait({logger_task, uMDGasCheck_task, laobaoCheck_task, alarm_task}, timeout=300000.0) if uMDGasCheck_task in done and laobaoCheck_task in done: await uMDGasCheck_task await laobaoCheck_task # alarm_task.cancel() print("前置条件检查完成,退出") else: # 如果超时,则取消未完成的任务 cancel_event.set() laobaoCheck_executor.shutdown(wait=False) laobaoCheck_task.cancel() uMDGasCheck_task.cancel() print("前置条件检查时间过长,退出") # 并行执行任务 # print("开始工作") # yinhuanCheck_task = asyncio.create_task(yinhuanCheck_task_fake(10000)) # gasCheck = asyncio.create_task(gasCheck_task_fake(10000)) # xinlvCheck = asyncio.create_task(xinlvCheck_task_fake(10000)) # results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck) asyncio.run(fun()) if __name__=='__main__': # print(getNewGasData()) # print(getNewGasData()) getGasFlag()