import time import traceback from asyncio import Event from copy import deepcopy from datetime import datetime from flatbuffers.builder import np from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area from common.device_status_manager import DeviceStatusManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager from common.image_plotting import Annotator from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter from scene_handler.base_scene_handler import BaseSceneHandler from scene_handler.limit_space_scene_handler import is_overlapping from services.global_config import GlobalConfig from tcp.tcp_manager import TcpManager from entity.device import Device from common.http_utils import get_request from ultralytics import YOLO ''' alarmCategory: 0 行为监管 1 环境监管 2 人员监管 3 围栏监管 handelType: 0 检测到报警 1 未检测到报警 2 人未穿戴报警 3 其他 ''' ALARM_DICT = [ { 'alarmCategory': 0, 'alarmType': '1', 'handelType': 1, 'category_order': 1, 'class_idx': [34], 'alarm_name': 'no_fire_extinguisher', 'alarmContent': '未检测到灭火器', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '2', 'handelType': 1, 'category_order': 2, 'class_idx': [43], 'alarm_name': 'no_barrier_tape', 'alarmContent': '未检测到警戒线', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '3', 'handelType': 1, 'category_order': 3, 'class_idx': [48], 'alarm_name': 'no_cone', 'alarmContent': '未检测到锥桶', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '4', 'handelType': 1, 'category_order': 4, 'class_idx': [4, 5, 16], 'alarm_name': 'no_board', 'alarmContent': '未检测到指示牌', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '5', 'handelType': 2, 'category_order': -1, 'class_idx': [18], 'alarm_name': 'no_helmet', 'alarmContent': '未佩戴安全帽', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '未佩戴安全帽', }, # todo 明火 { 'alarmCategory': 1, 'alarmType': '7', 'handelType': 3, 'category_order': 1, 'class_idx': [], 'alarm_name': 'gas_alarm', 'alarmContent': '甲烷浓度超限', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 1, 'alarmType': '8', 'handelType': 3, 'category_order': 2, 'class_idx': [], 'alarm_name': 'harmful_alarm', 'alarmContent': '有害气体浓度超标', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 2, 'alarmType': '9', 'handelType': 3, 'category_order': -1, 'class_idx': [], 'alarm_name': 'health_alarm', 'alarmContent': '心率血氧异常', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '', }, { 'alarmCategory': 3, 'alarmType': '10', 'handelType': 2, 'category_order': 4, 'class_idx': [24], 'alarm_name': 'break_in_alarm', 'alarmContent': '非法闯入', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', 'label': '非法闯入', }, ] COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) class BlockSceneHandler(BaseSceneHandler): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) self.__stop_event = Event(loop=main_loop) self.health_ts_dict = {} self.harmful_ts_dict = {} self.object_ts_dict = {} self.thread_pool = GlobalThreadPool() self.alarm_message_center = AlarmMessageCenter(device.id,main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval,main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() self.health_device_codes = ['HWIH061000056395'] # todo self.harmful_device_codes = [] # todo for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: self.thread_pool.submit_task(self.harmful_data_task, harmful_device_code) self.thread_pool.submit_task(self.alarm_message_center.process_messages) # todo 明火 self.model = YOLO('weights/labor-v8-20241114.pt') self.model_classes = { 0: '三脚架', 3: '人', 4: '作业信息公示牌', 6: '危险告知牌', 9: '反光衣', 11: '呼吸面罩', 13: '四合一', 15: '头', 16: '安全告知牌', 18: '安全帽', 20: '安全标识牌', 24: '工服', 34: '灭火器', 43: '警戒线', 48: '路锥', 58: '鼓风机', } self.PERSON_CLASS_IDX = 3 self.HEAD_CLASS_IDX = 15 self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id) self.range_points = range_points self.abs_range_points = self.get_absolute_range() self.tracking_status = {} # 跟踪每个行人的状态 self.max_missing_frames = 25 # 报警的阈值 self.disappear_threshold = 25 * 3 # 移除行人的阈值 def get_absolute_range(self): fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: abs_points = [] for p in fence_info: abs_points.append( [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))]) abs_points = np.array(abs_points, dtype=np.int32) hull = ConvexHull(abs_points) sorted_coordinates = abs_points[hull.vertices] # abs_points = abs_points.reshape((-1, 1, 2)) return sorted_coordinates else: return None def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) for gas_type, gas_data in harmful_gas_data.items(): ts_key = f'{harmful_device_code}_{gas_type}' last_ts = self.harmful_ts_dict.get(ts_key) gas_ts = gas_data.get('gas_ts') if last_ts is None or (gas_ts - last_ts).total_seconds() > 0: self.harmful_ts_dict[ts_key] = gas_ts self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) def health_data_task(self, helmet_code): while not self.__stop_event.is_set(): header = { 'ak': 'fe80b2f021644b1b8c77fda743a83670', 'sk': '8771ea6e931d4db646a26f67bcb89909', } url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}' response = get_request(url, headers=header) if response and response.get('data'): last_ts = self.health_ts_dict.get(helmet_code) vitalsigns_data = response.get('data').get('vitalSignsData') if vitalsigns_data: upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S") if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0: self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate'),upload_timestamp) time.sleep(10) def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' f'upload_timestamp = {upload_timestamp}') if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 需要往后台发原始数据吗 def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): alarm = False gas_value = gas_data['gas_value'] if gas_type == 3: # h2s alarm = gas_value > 120.0 elif gas_type == 4: # co alarm = gas_value > 10.0 elif gas_type == 5: # o2 alarm = gas_value < 15 elif gas_type == 50: # ex alarm = gas_value > 10 if alarm: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 def model_predict(self, frames): results_generator = self.model(frames, save_txt=False, save=False, verbose=False, conf=0.5, classes=list(self.model_classes.keys()), imgsz=640, stream=True) result_boxes = [] for r in results_generator: result_boxes.append(r.boxes) pred_ids = [[int(box.cls) for box in sublist] for sublist in result_boxes] pred_names = [[self.model_classes[int(box.cls)] for box in sublist] for sublist in result_boxes] return result_boxes, pred_ids, pred_names def handle_behave_alarm(self, frames, result_boxes, pred_ids, pred_names): behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] for alarm_dict in behave_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] frame_ids, frame_names = pred_ids[idx], pred_names[idx] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] if alarm_dict['handelType'] == 0: # 检测到就报警 if object_boxes: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) for box in frame_boxes: if int(box.cls) != self.HEAD_CLASS_IDX: box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.model_classes[int(box.cls)]}", color=box_color, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) elif alarm_dict['handelType'] == 1: # 检测不到报警 if object_boxes: self.object_ts_dict[alarm_dict['alarm_name']] = time.time() else: last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0) if time.time() - last_ts > 5: self.object_ts_dict[alarm_dict['alarm_name']] = time.time() self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) for box in frame_boxes: if int(box.cls) != self.HEAD_CLASS_IDX: annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) elif alarm_dict['handelType'] == 2: # 人未穿戴报警 person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] has_alarm = False annotator = None for person_box in person_boxes: person_bbox = person_box.xyxy.cpu().squeeze() # 检查这个人是否佩戴了安全帽 has_helmet = True person_head = get_person_head(person_bbox, head_boxes) if person_head is not None: has_helmet = any( is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) for helmet in object_boxes) if not has_helmet: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) if has_alarm: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): for box in frame_boxes: box_cls = box.cls if box_cls != self.PERSON_CLASS_IDX and box_cls != self.HEAD_CLASS_IDX: annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) def handle_break_in_alarm(self, frames, result_boxes, pred_ids, pred_names): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] for alarm_dict in break_in_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] frame_ids, frame_names = pred_ids[idx], pred_names[idx] person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] has_alarm = False annotator = None for person_box in person_boxes: person_bbox = person_box.xyxy.cpu().squeeze() person_id = person_box.id if is_within_alert_range(person_bbox, self.abs_range_points): has_object = True person_head = get_person_head(person_bbox, head_boxes) if person_head is not None: overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(person_bbox) if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 has_object = any( is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) for object_boxe in object_boxes) if not has_object: self.tracking_status[person_box.id] = self.tracking_status.get(person_box.id, 0) + 1 has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) if has_alarm: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) def run(self): while not self.stream_loader.init: if self.__stop_event.is_set(): break # 如果触发了停止事件,则退出循环 self.stream_loader.init_cap() for frames in self.stream_loader: try: if self.__stop_event.is_set(): break # 如果触发了停止事件,则退出循环 if not frames: continue self.device_status_manager.set_status(device_id=self.device.id) result_boxes, pred_ids, pred_names = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame # print(pred_names) self.handle_behave_alarm(frames, result_boxes, pred_ids, pred_names) self.handle_break_in_alarm(frames, result_boxes, pred_ids, pred_names) except Exception as ex: traceback.print_exc() logger.error(ex)