diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 694b825..668520b 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 694b825..668520b 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index fba1397..729d298 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -50,7 +50,7 @@ def _clean_old_messages(self): """清理长期堆积的消息""" - print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') + # print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 694b825..668520b 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index fba1397..729d298 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -50,7 +50,7 @@ def _clean_old_messages(self): """清理长期堆积的消息""" - print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') + # print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 3dd0b31..1f25d14 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,17 +4,17 @@ from copy import deepcopy from datetime import datetime - from flatbuffers.builder import np from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager -from common.image_plotting import Annotator +from common.image_plotting import Annotator, colors from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter @@ -157,20 +157,21 @@ self.object_ts_dict = {} self.thread_pool = GlobalThreadPool() - self.alarm_message_center = AlarmMessageCenter(device.id,main_loop=main_loop, tcp_manager=tcp_manager, + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) - self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval,main_loop=main_loop) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() - - self.health_device_codes = ['HWIH061000056395'] # todo - self.harmful_device_codes = [] # todo + # todo 要改成通过后台接口读取设备编号 + self.health_device_codes = ['HWIH061000056395'] + self.harmful_device_codes = ['862635063168165A'] for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: - self.thread_pool.submit_task(self.harmful_data_task, harmful_device_code) + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) self.thread_pool.submit_task(self.alarm_message_center.process_messages) @@ -206,6 +207,9 @@ self.max_missing_frames = 25 # 报警的阈值 self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.frames_detected = 0 + self.fps_ts = None + def get_absolute_range(self): fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: @@ -222,6 +226,7 @@ else: return None + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) @@ -233,6 +238,25 @@ self.harmful_ts_dict[ts_key] = gas_ts self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + time.sleep(5) + def health_data_task(self, helmet_code): while not self.__stop_event.is_set(): header = { @@ -250,7 +274,7 @@ self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), - vitalsigns_data.get('heartRate'),upload_timestamp) + vitalsigns_data.get('heartRate'), upload_timestamp) time.sleep(10) def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): @@ -262,6 +286,15 @@ self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 需要往后台发原始数据吗 + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): alarm = False gas_value = gas_data['gas_value'] @@ -281,24 +314,21 @@ # todo 需要生成报警记录吗 def model_predict(self, frames): - results_generator = self.model(frames, save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.model_classes.keys()), - imgsz=640, - stream=True) + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) result_boxes = [] for r in results_generator: result_boxes.append(r.boxes) - pred_ids = [[int(box.cls) for box in sublist] for sublist in result_boxes] - pred_names = [[self.model_classes[int(box.cls)] for box in sublist] for sublist in result_boxes] - return result_boxes, pred_ids, pred_names + return result_boxes - def handle_behave_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_behave_alarm(self, frames, result_boxes): behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] for alarm_dict in behave_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] if alarm_dict['handelType'] == 0: # 检测到就报警 if object_boxes: @@ -339,7 +369,11 @@ has_alarm = False annotator = None for person_box in person_boxes: + if person_box.id is None: + continue + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) # 检查这个人是否佩戴了安全帽 has_helmet = True person_head = get_person_head(person_bbox, head_boxes) @@ -347,11 +381,23 @@ has_helmet = any( is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -366,36 +412,51 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] for alarm_dict in break_in_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] has_alarm = False annotator = None for person_box in person_boxes: - person_bbox = person_box.xyxy.cpu().squeeze() - person_id = person_box.id - if is_within_alert_range(person_bbox, self.abs_range_points): - has_object = True - person_head = get_person_head(person_bbox, head_boxes) - if person_head is not None: - overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(person_bbox) - if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 - has_object = any( - is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) - for object_boxe in object_boxes) - if not has_object: - self.tracking_status[person_box.id] = self.tracking_status.get(person_box.id, 0) + 1 + if person_box.id is None: + continue - has_alarm = True - if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): - annotator = Annotator(deepcopy(frame)) if annotator is None else annotator - annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -403,6 +464,16 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + def run(self): while not self.stream_loader.init: if self.__stop_event.is_set(): @@ -416,10 +487,40 @@ continue self.device_status_manager.set_status(device_id=self.device.id) - result_boxes, pred_ids, pred_names = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame - # print(pred_names) - self.handle_behave_alarm(frames, result_boxes, pred_ids, pred_names) - self.handle_break_in_alarm(frames, result_boxes, pred_ids, pred_names) + result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + for idx, frame_boxes in enumerate(result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes) + self.handle_break_in_alarm(frames, result_boxes) + + # for person_id in self.tracking_status.keys(): + # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + # for idx, frame in enumerate(frames): + # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + # frame_boxes = result_boxes[idx] + # for s_box in frame_boxes: + # annotator.box_label(s_box.xyxy.cpu().squeeze(), + # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + # color=colors(int(s_box.cls)), + # rotated=False) + # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + self.log_fps(len(frames)) except Exception as ex: traceback.print_exc() diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 694b825..668520b 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index fba1397..729d298 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -50,7 +50,7 @@ def _clean_old_messages(self): """清理长期堆积的消息""" - print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') + # print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 3dd0b31..1f25d14 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,17 +4,17 @@ from copy import deepcopy from datetime import datetime - from flatbuffers.builder import np from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager -from common.image_plotting import Annotator +from common.image_plotting import Annotator, colors from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter @@ -157,20 +157,21 @@ self.object_ts_dict = {} self.thread_pool = GlobalThreadPool() - self.alarm_message_center = AlarmMessageCenter(device.id,main_loop=main_loop, tcp_manager=tcp_manager, + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) - self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval,main_loop=main_loop) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() - - self.health_device_codes = ['HWIH061000056395'] # todo - self.harmful_device_codes = [] # todo + # todo 要改成通过后台接口读取设备编号 + self.health_device_codes = ['HWIH061000056395'] + self.harmful_device_codes = ['862635063168165A'] for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: - self.thread_pool.submit_task(self.harmful_data_task, harmful_device_code) + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) self.thread_pool.submit_task(self.alarm_message_center.process_messages) @@ -206,6 +207,9 @@ self.max_missing_frames = 25 # 报警的阈值 self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.frames_detected = 0 + self.fps_ts = None + def get_absolute_range(self): fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: @@ -222,6 +226,7 @@ else: return None + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) @@ -233,6 +238,25 @@ self.harmful_ts_dict[ts_key] = gas_ts self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + time.sleep(5) + def health_data_task(self, helmet_code): while not self.__stop_event.is_set(): header = { @@ -250,7 +274,7 @@ self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), - vitalsigns_data.get('heartRate'),upload_timestamp) + vitalsigns_data.get('heartRate'), upload_timestamp) time.sleep(10) def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): @@ -262,6 +286,15 @@ self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 需要往后台发原始数据吗 + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): alarm = False gas_value = gas_data['gas_value'] @@ -281,24 +314,21 @@ # todo 需要生成报警记录吗 def model_predict(self, frames): - results_generator = self.model(frames, save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.model_classes.keys()), - imgsz=640, - stream=True) + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) result_boxes = [] for r in results_generator: result_boxes.append(r.boxes) - pred_ids = [[int(box.cls) for box in sublist] for sublist in result_boxes] - pred_names = [[self.model_classes[int(box.cls)] for box in sublist] for sublist in result_boxes] - return result_boxes, pred_ids, pred_names + return result_boxes - def handle_behave_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_behave_alarm(self, frames, result_boxes): behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] for alarm_dict in behave_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] if alarm_dict['handelType'] == 0: # 检测到就报警 if object_boxes: @@ -339,7 +369,11 @@ has_alarm = False annotator = None for person_box in person_boxes: + if person_box.id is None: + continue + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) # 检查这个人是否佩戴了安全帽 has_helmet = True person_head = get_person_head(person_bbox, head_boxes) @@ -347,11 +381,23 @@ has_helmet = any( is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -366,36 +412,51 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] for alarm_dict in break_in_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] has_alarm = False annotator = None for person_box in person_boxes: - person_bbox = person_box.xyxy.cpu().squeeze() - person_id = person_box.id - if is_within_alert_range(person_bbox, self.abs_range_points): - has_object = True - person_head = get_person_head(person_bbox, head_boxes) - if person_head is not None: - overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(person_bbox) - if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 - has_object = any( - is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) - for object_boxe in object_boxes) - if not has_object: - self.tracking_status[person_box.id] = self.tracking_status.get(person_box.id, 0) + 1 + if person_box.id is None: + continue - has_alarm = True - if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): - annotator = Annotator(deepcopy(frame)) if annotator is None else annotator - annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -403,6 +464,16 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + def run(self): while not self.stream_loader.init: if self.__stop_event.is_set(): @@ -416,10 +487,40 @@ continue self.device_status_manager.set_status(device_id=self.device.id) - result_boxes, pred_ids, pred_names = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame - # print(pred_names) - self.handle_behave_alarm(frames, result_boxes, pred_ids, pred_names) - self.handle_break_in_alarm(frames, result_boxes, pred_ids, pred_names) + result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + for idx, frame_boxes in enumerate(result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes) + self.handle_break_in_alarm(frames, result_boxes) + + # for person_id in self.tracking_status.keys(): + # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + # for idx, frame in enumerate(frames): + # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + # frame_boxes = result_boxes[idx] + # for s_box in frame_boxes: + # annotator.box_label(s_box.xyxy.cpu().squeeze(), + # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + # color=colors(int(s_box.cls)), + # rotated=False) + # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + self.log_fps(len(frames)) except Exception as ex: traceback.print_exc() diff --git a/services/alarm_record_service.py b/services/alarm_record_service.py index a0a86f9..dc95806 100644 --- a/services/alarm_record_service.py +++ b/services/alarm_record_service.py @@ -7,8 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from entity.alarm_record import AlarmRecordCreate, AlarmRecord - - +from sqlmodel import select, delete +from sqlalchemy import func class AlarmRecordService: def __init__(self, db: AsyncSession): self.db = db @@ -44,3 +44,14 @@ await self.db.commit() await self.db.refresh(alarm_record) return alarm_record + + async def delete_frame(self, max_alarm_id: int): + statement = delete(AlarmRecord).where(AlarmRecord.id <= max_alarm_id) + await self.db.execute(statement) + await self.db.commit() + return max_alarm_id + + async def select_max_id(self, latest_time): + statement = select(func.max(AlarmRecord.id)).where(AlarmRecord.alarm_time < latest_time) + result = await self.db.execute(statement) + return result.scalar_one() or 0 diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 694b825..668520b 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index fba1397..729d298 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -50,7 +50,7 @@ def _clean_old_messages(self): """清理长期堆积的消息""" - print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') + # print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 3dd0b31..1f25d14 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,17 +4,17 @@ from copy import deepcopy from datetime import datetime - from flatbuffers.builder import np from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager -from common.image_plotting import Annotator +from common.image_plotting import Annotator, colors from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter @@ -157,20 +157,21 @@ self.object_ts_dict = {} self.thread_pool = GlobalThreadPool() - self.alarm_message_center = AlarmMessageCenter(device.id,main_loop=main_loop, tcp_manager=tcp_manager, + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) - self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval,main_loop=main_loop) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() - - self.health_device_codes = ['HWIH061000056395'] # todo - self.harmful_device_codes = [] # todo + # todo 要改成通过后台接口读取设备编号 + self.health_device_codes = ['HWIH061000056395'] + self.harmful_device_codes = ['862635063168165A'] for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: - self.thread_pool.submit_task(self.harmful_data_task, harmful_device_code) + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) self.thread_pool.submit_task(self.alarm_message_center.process_messages) @@ -206,6 +207,9 @@ self.max_missing_frames = 25 # 报警的阈值 self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.frames_detected = 0 + self.fps_ts = None + def get_absolute_range(self): fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: @@ -222,6 +226,7 @@ else: return None + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) @@ -233,6 +238,25 @@ self.harmful_ts_dict[ts_key] = gas_ts self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + time.sleep(5) + def health_data_task(self, helmet_code): while not self.__stop_event.is_set(): header = { @@ -250,7 +274,7 @@ self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), - vitalsigns_data.get('heartRate'),upload_timestamp) + vitalsigns_data.get('heartRate'), upload_timestamp) time.sleep(10) def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): @@ -262,6 +286,15 @@ self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 需要往后台发原始数据吗 + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): alarm = False gas_value = gas_data['gas_value'] @@ -281,24 +314,21 @@ # todo 需要生成报警记录吗 def model_predict(self, frames): - results_generator = self.model(frames, save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.model_classes.keys()), - imgsz=640, - stream=True) + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) result_boxes = [] for r in results_generator: result_boxes.append(r.boxes) - pred_ids = [[int(box.cls) for box in sublist] for sublist in result_boxes] - pred_names = [[self.model_classes[int(box.cls)] for box in sublist] for sublist in result_boxes] - return result_boxes, pred_ids, pred_names + return result_boxes - def handle_behave_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_behave_alarm(self, frames, result_boxes): behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] for alarm_dict in behave_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] if alarm_dict['handelType'] == 0: # 检测到就报警 if object_boxes: @@ -339,7 +369,11 @@ has_alarm = False annotator = None for person_box in person_boxes: + if person_box.id is None: + continue + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) # 检查这个人是否佩戴了安全帽 has_helmet = True person_head = get_person_head(person_bbox, head_boxes) @@ -347,11 +381,23 @@ has_helmet = any( is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -366,36 +412,51 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] for alarm_dict in break_in_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] has_alarm = False annotator = None for person_box in person_boxes: - person_bbox = person_box.xyxy.cpu().squeeze() - person_id = person_box.id - if is_within_alert_range(person_bbox, self.abs_range_points): - has_object = True - person_head = get_person_head(person_bbox, head_boxes) - if person_head is not None: - overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(person_bbox) - if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 - has_object = any( - is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) - for object_boxe in object_boxes) - if not has_object: - self.tracking_status[person_box.id] = self.tracking_status.get(person_box.id, 0) + 1 + if person_box.id is None: + continue - has_alarm = True - if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): - annotator = Annotator(deepcopy(frame)) if annotator is None else annotator - annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -403,6 +464,16 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + def run(self): while not self.stream_loader.init: if self.__stop_event.is_set(): @@ -416,10 +487,40 @@ continue self.device_status_manager.set_status(device_id=self.device.id) - result_boxes, pred_ids, pred_names = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame - # print(pred_names) - self.handle_behave_alarm(frames, result_boxes, pred_ids, pred_names) - self.handle_break_in_alarm(frames, result_boxes, pred_ids, pred_names) + result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + for idx, frame_boxes in enumerate(result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes) + self.handle_break_in_alarm(frames, result_boxes) + + # for person_id in self.tracking_status.keys(): + # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + # for idx, frame in enumerate(frames): + # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + # frame_boxes = result_boxes[idx] + # for s_box in frame_boxes: + # annotator.box_label(s_box.xyxy.cpu().squeeze(), + # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + # color=colors(int(s_box.cls)), + # rotated=False) + # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + self.log_fps(len(frames)) except Exception as ex: traceback.print_exc() diff --git a/services/alarm_record_service.py b/services/alarm_record_service.py index a0a86f9..dc95806 100644 --- a/services/alarm_record_service.py +++ b/services/alarm_record_service.py @@ -7,8 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from entity.alarm_record import AlarmRecordCreate, AlarmRecord - - +from sqlmodel import select, delete +from sqlalchemy import func class AlarmRecordService: def __init__(self, db: AsyncSession): self.db = db @@ -44,3 +44,14 @@ await self.db.commit() await self.db.refresh(alarm_record) return alarm_record + + async def delete_frame(self, max_alarm_id: int): + statement = delete(AlarmRecord).where(AlarmRecord.id <= max_alarm_id) + await self.db.execute(statement) + await self.db.commit() + return max_alarm_id + + async def select_max_id(self, latest_time): + statement = select(func.max(AlarmRecord.id)).where(AlarmRecord.alarm_time < latest_time) + result = await self.db.execute(statement) + return result.scalar_one() or 0 diff --git a/services/schedule_job.py b/services/schedule_job.py index def2d1d..a6e5a2f 100644 --- a/services/schedule_job.py +++ b/services/schedule_job.py @@ -7,6 +7,7 @@ from common.global_logger import logger from db.database import get_db +from services.alarm_record_service import AlarmRecordService from services.device_frame_service import DeviceFrameService from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -18,6 +19,7 @@ async def start_scheduler(): await delete_frames() + await delete_alarms() await compress_day_logs() await organize_and_compress_month_logs() @@ -29,6 +31,8 @@ :return: ''' base_folder = './storage/frames' + if not os.path.exists(base_folder): + return async for db in get_db(): frame_service = DeviceFrameService(db) @@ -58,6 +62,43 @@ # 忽略无法解析为日期的文件夹名称 logger.warning(f"Skipping non-date folder: {folder_name}") +async def delete_alarms(save_month=3): + ''' + 删除3个月前的报警记录 + :param save_month: 默认保留3个月的报警记录 + :return: + ''' + base_folder = './storage/alarms' + if not os.path.exists(base_folder): + return + + async for db in get_db(): + alarm_service = AlarmRecordService(db) + + now = datetime.now() + latest_time = now - relativedelta(months=save_month) + latest_time = latest_time.replace(hour=0, minute=0, second=0, microsecond=0) + + max_alarm_id = await alarm_service.select_max_id(latest_time) + if max_alarm_id > 0: + await alarm_service.delete_frame(max_alarm_id) + logger.info(f"Delete alarms before {latest_time}, max_alarm_id = {max_alarm_id}") + + # 遍历子文件夹 + for folder_name in os.listdir(base_folder): + folder_path = os.path.join(base_folder, folder_name) + if os.path.isdir(folder_path): + try: + # 尝试将文件夹名称解析为日期 + folder_date = datetime.strptime(folder_name, "%Y-%m-%d") + folder_date = folder_date.replace(hour=0, minute=0, second=0, microsecond=0) + # 比较日期并删除早于3个月前的文件夹 + if folder_date < latest_time: + logger.info(f"Deleting folder: {folder_path}") + shutil.rmtree(folder_path) # 递归删除文件夹及其内容 + except ValueError: + # 忽略无法解析为日期的文件夹名称 + logger.warning(f"Skipping non-date folder: {folder_name}") async def compress_day_logs(): """ diff --git a/db/safe-algo-pro.db b/db/safe-algo-pro.db index 694b825..668520b 100644 --- a/db/safe-algo-pro.db +++ b/db/safe-algo-pro.db Binary files differ diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index fba1397..729d298 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -50,7 +50,7 @@ def _clean_old_messages(self): """清理长期堆积的消息""" - print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') + # print(f'清理长期堆积的消息 (队列长度: {len(self.queue)})') now = time.time() with self.lock: self.queue = deque([msg for msg in self.queue if now - msg['timestamp'] <= self.retention_time]) diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index 3dd0b31..1f25d14 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -4,17 +4,17 @@ from copy import deepcopy from datetime import datetime - from flatbuffers.builder import np from scipy.spatial import ConvexHull from algo.stream_loader import OpenCVStreamLoad from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager from common.global_logger import logger from common.global_thread_pool import GlobalThreadPool from common.harmful_gas_manager import HarmfulGasManager -from common.image_plotting import Annotator +from common.image_plotting import Annotator, colors from entity.device import Device from scene_handler.alarm_message_center import AlarmMessageCenter from scene_handler.alarm_record_center import AlarmRecordCenter @@ -157,20 +157,21 @@ self.object_ts_dict = {} self.thread_pool = GlobalThreadPool() - self.alarm_message_center = AlarmMessageCenter(device.id,main_loop=main_loop, tcp_manager=tcp_manager, + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, category_priority={2: 0, 1: 1, 3: 2, 0: 3}) - self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval,main_loop=main_loop) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) self.harmful_data_manager = HarmfulGasManager() self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() - - self.health_device_codes = ['HWIH061000056395'] # todo - self.harmful_device_codes = [] # todo + # todo 要改成通过后台接口读取设备编号 + self.health_device_codes = ['HWIH061000056395'] + self.harmful_device_codes = ['862635063168165A'] for helmet_code in self.health_device_codes: self.thread_pool.submit_task(self.health_data_task, helmet_code) for harmful_device_code in self.harmful_device_codes: - self.thread_pool.submit_task(self.harmful_data_task, harmful_device_code) + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) self.thread_pool.submit_task(self.alarm_message_center.process_messages) @@ -206,6 +207,9 @@ self.max_missing_frames = 25 # 报警的阈值 self.disappear_threshold = 25 * 3 # 移除行人的阈值 + self.frames_detected = 0 + self.fps_ts = None + def get_absolute_range(self): fence_info = eval(self.range_points) if fence_info and len(fence_info) > 1: @@ -222,6 +226,7 @@ else: return None + # 一体机直接接收四合一浓度 def harmful_data_task(self, harmful_device_code): while not self.__stop_event.is_set(): harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) @@ -233,6 +238,25 @@ self.harmful_ts_dict[ts_key] = gas_ts self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60 * 60*24*10: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code,ch4, co, h2s, o2) + time.sleep(5) + def health_data_task(self, helmet_code): while not self.__stop_event.is_set(): header = { @@ -250,7 +274,7 @@ self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), - vitalsigns_data.get('heartRate'),upload_timestamp) + vitalsigns_data.get('heartRate'), upload_timestamp) time.sleep(10) def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): @@ -262,6 +286,15 @@ self.alarm_message_center.add_message(alarm_dict[0]) # todo 需要生成报警记录吗 需要往后台发原始数据吗 + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): alarm = False gas_value = gas_data['gas_value'] @@ -281,24 +314,21 @@ # todo 需要生成报警记录吗 def model_predict(self, frames): - results_generator = self.model(frames, save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.model_classes.keys()), - imgsz=640, - stream=True) + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) result_boxes = [] for r in results_generator: result_boxes.append(r.boxes) - pred_ids = [[int(box.cls) for box in sublist] for sublist in result_boxes] - pred_names = [[self.model_classes[int(box.cls)] for box in sublist] for sublist in result_boxes] - return result_boxes, pred_ids, pred_names + return result_boxes - def handle_behave_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_behave_alarm(self, frames, result_boxes): behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] for alarm_dict in behave_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] if alarm_dict['handelType'] == 0: # 检测到就报警 if object_boxes: @@ -339,7 +369,11 @@ has_alarm = False annotator = None for person_box in person_boxes: + if person_box.id is None: + continue + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) # 检查这个人是否佩戴了安全帽 has_helmet = True person_head = get_person_head(person_bbox, head_boxes) @@ -347,11 +381,23 @@ has_helmet = any( is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: has_alarm = True if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): annotator = Annotator(deepcopy(frame)) if annotator is None else annotator annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -366,36 +412,51 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, pred_ids, pred_names): + def handle_break_in_alarm(self, frames, result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] for alarm_dict in break_in_alarm_dicts: for idx, frame_boxes in enumerate(result_boxes): frame = frames[idx] - frame_ids, frame_names = pred_ids[idx], pred_names[idx] person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] has_alarm = False annotator = None for person_box in person_boxes: - person_bbox = person_box.xyxy.cpu().squeeze() - person_id = person_box.id - if is_within_alert_range(person_bbox, self.abs_range_points): - has_object = True - person_head = get_person_head(person_bbox, head_boxes) - if person_head is not None: - overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(person_bbox) - if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 - has_object = any( - is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) - for object_boxe in object_boxes) - if not has_object: - self.tracking_status[person_box.id] = self.tracking_status.get(person_box.id, 0) + 1 + if person_box.id is None: + continue - has_alarm = True - if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): - annotator = Annotator(deepcopy(frame)) if annotator is None else annotator - annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 if has_alarm: self.alarm_message_center.add_message(alarm_dict) @@ -403,6 +464,16 @@ self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, annotator.result()) + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + def run(self): while not self.stream_loader.init: if self.__stop_event.is_set(): @@ -416,10 +487,40 @@ continue self.device_status_manager.set_status(device_id=self.device.id) - result_boxes, pred_ids, pred_names = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame - # print(pred_names) - self.handle_behave_alarm(frames, result_boxes, pred_ids, pred_names) - self.handle_break_in_alarm(frames, result_boxes, pred_ids, pred_names) + result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + for idx, frame_boxes in enumerate(result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes) + self.handle_break_in_alarm(frames, result_boxes) + + # for person_id in self.tracking_status.keys(): + # print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + # for idx, frame in enumerate(frames): + # annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + # frame_boxes = result_boxes[idx] + # for s_box in frame_boxes: + # annotator.box_label(s_box.xyxy.cpu().squeeze(), + # f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + # color=colors(int(s_box.cls)), + # rotated=False) + # self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + self.log_fps(len(frames)) except Exception as ex: traceback.print_exc() diff --git a/services/alarm_record_service.py b/services/alarm_record_service.py index a0a86f9..dc95806 100644 --- a/services/alarm_record_service.py +++ b/services/alarm_record_service.py @@ -7,8 +7,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from entity.alarm_record import AlarmRecordCreate, AlarmRecord - - +from sqlmodel import select, delete +from sqlalchemy import func class AlarmRecordService: def __init__(self, db: AsyncSession): self.db = db @@ -44,3 +44,14 @@ await self.db.commit() await self.db.refresh(alarm_record) return alarm_record + + async def delete_frame(self, max_alarm_id: int): + statement = delete(AlarmRecord).where(AlarmRecord.id <= max_alarm_id) + await self.db.execute(statement) + await self.db.commit() + return max_alarm_id + + async def select_max_id(self, latest_time): + statement = select(func.max(AlarmRecord.id)).where(AlarmRecord.alarm_time < latest_time) + result = await self.db.execute(statement) + return result.scalar_one() or 0 diff --git a/services/schedule_job.py b/services/schedule_job.py index def2d1d..a6e5a2f 100644 --- a/services/schedule_job.py +++ b/services/schedule_job.py @@ -7,6 +7,7 @@ from common.global_logger import logger from db.database import get_db +from services.alarm_record_service import AlarmRecordService from services.device_frame_service import DeviceFrameService from apscheduler.schedulers.asyncio import AsyncIOScheduler @@ -18,6 +19,7 @@ async def start_scheduler(): await delete_frames() + await delete_alarms() await compress_day_logs() await organize_and_compress_month_logs() @@ -29,6 +31,8 @@ :return: ''' base_folder = './storage/frames' + if not os.path.exists(base_folder): + return async for db in get_db(): frame_service = DeviceFrameService(db) @@ -58,6 +62,43 @@ # 忽略无法解析为日期的文件夹名称 logger.warning(f"Skipping non-date folder: {folder_name}") +async def delete_alarms(save_month=3): + ''' + 删除3个月前的报警记录 + :param save_month: 默认保留3个月的报警记录 + :return: + ''' + base_folder = './storage/alarms' + if not os.path.exists(base_folder): + return + + async for db in get_db(): + alarm_service = AlarmRecordService(db) + + now = datetime.now() + latest_time = now - relativedelta(months=save_month) + latest_time = latest_time.replace(hour=0, minute=0, second=0, microsecond=0) + + max_alarm_id = await alarm_service.select_max_id(latest_time) + if max_alarm_id > 0: + await alarm_service.delete_frame(max_alarm_id) + logger.info(f"Delete alarms before {latest_time}, max_alarm_id = {max_alarm_id}") + + # 遍历子文件夹 + for folder_name in os.listdir(base_folder): + folder_path = os.path.join(base_folder, folder_name) + if os.path.isdir(folder_path): + try: + # 尝试将文件夹名称解析为日期 + folder_date = datetime.strptime(folder_name, "%Y-%m-%d") + folder_date = folder_date.replace(hour=0, minute=0, second=0, microsecond=0) + # 比较日期并删除早于3个月前的文件夹 + if folder_date < latest_time: + logger.info(f"Deleting folder: {folder_path}") + shutil.rmtree(folder_path) # 递归删除文件夹及其内容 + except ValueError: + # 忽略无法解析为日期的文件夹名称 + logger.warning(f"Skipping non-date folder: {folder_name}") async def compress_day_logs(): """ diff --git a/tcp/tcp_client_connector.py b/tcp/tcp_client_connector.py index b3b8169..de6205f 100644 --- a/tcp/tcp_client_connector.py +++ b/tcp/tcp_client_connector.py @@ -139,8 +139,9 @@ try: logger.info(f"Start querying gas from {self.ip}...") while True: - await self.send_message(TREE_COMMAND.GAS_QUERY, have_response=True) - await asyncio.sleep(self.query_interval) + if not self.is_reconnecting: + await self.send_message(TREE_COMMAND.GAS_QUERY, have_response=True) + await asyncio.sleep(self.query_interval) except (ConnectionResetError, asyncio.IncompleteReadError) as e: logger.error(f"Error during query for {self.ip}:{self.port}: {e}") await self.reconnect() @@ -168,7 +169,7 @@ async def gas_push(self, data_gas): global_config = GlobalConfig() gas_push_config = global_config.get_gas_push_config() - if gas_push_config and gas_push_config.push_url: + if gas_push_config and gas_push_config.push_url and gas_push_config.push_interval > 0: last_ts = self.push_ts_dict.get(data_gas.device_code) current_time = datetime.now()