import asyncio import base64 import traceback from concurrent.futures import ThreadPoolExecutor from copy import deepcopy, copy import time from typing import Dict, List import cv2 from datetime import datetime import csv from algo.stream_loader import OpenCVStreamLoad from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager from common.http_utils import send_request, get_request from common.image_plotting import Annotator from entity.device import Device import numpy as np from ultralytics import YOLO from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter from scene_handler.base_scene_handler import BaseSceneHandler from scene_handler.helmet_data_processor import HelmetDataProcessor from services.global_config import GlobalConfig from tcp.tcp_client_manager import TcpClientManager def create_value_iterator(values): for value in values: yield value fake_list = [ # 假如这是你从四合一后台请求来的数据 {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, ] value_iterator = create_value_iterator(fake_list) COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) def flatten(lst): result = [] for i in lst: if isinstance(i, list): result.extend(flatten(i)) # 递归调用以处理嵌套列表 else: result.append(i) return result ''' alarmCategory: 0 劳保用品检测异常:三脚架、灭火器、鼓风机、指示牌、面罩、交底 1 作业过程隐患:闲杂人、安全帽、打电话、吸烟、袖标 2 人员健康异常 3 气体浓度异常 4 上中下气体浓度异常 ? handelType: 0 检测到报警 1 未检测到报警 2 人未穿戴报警 3 其他 4 人员检测到报警 ''' ALARM_DICT = { 'no_brief': { 'alarmCategory': 0, 'alarmType': '14', # todo 'handelType': 1, 'category_order': 6, 'alarm_name': 'no_brief', 'alarmContent': '未进行施工交底', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', # todo 'label': '', }, 'no_tripod': { 'alarmCategory': 0, 'alarmType': '14', 'handelType': 1, 'category_order': 1, 'alarm_name': 'no_tripod', 'alarmContent': '未检测到三脚架', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', # todo 'label': '', }, 'no_mask': { 'alarmCategory': 0, 'alarmType': '11', 'handelType': 1, 'category_order': 5, 'alarm_name': 'no_mask', 'alarmContent': '未佩戴呼吸防护设备', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '' }, 'no_blower': { 'alarmCategory': 0, 'alarmType': '13', 'handelType': 1, 'category_order': 3, 'alarm_name': 'no_blower', 'alarmContent': '没有检测到通风设备', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x1A\x00\xAE', 'label': '没有检测到通风设备' }, 'no_extinguisher': { 'alarmCategory': 0, 'alarmType': '14', 'handelType': 1, 'category_order': 2, 'alarm_name': 'no_fire_extinguisher', 'alarmContent': '未检测到灭火器', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', 'label': '', }, 'no_board': { 'alarmCategory': 0, 'alarmType': '17', 'handelType': 1, 'category_order': 4, 'alarm_name': 'no_board', 'alarmContent': '未检测到指示牌', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', 'label': '', }, 'harmful_gas': { 'alarmCategory': 3, 'alarmType': '', 'handelType': 3, 'category_order': -1, 'alarm_name': 'harmful_alarm', 'alarmContent': '有害气体浓度超标', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', 'label': '', }, 'umd_harmful_gas': { # todo 要跟上面区分吗 'alarmCategory': 4, 'alarmType': '', 'handelType': 3, 'category_order': -1, 'alarm_name': 'umd_harmful_gas', 'alarmContent': '有害气体浓度超标', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', 'label': '', }, 'health': { 'alarmCategory': 2, 'alarmType': '18', 'handelType': 3, 'category_order': -1, 'alarm_name': 'health_alarm', 'alarmContent': '作业人员心率血氧异常', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', }, 'smoke': { 'alarmCategory': 1, 'alarmType': '19', # todo 'handelType': 4, 'category_order': 4, 'alarm_name': 'cigarette', 'alarmContent': '吸烟', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', }, 'phone': { 'alarmCategory': 1, 'alarmType': '2', 'handelType': 4, 'category_order': 3, 'alarm_name': 'phone', 'alarmContent': '打电话', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', }, 'aqm': { 'alarmCategory': 1, 'alarmType': '18', 'handelType': 2, 'category_order': 2, 'alarm_name': 'no_helmet', 'alarmContent': '未佩戴安全帽', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', 'label': '未佩戴安全帽', }, 'armband': { 'alarmCategory': 1, 'alarmType': '20', # todo 'handelType': 2, 'category_order': 5, 'alarm_name': 'no_armband', 'alarmContent': '未佩戴袖标', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '未佩戴袖标', 'model_type': 'safe', }, 'break': { 'alarmCategory': 1, 'alarmType': '3', 'handelType': 2, 'category_order': 1, 'alarm_name': 'break_in_alarm', 'alarmContent': '非法闯入', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', 'label': '非法闯入', }, } # UMD_PASS_MESSAGE = b'' # 上中下气体检测通过 PREPARE_COMPLETE_MESSAGE = b'\xaa\x01\x00\x93\x19\x00\xAD' # 满足有限空间作业要求,可以作业 def writeFile(file_path, data): # print(f"写入{data}") with open(file_path, mode='a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerows(data) class EventController(): def __init__(self): self.timeout_event = asyncio.Event() self.umd_complete = asyncio.Event() self.qianzhi_check_complete = asyncio.Event() self.laobao_complete = asyncio.Event() class SiHeYi(): def __init__(self, harmful_device_code, harmful_data_manager): self.url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={harmful_device_code}' # 后台访问数据的url self.harmful_device_code = harmful_device_code # 四合一标识符 self.harmful_data_manager = harmful_data_manager self.last_ts = None # 上次读取的数据的生成时间戳 def waitPowerOn(self, script_start_time): """ 阻塞函数 循环是否开机,只有检测到开机才会退出函数 :param script_start_time:脚本启动的时间戳 :return: """ print("检测四合一是否开机") while True: self.getNewData() flag = script_start_time < self.last_ts # 当前时间T/F 开机/未开机 print(f'{script_start_time} {self.last_ts} {flag}') if flag: print("检测到开机") return else: print("未开机") time.sleep(2) def getNewData(self): """ 阻塞函数 访问后台数据库 读取最新产生的四合一浓度 如果有返回数据则记录 该数据产生的时间。 如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间: 则视为读取到新数据,返回新数据 没有数据等待n秒后重复询问 :return: """ while True: harmful_gas_data = self.harmful_data_manager.get_device_all_data(self.harmful_device_code) if harmful_gas_data: latest_gas_ts = max(harmful_gas_data.values(), key=lambda d: d['gas_ts'])['gas_ts'] if self.last_ts is None or (latest_gas_ts.timestamp() - self.last_ts) > 0: self.last_ts = latest_gas_ts.timestamp() if time.time() - latest_gas_ts.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 ch4 = harmful_gas_data.get(50).get('gas_value') co = harmful_gas_data.get(4).get('gas_value') h2s = harmful_gas_data.get(3).get('gas_value') o2 = harmful_gas_data.get(5).get('gas_value') return ch4, co, h2s, o2 else: print('ignore') else: logger.debug("四合一没有读取到数据") time.sleep(5) def getNewDataRemote(self): """ 阻塞函数 访问后台数据库 读取最新产生的四合一浓度 如果有返回数据则记录 该数据产生的时间。 如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间: 则视为读取到新数据,返回新数据 没有数据等待n秒后重复询问 :return: """ while True: url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={self.harmful_device_code}' print("访问四合一数据...") response = get_request(url) # response = getGasGata_fake() if response and response.get('data'): data = response.get('data') print(f"访问到四合一数据: {data}") uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0: self.last_ts = uptime.timestamp() if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 ch4 = data.get('ch4') co = data.get('co') h2s = data.get('h2s') o2 = data.get('o2') return ch4, co, h2s, o2 else: print('ignore') else: # url没有返回数据 logger.debug("四合一没有读取到数据") time.sleep(5) def isDataNormal(self, ch4, co, h2s, o2): """ 判断四项气体是否正常 :param ch4: :param co: :param h2s: :param o2: :return: """ if float(ch4) > 10.0 \ or float(co) > 10.0 \ or float(h2s) > 120.0 \ or float(o2) < 15: return False # 气体异常 else: return True # 气体正常 # class AnQuanMao(): # def __init__(self, helmet_code): # self.helmet_code = helmet_code # self.url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={helmet_code}' # 后台访问数据的url # self.last_ts = None # 上次读取的数据的生成时间戳 # # def getNewData(self): # """ # 阻塞进程 # :return: # """ # while True: # header = { # 'ak': 'fe80b2f021644b1b8c77fda743a83670', # 'sk': '8771ea6e931d4db646a26f67bcb89909', # } # url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' # print("访问心率血氧数据...") # response = get_request(url, headers=header) # if response and response.get('data'): # print("访问到心率血氧数据") # vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 # if vitalsigns_data: # 访问成功 # upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), # "%Y-%m-%d %H:%M:%S") # 访问数据的时间 # if self.last_ts is None or ( # upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 # self.last_ts = upload_timestamp.timestamp() # 更新数据 # if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 # return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') # else: # print("无法访问到心率血氧数据") # time.sleep(5) # # def isDataNormal(self, blood_oxygen, heartrate): # if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 # return False # else: # return True class Laobaocheck(): def __init__(self, eventController=None, alarm=None): self.laobao_model = YOLO("weights/labor-v8-20241114.pt") self.jiaodi_model = YOLO("weights/jiaodi.pt") self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, "工作指示牌": False} # OD 模型有无检测这些目标 self.jiaodi_flag = False # 分类模型 有无检测到交底 self.laobao_pool = {} self.eventController = eventController self.alarm = alarm def getUndetectedTarget(self): # 获取未检测目标的名称,返回str列表 result = [] for name, flag in self.target_flag.items(): if flag == False: result.append(name) if not self.jiaodi_flag: result.append("交底") return result def name2alarm(self, target_name): alarm_map = { '三脚架': 'no_tripod', '灭火器': 'no_extinguisher', '鼓风机': 'no_blower', '面罩': 'no_mask', '工作指示牌': 'no_board', '交底': 'no_brief' } return alarm_map.get(target_name, None) def getDetectedTarget(self): # 获取已检测目标的名称,返回str列表 result = [] for name, flag in self.target_flag.items(): if flag == True: result.append(name) return result def name2id(self, input): # 检测名称 映射为 id if isinstance(input, str): return self.target[input] elif isinstance(input, list): result = [] for item in input: if item in self.target: result.append(self.target[item]) if len(result) == 0: return [] return list(set(np.concatenate([r for r in result]).astype(int).tolist())) def id2name(self, input): """ :param input: int 或 [int,int,int...] :return: """ # id -> 类别名称 result = [] if isinstance(input, int): input = [input] for id in input: for k, v in self.target.items(): # k: 类别名称 , v: id_list if id in v: result.append(k) return list(set(result)) def predict_isJiaodi(self, frames): """ 调用 jiaodi.pt 分类模型 :return: True:交底,False:没检测到 交底 """ jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False) jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results] for prob in jiaodi_prob: if prob > 0.6: return True return False def predict_laobao(self, frames): ''' 调用 labor-v8-20241114.pt OD 模型 :param frames: :return: [类别1,类别2] ''' target_idx = self.name2id(self.getUndetectedTarget()) results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, save=False, verbose=False) # results:list(4) 4帧的检测结果 pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype( int).tolist())) # 检测到的目标类别id_list,已去重 return self.id2name(pred_c_list) def updateUnpredictedTargets(self, jiaodi_flag, pred_labels): ''' 更新 已检测到的目标 列表 和有无检测到交底 :param pred_labels: [str, str...] :return: None ''' for pred_label in pred_labels: print(f"检测到{pred_label}") self.target_flag[pred_label] = True if self.jiaodi_flag == False and jiaodi_flag == True: print(f"劳保检测:检测到交底") self.jiaodi_flag = jiaodi_flag def model_predict_fake(self, video_path): cap = cv2.VideoCapture(video_path) while True: try: ret, frames = cap.read() frames = [frames] if not ret: break cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 cv2.imshow("Video Frame", frames[0]) except Exception as ex: traceback.print_exc() logger.error(ex) cap.release() cv2.destroyAllWindows() # 等待1毫秒,检查是否按下了'q'键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break if self.eventController != None and self.eventController.timeout_event.is_set(): # 超时退出 cap.release() cv2.destroyAllWindows() return # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 pred_label = self.predict_laobao(frames) # [str, str...] self.updateUnpredictedTargets(jiaodi_flag, pred_label) if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 # 检验所有物体都检验到了吗 if self.getUndetectedTarget() == []: # 如果全部检验到了 print("劳保物品 通过") self.eventController.laobao_complete.set() cap.release() cv2.destroyAllWindows() return # 退出检测 else: # 如果还有未检测到的 undetectedTargets = self.getUndetectedTarget() for target_name in undetectedTargets: alarm_dict = self.name2alarm(target_name) if alarm_dict: self.alarm.addAlarm(alarm_dict) cap.release() cv2.destroyAllWindows() def model_predict(self, stream_loader): for frames in stream_loader: # type : list (4),连续的4帧 if self.eventController is not None and self.eventController.timeout_event.is_set(): # 超时退出 return if not frames: continue # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 pred_label = self.predict_laobao(frames) # [str, str...] self.updateUnpredictedTargets(jiaodi_flag, pred_label) if self.eventController is not None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 # 检验所有物体都检验到了吗 if not self.getUndetectedTarget(): # 如果全部检验到了 print("劳保物品 通过") self.eventController.laobao_complete.set() return # 退出检测 else: # 如果还有未检测到的 # todo 这里是否要生成报警记录 undetectedTargets = self.getUndetectedTarget() for target_name in undetectedTargets: alarm_dict_key = self.name2alarm(target_name) if alarm_dict_key: self.alarm.addAlarm(ALARM_DICT[alarm_dict_key]) class YinHuanCheck: def __init__(self, device_code, eventController, alarm, frame_threshold=20): # 初始化YOLO模型及其他参数 self.model = YOLO("weights/yinhuan.pt") self.eventController = eventController self.alarm = alarm self.device_code = device_code self.frame_threshold = frame_threshold # 连续异常帧阈值 # 针对每个人的异常计数器,键为 person_id self.counters = { 'no_helmet': {}, # 未佩戴安全帽 'smoking': {}, # 吸烟(检测到烟头) 'phone': {}, # 打电话(检测到电话) 'illegal_intrusion': {} # 非法闯入(既没有安全帽也没有工服) } # 针对袖标异常为全局条件:如果当前帧中所有人均未检测到袖标,则更新全局计数器 self.armband_counter = 0 def id2name(self, input_id): """ 将检测到的类别ID转换为类别名称,支持单个ID或ID列表 """ result = [] if isinstance(input_id, int): input_id = [input_id] for id in input_id: for k, v in self.model.names.items(): # k: id , v: 类别名称 if k == id: result.append(v) return list(set(result)) def detect_person(self, frames): """ 对输入的一批视频帧进行人员检测与跟踪,返回每帧中检测到的人员信息。 返回格式:list,每项为字典,键为 person_id,值为 {'crop': 截取的人像, 'box': 人员检测框 [x1,y1,x2,y2]} """ people_results = self.model.track(source=frames, conf=0.6, classes=[0], save=False, verbose=False) # 检测人(类别0) results = [] for people_result in people_results: orig_img = people_result.orig_img # 当前帧原图 person_dict = {} for person_box in people_result.boxes: person_id = person_box.id.item() if person_id: box = person_box.xyxy.squeeze().tolist() # [x1, y1, x2, y2] # 截取检测到的人像区域 cropped_image = orig_img[int(box[1]):int(box[3]), int(box[0]):int(box[2])] person_dict[person_id] = {'crop': cropped_image, 'box': box} results.append(person_dict) return results def detect_person_targets(self, people_results): """ 针对每个人的图像区域进行目标检测(安全帽、工服、烟头、电话、袖标)。 返回格式:list,每项为字典,键为 person_id,值为该人身上检测到的目标字典 {label: xyxy} """ # 收集所有人的图像区域 person_images = [] for frame_persons in people_results: for info in frame_persons.values(): person_images.append(info['crop']) if len(person_images) == 0: return [] # 对所有人的区域进行目标检测 results = self.model.predict(source=person_images, conf=0.6, classes=[2, 3, 4, 5, 6], save=False, verbose=False) person_detect_targets = [] result_idx = 0 for frame_persons in people_results: frame_detection = {} for person_id in frame_persons.keys(): person_result = results[result_idx] detection_dict = {} for box in person_result.boxes: label = int(box.cls.item()) label_name = self.id2name(label) xyxy = box.xyxy.squeeze().tolist() # 假设每个box只对应一种类别,直接存入字典 detection_dict[label_name[0]] = xyxy frame_detection[person_id] = detection_dict result_idx += 1 person_detect_targets.append(frame_detection) return person_detect_targets def annotate_alarm(self, frame, condition, person_box=None, detection=None): """ 在报警图片上对异常情况进行标注: - person_box: 异常人员的检测框 - detection: 异常物品的检测框,格式为 {label: xyxy} """ annotator = Annotator(deepcopy(frame), None, 18, "Arial.ttf", False) if person_box is not None: annotator.box_label(person_box, condition, color=COLOR_RED, rotated=False) # 绘制异常物品的检测框(如烟头、电话等) if detection is not None: if person_box is not None: offset_x, offset_y = int(person_box[0]), int(person_box[1]) for label, box in detection.items(): # 将 detection 框坐标转换为全局坐标 global_box = [box[0] + offset_x, box[1] + offset_y, box[2] + offset_x, box[3] + offset_y] annotator.box_label(global_box, '', color=COLOR_RED, rotated=False) else: # 如果没有人的框,则直接使用 detection 框 for label, box in detection.items(): annotator.box_label(box, label, color=COLOR_RED, rotated=False) return annotator.result() def trigger_alarm(self, frame, condition, person_id=None, detection=None, person_box=None): """ 当某个异常条件达到连续帧阈值时触发报警: - condition: 异常类型,取值:'no_helmet', 'smoking', 'phone', 'illegal_intrusion', 'armband' - 对于人员级别的异常,会标注该人员检测框及异常物品 - 对于全局异常(armband)直接标注图片 """ # 定义异常对应的报警类型(需与系统定义的 ALARM_DICT 对应) alarm_mapping = { 'no_helmet': 'aqm', # 安全帽异常 'smoking': 'smoke', # 吸烟异常 'phone': 'phone', # 打电话异常 'illegal_intrusion': 'break', # 非法闯入 'armband': 'armband' # 袖标异常 } alarm_type = alarm_mapping.get(condition, 'unknown') # 播报异常语言 self.alarm.addAlarm(ALARM_DICT[alarm_type]) if self.alarm.alarm_record_center.need_alarm(self.device_code, ALARM_DICT[alarm_type]): alarm_detections = {} if alarm_type == 'smoke': alarm_detections = {key: detection[key] for key in detection if key == '烟头'} elif alarm_type == 'phone': alarm_detections = {key: detection[key] for key in detection if key == '电话'} # 生成报警图片:在报警图片上对异常人员与异常物品进行标注 annotated_image = self.annotate_alarm(frame, ALARM_DICT[alarm_type]['label'], person_box, alarm_detections) # 上传报警图片到后台 self.alarm.alarm_record_center.upload_alarm_record(self.device_code, ALARM_DICT[alarm_type], alarm_np_img=annotated_image) def annotate_alarm_aggregated(self, frame, condition, candidates, alarm_type): """ 对一张报警图片进行标注,候选列表 candidates 是一个列表,每项是字典, 包含 'person_box'(整图坐标)和 'detection'(相对于裁剪图的检测结果)。 在图片上标注所有候选人员及其异常检测框。 """ annotator = Annotator(deepcopy(frame), None, 18, "Arial.ttf", False) for candidate in candidates: person_box = candidate.get("person_box") detection = candidate.get("detection") alarm_detections = {} if alarm_type == 'smoke': alarm_detections = {key: detection[key] for key in detection if key == '烟头'} elif alarm_type == 'phone': alarm_detections = {key: detection[key] for key in detection if key == '电话'} if person_box is not None: annotator.box_label(person_box, condition, color=COLOR_RED, rotated=False) if alarm_detections is not None: offset_x, offset_y = int(person_box[0]), int(person_box[1]) for label, box in alarm_detections.items(): global_box = [box[0] + offset_x, box[1] + offset_y, box[2] + offset_x, box[3] + offset_y] annotator.box_label(global_box, label, color=COLOR_RED, rotated=False) else: if alarm_detections is not None: for label, box in alarm_detections.items(): annotator.box_label(box, label, color=COLOR_RED, rotated=False) return annotator.result() def trigger_alarm_aggregated(self, frame, condition, candidates): """ 针对一帧内某报警类别的所有候选人员生成一次报警记录和报警图片。 """ alarm_mapping = { 'no_helmet': 'aqm', # 安全帽异常 'smoking': 'smoke', # 吸烟异常 'phone': 'phone', # 打电话异常 'illegal_intrusion': 'break', # 非法闯入 'armband': 'armband' # 袖标异常 } alarm_type = alarm_mapping.get(condition, 'unknown') self.alarm.addAlarm(ALARM_DICT[alarm_type]) if self.alarm.alarm_record_center.need_alarm(self.device_code, ALARM_DICT[alarm_type]): annotated_image = self.annotate_alarm_aggregated(frame, ALARM_DICT[alarm_type]['label'], candidates, alarm_type) self.alarm.alarm_record_center.upload_alarm_record(self.device_code, ALARM_DICT[alarm_type], alarm_np_img=annotated_image) def cleanup_counters(self, current_ids): """ 清理各异常计数器中失去的 person id(即当前帧中不再检测到的人员) """ for cond in self.counters: lost_ids = [pid for pid in self.counters[cond] if pid not in current_ids] for pid in lost_ids: del self.counters[cond][pid] def process_batch(self, frames): """ 对一批视频帧进行处理: 1. 对每帧进行人员检测及目标检测 2. 针对每一帧中所有人员判断是否存在异常: - 未佩戴安全帽:若该人检测结果中没有 "安全帽" - 吸烟:若检测到 "烟头" - 打电话:若检测到 "电话" - 非法闯入:若既没有检测到 "安全帽" 也没有检测到 "工服" - 袖标异常:若该人检测结果中没有 "袖标" 3. 对每一帧按报警类别聚合候选人员,若连续异常帧达到阈值,则对该报警类别生成一次报警记录和报警图片 4. 清理当前帧中不再检测到的 person id(清除计数器中遗留的记录) """ # 第一步:检测人员及其区域目标 people_results = self.detect_person(frames) if all(not d for d in people_results): return person_detect_targets = self.detect_person_targets(people_results) # 收集当前帧所有检测到的 person id(各帧检测到的并集) current_ids = set() for frame_persons in people_results: current_ids.update(frame_persons.keys()) # 针对每一帧处理 for idx, frame in enumerate(frames): frame_targets = person_detect_targets[idx] # 当前帧中,各人员的检测结果 # 用于聚合每个报警类别的候选记录(每项包含 'person_box' 和 'detection') frame_candidates = { 'no_helmet': [], 'smoking': [], 'phone': [], 'illegal_intrusion': [] } # 遍历当前帧的每个检测到的人员 for person_id, detections in frame_targets.items(): person_box = people_results[idx].get(person_id, {}).get('box') no_helmet = "安全帽" not in detections smoking = "烟头" in detections phone = "电话" in detections illegal_intrusion = ("安全帽" not in detections) and ("工服" not in detections) for cond, abnormal in zip( ['no_helmet', 'smoking', 'phone', 'illegal_intrusion'], [no_helmet, smoking, phone, illegal_intrusion] ): if person_id not in self.counters[cond]: self.counters[cond][person_id] = 0 if abnormal: self.counters[cond][person_id] += 1 else: self.counters[cond][person_id] = 0 # 当连续异常帧达到阈值时,将该人员加入对应报警候选列表(每人每类只添加一次) if self.counters[cond][person_id] >= self.frame_threshold: already_added = any(cand.get("person_id") == person_id for cand in frame_candidates[cond]) if not already_added: frame_candidates[cond].append({ "person_id": person_id, "person_box": person_box, "detection": detections }) self.counters[cond][person_id] = 0 # 重置计数 # 针对袖标异常的处理:如果当前帧中所有人员均未检测到 "袖标" armband_candidates = [] if frame_targets: if all("袖标" not in det for det in frame_targets.values()): for person_id, detections in frame_targets.items(): person_box = people_results[idx].get(person_id, {}).get('box') armband_candidates.append({ "person_id": person_id, "person_box": person_box, "detection": detections }) self.armband_counter += 1 else: self.armband_counter = 0 # 触发各报警类别的聚合报警(一次报警记录、一次报警图片) for category, candidates in frame_candidates.items(): if candidates: self.trigger_alarm_aggregated(frame, category, candidates) # 针对袖标异常,如果连续帧达到阈值,则触发一次报警 if self.armband_counter >= self.frame_threshold and armband_candidates: self.trigger_alarm_aggregated(frame, 'armband', []) self.armband_counter = 0 def main(self, stream_loader): """ 主流程:从视频流中获取每一批帧,处理后检测异常并报警 """ for frames in stream_loader: # stream_loader 每次返回一批连续帧(例如4帧) try: self.process_batch(frames) except Exception as ex: traceback.print_exc() # 记录错误信息 logger.error(ex) def main_fake(self, video_path): cap = cv2.VideoCapture(video_path) ret, frames = cap.read() while True: try: ret, frames = cap.read() frames = [frames] if not ret: cap.release() cv2.destroyAllWindows() break cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE) # cv2.resizeWindow('Video Frame2', 800, 600) # 宽度800像素,高度600像素 cv2.imshow("Video Frame2", frames[0]) self.process_batch(frames) except Exception as ex: traceback.print_exc() logger.error(ex) cv2.destroyAllWindows() # 等待1毫秒,检查是否按下了'q'键退出 if cv2.waitKey(1) & 0xFF == ord('q'): break class Alarm(): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpClientManager, main_loop, eventController=None): self.pool = [] self.device = device self.thread_id = thread_id self.tcp_manager = tcp_manager self.main_loop = main_loop self.eventController = eventController # self.alarm_interval_dict = {} # self.alarm_interval = device.alarm_interval # # self.socket_interval_dict = {} # self.socket_interval = device.alarm_interval # self.socket_retry = 3 self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_interval=30, message_send_interval=3, retention_time=10, category_priority={0: 0, 4: 1, 3: 2, 2: 3, 1: 4}) # (优先级:0 > 4 > 3 > 2 > 1) self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) # todo 跟下面的alarm task二选一 # self.thread_pool = GlobalThreadPool() # self.thread_pool.submit_task(self.alarm_message_center.process_messages) def addAlarm(self, alarm_dict): self.alarm_message_center.add_message(alarm_dict) # def addAlarm(self, content): # """ # 添加一条报警到报警队列中 # :param content: # :return: # """ # if content in self.pool: # self.pool.remove(content) # self.pool.append(content) def deleteAlarmOfLaoBao(self): """ 删除池子中有关劳保检测的报警 :return: """ # self.pool = [item for item in self.pool if "劳保" not in item] def laobao_condition(msg): return msg['alarmCategory'] == 0 self.alarm_message_center.delete_messages(laobao_condition) def deleteAlaramOfUmdGas(self): """ 删除池子中有关上中下气体的报警 :return: """ def umd_condition(msg): return msg['alarmCategory'] == 4 self.alarm_message_center.delete_messages(umd_condition) # self.pool = [item for item in self.pool if "劳保" not in item] # def main(self): # while True: # if self.eventController.timeout_event.is_set(): # 前置条件检查超时 # self.deleteAlarmOfLaoBao() # self.deleteAlaramOfUmdGas() # if len(self.pool) != 0: # content = self.pool.pop(0) # print(f"{content},报警队列长度:{len(self.pool)}") # # self.send_alarm_message("no_jiandu") # time.sleep(1) # # def send_tcp_message(self, message: bytes, have_response=False): # asyncio.run_coroutine_threadsafe( # self.tcp_manager.send_message_to_device(device_id=self.device.id, # message=message, # have_response=have_response), # self.main_loop) # def send_alarm_message(self, type): # if self.tcp_manager: # # if self.socket_interval_dict.get(type) is None \ # # or (datetime.now() - self.socket_interval_dict.get(type)).total_seconds() > int(self.socket_interval): # logger.debug("send alarm message %s %s", ALARM_DICT[type]['alarmContent'], # ALARM_DICT[type]['alarmSoundMessage']) # self.send_tcp_message(ALARM_DICT[type]['alarmSoundMessage'], have_response=True) # self.socket_interval_dict[type] = datetime.now() HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 def get_group_device_list(device_code): health_device_codes = [] harmful_device_codes = [] url = f'http://172.27.46.84:30003/v3/device/listGroupDevs?devcode={device_code}' response = get_request(url) if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes class IntranetLimitSpaceSceneHandler(BaseSceneHandler): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpClientManager, main_loop, range_points): super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) self.start_time = time.time() # 脚本启动时间戳 print(f'start time = {self.start_time}') self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) self.executor = ThreadPoolExecutor(max_workers=10) self.loop = asyncio.get_running_loop() self.eventController = EventController() self.alarm = Alarm(device, thread_id, tcp_manager, main_loop, self.eventController) self.laobao_check = Laobaocheck(self.eventController, self.alarm) self.yinhuan_check = YinHuanCheck(device_code=device.code, eventController=self.eventController, alarm=self.alarm) self.anQuanMaoList = [] self.siHeyiList = [] self.siHeyiUmd = None # 上中下气体检测用的四合一设备 self.harmful_gas_manager = HarmfulGasManager() health_device_codes, harmful_device_codes = get_group_device_list(device.code) for health_device_code in health_device_codes: self.anQuanMaoList.append(HelmetDataProcessor(health_device_code, self.alarm.alarm_record_center)) for harmful_device_code in harmful_device_codes: self.siHeyiList.append(SiHeYi(harmful_device_code,self.harmful_gas_manager)) if self.siHeyiList: self.siHeyiUmd = self.siHeyiList[0] # todo 暂时先用第一个,后期要有标识标明用哪个 else: self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98',self.harmful_gas_manager) async def laobaoCheck_task(self): # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() # await loop.run_in_executor(executor, self.laobao_check.model_predict_fake, # r"D:\workspace\pythonProject\safe-algo-pro\2025-02-25 15-25-48.mkv") await self.loop.run_in_executor(self.executor, self.laobao_check.model_predict, self.stream_loader) async def uMDGasCheck_task(self, eventController=None): # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() tflag_pool = [] # 返回数据正常了几次 await self.loop.run_in_executor(self.executor, self.siHeyiUmd.waitPowerOn, self.start_time) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 print('上中下气体检测:四合一已开机') while True: # 模拟循环检测气体 if eventController.timeout_event.is_set(): # 超时退出 return ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, self.siHeyiUmd.getNewData) # 判断气体是否合规 flag = self.siHeyiUmd.isDataNormal(ch4, co, h2s, o2) if flag == False: tflag_pool.clear() self.alarm.addAlarm(ALARM_DICT['umd_harmful_gas']) else: tflag_pool.append(True) print(f"上中下气体检测正常次数:{tflag_pool}") if len(tflag_pool) == 3: break # 退出检测 print('上中下气体检测:上中下气体检测通过') # todo 需要语音吗 self.eventController.umd_complete.set() return async def alarm_task(self): # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() await self.loop.run_in_executor(self.executor, self.alarm.alarm_message_center.process_messages) async def yinhuanCheck_task(self): """ 检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头) :return: """ # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() await self.loop.run_in_executor(self.executor, self.yinhuan_check.main_fake, r"D:\workspace\pythonProject\safe-algo-pro\1.mp4") async def xinlvCheck_task(self): def fun(anQuanMao): blood_oxygen, heartrate = anQuanMao.getNewData() if not anQuanMao.isDataNormal(blood_oxygen, heartrate): self.alarm.addAlarm(ALARM_DICT['health']) anQuanMao.sendAlarmRecord(blood_oxygen, heartrate) # # flag = anQuanMao.isDataNormal(blood_oxygen, heartrate) # if flag == False: # self.alarm.addAlarm(ALARM_DICT['health']) # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() for aqm in self.anQuanMaoList: await self.loop.run_in_executor(self.executor, fun, aqm) async def gasCheck(self): """ 四合一气体检测 :return: """ def fun(siHeyi): ch4, co, h2s, o2 = siHeyi.getNewData() flag = siHeyi.isDataNormal(ch4, co, h2s, o2) if flag == False: self.alarm.addAlarm(ALARM_DICT['harmful_gas']) # executor = ThreadPoolExecutor(max_workers=3) # loop = asyncio.get_running_loop() for siHeYi in self.siHeyiList: await self.loop.run_in_executor(self.executor, fun, siHeYi) def run(self): async def fun(): try: self.loop = asyncio.get_running_loop() # 添加异常处理 def handle_task_exception(task): try: task.result() # 触发异常(如果有) except Exception as e: logger.exception(f"任务 {task.get_name()} 发生异常: {e}") # 并行执行任务 uMDGasCheck_task = asyncio.create_task( self.uMDGasCheck_task(self.eventController)) laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task()) alarm_task = asyncio.create_task(self.alarm_task()) # 给所有任务添加异常处理 for task in [uMDGasCheck_task, laobaoCheck_task, alarm_task]: task.add_done_callback(handle_task_exception) # alarm_task.add_done_callback(handle_task_exception) done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=60*10) if uMDGasCheck_task in done and laobaoCheck_task in done: await uMDGasCheck_task await laobaoCheck_task self.eventController.timeout_event.set() self.alarm.alarm_message_center.send_immediate_command(PREPARE_COMPLETE_MESSAGE) print("前置条件检查完成,退出") else: # 如果超时,则取消未完成的任务 self.eventController.timeout_event.set() laobaoCheck_task.cancel() uMDGasCheck_task.cancel() print("前置条件检查时间过长,退出") # 删除前面产生的报警 self.alarm.deleteAlarmOfLaoBao() self.alarm.deleteAlaramOfUmdGas() # 并行执行任务 print("开始工作") xinlvCheck_task = asyncio.create_task(self.xinlvCheck_task()) yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task()) gasCheck_task = asyncio.create_task(self.gasCheck()) # 也给这些任务添加异常处理 for task in [xinlvCheck_task, yinhuanCheck_task, gasCheck_task]: task.add_done_callback(handle_task_exception) try: results = await asyncio.gather(yinhuanCheck_task, gasCheck_task, xinlvCheck_task, return_exceptions=True) for result in results: if isinstance(result, Exception): logger.exception(f"任务发生异常: {result}") except Exception as e: logger.exception(f"gather 执行过程中发生异常: {e}") done1, pending1 = await asyncio.wait({alarm_task}, timeout=300000.0) except Exception as e: logger.exception(f"run 方法中的 fun 发生异常: {e}") asyncio.run(fun()) if __name__ == '__main__': # print(getNewGasData()) model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt") print(model.names)