diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..aeb844f --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,14 @@ +项目启动: +python main.py +如果无法启动,可能端口已经被占用。请更换端口再试试 + + +如果无法连接到,192.168.10.51:333,说明无法连接到 甲烷/语音报警器 +请连接wifi:AQSTYTJ_XXXX +密码:abcd1234 + +算法权重文件 放在 weights文件夹下 + +一些未解决的问题: +[zyn_limit_space_scene_handler.py](scene_handler/zyn_limit_space_scene_handler.py) +YinHuanCheck的main_fake和LaoBaoCheck的model_predict_fake,如果我在LaoBaoCheck实时预览cv2的窗口,就不能再YinHuanCheck实时预览窗口,暂时不知道为什么。 diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..aeb844f --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,14 @@ +项目启动: +python main.py +如果无法启动,可能端口已经被占用。请更换端口再试试 + + +如果无法连接到,192.168.10.51:333,说明无法连接到 甲烷/语音报警器 +请连接wifi:AQSTYTJ_XXXX +密码:abcd1234 + +算法权重文件 放在 weights文件夹下 + +一些未解决的问题: +[zyn_limit_space_scene_handler.py](scene_handler/zyn_limit_space_scene_handler.py) +YinHuanCheck的main_fake和LaoBaoCheck的model_predict_fake,如果我在LaoBaoCheck实时预览cv2的窗口,就不能再YinHuanCheck实时预览窗口,暂时不知道为什么。 diff --git a/common/http_utils.py b/common/http_utils.py index 405d951..ccfa59b 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -29,9 +29,9 @@ logger.error(f"Failed to push data: {e}") -def get_request(url, headers=None): +def get_request(url, headers=None, timeout=10): try: - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, timeout=timeout) return response.json() except requests.RequestException as e: logger.error(f"Failed to get data: {e}") diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..aeb844f --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,14 @@ +项目启动: +python main.py +如果无法启动,可能端口已经被占用。请更换端口再试试 + + +如果无法连接到,192.168.10.51:333,说明无法连接到 甲烷/语音报警器 +请连接wifi:AQSTYTJ_XXXX +密码:abcd1234 + +算法权重文件 放在 weights文件夹下 + +一些未解决的问题: +[zyn_limit_space_scene_handler.py](scene_handler/zyn_limit_space_scene_handler.py) +YinHuanCheck的main_fake和LaoBaoCheck的model_predict_fake,如果我在LaoBaoCheck实时预览cv2的窗口,就不能再YinHuanCheck实时预览窗口,暂时不知道为什么。 diff --git a/common/http_utils.py b/common/http_utils.py index 405d951..ccfa59b 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -29,9 +29,9 @@ logger.error(f"Failed to push data: {e}") -def get_request(url, headers=None): +def get_request(url, headers=None, timeout=10): try: - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, timeout=timeout) return response.json() except requests.RequestException as e: logger.error(f"Failed to get data: {e}") diff --git a/main.py b/main.py index 8da2462..2646193 100644 --- a/main.py +++ b/main.py @@ -45,4 +45,4 @@ uvicorn_logger.setLevel(logging.INFO) - uvicorn.run(app, host="0.0.0.0", port=9000, log_config=None) + uvicorn.run(app, host="0.0.0.0", port=9299, log_config=None) diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..aeb844f --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,14 @@ +项目启动: +python main.py +如果无法启动,可能端口已经被占用。请更换端口再试试 + + +如果无法连接到,192.168.10.51:333,说明无法连接到 甲烷/语音报警器 +请连接wifi:AQSTYTJ_XXXX +密码:abcd1234 + +算法权重文件 放在 weights文件夹下 + +一些未解决的问题: +[zyn_limit_space_scene_handler.py](scene_handler/zyn_limit_space_scene_handler.py) +YinHuanCheck的main_fake和LaoBaoCheck的model_predict_fake,如果我在LaoBaoCheck实时预览cv2的窗口,就不能再YinHuanCheck实时预览窗口,暂时不知道为什么。 diff --git a/common/http_utils.py b/common/http_utils.py index 405d951..ccfa59b 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -29,9 +29,9 @@ logger.error(f"Failed to push data: {e}") -def get_request(url, headers=None): +def get_request(url, headers=None, timeout=10): try: - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, timeout=timeout) return response.json() except requests.RequestException as e: logger.error(f"Failed to get data: {e}") diff --git a/main.py b/main.py index 8da2462..2646193 100644 --- a/main.py +++ b/main.py @@ -45,4 +45,4 @@ uvicorn_logger.setLevel(logging.INFO) - uvicorn.run(app, host="0.0.0.0", port=9000, log_config=None) + uvicorn.run(app, host="0.0.0.0", port=9299, log_config=None) diff --git a/scene_handler/zyn_limit_space_scene_handler.py b/scene_handler/zyn_limit_space_scene_handler.py index 7d24fa6..1d243fa 100644 --- a/scene_handler/zyn_limit_space_scene_handler.py +++ b/scene_handler/zyn_limit_space_scene_handler.py @@ -31,16 +31,16 @@ for value in values: yield value fake_list = [ # 假如这是你从四合一后台请求来的数据 - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, ] value_iterator = create_value_iterator(fake_list) @@ -65,13 +65,16 @@ def getGasData(): - # 从后台读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'} + # 从后台数据库读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'} harmful_device_code = '862635063168165A' url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' response = get_request(url) if response and response.get('data'): return response.get('data') + + + def getNewGasData(): global last_time data = getGasGata_fake() @@ -86,69 +89,531 @@ return data -def isPowerOn(): - ''' - 模拟询问开机, - :param flag:True:认定为开机,False,关机 - :return: - ''' - global last_time - print("检测四合一是否开机") - while True: - data = getNewGasData() - if data == None: - time.sleep(2) - else: - flag = datetime.now() < datetime.strptime(data.get('uptime'),"%Y-%m-%d %H:%M:%S") # T/F 开机/未开机 - if flag == True: - last_time = data["uptime"] - return - else: - print("重新访问") - time.sleep(2) -def getGasFlag(): - # 判断气体是否合规,T:合规。F:不合规 - try: - data = getNewGasData() - if float(data["ch4"]) > 10.0 \ - or float(data["co"]) > 10.0 \ - or float(data["h2s"]) > 120.0 \ - or float(data["o2"]) < 15: - return False - else:return True - except Exception as e: - print(str(e)) + def writeFile(file_path, data): - print(f"写入{data}") + # print(f"写入{data}") with open(file_path, mode='a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerows(data) +class EventController(): + def __init__(self): + self.cancel_event = asyncio.Event() + self.umd_complete = asyncio.Event() + self.laobao_complete = asyncio.Event() -class TestLimitSpaceSceneHandler(BaseSceneHandler): +class SiHeYi(): + def __init__(self, harmful_device_code): + self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' # 后台访问数据的url + self.harmful_device_code = harmful_device_code # 四合一标识符 + self.last_ts = None # 上次读取的数据的生成时间戳 + + def waitPowerOn(self, script_start_time): + ''' + 阻塞函数 + 循环是否开机,只有检测到开机才会退出函数 + + :param script_start_time:脚本启动的时间戳 + + :return: + ''' + print("检测四合一是否开机") + + while True: + self.getNewData() + flag = script_start_time < self.last_ts # 当前时间T/F 开机/未开机 + if flag == True: + print("检测到开机") + return + else: + print("未开机") + time.sleep(2) + + def getNewData(self): + ''' + 阻塞函数 + 访问后台数据库 读取最新产生的四合一浓度 + 如果有返回数据则记录 该数据产生的时间。 + 如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间: + 则视为读取到新数据,返回新数据 + 没有数据等待n秒后重复询问 + :return: + ''' + while True: + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={self.harmful_device_code}' + print("访问四合一数据...") + # response = get_request(url) + response = getGasGata_fake() + if response and response.get('data'): + print("访问到四合一数据") + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0: + self.last_ts = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + return ch4, co, h2s, o2 + else: # url没有返回数据 + print("四合一没有读取到数据") + time.sleep(5) + + def isDataNormal(self, ch4, co, h2s, o2): + ''' + 判断四项气体是否正常 + :param ch4: + :param co: + :param h2s: + :param o2: + :return: + ''' + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + return False # 气体异常 + else: + return True # 气体正常 + +class AnQuanMao(): + def __init__(self, helmet_code): + self.helmet_code = helmet_code + self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={helmet_code}' # 后台访问数据的url + self.last_ts = None # 上次读取的数据的生成时间戳 + + def getNewData(self): + ''' + 阻塞进程 + :return: + ''' + while True: + header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' + print("访问心率血氧数据...") + response = get_request(url, headers=header) + if response and response.get('data'): + print("访问到心率血氧数据") + vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 + if vitalsigns_data: # 访问成功 + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), + "%Y-%m-%d %H:%M:%S") # 访问数据的时间 + if self.last_ts is None or ( + upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 + self.last_ts= upload_timestamp.timestamp() # 更新数据 + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 + return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') + else: + print("无法访问到心率血氧数据") + time.sleep(5) + + def isDataNormal(self, blood_oxygen, heartrate): + if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 + return False + else: return True + + +class Laobaocheck(): + def __init__(self, eventController=None, alarm=None): + self.laobao_model = YOLO("weights/labor-v8-20241114.pt") + self.jiaodi_model = YOLO("weights/jiaodi.pt") + self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} + self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} # OD 模型有无检测这些目标 + self.jiaodi_flag = False # 分类模型 有无检测到交底 + self.laobao_pool = {} + + self.eventController = eventController + self.alarm = alarm + + def getUndetectedTarget(self): + # 获取未检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == False: result.append(name) + return result + + def getDetectedTarget(self): + # 获取已检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == True: result.append(name) + return result + + def name2id(self, input): + # 检测名称 映射为 id + if isinstance(input, str): + return self.target[input] + elif isinstance(input, list): + result = [] + for item in input: + result.append(self.target[item]) + if len(result) == 0:return [] + return list(set(np.concatenate([r for r in result]).astype(int).tolist())) + + def id2name(self, input): + ''' + + :param input: int 或 [int,int,int...] + :return: + ''' + # id -> 类别名称 + result = [] + if isinstance(input, int): + input = [input] + for id in input: + for k,v in self.target.items(): # k: 类别名称 , v: id_list + if id in v:result.append(k) + + return list(set(result)) + + + + def predict_isJiaodi(self,frames): + ''' + 调用 jiaodi.pt 分类模型 + :return: True:交底,False:没检测到 交底 + ''' + + jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False) + jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results] + for prob in jiaodi_prob: + if prob > 0.6: + return True + return False + + def predict_laobao(self, frames): + ''' + 调用 labor-v8-20241114.pt OD 模型 + :param frames: + :return: [类别1,类别2] + ''' + target_idx = self.name2id(self.getUndetectedTarget()) + results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, + save=False, verbose=False) # results:list(4) 4帧的检测结果 + pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype( + int).tolist())) # 检测到的目标类别id_list,已去重 + return self.id2name(pred_c_list) + + def updateUnpredictedTargets(self, jiaodi_flag, pred_labels): + ''' + 更新 已检测到的目标 列表 和有无检测到交底 + :param pred_c_list: [str, str...] + :return: None + ''' + for pred_label in pred_labels: + # print(f"检测到{pred_label}") + self.target_flag[pred_label] = True + self.jiaodi_flag = jiaodi_flag + if self.jiaodi_flag == False and jiaodi_flag ==True: + self.alarm.addAlarm("-------检测到交底") + + def model_predict_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + while True: + try: + ret, frames = cap.read() + frames = [frames] + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") + if not ret: + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") + break + # cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) + # cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 + # cv2.imshow("Video Frame", frames[0]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cap.release() + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + if self.eventController != None and self.eventController.cancel_event.is_set(): # 超时退出 + cap.release() + cv2.destroyAllWindows() + return + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if self.getUndetectedTarget() == []: # 如果全部检验到了 + # print("劳保物品 通过") + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") + self.alarm.addAlarm("-------劳保物品 通过") + self.eventController.laobao_complete.set() + cap.release() + cv2.destroyAllWindows() + return # 退出检测 + else: # 如果还有未检测到的 + + undetectedTargets = self.getUndetectedTarget() + # print(f"报警:劳保物品缺失:{wrong_class_list}") + self.alarm.addAlarm(f"-------报警:劳保物品缺失:{undetectedTargets[0]}") + cap.release() + cv2.destroyAllWindows() + def model_predict(self, stream_loader): + for frames in stream_loader: # type : list (4),连续的4帧 + if self.eventController != None and self.eventController.cancel_event.is_set(): # 超时退出 + return + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if self.getUndetectedTarget() == []: # 如果全部检验到了 + # print("劳保物品 通过") + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") + self.alarm_list.append("劳保物品 通过") + self.eventController.laobao_complete.set() + return # 退出检测 + else: # 如果还有未检测到的 + + undetectedTargets = self.getUndetectedTarget() + # print(f"报警:劳保物品缺失:{wrong_class_list}") + + for undetectedTarget in undetectedTargets: + alarm_content = f"报警:劳保物品缺失:{undetectedTarget}" + if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 + self.alarm_list.append(alarm_content) + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), + # f"报警:劳保物品缺失:{undetectedTarget}") + # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 + # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 + +class YinHuanCheck(): + def __init__(self, eventController=None, alarm_list=None): + self.model = YOLO("weights/yinhuan.pt") + self.eventController = eventController + self.alarm_list = alarm_list + + + self.anquanmao_tolerate = [] + self.xiyan_tolerate = [] + self.xiubiao_tolerate = [] + self.dadianhua_tolerate = [] + self.xianzaren_tolerate = [] + + def id2name(self, input): + ''' + + :param input: int 或 [int,int,int...] + :return: + ''' + # id -> 类别名称 + result = [] + if isinstance(input, int): + input = [input] + for id in input: + for k,v in self.model.names.items(): # k: id , v: label名称 + if k == id:result.append(v) + + return list(set(result)) + + + def judge_anquanmao(self, person_targets): + ''' + 判定有没有 没带安全帽的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "安全帽" not in person_result: + return False # 没有通过安全帽检测 + return True # 通过安全帽检测 + + + def judge_xiyan(self, person_targets): + ''' + 判定有没有 吸烟的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "吸烟" in person_result: + return False # 没有通过吸烟检测 + return True # 通过吸烟检测 + + def judge_xiubiao(self, person_targets): + ''' + 判定有没有 吸烟的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "吸烟" in person_result: + return False # 没有通过吸烟检测 + return True # 通过吸烟检测 + def judge_dadianhua(self, person_targets): + ''' + 判定有没有 打电话的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "打电话" in person_result: + return False # 没有通过打电话检测 + return True # 通过打电话检测 + + def judge_xianzaren(self, person_targets): + ''' + 判定有没有 打电话的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "安全帽" not in person_result and "工服" not in person_result: # 既不穿安全帽也不穿安全帽 + return False # 没有通过闲杂人检测 + return True # 通过闲杂人检测 + def detect_person(self, frames): + ''' + + :param frames: list of pic or ndarray + :return: list of ndarray of person + ''' + people_results = self.model.predict(source=frames, conf=0.6,classes = [0], + save=False, verbose=False) # 检测人 + result = [] + for people_result in people_results: + orig_img = people_result.orig_img # 这一帧的原图矩阵 + person_boxes = people_result.boxes.xyxy.tolist() # 这一帧所有人的box的xyxy + for box in person_boxes: + cropped_image = orig_img[int(box[1]):int(box[3]), + int(box[0]):int(box[2])] # y1:y2, x1:x2 + + result.append(cropped_image) + return result + + def isAlarm(self): + def has_consecutive_true(bool_list, threshold=30): + count = 0 # 计数器,用于记录连续 True 的次数 + for value in bool_list: + if value: # 如果当前值为 True + count += 1 # 增加计数器 + if count > threshold: # 如果计数器超过阈值 + return True + else: # 如果当前值为 False + count = 0 # 重置计数器 + return False # 遍历完成后没有发现连续超过 threshold 次的 True + + if has_consecutive_true(self.anquanmao_tolerate) == True: + self.anquanmao_tolerate.clear() + # print("安全帽报警") + + + def detect_person_targets(self,person_crops): + ''' + + :param person_crops: list of ndarray of person + :return: [[label1,label2..], [label1...]] 每个人身上的目标 + ''' + if len(person_crops) == 0:return [] + results = self.model.predict(source=person_crops, conf=0.6, classes=[2, 3, 4, 5, 6], # 安全帽、工服、烟头、电话、袖标 + save=False, verbose=False) # 检测人 + person_detect_targets = [] + for result in results: # 每个人的检测结果 + labels = result.boxes.cls.tolist() # 这个人的检测标签,id形式 + labels = [int(num) for num in labels] # 转换为 int + labels = self.id2name(labels) # 转换为label_name + person_detect_targets.append(labels) + return person_detect_targets + + def main_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + ret, frames = cap.read() + while True: + # for frames in self.stream_loader: # type : list (4),连续的4帧 + try: + ret, frames = cap.read() + print(f"读取帧{ret}") + frames = [frames] + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") + if not ret: + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") + cap.release() + cv2.destroyAllWindows() + break + cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE) + cv2.resizeWindow('Video Frame2', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame2", frames[0]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + people_results = self.detect_person(frames) # 检测人 + person_detect_targets = self.detect_person_targets(people_results) + print(person_detect_targets) + self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets)) + self.isAlarm() + + def main(self, stream_loader): + for frames in stream_loader: # type : list (4),连续的4帧 + people_results = self.detect_person(frames) # 检测人 + person_detect_targets = self.detect_person_targets(people_results) + self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets)) + self.isAlarm() + +class Alarm(): + def __init__(self): + self.pool = [] + def addAlarm(self, content): + ''' + + :param content: + :return: + ''' + if content in self.pool: + self.pool.remove(content) + self.pool.append(content) + + def main(self): + for i in range(1000000): + if len(self.pool) != 0: + print(f"{self.pool.pop(0)},报警队列长度:{len(self.pool)}") + time.sleep(1) + +class ZynLimitSpaceSceneHandler(BaseSceneHandler): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) - print("__init__") + self.start_time = time.time() # 脚本启动时间戳 self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) # alarmCategory:优先级 0代表优先级最高 # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, # device_thread_id=thread_id) + self.health_ts_dict = {} + self.harmful_ts_dict = {} + # todo 要改成通过后台接口读取设备编号 + self.health_device_codes = ['HWIH061000056395'] # 安全帽编号 - self.laobao_complete = asyncio.Event() - self.umd_complete = asyncio.Event() - # self.umd_complete.set() - self.laobao_model = YOLO("weights/labor-v8-20241114.pt") - self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} - self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} - self.alarm_list = [] - self.laobao_pool = {} + self.eventController = EventController() + self.alarm = Alarm() + + self.laobao_check = Laobaocheck(self.eventController, self.alarm) + self.yinhuan_check = YinHuanCheck(self.eventController, self.alarm) self.umd_pool = {} + self.siHeyi = SiHeYi("862635063168165A") + self.anQuanMao = AnQuanMao("HWIH061000056395") def addLog(self, pool, time, text): if time not in pool.keys(): @@ -157,196 +622,51 @@ pool[time].append(text) pool[time] = list(set(pool[time])) # 不添加重复的日志 - async def laobaoCheck_task(self, executor, cancel_event=None): - # 报警只在 上中下气体进行完了之后进行,就算上中下一直没结束 也不报警 - - def getWrongClass(): - wrong_class_list = [] - for k, v in self.target_flag.items(): - if v == False: - wrong_class_list.append(k) - return wrong_class_list - - def getDetectedTarget(): - # 获取已检测目标的名称,返回str列表 - result = [] - for name,flag in self.target_flag.items(): - if flag == True:result.append(name) - return result - - def getUndetectedTarget(): - # 获取未检测目标的名称,返回str列表 - result = [] - for name, flag in self.target_flag.items(): - if flag == False: result.append(name) - return result - - def name2id(input): - # 检测名称 映射为 id - if isinstance(input, str): - return self.target[input] - elif isinstance(input, list): - result = [] - for item in input: - result.append(self.target[item]) - return list(set(np.concatenate([r for r in result]).astype(int).tolist())) - - def model_predict_test(video_path): - self.addLog(self.laobao_pool,datetime.now().strftime("%H:%M:%S"),"开始劳保检测") - cap = cv2.VideoCapture(video_path) - ret, frames = cap.read() - - # output_file = "output.avi" # 输出视频文件名 - # fps = 20.0 # 每秒帧数 - # frame_size = (1280, 720) # 帧大小,根据实际情况调整 - # fourcc = cv2.VideoWriter_fourcc(*'XVID') # 使用XVID编码器 - # out = cv2.VideoWriter(output_file, fourcc, fps, frame_size) - while True: - # for frames in self.stream_loader: # type : list (4),连续的4帧 - try: - ret, frames = cap.read() - frames = [frames] - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") - if not ret: - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") - cap.release() - cv2.destroyAllWindows() - break - # print(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx{len(frames)}") - cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) - cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 - cv2.imshow("Video Frame", frames[0]) - # out.write(frames[0]) - # out.write(frames[1]) - # out.write(frames[2]) - # out.write(frames[3]) - except Exception as ex: - traceback.print_exc() - logger.error(ex) - cv2.destroyAllWindows() - # 等待1毫秒,检查是否按下了'q'键退出 - if cv2.waitKey(1) & 0xFF == ord('q'): - break - - if cancel_event.is_set(): # 超时退出 - return - - # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) - try: - target_idx = name2id(getUndetectedTarget()) - results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, - save=False, verbose=False) # results:list(4) 4帧的检测结果 - pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) # 检测到的目标类别id_list,已去重 - for k, v in self.target.items(): # k:类别名称,v:类别id, 更新target_flag状态,如果之前存在c物体的报警,现在检测到c物体,移除它的报警 - for pred_c in pred_c_list: - if pred_c in v: - self.target_flag[k] = True - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),f"检测到{k}") - print(f"检测到{k}") - if f"报警:劳保物品缺失:{k}" in self.alarm_list:self.alarm_list.remove(f"报警:劳保物品缺失:{k}") - break - if self.umd_complete.is_set(): # 上中下气体检测完毕 - # 检验所有物体都检验到了吗 - if getUndetectedTarget()==[]: # 如果全部检验到了 - # print("劳保物品 通过") - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") - self.alarm_list.append("劳保物品 通过") - self.laobao_complete.set() - cap.release() - cv2.destroyAllWindows() - return # 退出检测 - else: # 如果还有未检测到的 - undetectedTargets = getUndetectedTarget() - # print(f"报警:劳保物品缺失:{wrong_class_list}") - - for undetectedTarget in undetectedTargets: - alarm_content = f"报警:劳保物品缺失:{undetectedTarget}" - if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 - self.alarm_list.append(alarm_content) - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"报警:劳保物品缺失:{undetectedTarget}") - # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 - # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 - except Exception as e: - print(str(e)) - def model_predict(): - for frames in self.stream_loader: # type : list (4),连续的4帧 - if cancel_event.is_set(): # 超时退出 - return - # print(f"laobaoCheck_task:检测中,已检测到{self.complete_target}") - target_idx = [] - for k,v in self.target.items(): - if k not in self.complete_target: - target_idx.append(v) - results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, - save=False, verbose=False) # results:list(4) 4帧的检测结果 - pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) - for k, v in self.target.items(): - for pred_c in pred_c_list: - if pred_c in v: - self.target_flag[k] = True - self.complete_target.append(k) # 检测到 类别c 以后就不需要检测c - self.alarm_list.remove(f"报警:劳保物品缺失:{k}") - - break - if self.umd_complete.is_set(): # 上中下气体检测完毕 - # 检验所有物体都检验到了吗 - if all(self.target_flag.values()): # 如果全部检验到了 - # print("劳保物品 通过") - self.alarm_list.append("劳保物品 通过") - self.laobao_complete.set() - return # 退出检测 - else: - wrong_class_list = getWrongClass() - # print(f"报警:劳保物品缺失:{wrong_class_list}") - - for wrong_class in wrong_class_list: - alarm_content = f"报警:劳保物品缺失:{wrong_class}" - if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 - self.alarm_list.append(alarm_content) - # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 - # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 - - + async def laobaoCheck_task(self): + executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() - await loop.run_in_executor(executor, model_predict_test, r"/home/pc/桌面/2025-01-21 14-42-00.mkv") - # await loop.run_in_executor(executor, model_predict) - print("劳保用品退出检测") + await loop.run_in_executor(executor, self.laobao_check.model_predict_fake, + r"/home/pc/Desktop/project/safe-algo-pro/2025-02-25 15-25-48.mkv") - async def uMDGasCheck_task(self,cancel_event=None): + async def uMDGasCheck_task(self,eventController=None): executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() tflag_pool = [] # 返回数据正常了几次 self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?") - await loop.run_in_executor(executor, isPowerOn) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 + await loop.run_in_executor(executor, self.siHeyi.waitPowerOn, self.start_time) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!") + self.alarm_list.append("-------报警:检测到四合一已开机") for i in range(1000000): # 模拟循环检测气体 - if cancel_event.is_set(): # 超时退出 + if eventController.cancel_event.is_set(): # 超时退出 return - print(f"uMDGasCheck_task:{i}") + # print(f"uMDGasCheck_task:{i}") - flag = await loop.run_in_executor(executor, getGasFlag) # 判断气体是否合规 + ch4, co, h2s, o2 = await loop.run_in_executor(executor, self.siHeyi.getNewData) # 判断气体是否合规 + flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2) if flag == False: tflag_pool.clear() self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体" self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常") + else: + print(f"上中下气体检测正常次数:{tflag_pool}") tflag_pool.append(True) if len(tflag_pool) == 3: break # 退出检测 - print("上中下气体检测通过") + self.alarm_list.append("-------报警:上中下气体检测通过") self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过") - self.umd_complete.set() + self.eventController.umd_complete.set() return - async def alarm_task(self, cancel_event): + async def alarm_task(self): def fun(): - for i in range(10000): # 检验中 - if self.laobao_complete.is_set(): + for i in range(1000000): # 检验中 + if self.eventController.laobao_complete.is_set(): if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过") return # print(f"alarm_task:{len(self.alarm_list)}") - if cancel_event.is_set(): # 超时退出 + if self.eventController.cancel_event.is_set(): # 超时退出 return if len(self.alarm_list) != 0: print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}") @@ -375,56 +695,74 @@ executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() await loop.run_in_executor(executor, fun) - def run(self): - print("runrunrunrunrunrunrunrunrunrunrunrunrunrunrunv") - async def fun(): - # d = { - # 'alarmCategory': 0, - # 'alarmType': '1', - # 'category_order': 1, # alarmCategory 下的优先级 - # 'alarm_name': 'no_fire_extinguisher', - # 'alarmContent': '未检测到灭火器', - # 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', - # 'label': '' - # } - # self.alarm_message_center.add_message(d) - # self.alarm_message_center.process_messages() + async def yinhuanCheck_task(self): + ''' + 检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头) + :return: + ''' + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, self.yinhuan_check.main_fake, r"/home/pc/Desktop/project/safe-algo-pro/2025-02-26 08-49-39.mkv") + + async def xinlvCheck_task(self): + + + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + blood_oxygen, heartrate = await loop.run_in_executor(executor, self.anQuanMao.getNewData) + flag = self.anQuanMao.isDataNormal(blood_oxygen, heartrate) + if flag == False: + self.alarm_list.append("-------报警:gasCheck:心率血氧数据异常") + async def gasCheck(self): + ''' + 四合一气体检测 + :return: + ''' + def fun(): + ch4, co, h2s, o2 = self.siHeyi.getNewData() + flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2) + if flag == False: + self.alarm_list.append("-------报警:gasCheck:四合一气体异常") + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, fun) + + def run(self): + async def fun(): # 并行执行任务 - cancel_event = asyncio.Event() uMDGasCheck_task = asyncio.create_task( - self.uMDGasCheck_task(cancel_event)) - laobaoCheck_executor = ThreadPoolExecutor(max_workers=3) - laobaoCheck_task = asyncio.create_task( - self.laobaoCheck_task(laobaoCheck_executor, cancel_event)) - alarm_task = asyncio.create_task(self.alarm_task(cancel_event)) + self.uMDGasCheck_task(self.eventController)) + laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task()) + alarm_task = asyncio.create_task(self.alarm_task()) logger_task = asyncio.create_task(self.logger_task()) - done, pending = await asyncio.wait({logger_task, uMDGasCheck_task, laobaoCheck_task, alarm_task}, timeout=300000.0) + + done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=300000.0) + if uMDGasCheck_task in done and laobaoCheck_task in done: await uMDGasCheck_task await laobaoCheck_task - # alarm_task.cancel() print("前置条件检查完成,退出") else: # 如果超时,则取消未完成的任务 - cancel_event.set() - laobaoCheck_executor.shutdown(wait=False) + self.eventController.cancel_event.set() laobaoCheck_task.cancel() uMDGasCheck_task.cancel() print("前置条件检查时间过长,退出") # 并行执行任务 - # print("开始工作") - # yinhuanCheck_task = asyncio.create_task(yinhuanCheck_task_fake(10000)) - # gasCheck = asyncio.create_task(gasCheck_task_fake(10000)) - # xinlvCheck = asyncio.create_task(xinlvCheck_task_fake(10000)) - # results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck) + print("开始工作") + xinlvCheck = asyncio.create_task(self.xinlvCheck_task()) + yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task()) + gasCheck = asyncio.create_task(self.gasCheck()) + results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck) + done1, pending1 = await asyncio.wait({alarm_task, logger_task}, timeout=300000.0) asyncio.run(fun()) if __name__=='__main__': # print(getNewGasData()) - # print(getNewGasData()) + model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt") + print(model.names) - getGasFlag() \ No newline at end of file diff --git a/ReadMe.md b/ReadMe.md new file mode 100644 index 0000000..aeb844f --- /dev/null +++ b/ReadMe.md @@ -0,0 +1,14 @@ +项目启动: +python main.py +如果无法启动,可能端口已经被占用。请更换端口再试试 + + +如果无法连接到,192.168.10.51:333,说明无法连接到 甲烷/语音报警器 +请连接wifi:AQSTYTJ_XXXX +密码:abcd1234 + +算法权重文件 放在 weights文件夹下 + +一些未解决的问题: +[zyn_limit_space_scene_handler.py](scene_handler/zyn_limit_space_scene_handler.py) +YinHuanCheck的main_fake和LaoBaoCheck的model_predict_fake,如果我在LaoBaoCheck实时预览cv2的窗口,就不能再YinHuanCheck实时预览窗口,暂时不知道为什么。 diff --git a/common/http_utils.py b/common/http_utils.py index 405d951..ccfa59b 100644 --- a/common/http_utils.py +++ b/common/http_utils.py @@ -29,9 +29,9 @@ logger.error(f"Failed to push data: {e}") -def get_request(url, headers=None): +def get_request(url, headers=None, timeout=10): try: - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, timeout=timeout) return response.json() except requests.RequestException as e: logger.error(f"Failed to get data: {e}") diff --git a/main.py b/main.py index 8da2462..2646193 100644 --- a/main.py +++ b/main.py @@ -45,4 +45,4 @@ uvicorn_logger.setLevel(logging.INFO) - uvicorn.run(app, host="0.0.0.0", port=9000, log_config=None) + uvicorn.run(app, host="0.0.0.0", port=9299, log_config=None) diff --git a/scene_handler/zyn_limit_space_scene_handler.py b/scene_handler/zyn_limit_space_scene_handler.py index 7d24fa6..1d243fa 100644 --- a/scene_handler/zyn_limit_space_scene_handler.py +++ b/scene_handler/zyn_limit_space_scene_handler.py @@ -31,16 +31,16 @@ for value in values: yield value fake_list = [ # 假如这是你从四合一后台请求来的数据 - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}, - {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', - 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}}, + {"data":{'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, ] value_iterator = create_value_iterator(fake_list) @@ -65,13 +65,16 @@ def getGasData(): - # 从后台读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'} + # 从后台数据库读取四合一数据,返回示例:{'ch4': '0.00', 'co': '0.00', 'devcode': '862635063168165A', 'h2s': '0.00', 'id': '144203', 'logtime': '2025-01-16 17:48:01', 'o2': '20.90', 'uptime': '2025-01-16 17:48:01'} harmful_device_code = '862635063168165A' url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' response = get_request(url) if response and response.get('data'): return response.get('data') + + + def getNewGasData(): global last_time data = getGasGata_fake() @@ -86,69 +89,531 @@ return data -def isPowerOn(): - ''' - 模拟询问开机, - :param flag:True:认定为开机,False,关机 - :return: - ''' - global last_time - print("检测四合一是否开机") - while True: - data = getNewGasData() - if data == None: - time.sleep(2) - else: - flag = datetime.now() < datetime.strptime(data.get('uptime'),"%Y-%m-%d %H:%M:%S") # T/F 开机/未开机 - if flag == True: - last_time = data["uptime"] - return - else: - print("重新访问") - time.sleep(2) -def getGasFlag(): - # 判断气体是否合规,T:合规。F:不合规 - try: - data = getNewGasData() - if float(data["ch4"]) > 10.0 \ - or float(data["co"]) > 10.0 \ - or float(data["h2s"]) > 120.0 \ - or float(data["o2"]) < 15: - return False - else:return True - except Exception as e: - print(str(e)) + def writeFile(file_path, data): - print(f"写入{data}") + # print(f"写入{data}") with open(file_path, mode='a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerows(data) +class EventController(): + def __init__(self): + self.cancel_event = asyncio.Event() + self.umd_complete = asyncio.Event() + self.laobao_complete = asyncio.Event() -class TestLimitSpaceSceneHandler(BaseSceneHandler): +class SiHeYi(): + def __init__(self, harmful_device_code): + self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' # 后台访问数据的url + self.harmful_device_code = harmful_device_code # 四合一标识符 + self.last_ts = None # 上次读取的数据的生成时间戳 + + def waitPowerOn(self, script_start_time): + ''' + 阻塞函数 + 循环是否开机,只有检测到开机才会退出函数 + + :param script_start_time:脚本启动的时间戳 + + :return: + ''' + print("检测四合一是否开机") + + while True: + self.getNewData() + flag = script_start_time < self.last_ts # 当前时间T/F 开机/未开机 + if flag == True: + print("检测到开机") + return + else: + print("未开机") + time.sleep(2) + + def getNewData(self): + ''' + 阻塞函数 + 访问后台数据库 读取最新产生的四合一浓度 + 如果有返回数据则记录 该数据产生的时间。 + 如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间: + 则视为读取到新数据,返回新数据 + 没有数据等待n秒后重复询问 + :return: + ''' + while True: + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={self.harmful_device_code}' + print("访问四合一数据...") + # response = get_request(url) + response = getGasGata_fake() + if response and response.get('data'): + print("访问到四合一数据") + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0: + self.last_ts = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + return ch4, co, h2s, o2 + else: # url没有返回数据 + print("四合一没有读取到数据") + time.sleep(5) + + def isDataNormal(self, ch4, co, h2s, o2): + ''' + 判断四项气体是否正常 + :param ch4: + :param co: + :param h2s: + :param o2: + :return: + ''' + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + return False # 气体异常 + else: + return True # 气体正常 + +class AnQuanMao(): + def __init__(self, helmet_code): + self.helmet_code = helmet_code + self.url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={helmet_code}' # 后台访问数据的url + self.last_ts = None # 上次读取的数据的生成时间戳 + + def getNewData(self): + ''' + 阻塞进程 + :return: + ''' + while True: + header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' + print("访问心率血氧数据...") + response = get_request(url, headers=header) + if response and response.get('data'): + print("访问到心率血氧数据") + vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 + if vitalsigns_data: # 访问成功 + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), + "%Y-%m-%d %H:%M:%S") # 访问数据的时间 + if self.last_ts is None or ( + upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 + self.last_ts= upload_timestamp.timestamp() # 更新数据 + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 + return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') + else: + print("无法访问到心率血氧数据") + time.sleep(5) + + def isDataNormal(self, blood_oxygen, heartrate): + if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 + return False + else: return True + + +class Laobaocheck(): + def __init__(self, eventController=None, alarm=None): + self.laobao_model = YOLO("weights/labor-v8-20241114.pt") + self.jiaodi_model = YOLO("weights/jiaodi.pt") + self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} + self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} # OD 模型有无检测这些目标 + self.jiaodi_flag = False # 分类模型 有无检测到交底 + self.laobao_pool = {} + + self.eventController = eventController + self.alarm = alarm + + def getUndetectedTarget(self): + # 获取未检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == False: result.append(name) + return result + + def getDetectedTarget(self): + # 获取已检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == True: result.append(name) + return result + + def name2id(self, input): + # 检测名称 映射为 id + if isinstance(input, str): + return self.target[input] + elif isinstance(input, list): + result = [] + for item in input: + result.append(self.target[item]) + if len(result) == 0:return [] + return list(set(np.concatenate([r for r in result]).astype(int).tolist())) + + def id2name(self, input): + ''' + + :param input: int 或 [int,int,int...] + :return: + ''' + # id -> 类别名称 + result = [] + if isinstance(input, int): + input = [input] + for id in input: + for k,v in self.target.items(): # k: 类别名称 , v: id_list + if id in v:result.append(k) + + return list(set(result)) + + + + def predict_isJiaodi(self,frames): + ''' + 调用 jiaodi.pt 分类模型 + :return: True:交底,False:没检测到 交底 + ''' + + jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False) + jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results] + for prob in jiaodi_prob: + if prob > 0.6: + return True + return False + + def predict_laobao(self, frames): + ''' + 调用 labor-v8-20241114.pt OD 模型 + :param frames: + :return: [类别1,类别2] + ''' + target_idx = self.name2id(self.getUndetectedTarget()) + results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, + save=False, verbose=False) # results:list(4) 4帧的检测结果 + pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype( + int).tolist())) # 检测到的目标类别id_list,已去重 + return self.id2name(pred_c_list) + + def updateUnpredictedTargets(self, jiaodi_flag, pred_labels): + ''' + 更新 已检测到的目标 列表 和有无检测到交底 + :param pred_c_list: [str, str...] + :return: None + ''' + for pred_label in pred_labels: + # print(f"检测到{pred_label}") + self.target_flag[pred_label] = True + self.jiaodi_flag = jiaodi_flag + if self.jiaodi_flag == False and jiaodi_flag ==True: + self.alarm.addAlarm("-------检测到交底") + + def model_predict_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + while True: + try: + ret, frames = cap.read() + frames = [frames] + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") + if not ret: + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") + break + # cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) + # cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 + # cv2.imshow("Video Frame", frames[0]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cap.release() + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + if self.eventController != None and self.eventController.cancel_event.is_set(): # 超时退出 + cap.release() + cv2.destroyAllWindows() + return + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if self.getUndetectedTarget() == []: # 如果全部检验到了 + # print("劳保物品 通过") + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") + self.alarm.addAlarm("-------劳保物品 通过") + self.eventController.laobao_complete.set() + cap.release() + cv2.destroyAllWindows() + return # 退出检测 + else: # 如果还有未检测到的 + + undetectedTargets = self.getUndetectedTarget() + # print(f"报警:劳保物品缺失:{wrong_class_list}") + self.alarm.addAlarm(f"-------报警:劳保物品缺失:{undetectedTargets[0]}") + cap.release() + cv2.destroyAllWindows() + def model_predict(self, stream_loader): + for frames in stream_loader: # type : list (4),连续的4帧 + if self.eventController != None and self.eventController.cancel_event.is_set(): # 超时退出 + return + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if self.getUndetectedTarget() == []: # 如果全部检验到了 + # print("劳保物品 通过") + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") + self.alarm_list.append("劳保物品 通过") + self.eventController.laobao_complete.set() + return # 退出检测 + else: # 如果还有未检测到的 + + undetectedTargets = self.getUndetectedTarget() + # print(f"报警:劳保物品缺失:{wrong_class_list}") + + for undetectedTarget in undetectedTargets: + alarm_content = f"报警:劳保物品缺失:{undetectedTarget}" + if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 + self.alarm_list.append(alarm_content) + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), + # f"报警:劳保物品缺失:{undetectedTarget}") + # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 + # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 + +class YinHuanCheck(): + def __init__(self, eventController=None, alarm_list=None): + self.model = YOLO("weights/yinhuan.pt") + self.eventController = eventController + self.alarm_list = alarm_list + + + self.anquanmao_tolerate = [] + self.xiyan_tolerate = [] + self.xiubiao_tolerate = [] + self.dadianhua_tolerate = [] + self.xianzaren_tolerate = [] + + def id2name(self, input): + ''' + + :param input: int 或 [int,int,int...] + :return: + ''' + # id -> 类别名称 + result = [] + if isinstance(input, int): + input = [input] + for id in input: + for k,v in self.model.names.items(): # k: id , v: label名称 + if k == id:result.append(v) + + return list(set(result)) + + + def judge_anquanmao(self, person_targets): + ''' + 判定有没有 没带安全帽的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "安全帽" not in person_result: + return False # 没有通过安全帽检测 + return True # 通过安全帽检测 + + + def judge_xiyan(self, person_targets): + ''' + 判定有没有 吸烟的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "吸烟" in person_result: + return False # 没有通过吸烟检测 + return True # 通过吸烟检测 + + def judge_xiubiao(self, person_targets): + ''' + 判定有没有 吸烟的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "吸烟" in person_result: + return False # 没有通过吸烟检测 + return True # 通过吸烟检测 + def judge_dadianhua(self, person_targets): + ''' + 判定有没有 打电话的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "打电话" in person_result: + return False # 没有通过打电话检测 + return True # 通过打电话检测 + + def judge_xianzaren(self, person_targets): + ''' + 判定有没有 打电话的人 + param person_targets: [[label1,label2..], [label1...]] 每个人身上的目标 + :return: + ''' + for person_result in person_targets: # 每个人身上的物件 + if "安全帽" not in person_result and "工服" not in person_result: # 既不穿安全帽也不穿安全帽 + return False # 没有通过闲杂人检测 + return True # 通过闲杂人检测 + def detect_person(self, frames): + ''' + + :param frames: list of pic or ndarray + :return: list of ndarray of person + ''' + people_results = self.model.predict(source=frames, conf=0.6,classes = [0], + save=False, verbose=False) # 检测人 + result = [] + for people_result in people_results: + orig_img = people_result.orig_img # 这一帧的原图矩阵 + person_boxes = people_result.boxes.xyxy.tolist() # 这一帧所有人的box的xyxy + for box in person_boxes: + cropped_image = orig_img[int(box[1]):int(box[3]), + int(box[0]):int(box[2])] # y1:y2, x1:x2 + + result.append(cropped_image) + return result + + def isAlarm(self): + def has_consecutive_true(bool_list, threshold=30): + count = 0 # 计数器,用于记录连续 True 的次数 + for value in bool_list: + if value: # 如果当前值为 True + count += 1 # 增加计数器 + if count > threshold: # 如果计数器超过阈值 + return True + else: # 如果当前值为 False + count = 0 # 重置计数器 + return False # 遍历完成后没有发现连续超过 threshold 次的 True + + if has_consecutive_true(self.anquanmao_tolerate) == True: + self.anquanmao_tolerate.clear() + # print("安全帽报警") + + + def detect_person_targets(self,person_crops): + ''' + + :param person_crops: list of ndarray of person + :return: [[label1,label2..], [label1...]] 每个人身上的目标 + ''' + if len(person_crops) == 0:return [] + results = self.model.predict(source=person_crops, conf=0.6, classes=[2, 3, 4, 5, 6], # 安全帽、工服、烟头、电话、袖标 + save=False, verbose=False) # 检测人 + person_detect_targets = [] + for result in results: # 每个人的检测结果 + labels = result.boxes.cls.tolist() # 这个人的检测标签,id形式 + labels = [int(num) for num in labels] # 转换为 int + labels = self.id2name(labels) # 转换为label_name + person_detect_targets.append(labels) + return person_detect_targets + + def main_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + ret, frames = cap.read() + while True: + # for frames in self.stream_loader: # type : list (4),连续的4帧 + try: + ret, frames = cap.read() + print(f"读取帧{ret}") + frames = [frames] + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") + if not ret: + # self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") + cap.release() + cv2.destroyAllWindows() + break + cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE) + cv2.resizeWindow('Video Frame2', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame2", frames[0]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + people_results = self.detect_person(frames) # 检测人 + person_detect_targets = self.detect_person_targets(people_results) + print(person_detect_targets) + self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets)) + self.isAlarm() + + def main(self, stream_loader): + for frames in stream_loader: # type : list (4),连续的4帧 + people_results = self.detect_person(frames) # 检测人 + person_detect_targets = self.detect_person_targets(people_results) + self.anquanmao_tolerate.append(self.judge_anquanmao(person_detect_targets)) + self.isAlarm() + +class Alarm(): + def __init__(self): + self.pool = [] + def addAlarm(self, content): + ''' + + :param content: + :return: + ''' + if content in self.pool: + self.pool.remove(content) + self.pool.append(content) + + def main(self): + for i in range(1000000): + if len(self.pool) != 0: + print(f"{self.pool.pop(0)},报警队列长度:{len(self.pool)}") + time.sleep(1) + +class ZynLimitSpaceSceneHandler(BaseSceneHandler): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) - print("__init__") + self.start_time = time.time() # 脚本启动时间戳 self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) # alarmCategory:优先级 0代表优先级最高 # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, # device_thread_id=thread_id) + self.health_ts_dict = {} + self.harmful_ts_dict = {} + # todo 要改成通过后台接口读取设备编号 + self.health_device_codes = ['HWIH061000056395'] # 安全帽编号 - self.laobao_complete = asyncio.Event() - self.umd_complete = asyncio.Event() - # self.umd_complete.set() - self.laobao_model = YOLO("weights/labor-v8-20241114.pt") - self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} - self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} - self.alarm_list = [] - self.laobao_pool = {} + self.eventController = EventController() + self.alarm = Alarm() + + self.laobao_check = Laobaocheck(self.eventController, self.alarm) + self.yinhuan_check = YinHuanCheck(self.eventController, self.alarm) self.umd_pool = {} + self.siHeyi = SiHeYi("862635063168165A") + self.anQuanMao = AnQuanMao("HWIH061000056395") def addLog(self, pool, time, text): if time not in pool.keys(): @@ -157,196 +622,51 @@ pool[time].append(text) pool[time] = list(set(pool[time])) # 不添加重复的日志 - async def laobaoCheck_task(self, executor, cancel_event=None): - # 报警只在 上中下气体进行完了之后进行,就算上中下一直没结束 也不报警 - - def getWrongClass(): - wrong_class_list = [] - for k, v in self.target_flag.items(): - if v == False: - wrong_class_list.append(k) - return wrong_class_list - - def getDetectedTarget(): - # 获取已检测目标的名称,返回str列表 - result = [] - for name,flag in self.target_flag.items(): - if flag == True:result.append(name) - return result - - def getUndetectedTarget(): - # 获取未检测目标的名称,返回str列表 - result = [] - for name, flag in self.target_flag.items(): - if flag == False: result.append(name) - return result - - def name2id(input): - # 检测名称 映射为 id - if isinstance(input, str): - return self.target[input] - elif isinstance(input, list): - result = [] - for item in input: - result.append(self.target[item]) - return list(set(np.concatenate([r for r in result]).astype(int).tolist())) - - def model_predict_test(video_path): - self.addLog(self.laobao_pool,datetime.now().strftime("%H:%M:%S"),"开始劳保检测") - cap = cv2.VideoCapture(video_path) - ret, frames = cap.read() - - # output_file = "output.avi" # 输出视频文件名 - # fps = 20.0 # 每秒帧数 - # frame_size = (1280, 720) # 帧大小,根据实际情况调整 - # fourcc = cv2.VideoWriter_fourcc(*'XVID') # 使用XVID编码器 - # out = cv2.VideoWriter(output_file, fourcc, fps, frame_size) - while True: - # for frames in self.stream_loader: # type : list (4),连续的4帧 - try: - ret, frames = cap.read() - frames = [frames] - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取帧") - if not ret: - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), "读取视频结束,退出") - cap.release() - cv2.destroyAllWindows() - break - # print(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx{len(frames)}") - cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) - cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 - cv2.imshow("Video Frame", frames[0]) - # out.write(frames[0]) - # out.write(frames[1]) - # out.write(frames[2]) - # out.write(frames[3]) - except Exception as ex: - traceback.print_exc() - logger.error(ex) - cv2.destroyAllWindows() - # 等待1毫秒,检查是否按下了'q'键退出 - if cv2.waitKey(1) & 0xFF == ord('q'): - break - - if cancel_event.is_set(): # 超时退出 - return - - # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) - try: - target_idx = name2id(getUndetectedTarget()) - results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, - save=False, verbose=False) # results:list(4) 4帧的检测结果 - pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) # 检测到的目标类别id_list,已去重 - for k, v in self.target.items(): # k:类别名称,v:类别id, 更新target_flag状态,如果之前存在c物体的报警,现在检测到c物体,移除它的报警 - for pred_c in pred_c_list: - if pred_c in v: - self.target_flag[k] = True - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"),f"检测到{k}") - print(f"检测到{k}") - if f"报警:劳保物品缺失:{k}" in self.alarm_list:self.alarm_list.remove(f"报警:劳保物品缺失:{k}") - break - if self.umd_complete.is_set(): # 上中下气体检测完毕 - # 检验所有物体都检验到了吗 - if getUndetectedTarget()==[]: # 如果全部检验到了 - # print("劳保物品 通过") - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"劳保物品 通过") - self.alarm_list.append("劳保物品 通过") - self.laobao_complete.set() - cap.release() - cv2.destroyAllWindows() - return # 退出检测 - else: # 如果还有未检测到的 - undetectedTargets = getUndetectedTarget() - # print(f"报警:劳保物品缺失:{wrong_class_list}") - - for undetectedTarget in undetectedTargets: - alarm_content = f"报警:劳保物品缺失:{undetectedTarget}" - if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 - self.alarm_list.append(alarm_content) - self.addLog(self.laobao_pool, datetime.now().strftime("%H:%M:%S"), f"报警:劳保物品缺失:{undetectedTarget}") - # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 - # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 - except Exception as e: - print(str(e)) - def model_predict(): - for frames in self.stream_loader: # type : list (4),连续的4帧 - if cancel_event.is_set(): # 超时退出 - return - # print(f"laobaoCheck_task:检测中,已检测到{self.complete_target}") - target_idx = [] - for k,v in self.target.items(): - if k not in self.complete_target: - target_idx.append(v) - results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, - save=False, verbose=False) # results:list(4) 4帧的检测结果 - pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype(int).tolist())) - for k, v in self.target.items(): - for pred_c in pred_c_list: - if pred_c in v: - self.target_flag[k] = True - self.complete_target.append(k) # 检测到 类别c 以后就不需要检测c - self.alarm_list.remove(f"报警:劳保物品缺失:{k}") - - break - if self.umd_complete.is_set(): # 上中下气体检测完毕 - # 检验所有物体都检验到了吗 - if all(self.target_flag.values()): # 如果全部检验到了 - # print("劳保物品 通过") - self.alarm_list.append("劳保物品 通过") - self.laobao_complete.set() - return # 退出检测 - else: - wrong_class_list = getWrongClass() - # print(f"报警:劳保物品缺失:{wrong_class_list}") - - for wrong_class in wrong_class_list: - alarm_content = f"报警:劳保物品缺失:{wrong_class}" - if alarm_content not in self.alarm_list: # 不加入 重复的报警内容 - self.alarm_list.append(alarm_content) - # self.alarm_list.remove(f"报警:劳保物品缺失:{wrong_class}") # 对于同样内容的报警,删除 早的报警 - # self.alarm_list.append(f"报警:劳保物品缺失:{wrong_class}") # 保留新的报警 - - + async def laobaoCheck_task(self): + executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() - await loop.run_in_executor(executor, model_predict_test, r"/home/pc/桌面/2025-01-21 14-42-00.mkv") - # await loop.run_in_executor(executor, model_predict) - print("劳保用品退出检测") + await loop.run_in_executor(executor, self.laobao_check.model_predict_fake, + r"/home/pc/Desktop/project/safe-algo-pro/2025-02-25 15-25-48.mkv") - async def uMDGasCheck_task(self,cancel_event=None): + async def uMDGasCheck_task(self,eventController=None): executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() tflag_pool = [] # 返回数据正常了几次 self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"检测开机?") - await loop.run_in_executor(executor, isPowerOn) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 + await loop.run_in_executor(executor, self.siHeyi.waitPowerOn, self.start_time) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"已开机!") + self.alarm_list.append("-------报警:检测到四合一已开机") for i in range(1000000): # 模拟循环检测气体 - if cancel_event.is_set(): # 超时退出 + if eventController.cancel_event.is_set(): # 超时退出 return - print(f"uMDGasCheck_task:{i}") + # print(f"uMDGasCheck_task:{i}") - flag = await loop.run_in_executor(executor, getGasFlag) # 判断气体是否合规 + ch4, co, h2s, o2 = await loop.run_in_executor(executor, self.siHeyi.getNewData) # 判断气体是否合规 + flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2) if flag == False: tflag_pool.clear() self.umd_pool[datetime.now().strftime("%H:%M:%S")] = "报警:上中下气体" self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"报警:上中下气体异常") + else: + print(f"上中下气体检测正常次数:{tflag_pool}") tflag_pool.append(True) if len(tflag_pool) == 3: break # 退出检测 - print("上中下气体检测通过") + self.alarm_list.append("-------报警:上中下气体检测通过") self.addLog(self.umd_pool, datetime.now().strftime("%H:%M:%S"), f"上中下气体检测通过") - self.umd_complete.set() + self.eventController.umd_complete.set() return - async def alarm_task(self, cancel_event): + async def alarm_task(self): def fun(): - for i in range(10000): # 检验中 - if self.laobao_complete.is_set(): + for i in range(1000000): # 检验中 + if self.eventController.laobao_complete.is_set(): if "劳保物品 通过" in self.alarm_list: print("劳保物品 通过") return # print(f"alarm_task:{len(self.alarm_list)}") - if cancel_event.is_set(): # 超时退出 + if self.eventController.cancel_event.is_set(): # 超时退出 return if len(self.alarm_list) != 0: print(f"{self.alarm_list.pop(0)},报警队列长度:{len(self.alarm_list)}") @@ -375,56 +695,74 @@ executor = ThreadPoolExecutor(max_workers=3) loop = asyncio.get_running_loop() await loop.run_in_executor(executor, fun) - def run(self): - print("runrunrunrunrunrunrunrunrunrunrunrunrunrunrunv") - async def fun(): - # d = { - # 'alarmCategory': 0, - # 'alarmType': '1', - # 'category_order': 1, # alarmCategory 下的优先级 - # 'alarm_name': 'no_fire_extinguisher', - # 'alarmContent': '未检测到灭火器', - # 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', - # 'label': '' - # } - # self.alarm_message_center.add_message(d) - # self.alarm_message_center.process_messages() + async def yinhuanCheck_task(self): + ''' + 检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头) + :return: + ''' + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, self.yinhuan_check.main_fake, r"/home/pc/Desktop/project/safe-algo-pro/2025-02-26 08-49-39.mkv") + + async def xinlvCheck_task(self): + + + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + blood_oxygen, heartrate = await loop.run_in_executor(executor, self.anQuanMao.getNewData) + flag = self.anQuanMao.isDataNormal(blood_oxygen, heartrate) + if flag == False: + self.alarm_list.append("-------报警:gasCheck:心率血氧数据异常") + async def gasCheck(self): + ''' + 四合一气体检测 + :return: + ''' + def fun(): + ch4, co, h2s, o2 = self.siHeyi.getNewData() + flag = self.siHeyi.isDataNormal(ch4, co, h2s, o2) + if flag == False: + self.alarm_list.append("-------报警:gasCheck:四合一气体异常") + executor = ThreadPoolExecutor(max_workers=3) + loop = asyncio.get_running_loop() + await loop.run_in_executor(executor, fun) + + def run(self): + async def fun(): # 并行执行任务 - cancel_event = asyncio.Event() uMDGasCheck_task = asyncio.create_task( - self.uMDGasCheck_task(cancel_event)) - laobaoCheck_executor = ThreadPoolExecutor(max_workers=3) - laobaoCheck_task = asyncio.create_task( - self.laobaoCheck_task(laobaoCheck_executor, cancel_event)) - alarm_task = asyncio.create_task(self.alarm_task(cancel_event)) + self.uMDGasCheck_task(self.eventController)) + laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task()) + alarm_task = asyncio.create_task(self.alarm_task()) logger_task = asyncio.create_task(self.logger_task()) - done, pending = await asyncio.wait({logger_task, uMDGasCheck_task, laobaoCheck_task, alarm_task}, timeout=300000.0) + + done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=300000.0) + if uMDGasCheck_task in done and laobaoCheck_task in done: await uMDGasCheck_task await laobaoCheck_task - # alarm_task.cancel() print("前置条件检查完成,退出") else: # 如果超时,则取消未完成的任务 - cancel_event.set() - laobaoCheck_executor.shutdown(wait=False) + self.eventController.cancel_event.set() laobaoCheck_task.cancel() uMDGasCheck_task.cancel() print("前置条件检查时间过长,退出") # 并行执行任务 - # print("开始工作") - # yinhuanCheck_task = asyncio.create_task(yinhuanCheck_task_fake(10000)) - # gasCheck = asyncio.create_task(gasCheck_task_fake(10000)) - # xinlvCheck = asyncio.create_task(xinlvCheck_task_fake(10000)) - # results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck) + print("开始工作") + xinlvCheck = asyncio.create_task(self.xinlvCheck_task()) + yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task()) + gasCheck = asyncio.create_task(self.gasCheck()) + results = await asyncio.gather(yinhuanCheck_task, gasCheck, xinlvCheck) + done1, pending1 = await asyncio.wait({alarm_task, logger_task}, timeout=300000.0) asyncio.run(fun()) if __name__=='__main__': # print(getNewGasData()) - # print(getNewGasData()) + model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt") + print(model.names) - getGasFlag() \ No newline at end of file diff --git a/weights/yinhuan.pt b/weights/yinhuan.pt new file mode 100644 index 0000000..07a7900 --- /dev/null +++ b/weights/yinhuan.pt Binary files differ