diff --git a/common/global_thread_pool.py b/common/global_thread_pool.py index ef9a356..917d262 100644 --- a/common/global_thread_pool.py +++ b/common/global_thread_pool.py @@ -30,7 +30,10 @@ def _initialize(self, max_workers): self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.loop = asyncio.get_running_loop() # 获取当前的事件循环 + try: + self.loop = asyncio.get_running_loop() # 获取当前的事件循环 + except: + self.loop = asyncio.new_event_loop() self.task_map = {} # def __new__(cls, *args, **kwargs): diff --git a/common/global_thread_pool.py b/common/global_thread_pool.py index ef9a356..917d262 100644 --- a/common/global_thread_pool.py +++ b/common/global_thread_pool.py @@ -30,7 +30,10 @@ def _initialize(self, max_workers): self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.loop = asyncio.get_running_loop() # 获取当前的事件循环 + try: + self.loop = asyncio.get_running_loop() # 获取当前的事件循环 + except: + self.loop = asyncio.new_event_loop() self.task_map = {} # def __new__(cls, *args, **kwargs): diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 191f99f..08505c8 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/common/global_thread_pool.py b/common/global_thread_pool.py index ef9a356..917d262 100644 --- a/common/global_thread_pool.py +++ b/common/global_thread_pool.py @@ -30,7 +30,10 @@ def _initialize(self, max_workers): self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.loop = asyncio.get_running_loop() # 获取当前的事件循环 + try: + self.loop = asyncio.get_running_loop() # 获取当前的事件循环 + except: + self.loop = asyncio.new_event_loop() self.task_map = {} # def __new__(cls, *args, **kwargs): diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 191f99f..08505c8 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/zyn_limit_space_scene_handler.py b/scene_handler/zyn_limit_space_scene_handler.py new file mode 100644 index 0000000..7d24fa6 --- /dev/null +++ b/scene_handler/zyn_limit_space_scene_handler.py @@ -0,0 +1,430 @@ +import asyncio +import base64 +import traceback +from asyncio import Event +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy, copy +from datetime import datetime +import time +import cv2 +from datetime import datetime +import csv + +from algo.model_manager import AlgoModelExec +from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager +from common.global_logger import logger +from common.global_thread_pool import GlobalThreadPool +from common.http_utils import send_request, get_request +from common.image_plotting import Annotator +from entity.device import Device +import numpy as np +from ultralytics import YOLO + +from scene_handler.alarm_message_center import AlarmMessageCenter +from scene_handler.base_scene_handler import BaseSceneHandler +from services.global_config import GlobalConfig +from tcp.tcp_manager import TcpManager + +last_time = "" +def create_value_iterator(values): + for value in values: + yield value +fake_list = [ # 假如这是你从四合一后台请求来的数据 + {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, + {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, + {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}, + {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}, + {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}, +] +value_iterator = create_value_iterator(fake_list) + + +def flatten(lst): + result = [] + for i in lst: + if isinstance(i, list): + result.extend(flatten(i)) # 递归调用以处理嵌套列表 + else: + result.append(i) + return result + + +def getGasGata_fake(): + # 模拟 读取四合一数据(四合一数据是假的) + try: + time.sleep(1) + return next(value_iterator) + except StopIteration: + return None + + +def getGasData(): + # 从后台读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'} + harmful_device_code = '862635063168165A' + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + return response.get('data') + +def getNewGasData(): + global last_time + data = getGasGata_fake() + while True: + if data == None: return None + if last_time == "" or datetime.strptime(data["uptime"], '%Y-%m-%d %H:%M:%S') > datetime.strptime(last_time , '%Y-%m-%d %H:%M:%S'): # 有最新数据产生了! + last_time = data["uptime"] # 更新最新数据时间 + break + else: # 获取的还是上次的重复数据,继续访问 + data = getGasGata_fake() + time.sleep(2) + return data + + +def isPowerOn(): + ''' + 模拟询问开机, + :param flag:True:认定为开机,False,关机 + :return: + ''' + global last_time + print("检测四合一是否开机") + + while True: + data = getNewGasData() + if data == None: + time.sleep(2) + else: + flag = datetime.now() < datetime.strptime(data.get('uptime'),"%Y-%m-%d %H:%M:%S") # T/F 开机/未开机 + if flag == True: + last_time = data["uptime"] + return + else: + print("重新访问") + time.sleep(2) + +def getGasFlag(): + # 判断气体是否合规,T:合规。F:不合规 + try: + data = getNewGasData() + if float(data["ch4"]) > 10.0 \ + or float(data["co"]) > 10.0 \ + or float(data["h2s"]) > 120.0 \ + or float(data["o2"]) < 15: + return False + else:return True + except Exception as e: + print(str(e)) + +def writeFile(file_path, data): + print(f"写入{data}") + with open(file_path, mode='a', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + writer.writerows(data) + + +class TestLimitSpaceSceneHandler(BaseSceneHandler): + + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): + super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) + print("__init__") + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # alarmCategory:优先级 0代表优先级最高 + # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, + # device_thread_id=thread_id) + + + self.laobao_complete = asyncio.Event() + self.umd_complete = asyncio.Event() + # self.umd_complete.set() + self.laobao_model = YOLO("weights/labor-v8-20241114.pt") + self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} + self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} + self.alarm_list = [] + self.laobao_pool = {} + self.umd_pool = {} + + def addLog(self, pool, time, text): + if time not in pool.keys(): + pool[time] = [text] + else: + pool[time].append(text) + pool[time] = list(set(pool[time])) # 不添加重复的日志 + + async def laobaoCheck_task(self, executor, cancel_event=None): + # 报警只在 上中下气体进行完了之后进行,就算上中下一直没结束 也不报警 + + def getWrongClass(): + wrong_class_list = [] + for k, v in self.target_flag.items(): + if v == False: + wrong_class_list.append(k) + return wrong_class_list + + def getDetectedTarget(): + # 获取已检测目标的名称,返回str列表 + result = [] + for name,flag in self.target_flag.items(): + if flag == True:result.append(name) + return result + + def getUndetectedTarget(): + # 获取未检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == False: result.append(name) + return result + + def name2id(input): + # 检测名称 映射为 id + if isinstance(input, str): + return self.target[input] + elif isinstance(input, list): + result = [] + for item in input: + result.append(self.target[item]) + return list(set(np.concatenate([r for r in result]).astype(int).tolist())) + + def model_predict_test(video_path): + self.addLog(self.laobao_pool,datetime.now().strftime("%H:%M:%S"),"开始劳保检测") + cap = cv2.VideoCapture(video_path) + ret, frames = cap.read() + + # output_file = "output.avi" # 输出视频文件名 + # fps = 20.0 # 每秒帧数 + # frame_size = (1280, 720) # 帧大小,根据实际情况调整 + # fourcc = cv2.VideoWriter_fourcc(*'XVID') # 使用XVID编码器 + # out = cv2.VideoWriter(output_file, fourcc, fps, frame_size) + while True: + # for frames in self.stream_loader: # type : list (4),连续的4帧 + try: + ret, frames = cap.read() + frames = [frames] + self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") + if not ret: + self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") + cap.release() + cv2.destroyAllWindows() + break + # print(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx{len(frames)}") + cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) + cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame", frames[0]) + # out.write(frames[0]) + # out.write(frames[1]) + # out.write(frames[2]) + # out.write(frames[3]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + if cancel_event.is_set(): # 超时退出 + return + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + try: + target_idx = name2id(getUndetectedTarget()) + results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, + save=False, verbose=False) # results:list(4) 4帧的检测结果 + pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) # 检测到的目标类别id_list,已去重 + for k, v in self.target.items(): # k:类别名称,v:类别id, 更新target_flag状态,如果之前存在c物体的报警,现在检测到c物体,移除它的报警 + for pred_c in pred_c_list: + if pred_c in v: + self.target_flag[k] = True + self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),f"检测到{k}") + print(f"检测到{k}") + if f"报警:劳保物品缺失:{k}" in self.alarm_list:self.alarm_list.remove(f"报警:劳保物品缺失:{k}") + break + if self.umd_complete.is_set(): # 上中下气体检测完毕 + # 检验所有物体都检验到了吗 + if getUndetectedTarget()==[]: # 如果全部检验到了 + # print("劳保物品 通过") + self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") + self.alarm_list.append("劳保物品 通过") + self.laobao_complete.set() + cap.release() + cv2.destroyAllWindows() + return # 退出检测 + else: # 如果还有未检测到的 + undetectedTargets = getUndetectedTarget() + # print(f"报警:劳保物品缺失:{wrong_class_list}") + + for undetectedTarget in undetectedTargets: + alarm_content = f"报警:劳保物品缺失:{undetectedTarget}" + if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 + self.alarm_list.append(alarm_content) + self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"报警:劳保物品缺失:{undetectedTarget}") + # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 + # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 + except Exception as e: + print(str(e)) + def model_predict(): + for frames in self.stream_loader: # type : list (4),连续的4帧 + if cancel_event.is_set(): # 超时退出 + return + # print(f"laobaoCheck_task:检测中,已检测到{self.complete_target}") + target_idx = [] + for k,v in self.target.items(): + if k not in self.complete_target: + target_idx.append(v) + results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, + save=False, verbose=False) # results:list(4) 4帧的检测结果 + pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) + for k, v in self.target.items(): + for pred_c in pred_c_list: + if pred_c in v: + self.target_flag[k] = True + self.complete_target.append(k) # 检测到 类别c 以后就不需要检测c + self.alarm_list.remove(f"报警:劳保物品缺失:{k}") + + break + if self.umd_complete.is_set(): # 上中下气体检测完毕 + # 检验所有物体都检验到了吗 + if all(self.target_flag.values()): # 如果全部检验到了 + # print("劳保物品 通过") + self.alarm_list.append("劳保物品 通过") + self.laobao_complete.set() + return # 退出检测 + else: + wrong_class_list = getWrongClass() + # print(f"报警:劳保物品缺失:{wrong_class_list}") + + for wrong_class in wrong_class_list: + alarm_content = f"报警:劳保物品缺失:{wrong_class}" + if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 + self.alarm_list.append(alarm_content) + # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 + # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 + + + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, model_predict_test, r"/home/pc/桌面/2025-01-21 14-42-00.mkv") + # await loop.run_in_executor(executor, model_predict) + print("劳保用品退出检测") + + async def uMDGasCheck_task(self,cancel_event=None): + + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + tflag_pool = [] # 返回数据正常了几次 + self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?") + await loop.run_in_executor(executor, isPowerOn) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 + self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!") + + for i in range(1000000): # 模拟循环检测气体 + if cancel_event.is_set(): # 超时退出 + return + print(f"uMDGasCheck_task:{i}") + + flag = await loop.run_in_executor(executor, getGasFlag) # 判断气体是否合规 + if flag == False: + tflag_pool.clear() + self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体" + self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常") + else: + tflag_pool.append(True) + if len(tflag_pool) == 3: + break # 退出检测 + print("上中下气体检测通过") + self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过") + self.umd_complete.set() + return + async def alarm_task(self, cancel_event): + def fun(): + for i in range(10000): # 检验中 + if self.laobao_complete.is_set(): + if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过") + return + # print(f"alarm_task:{len(self.alarm_list)}") + if cancel_event.is_set(): # 超时退出 + return + if len(self.alarm_list) != 0: + print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}") + time.sleep(1) + + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, fun) + + async def logger_task(self): + def fun(): + for i in range(10000000): + now = datetime.now().strftime("%H:%M:%S") + if now in self.laobao_pool.keys():laobaoevent = self.laobao_pool[now] + else: laobaoevent = "-" + if now in self.umd_pool.keys(): + umdevent = self.umd_pool[now] + else: + umdevent = "-" + writeFile("events_table.csv", [[now, laobaoevent, umdevent]]) + + time.sleep(1) + + headers = ["时间", "劳保用品检测", "上中下气体检测"] + writeFile("events_table.csv", [[headers[0],headers[1],headers[2]]]) + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, fun) + def run(self): + print("runrunrunrunrunrunrunrunrunrunrunrunrunrunrunv") + async def fun(): + # d = { + # 'alarmCategory': 0, + # 'alarmType': '1', + # 'category_order': 1, # alarmCategory 下的优先级 + # 'alarm_name': 'no_fire_extinguisher', + # 'alarmContent': '未检测到灭火器', + # 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + # 'label': '' + # } + # self.alarm_message_center.add_message(d) + # self.alarm_message_center.process_messages() + + # 并行执行任务 + cancel_event = asyncio.Event() + uMDGasCheck_task = asyncio.create_task( + self.uMDGasCheck_task(cancel_event)) + laobaoCheck_executor = ThreadPoolExecutor(max_workers=3) + laobaoCheck_task = asyncio.create_task( + self.laobaoCheck_task(laobaoCheck_executor, cancel_event)) + alarm_task = asyncio.create_task(self.alarm_task(cancel_event)) + logger_task = asyncio.create_task(self.logger_task()) + done, pending = await asyncio.wait({logger_task, uMDGasCheck_task, laobaoCheck_task, alarm_task}, timeout=300000.0) + if uMDGasCheck_task in done and laobaoCheck_task in done: + await uMDGasCheck_task + await laobaoCheck_task + # alarm_task.cancel() + print("前置条件检查完成,退出") + + else: + # 如果超时,则取消未完成的任务 + cancel_event.set() + laobaoCheck_executor.shutdown(wait=False) + laobaoCheck_task.cancel() + uMDGasCheck_task.cancel() + print("前置条件检查时间过长,退出") + + # 并行执行任务 + # print("开始工作") + # yinhuanCheck_task = asyncio.create_task(yinhuanCheck_task_fake(10000)) + # gasCheck = asyncio.create_task(gasCheck_task_fake(10000)) + # xinlvCheck = asyncio.create_task(xinlvCheck_task_fake(10000)) + # results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck) + + asyncio.run(fun()) + +if __name__=='__main__': + # print(getNewGasData()) + # print(getNewGasData()) + + getGasFlag() \ No newline at end of file