diff --git a/.idea/misc.xml b/.idea/misc.xml
index b68d82f..ef7c64e 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
-
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index b68d82f..ef7c64e 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
-
+
+
+
+
\ No newline at end of file
diff --git a/.idea/safe-algo-pro.iml b/.idea/safe-algo-pro.iml
index 33f5c97..54fd3ad 100644
--- a/.idea/safe-algo-pro.iml
+++ b/.idea/safe-algo-pro.iml
@@ -2,7 +2,7 @@
-
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index b68d82f..ef7c64e 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
-
+
+
+
+
\ No newline at end of file
diff --git a/.idea/safe-algo-pro.iml b/.idea/safe-algo-pro.iml
index 33f5c97..54fd3ad 100644
--- a/.idea/safe-algo-pro.iml
+++ b/.idea/safe-algo-pro.iml
@@ -2,7 +2,7 @@
-
+
diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db
index 668520b..08505c8 100644
--- a/db/safe-algo-pro.db
+++ b/db/safe-algo-pro.db
Binary files differ
diff --git a/.idea/misc.xml b/.idea/misc.xml
index b68d82f..ef7c64e 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
-
+
+
+
+
\ No newline at end of file
diff --git a/.idea/safe-algo-pro.iml b/.idea/safe-algo-pro.iml
index 33f5c97..54fd3ad 100644
--- a/.idea/safe-algo-pro.iml
+++ b/.idea/safe-algo-pro.iml
@@ -2,7 +2,7 @@
-
+
diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db
index 668520b..08505c8 100644
--- a/db/safe-algo-pro.db
+++ b/db/safe-algo-pro.db
Binary files differ
diff --git a/scene_handler/zyn_limit_space_scene_handler.py b/scene_handler/zyn_limit_space_scene_handler.py
new file mode 100644
index 0000000..7d24fa6
--- /dev/null
+++ b/scene_handler/zyn_limit_space_scene_handler.py
@@ -0,0 +1,430 @@
+import asyncio
+import base64
+import traceback
+from asyncio import Event
+from concurrent.futures import ThreadPoolExecutor
+from copy import deepcopy, copy
+from datetime import datetime
+import time
+import cv2
+from datetime import datetime
+import csv
+
+from algo.model_manager import AlgoModelExec
+from algo.stream_loader import OpenCVStreamLoad
+from common.device_status_manager import DeviceStatusManager
+from common.global_logger import logger
+from common.global_thread_pool import GlobalThreadPool
+from common.http_utils import send_request, get_request
+from common.image_plotting import Annotator
+from entity.device import Device
+import numpy as np
+from ultralytics import YOLO
+
+from scene_handler.alarm_message_center import AlarmMessageCenter
+from scene_handler.base_scene_handler import BaseSceneHandler
+from services.global_config import GlobalConfig
+from tcp.tcp_manager import TcpManager
+
+last_time = ""
+def create_value_iterator(values):
+ for value in values:
+ yield value
+fake_list = [ # 假如这是你从四合一后台请求来的数据
+ {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
+ 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'},
+ {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
+ 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'},
+ {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
+ 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'},
+ {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
+ 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'},
+ {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913',
+ 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'},
+]
+value_iterator = create_value_iterator(fake_list)
+
+
+def flatten(lst):
+ result = []
+ for i in lst:
+ if isinstance(i, list):
+ result.extend(flatten(i)) # 递归调用以处理嵌套列表
+ else:
+ result.append(i)
+ return result
+
+
+def getGasGata_fake():
+ # 模拟 读取四合一数据(四合一数据是假的)
+ try:
+ time.sleep(1)
+ return next(value_iterator)
+ except StopIteration:
+ return None
+
+
+def getGasData():
+ # 从后台读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'}
+ harmful_device_code = '862635063168165A'
+ url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'
+ response = get_request(url)
+ if response and response.get('data'):
+ return response.get('data')
+
+def getNewGasData():
+ global last_time
+ data = getGasGata_fake()
+ while True:
+ if data == None: return None
+ if last_time == "" or datetime.strptime(data["uptime"], '%Y-%m-%d %H:%M:%S') > datetime.strptime(last_time , '%Y-%m-%d %H:%M:%S'): # 有最新数据产生了!
+ last_time = data["uptime"] # 更新最新数据时间
+ break
+ else: # 获取的还是上次的重复数据,继续访问
+ data = getGasGata_fake()
+ time.sleep(2)
+ return data
+
+
+def isPowerOn():
+ '''
+ 模拟询问开机,
+ :param flag:True:认定为开机,False,关机
+ :return:
+ '''
+ global last_time
+ print("检测四合一是否开机")
+
+ while True:
+ data = getNewGasData()
+ if data == None:
+ time.sleep(2)
+ else:
+ flag = datetime.now() < datetime.strptime(data.get('uptime'),"%Y-%m-%d %H:%M:%S") # T/F 开机/未开机
+ if flag == True:
+ last_time = data["uptime"]
+ return
+ else:
+ print("重新访问")
+ time.sleep(2)
+
+def getGasFlag():
+ # 判断气体是否合规,T:合规。F:不合规
+ try:
+ data = getNewGasData()
+ if float(data["ch4"]) > 10.0 \
+ or float(data["co"]) > 10.0 \
+ or float(data["h2s"]) > 120.0 \
+ or float(data["o2"]) < 15:
+ return False
+ else:return True
+ except Exception as e:
+ print(str(e))
+
+def writeFile(file_path, data):
+ print(f"写入{data}")
+ with open(file_path, mode='a', newline='', encoding='utf-8') as file:
+ writer = csv.writer(file)
+ writer.writerows(data)
+
+
+class TestLimitSpaceSceneHandler(BaseSceneHandler):
+
+ def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points):
+ super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)
+ print("__init__")
+ self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager,
+ category_priority={2: 0, 1: 1, 3: 2,
+ 0: 3}) # alarmCategory:优先级 0代表优先级最高
+ # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
+ # device_thread_id=thread_id)
+
+
+ self.laobao_complete = asyncio.Event()
+ self.umd_complete = asyncio.Event()
+ # self.umd_complete.set()
+ self.laobao_model = YOLO("weights/labor-v8-20241114.pt")
+ self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]}
+ self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False}
+ self.alarm_list = []
+ self.laobao_pool = {}
+ self.umd_pool = {}
+
+ def addLog(self, pool, time, text):
+ if time not in pool.keys():
+ pool[time] = [text]
+ else:
+ pool[time].append(text)
+ pool[time] = list(set(pool[time])) # 不添加重复的日志
+
+ async def laobaoCheck_task(self, executor, cancel_event=None):
+ # 报警只在 上中下气体进行完了之后进行,就算上中下一直没结束 也不报警
+
+ def getWrongClass():
+ wrong_class_list = []
+ for k, v in self.target_flag.items():
+ if v == False:
+ wrong_class_list.append(k)
+ return wrong_class_list
+
+ def getDetectedTarget():
+ # 获取已检测目标的名称,返回str列表
+ result = []
+ for name,flag in self.target_flag.items():
+ if flag == True:result.append(name)
+ return result
+
+ def getUndetectedTarget():
+ # 获取未检测目标的名称,返回str列表
+ result = []
+ for name, flag in self.target_flag.items():
+ if flag == False: result.append(name)
+ return result
+
+ def name2id(input):
+ # 检测名称 映射为 id
+ if isinstance(input, str):
+ return self.target[input]
+ elif isinstance(input, list):
+ result = []
+ for item in input:
+ result.append(self.target[item])
+ return list(set(np.concatenate([r for r in result]).astype(int).tolist()))
+
+ def model_predict_test(video_path):
+ self.addLog(self.laobao_pool,datetime.now().strftime("%H:%M:%S"),"开始劳保检测")
+ cap = cv2.VideoCapture(video_path)
+ ret, frames = cap.read()
+
+ # output_file = "output.avi" # 输出视频文件名
+ # fps = 20.0 # 每秒帧数
+ # frame_size = (1280, 720) # 帧大小,根据实际情况调整
+ # fourcc = cv2.VideoWriter_fourcc(*'XVID') # 使用XVID编码器
+ # out = cv2.VideoWriter(output_file, fourcc, fps, frame_size)
+ while True:
+ # for frames in self.stream_loader: # type : list (4),连续的4帧
+ try:
+ ret, frames = cap.read()
+ frames = [frames]
+ self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧")
+ if not ret:
+ self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出")
+ cap.release()
+ cv2.destroyAllWindows()
+ break
+ # print(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx{len(frames)}")
+ cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE)
+ cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素
+ cv2.imshow("Video Frame", frames[0])
+ # out.write(frames[0])
+ # out.write(frames[1])
+ # out.write(frames[2])
+ # out.write(frames[3])
+ except Exception as ex:
+ traceback.print_exc()
+ logger.error(ex)
+ cv2.destroyAllWindows()
+ # 等待1毫秒,检查是否按下了'q'键退出
+ if cv2.waitKey(1) & 0xFF == ord('q'):
+ break
+
+ if cancel_event.is_set(): # 超时退出
+ return
+
+ # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出)
+ try:
+ target_idx = name2id(getUndetectedTarget())
+ results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6,
+ save=False, verbose=False) # results:list(4) 4帧的检测结果
+ pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) # 检测到的目标类别id_list,已去重
+ for k, v in self.target.items(): # k:类别名称,v:类别id, 更新target_flag状态,如果之前存在c物体的报警,现在检测到c物体,移除它的报警
+ for pred_c in pred_c_list:
+ if pred_c in v:
+ self.target_flag[k] = True
+ self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),f"检测到{k}")
+ print(f"检测到{k}")
+ if f"报警:劳保物品缺失:{k}" in self.alarm_list:self.alarm_list.remove(f"报警:劳保物品缺失:{k}")
+ break
+ if self.umd_complete.is_set(): # 上中下气体检测完毕
+ # 检验所有物体都检验到了吗
+ if getUndetectedTarget()==[]: # 如果全部检验到了
+ # print("劳保物品 通过")
+ self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过")
+ self.alarm_list.append("劳保物品 通过")
+ self.laobao_complete.set()
+ cap.release()
+ cv2.destroyAllWindows()
+ return # 退出检测
+ else: # 如果还有未检测到的
+ undetectedTargets = getUndetectedTarget()
+ # print(f"报警:劳保物品缺失:{wrong_class_list}")
+
+ for undetectedTarget in undetectedTargets:
+ alarm_content = f"报警:劳保物品缺失:{undetectedTarget}"
+ if alarm_content not in self.alarm_list: # 不加入 重复的报警内容
+ self.alarm_list.append(alarm_content)
+ self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"报警:劳保物品缺失:{undetectedTarget}")
+ # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警
+ # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警
+ except Exception as e:
+ print(str(e))
+ def model_predict():
+ for frames in self.stream_loader: # type : list (4),连续的4帧
+ if cancel_event.is_set(): # 超时退出
+ return
+ # print(f"laobaoCheck_task:检测中,已检测到{self.complete_target}")
+ target_idx = []
+ for k,v in self.target.items():
+ if k not in self.complete_target:
+ target_idx.append(v)
+ results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6,
+ save=False, verbose=False) # results:list(4) 4帧的检测结果
+ pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist()))
+ for k, v in self.target.items():
+ for pred_c in pred_c_list:
+ if pred_c in v:
+ self.target_flag[k] = True
+ self.complete_target.append(k) # 检测到 类别c 以后就不需要检测c
+ self.alarm_list.remove(f"报警:劳保物品缺失:{k}")
+
+ break
+ if self.umd_complete.is_set(): # 上中下气体检测完毕
+ # 检验所有物体都检验到了吗
+ if all(self.target_flag.values()): # 如果全部检验到了
+ # print("劳保物品 通过")
+ self.alarm_list.append("劳保物品 通过")
+ self.laobao_complete.set()
+ return # 退出检测
+ else:
+ wrong_class_list = getWrongClass()
+ # print(f"报警:劳保物品缺失:{wrong_class_list}")
+
+ for wrong_class in wrong_class_list:
+ alarm_content = f"报警:劳保物品缺失:{wrong_class}"
+ if alarm_content not in self.alarm_list: # 不加入 重复的报警内容
+ self.alarm_list.append(alarm_content)
+ # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警
+ # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警
+
+
+ loop = asyncio.get_running_loop()
+ await loop.run_in_executor(executor, model_predict_test, r"/home/pc/桌面/2025-01-21 14-42-00.mkv")
+ # await loop.run_in_executor(executor, model_predict)
+ print("劳保用品退出检测")
+
+ async def uMDGasCheck_task(self,cancel_event=None):
+
+ executor = ThreadPoolExecutor(max_workers=3)
+ loop = asyncio.get_running_loop()
+ tflag_pool = [] # 返回数据正常了几次
+ self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?")
+ await loop.run_in_executor(executor, isPowerOn) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行
+ self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!")
+
+ for i in range(1000000): # 模拟循环检测气体
+ if cancel_event.is_set(): # 超时退出
+ return
+ print(f"uMDGasCheck_task:{i}")
+
+ flag = await loop.run_in_executor(executor, getGasFlag) # 判断气体是否合规
+ if flag == False:
+ tflag_pool.clear()
+ self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体"
+ self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常")
+ else:
+ tflag_pool.append(True)
+ if len(tflag_pool) == 3:
+ break # 退出检测
+ print("上中下气体检测通过")
+ self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过")
+ self.umd_complete.set()
+ return
+ async def alarm_task(self, cancel_event):
+ def fun():
+ for i in range(10000): # 检验中
+ if self.laobao_complete.is_set():
+ if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过")
+ return
+ # print(f"alarm_task:{len(self.alarm_list)}")
+ if cancel_event.is_set(): # 超时退出
+ return
+ if len(self.alarm_list) != 0:
+ print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}")
+ time.sleep(1)
+
+ executor = ThreadPoolExecutor(max_workers=3)
+ loop = asyncio.get_running_loop()
+ await loop.run_in_executor(executor, fun)
+
+ async def logger_task(self):
+ def fun():
+ for i in range(10000000):
+ now = datetime.now().strftime("%H:%M:%S")
+ if now in self.laobao_pool.keys():laobaoevent = self.laobao_pool[now]
+ else: laobaoevent = "-"
+ if now in self.umd_pool.keys():
+ umdevent = self.umd_pool[now]
+ else:
+ umdevent = "-"
+ writeFile("events_table.csv", [[now, laobaoevent, umdevent]])
+
+ time.sleep(1)
+
+ headers = ["时间", "劳保用品检测", "上中下气体检测"]
+ writeFile("events_table.csv", [[headers[0],headers[1],headers[2]]])
+ executor = ThreadPoolExecutor(max_workers=3)
+ loop = asyncio.get_running_loop()
+ await loop.run_in_executor(executor, fun)
+ def run(self):
+ print("runrunrunrunrunrunrunrunrunrunrunrunrunrunrunv")
+ async def fun():
+ # d = {
+ # 'alarmCategory': 0,
+ # 'alarmType': '1',
+ # 'category_order': 1, # alarmCategory 下的优先级
+ # 'alarm_name': 'no_fire_extinguisher',
+ # 'alarmContent': '未检测到灭火器',
+ # 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
+ # 'label': ''
+ # }
+ # self.alarm_message_center.add_message(d)
+ # self.alarm_message_center.process_messages()
+
+ # 并行执行任务
+ cancel_event = asyncio.Event()
+ uMDGasCheck_task = asyncio.create_task(
+ self.uMDGasCheck_task(cancel_event))
+ laobaoCheck_executor = ThreadPoolExecutor(max_workers=3)
+ laobaoCheck_task = asyncio.create_task(
+ self.laobaoCheck_task(laobaoCheck_executor, cancel_event))
+ alarm_task = asyncio.create_task(self.alarm_task(cancel_event))
+ logger_task = asyncio.create_task(self.logger_task())
+ done, pending = await asyncio.wait({logger_task, uMDGasCheck_task, laobaoCheck_task, alarm_task}, timeout=300000.0)
+ if uMDGasCheck_task in done and laobaoCheck_task in done:
+ await uMDGasCheck_task
+ await laobaoCheck_task
+ # alarm_task.cancel()
+ print("前置条件检查完成,退出")
+
+ else:
+ # 如果超时,则取消未完成的任务
+ cancel_event.set()
+ laobaoCheck_executor.shutdown(wait=False)
+ laobaoCheck_task.cancel()
+ uMDGasCheck_task.cancel()
+ print("前置条件检查时间过长,退出")
+
+ # 并行执行任务
+ # print("开始工作")
+ # yinhuanCheck_task = asyncio.create_task(yinhuanCheck_task_fake(10000))
+ # gasCheck = asyncio.create_task(gasCheck_task_fake(10000))
+ # xinlvCheck = asyncio.create_task(xinlvCheck_task_fake(10000))
+ # results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck)
+
+ asyncio.run(fun())
+
+if __name__=='__main__':
+ # print(getNewGasData())
+ # print(getNewGasData())
+
+ getGasFlag()
\ No newline at end of file