import time import traceback from asyncio import Event from copy import deepcopy from datetime import datetime import numpy as np import asyncio from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area, is_overlapping from common.device_status_manager import DeviceStatusManager from common.display_frame_manager import DisplayFrameManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager from common.image_plotting import Annotator, colors from db.database import get_db from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter from scene_handler.base_scene_handler import BaseSceneHandler from scene_handler.limit_space_scene_handler import is_overlapping from services.data_gas_service import DataGasService from services.global_config import GlobalConfig from tcp.tcp_manager import TcpManager from entity.device import Device from common.http_utils import get_request from ultralytics import YOLO ''' alarmCategory: 0 行为监管 1 环境监管 2 人员监管 3 围栏监管 handelType: 0 检测到报警 1 未检测到报警 2 人未穿戴报警 3 其他 4 人员检测到报警 ''' ALARM_DICT = [ { 'alarmCategory': 0, 'alarmType': '14', 'handelType': 1, 'category_order': 1, 'class_idx': [34], 'alarm_name': 'no_fire_extinguisher', 'alarmContent': '未检测到灭火器', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '15', 'handelType': 1, 'category_order': 2, 'class_idx': [43], 'alarm_name': 'no_barrier_tape', 'alarmContent': '未检测到警戒线', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '16', 'handelType': 1, 'category_order': 3, 'class_idx': [48], 'alarm_name': 'no_cone', 'alarmContent': '未检测到锥桶', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '17', 'handelType': 1, 'category_order': 4, 'class_idx': [4, 5, 16], 'alarm_name': 'no_board', 'alarmContent': '未检测到指示牌', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', 'label': '', }, { 'alarmCategory': 0, 'alarmType': '18', 'handelType': 2, 'category_order': -1, 'class_idx': [18], 'alarm_name': 'no_helmet', 'alarmContent': '未佩戴安全帽', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', 'label': '未佩戴安全帽', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '19', # todo 'handelType': 4, 'category_order': -1, 'class_idx': [4], 'alarm_name': 'cigarette', 'alarmContent': '吸烟', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '2', 'handelType': 4, # todo 'category_order': -1, 'class_idx': [5], 'alarm_name': 'phone', 'alarmContent': '打电话', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', 'model_type': 'safe', }, # todo 明火 { 'alarmCategory': 1, 'alarmType': '1', 'handelType': 3, 'category_order': 1, 'class_idx': [], 'alarm_name': 'gas_alarm', 'alarmContent': '甲烷浓度超限', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', 'label': '', }, { 'alarmCategory': 1, 'alarmType': '', 'handelType': 3, 'category_order': 2, 'class_idx': [], 'alarm_name': 'harmful_alarm', 'alarmContent': '有害气体浓度超标', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', 'label': '', }, { 'alarmCategory': 2, 'alarmType': '18', 'handelType': 3, 'category_order': -1, 'class_idx': [], 'alarm_name': 'health_alarm', 'alarmContent': '心率血氧异常', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', }, { 'alarmCategory': 3, 'alarmType': '3', 'handelType': 2, 'category_order': 4, 'class_idx': [3], 'alarm_name': 'break_in_alarm', 'alarmContent': '非法闯入', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', 'label': '非法闯入', 'model_type': 'safe', }, ] COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 def get_group_device_list(device_code): health_device_codes = [] harmful_device_codes = [] url = f'http://111.198.10.15:22006/v3/device/listGroupDevs?devcode={device_code}' response = get_request(url) if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes class BlockSceneHandler(BaseSceneHandler): def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) self.__stop_event = Event(loop=main_loop) self.health_ts_dict = {} self.harmful_ts_dict = {} self.object_ts_dict = {} self.thread_pool = GlobalThreadPool() self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_interval=30, message_send_interval=3, retention_time=10, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) # (优先级:2 > 1 > 3 > 0) self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() self.display_frame_manager = DisplayFrameManager() # todo 要改成通过后台接口读取设备编号 # self.health_device_codes = ['HWIH061000056395'] # self.harmful_device_codes = ['862635063168165A'] self.health_device_codes, self.harmful_device_codes = get_group_device_list(device.code) self.thread_pool.submit_task(self.gas_data_task, device.code) for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) self.thread_pool.submit_task(self.alarm_message_center.process_messages) # todo 明火 # self.model = YOLO('weights/labor-v8-20250115-fp16.engine') self.model = YOLO('weights/labor-v8-20241114.pt') self.model_classes = { # 0: '三脚架', # 3: '人', 4: '作业信息公示牌', 6: '危险告知牌', 9: '反光衣', # 11: '呼吸面罩', # 13: '四合一', # 15: '头', 16: '安全告知牌', # 18: '安全帽', 20: '安全标识牌', # 24: '工服', 34: '灭火器', 43: '警戒线', 48: '路锥', 58: '鼓风机', } self.PERSON_CLASS_IDX = 3 self.HEAD_CLASS_IDX = 15 self.safe_model = YOLO('weights/yinhuan.pt') self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} self.PERSON_CLASS_IDX = 0 self.HEAD_CLASS_IDX = 1 self.SAFETY_CLASS_IDX = [2,3,6] self.vid_stride = 3 self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, device_thread_id=thread_id, vid_stride=self.vid_stride) self.range_points = range_points self.abs_range_points = self.get_absolute_range() self.tracking_status = {} # 跟踪每个行人的状态 self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 self.frames_detected = 0 self.fps_ts = None def get_absolute_range(self): if not self.range_points: return None fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: abs_points = [] for p in fence_info: abs_points.append( [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))]) abs_points = np.array(abs_points, dtype=np.int32) hull = ConvexHull(abs_points) sorted_coordinates = abs_points[hull.vertices] # abs_points = abs_points.reshape((-1, 1, 2)) return sorted_coordinates else: return None def gas_data_task(self, tree_device_code): while not self.__stop_event.is_set(): asyncio.run_coroutine_threadsafe( self.handle_gas_alarm(tree_device_code), self.main_loop ) time.sleep(5) async def handle_gas_alarm(self, tree_device_code): try: async for db in get_db(): gasService = DataGasService(db) gas_data = await gasService.latest_query(tree_device_code) if gas_data and time.time() - gas_data.ts.timestamp() < 60: if gas_data.gas_value > 200.0: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], alarm_np_img=None,alarm_value=gas_data.gas_value) except Exception as e: print(f"Error in latest_query: {e}") # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) for gas_type, gas_data in harmful_gas_data.items(): ts_key = f'{harmful_device_code}_{gas_type}' last_ts = self.harmful_ts_dict.get(ts_key) gas_ts = gas_data.get('gas_ts') if last_ts is None or (gas_ts - last_ts).total_seconds() > 0: self.harmful_ts_dict[ts_key] = gas_ts self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) # 从后台读取四合一浓度 def harmful_data_query_task(self, harmful_device_code): while not self.__stop_event.is_set(): url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' response = get_request(url) if response and response.get('data'): last_ts = self.harmful_ts_dict.get(harmful_device_code) data = response.get('data') uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") if last_ts is None or (uptime.timestamp() - last_ts) > 0: self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 ch4 = data.get('ch4') co = data.get('co') h2s = data.get('h2s') o2 = data.get('o2') self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) time.sleep(5) def health_data_task(self, helmet_code): while not self.__stop_event.is_set(): header = { 'ak': 'fe80b2f021644b1b8c77fda743a83670', 'sk': '8771ea6e931d4db646a26f67bcb89909', } url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}' response = get_request(url, headers=header) if response and response.get('data'): last_ts = self.health_ts_dict.get(helmet_code) vitalsigns_data = response.get('data').get('vitalSignsData') if vitalsigns_data: upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S") if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0: self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate'), upload_timestamp) time.sleep(10) def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' f'upload_timestamp = {upload_timestamp}') if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 需要往后台发原始数据吗 def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): if float(ch4) > 10.0 \ or float(co) > 10.0 \ or float(h2s) > 120.0 \ or float(o2) < 15: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): alarm = False gas_value = gas_data['gas_value'] if gas_type == 3: # h2s alarm = gas_value > 120.0 elif gas_type == 4: # co alarm = gas_value > 10.0 elif gas_type == 5: # o2 alarm = gas_value < 15 elif gas_type == 50: # ex alarm = gas_value > 10 if alarm: alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] if alarm_dict: self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 def model_predict(self, frames): result_boxes = [] safe_result_boxes = [] results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, classes=list(self.model_classes.keys()), imgsz=640, stream=True) for r in results_generator: result_boxes.append(r.boxes) safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, classes=list(self.safe_model_classes.keys()), imgsz=640, stream=True) for s in safe_results_generator: safe_result_boxes.append(s.boxes) return result_boxes, safe_result_boxes def handle_behave_alarm(self, frames, result_boxes, safe_result_boxes): behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] for alarm_dict in behave_alarm_dicts: use_safe_model = alarm_dict.get('model_type', 'labor') == 'safe' boxes = safe_result_boxes if use_safe_model else result_boxes model_classes = self.safe_model_classes if use_safe_model else self.model_classes for idx, frame_boxes in enumerate(boxes): frame = frames[idx] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] if alarm_dict['handelType'] == 0: # 检测到就报警 if object_boxes: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) # 红色标注目标设备,蓝色标注正常施工设备 for box in frame_boxes: box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE annotator.box_label(box.xyxy.cpu().squeeze(), f"{model_classes[int(box.cls)]}", color=box_color, rotated=False) # 蓝色标注人和正常穿戴设备 for box in safe_result_boxes[idx]: box_cls = int(box.cls) if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.safe_model_classes[box_cls]}", color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 1: # 检测不到报警 if object_boxes: self.object_ts_dict[alarm_dict['alarm_name']] = time.time() else: last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0) if time.time() - last_ts > 5: self.object_ts_dict[alarm_dict['alarm_name']] = time.time() self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) # 蓝色标注正常施工设备 for box in frame_boxes: annotator.box_label(box.xyxy.cpu().squeeze(), f"{model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) # 蓝色标注人和正常穿戴设备 for box in safe_result_boxes[idx]: box_cls = int(box.cls) if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.safe_model_classes[box_cls]}", color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 2: # 人未穿戴报警 person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] has_alarm = False annotator = None for person_box in person_boxes: if person_box.id is None: continue person_bbox = person_box.xyxy.cpu().squeeze() person_id = int(person_box.id) # 检查这个人是否佩戴了安全帽 has_helmet = True person_head = get_person_head(person_bbox, head_boxes) if person_head is not None: has_helmet = any( is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) for helmet in object_boxes) person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 if not has_helmet: person_status[alarm_dict['alarm_name']] += 1 else: person_status[alarm_dict['alarm_name']] = 0 person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): # 蓝色标注正常穿戴设备 for box in frame_boxes: box_cls = int(box.cls) if box_cls in self.SAFETY_CLASS_IDX: annotator.box_label(box.xyxy.cpu().squeeze(), f"{model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) # 蓝色标注正常施工设备 for box in result_boxes[idx]: annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] has_alarm = False annotator = None for person_box in person_boxes: if person_box.id is None: continue person_bbox = person_box.xyxy.cpu().squeeze() person_id = int(person_box.id) person_object_box = max( (box for box in object_boxes if is_overlapping(person_bbox, box.xyxy.cpu().squeeze())), key=lambda box: box.conf.item(), default=None ) has_object = person_object_box is not None person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 if has_object: person_status[alarm_dict['alarm_name']] += 1 else: person_status[alarm_dict['alarm_name']] = 0 person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人、目标 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): # 蓝色标注正常穿戴设备 for box in frame_boxes: box_cls = int(box.cls) if box_cls in self.SAFETY_CLASS_IDX: annotator.box_label(box.xyxy.cpu().squeeze(), f"{model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) # 蓝色标注正常施工设备 for box in result_boxes[idx]: annotator.box_label(box.xyxy.cpu().squeeze(), f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] for alarm_dict in break_in_alarm_dicts: for idx, frame_boxes in enumerate(safe_result_boxes): frame = frames[idx] person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] has_alarm = False annotator = None for person_box in person_boxes: if person_box.id is None: continue person_bbox = person_box.xyxy.cpu().squeeze() person_id = int(person_box.id) has_object = True person_head = get_person_head(person_bbox, head_boxes) if person_head is not None: overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( person_bbox) if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 has_object = any( is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) for object_boxe in object_boxes) person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): # 未检测到帧数 +1 person_status[alarm_dict['alarm_name']] += 1 else: # 未检测到帧数 清零 person_status[alarm_dict['alarm_name']] = 0 person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) def log_fps(self, frame_count): self.frames_detected += frame_count current_time = time.time() # 每秒输出 FPS if self.fps_ts is None or current_time - self.fps_ts >= 10: fps = self.frames_detected / 10.0 self.frames_detected = 0 logger.info(f"FPS (detect) for device {self.device.code}: {fps}") self.fps_ts = current_time def run(self): while not self.stream_loader.init: if self.__stop_event.is_set(): break # 如果触发了停止事件,则退出循环 self.stream_loader.init_cap() for frames in self.stream_loader: try: if self.__stop_event.is_set(): break # 如果触发了停止事件,则退出循环 if not frames: continue t1 = time.time() self.device_status_manager.set_status(device_id=self.device.id) result_boxes, safe_result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame t2 = time.time() for idx, frame_boxes in enumerate(safe_result_boxes): current_person_ids = {int(box.id) for box in frame_boxes if box.cls is not None and box.id is not None and int( box.cls) == self.PERSON_CLASS_IDX} for person_id in current_person_ids: if person_id not in self.tracking_status: self.tracking_status[person_id] = {} self.tracking_status[person_id]['disappear_frames'] = 0 for person_id in list(self.tracking_status.keys()): if person_id not in current_person_ids: self.tracking_status[person_id]['disappear_frames'] += 1 if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: self.tracking_status.pop(person_id) self.handle_behave_alarm(frames, result_boxes, safe_result_boxes) self.handle_break_in_alarm(frames, result_boxes, safe_result_boxes) t3 = time.time() for person_id in self.tracking_status.keys(): print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') for idx, frame in enumerate(frames): annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") frame_boxes = result_boxes[idx] for s_box in frame_boxes: annotator.box_label(s_box.xyxy.cpu().squeeze(), f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", color=colors(int(s_box.cls)), rotated=False) for s_box in safe_result_boxes[idx]: annotator.box_label(s_box.xyxy.cpu().squeeze(), f"{self.safe_model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", color=colors(int(s_box.cls)), rotated=False) self.display_frame_manager.add_frame(self.device.id, annotator.result()) # self.display_frame_manager.add_frame(self.device.id, frames[idx]) t4 = time.time() print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') self.log_fps(len(frames)) except Exception as ex: traceback.print_exc() logger.error(ex)