diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 051927e..95434bb 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -120,3 +120,35 @@ message=message['alarmSoundMessage'], have_response=False), self.main_loop) + + def send_immediate_command(self, command): + """ + 立即发送指定的指令,不经过消息队列。 + 参数: + command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 + """ + print(f"立即发送指令: {command}") + if self.tcp_manager: + # 直接调用 tcp_manager 的发送方法,将指令传递过去 + asyncio.run_coroutine_threadsafe( + self.tcp_manager.send_message_to_device( + device_id=self.device_id, + message=command, + have_response=False + ), + self.main_loop + ) + + def delete_messages(self, condition): + """ + 删除队列中符合条件的报警消息。 + + 参数: + condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 + 例如:lambda msg: msg['alarmCategory'] == 1 + """ + with self.lock: + original_length = len(self.queue) + # 过滤掉满足 condition 的消息,保留其余消息 + self.queue = deque([msg for msg in self.queue if not condition(msg)]) + print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。") diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 051927e..95434bb 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -120,3 +120,35 @@ message=message['alarmSoundMessage'], have_response=False), self.main_loop) + + def send_immediate_command(self, command): + """ + 立即发送指定的指令,不经过消息队列。 + 参数: + command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 + """ + print(f"立即发送指令: {command}") + if self.tcp_manager: + # 直接调用 tcp_manager 的发送方法,将指令传递过去 + asyncio.run_coroutine_threadsafe( + self.tcp_manager.send_message_to_device( + device_id=self.device_id, + message=command, + have_response=False + ), + self.main_loop + ) + + def delete_messages(self, condition): + """ + 删除队列中符合条件的报警消息。 + + 参数: + condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 + 例如:lambda msg: msg['alarmCategory'] == 1 + """ + with self.lock: + original_length = len(self.queue) + # 过滤掉满足 condition 的消息,保留其余消息 + self.queue = deque([msg for msg in self.queue if not condition(msg)]) + print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。") diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index db0b841..a365713 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -103,25 +103,25 @@ }, { 'alarmCategory': 0, - 'alarmType': '19', # todo + 'alarmType': '19', # todo 'handelType': 4, 'category_order': -1, 'class_idx': [4], 'alarm_name': 'cigarette', 'alarmContent': '吸烟', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '2', - 'handelType': 4, # todo + 'handelType': 4, # todo 'category_order': -1, 'class_idx': [5], 'alarm_name': 'phone', 'alarmContent': '打电话', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', 'model_type': 'safe', }, @@ -176,8 +176,9 @@ COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) -HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 -HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + def get_group_device_list(device_code): health_device_codes = [] @@ -187,8 +188,10 @@ if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: - health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] - harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes @@ -251,7 +254,7 @@ self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} self.PERSON_CLASS_IDX = 0 self.HEAD_CLASS_IDX = 1 - self.SAFETY_CLASS_IDX = [2,3,6] + self.SAFETY_CLASS_IDX = [2, 3, 6] self.vid_stride = 3 @@ -307,7 +310,8 @@ if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], - alarm_np_img=None,alarm_value=gas_data.gas_value) + alarm_np_img=None, + alarm_value=gas_data.gas_value) except Exception as e: print(f"Error in latest_query: {e}") @@ -410,10 +414,10 @@ for r in results_generator: result_boxes.append(r.boxes) - safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.safe_model_classes.keys()), - imgsz=640, - stream=True) + safe_results_generator = self.safe_model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) for s in safe_results_generator: safe_result_boxes.append(s.boxes) @@ -531,7 +535,7 @@ f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) - + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 @@ -551,7 +555,6 @@ ) has_object = person_object_box is not None - person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 @@ -567,7 +570,8 @@ annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人、目标 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) - annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(), '', color=COLOR_RED, + rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 @@ -590,7 +594,6 @@ rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 051927e..95434bb 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -120,3 +120,35 @@ message=message['alarmSoundMessage'], have_response=False), self.main_loop) + + def send_immediate_command(self, command): + """ + 立即发送指定的指令,不经过消息队列。 + 参数: + command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 + """ + print(f"立即发送指令: {command}") + if self.tcp_manager: + # 直接调用 tcp_manager 的发送方法,将指令传递过去 + asyncio.run_coroutine_threadsafe( + self.tcp_manager.send_message_to_device( + device_id=self.device_id, + message=command, + have_response=False + ), + self.main_loop + ) + + def delete_messages(self, condition): + """ + 删除队列中符合条件的报警消息。 + + 参数: + condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 + 例如:lambda msg: msg['alarmCategory'] == 1 + """ + with self.lock: + original_length = len(self.queue) + # 过滤掉满足 condition 的消息,保留其余消息 + self.queue = deque([msg for msg in self.queue if not condition(msg)]) + print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。") diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index db0b841..a365713 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -103,25 +103,25 @@ }, { 'alarmCategory': 0, - 'alarmType': '19', # todo + 'alarmType': '19', # todo 'handelType': 4, 'category_order': -1, 'class_idx': [4], 'alarm_name': 'cigarette', 'alarmContent': '吸烟', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '2', - 'handelType': 4, # todo + 'handelType': 4, # todo 'category_order': -1, 'class_idx': [5], 'alarm_name': 'phone', 'alarmContent': '打电话', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', 'model_type': 'safe', }, @@ -176,8 +176,9 @@ COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) -HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 -HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + def get_group_device_list(device_code): health_device_codes = [] @@ -187,8 +188,10 @@ if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: - health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] - harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes @@ -251,7 +254,7 @@ self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} self.PERSON_CLASS_IDX = 0 self.HEAD_CLASS_IDX = 1 - self.SAFETY_CLASS_IDX = [2,3,6] + self.SAFETY_CLASS_IDX = [2, 3, 6] self.vid_stride = 3 @@ -307,7 +310,8 @@ if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], - alarm_np_img=None,alarm_value=gas_data.gas_value) + alarm_np_img=None, + alarm_value=gas_data.gas_value) except Exception as e: print(f"Error in latest_query: {e}") @@ -410,10 +414,10 @@ for r in results_generator: result_boxes.append(r.boxes) - safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.safe_model_classes.keys()), - imgsz=640, - stream=True) + safe_results_generator = self.safe_model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) for s in safe_results_generator: safe_result_boxes.append(s.boxes) @@ -531,7 +535,7 @@ f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) - + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 @@ -551,7 +555,6 @@ ) has_object = person_object_box is not None - person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 @@ -567,7 +570,8 @@ annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人、目标 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) - annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(), '', color=COLOR_RED, + rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 @@ -590,7 +594,6 @@ rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py new file mode 100644 index 0000000..740e88f --- /dev/null +++ b/scene_handler/helmet_data_processor.py @@ -0,0 +1,89 @@ +import time +from datetime import datetime + +from common.http_utils import get_request +from scene_handler.alarm_record_center import AlarmRecordCenter + +ALARM_DICT = { + 'health_blood_oxygen': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + 'health_heartrate': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员心率异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + } +} + + +class HelmetDataProcessor: + def __init__(self, helmet_code, alarm_record_center: AlarmRecordCenter): + self.helmet_code = helmet_code + self.alarm_record_center = alarm_record_center + self.url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' + self.header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + self.last_ts = None # 上次读取的数据的生成时间戳 + + def getNewData(self): + """ + 阻塞进程 + :return: + """ + while True: + print(f"访问{self.helmet_code}心率血氧数据...") + response = get_request(self.url, headers=self.header) + if response and response.get('data'): + print("访问到心率血氧数据") + vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 + if vitalsigns_data: # 访问成功 + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), + "%Y-%m-%d %H:%M:%S") # 访问数据的时间 + if self.last_ts is None or ( + upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 + self.last_ts = upload_timestamp.timestamp() # 更新数据 + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 + return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') + else: + print("无法访问到心率血氧数据") + time.sleep(5) + + def isDataNormal(self, blood_oxygen, heartrate): + if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 + return False + else: + return True + + def sendAlarmRecord(self, blood_oxygen, heartrate): + if not self.isBloodOxygenNormal(blood_oxygen): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_blood_oxygen'], + alarm_value=blood_oxygen) + if not self.isHeartRateNormal(heartrate): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_heartrate'], + heartrate) + + def isBloodOxygenNormal(self, blood_oxygen): + if blood_oxygen < 85: + return False + else: + return True + + def isHeartRateNormal(self, heartrate): + if heartrate < 60 or heartrate > 120: + return False + else: + return True diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 051927e..95434bb 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -120,3 +120,35 @@ message=message['alarmSoundMessage'], have_response=False), self.main_loop) + + def send_immediate_command(self, command): + """ + 立即发送指定的指令,不经过消息队列。 + 参数: + command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 + """ + print(f"立即发送指令: {command}") + if self.tcp_manager: + # 直接调用 tcp_manager 的发送方法,将指令传递过去 + asyncio.run_coroutine_threadsafe( + self.tcp_manager.send_message_to_device( + device_id=self.device_id, + message=command, + have_response=False + ), + self.main_loop + ) + + def delete_messages(self, condition): + """ + 删除队列中符合条件的报警消息。 + + 参数: + condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 + 例如:lambda msg: msg['alarmCategory'] == 1 + """ + with self.lock: + original_length = len(self.queue) + # 过滤掉满足 condition 的消息,保留其余消息 + self.queue = deque([msg for msg in self.queue if not condition(msg)]) + print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。") diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index db0b841..a365713 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -103,25 +103,25 @@ }, { 'alarmCategory': 0, - 'alarmType': '19', # todo + 'alarmType': '19', # todo 'handelType': 4, 'category_order': -1, 'class_idx': [4], 'alarm_name': 'cigarette', 'alarmContent': '吸烟', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '2', - 'handelType': 4, # todo + 'handelType': 4, # todo 'category_order': -1, 'class_idx': [5], 'alarm_name': 'phone', 'alarmContent': '打电话', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', 'model_type': 'safe', }, @@ -176,8 +176,9 @@ COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) -HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 -HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + def get_group_device_list(device_code): health_device_codes = [] @@ -187,8 +188,10 @@ if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: - health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] - harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes @@ -251,7 +254,7 @@ self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} self.PERSON_CLASS_IDX = 0 self.HEAD_CLASS_IDX = 1 - self.SAFETY_CLASS_IDX = [2,3,6] + self.SAFETY_CLASS_IDX = [2, 3, 6] self.vid_stride = 3 @@ -307,7 +310,8 @@ if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], - alarm_np_img=None,alarm_value=gas_data.gas_value) + alarm_np_img=None, + alarm_value=gas_data.gas_value) except Exception as e: print(f"Error in latest_query: {e}") @@ -410,10 +414,10 @@ for r in results_generator: result_boxes.append(r.boxes) - safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.safe_model_classes.keys()), - imgsz=640, - stream=True) + safe_results_generator = self.safe_model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) for s in safe_results_generator: safe_result_boxes.append(s.boxes) @@ -531,7 +535,7 @@ f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) - + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 @@ -551,7 +555,6 @@ ) has_object = person_object_box is not None - person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 @@ -567,7 +570,8 @@ annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人、目标 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) - annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(), '', color=COLOR_RED, + rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 @@ -590,7 +594,6 @@ rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py new file mode 100644 index 0000000..740e88f --- /dev/null +++ b/scene_handler/helmet_data_processor.py @@ -0,0 +1,89 @@ +import time +from datetime import datetime + +from common.http_utils import get_request +from scene_handler.alarm_record_center import AlarmRecordCenter + +ALARM_DICT = { + 'health_blood_oxygen': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + 'health_heartrate': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员心率异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + } +} + + +class HelmetDataProcessor: + def __init__(self, helmet_code, alarm_record_center: AlarmRecordCenter): + self.helmet_code = helmet_code + self.alarm_record_center = alarm_record_center + self.url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' + self.header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + self.last_ts = None # 上次读取的数据的生成时间戳 + + def getNewData(self): + """ + 阻塞进程 + :return: + """ + while True: + print(f"访问{self.helmet_code}心率血氧数据...") + response = get_request(self.url, headers=self.header) + if response and response.get('data'): + print("访问到心率血氧数据") + vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 + if vitalsigns_data: # 访问成功 + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), + "%Y-%m-%d %H:%M:%S") # 访问数据的时间 + if self.last_ts is None or ( + upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 + self.last_ts = upload_timestamp.timestamp() # 更新数据 + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 + return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') + else: + print("无法访问到心率血氧数据") + time.sleep(5) + + def isDataNormal(self, blood_oxygen, heartrate): + if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 + return False + else: + return True + + def sendAlarmRecord(self, blood_oxygen, heartrate): + if not self.isBloodOxygenNormal(blood_oxygen): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_blood_oxygen'], + alarm_value=blood_oxygen) + if not self.isHeartRateNormal(heartrate): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_heartrate'], + heartrate) + + def isBloodOxygenNormal(self, blood_oxygen): + if blood_oxygen < 85: + return False + else: + return True + + def isHeartRateNormal(self, heartrate): + if heartrate < 60 or heartrate > 120: + return False + else: + return True diff --git a/scene_handler/intranet_block_scene_handler.py b/scene_handler/intranet_block_scene_handler.py new file mode 100644 index 0000000..953d18a --- /dev/null +++ b/scene_handler/intranet_block_scene_handler.py @@ -0,0 +1,718 @@ +import time +import traceback +from asyncio import Event +from copy import deepcopy +from datetime import datetime + +import numpy as np +import asyncio +from scipy.spatial import ConvexHull + +from algo.stream_loader import OpenCVStreamLoad +from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area, is_overlapping +from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager +from common.global_logger import logger +from common.global_thread_pool import GlobalThreadPool +from common.harmful_gas_manager import HarmfulGasManager +from common.image_plotting import Annotator, colors +from db.database import get_db +from entity.device import Device +from scene_handler.alarm_message_center import AlarmMessageCenter +from scene_handler.alarm_record_center import AlarmRecordCenter +from scene_handler.base_scene_handler import BaseSceneHandler +from scene_handler.limit_space_scene_handler import is_overlapping +from services.data_gas_service import DataGasService +from services.global_config import GlobalConfig +from tcp.tcp_manager import TcpManager + +from entity.device import Device +from common.http_utils import get_request +from ultralytics import YOLO + +''' +alarmCategory: +0 行为监管 +1 环境监管 +2 人员监管 +3 围栏监管 + +handelType: +0 检测到报警 +1 未检测到报警 +2 人未穿戴报警 +3 其他 +4 人员检测到报警 +''' +ALARM_DICT = [ + { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 1, + 'class_idx': [34], + 'alarm_name': 'no_fire_extinguisher', + 'alarmContent': '未检测到灭火器', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '15', + 'handelType': 1, + 'category_order': 2, + 'class_idx': [43], + 'alarm_name': 'no_barrier_tape', + 'alarmContent': '未检测到警戒线', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '16', + 'handelType': 1, + 'category_order': 3, + 'class_idx': [48], + 'alarm_name': 'no_cone', + 'alarmContent': '未检测到锥桶', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '17', + 'handelType': 1, + 'category_order': 4, + 'class_idx': [4, 5, 16], + 'alarm_name': 'no_board', + 'alarmContent': '未检测到指示牌', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '18', + 'handelType': 2, + 'category_order': -1, + 'class_idx': [18], + 'alarm_name': 'no_helmet', + 'alarmContent': '未佩戴安全帽', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', + 'label': '未佩戴安全帽', + 'model_type': 'safe', + }, + { + 'alarmCategory': 0, + 'alarmType': '19', # todo + 'handelType': 4, + 'category_order': -1, + 'class_idx': [4], + 'alarm_name': 'cigarette', + 'alarmContent': '吸烟', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '吸烟', + 'model_type': 'safe', + }, + { + 'alarmCategory': 0, + 'alarmType': '2', + 'handelType': 4, # todo + 'category_order': -1, + 'class_idx': [5], + 'alarm_name': 'phone', + 'alarmContent': '打电话', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '打电话', + 'model_type': 'safe', + }, + # todo 明火 + { + 'alarmCategory': 1, + 'alarmType': '1', + 'handelType': 3, + 'category_order': 1, + 'class_idx': [], + 'alarm_name': 'gas_alarm', + 'alarmContent': '甲烷浓度超限', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', + 'label': '', + }, + { + 'alarmCategory': 1, + 'alarmType': '', + 'handelType': 3, + 'category_order': 2, + 'class_idx': [], + 'alarm_name': 'harmful_alarm', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'class_idx': [], + 'alarm_name': 'health_alarm', + 'alarmContent': '心率血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + { + 'alarmCategory': 3, + 'alarmType': '3', + 'handelType': 2, + 'category_order': 4, + 'class_idx': [3], + 'alarm_name': 'break_in_alarm', + 'alarmContent': '非法闯入', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', + 'label': '非法闯入', + 'model_type': 'safe', + }, + +] + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + +def get_group_device_list(device_code): + health_device_codes = [] + harmful_device_codes = [] + url = f'http://172.27.46.84:30003/v3/device/listGroupDevs?devcode={device_code}' + response = get_request(url) + if response and response.get('code') == 200 and response.get('data'): + data = response.get('data') + for item in data: + health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + return health_device_codes, harmful_device_codes + + +class IntranetBlockSceneHandler(BaseSceneHandler): + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): + super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) + self.__stop_event = Event(loop=main_loop) + self.health_ts_dict = {} + self.harmful_ts_dict = {} + self.object_ts_dict = {} + self.thread_pool = GlobalThreadPool() + + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # (优先级:2 > 1 > 3 > 0) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) + self.harmful_data_manager = HarmfulGasManager() + self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() + + # todo 要改成通过后台接口读取设备编号 + # self.health_device_codes = ['HWIH061000056395'] + # self.harmful_device_codes = ['862635063168165A'] + self.health_device_codes, self.harmful_device_codes = get_group_device_list(device.code) + + self.thread_pool.submit_task(self.gas_data_task, device.code) + for helmet_code in self.health_device_codes: + self.thread_pool.submit_task(self.health_data_task, helmet_code) + for harmful_device_code in self.harmful_device_codes: + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) + + self.thread_pool.submit_task(self.alarm_message_center.process_messages) + + # todo 明火 + # self.model = YOLO('weights/labor-v8-20250115-fp16.engine') + self.model = YOLO('weights/labor-v8-20241114.pt') + self.model_classes = { + # 0: '三脚架', + # 3: '人', + 4: '作业信息公示牌', + 6: '危险告知牌', + 9: '反光衣', + # 11: '呼吸面罩', + # 13: '四合一', + # 15: '头', + 16: '安全告知牌', + # 18: '安全帽', + 20: '安全标识牌', + # 24: '工服', + 34: '灭火器', + 43: '警戒线', + 48: '路锥', + 58: '鼓风机', + } + self.PERSON_CLASS_IDX = 3 + self.HEAD_CLASS_IDX = 15 + + self.safe_model = YOLO('weights/yinhuan.pt') + self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} + self.PERSON_CLASS_IDX = 0 + self.HEAD_CLASS_IDX = 1 + self.SAFETY_CLASS_IDX = [2,3,6] + + self.vid_stride = 3 + + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, + device_thread_id=thread_id, vid_stride=self.vid_stride) + self.range_points = range_points + self.abs_range_points = self.get_absolute_range() + + self.tracking_status = {} # 跟踪每个行人的状态 + self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 + self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 + + self.frames_detected = 0 + self.fps_ts = None + + def get_absolute_range(self): + if not self.range_points: + return None + + fence_info = eval(self.range_points) + if fence_info and len(fence_info) > 1: + abs_points = [] + for p in fence_info: + abs_points.append( + [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))]) + + abs_points = np.array(abs_points, dtype=np.int32) + hull = ConvexHull(abs_points) + sorted_coordinates = abs_points[hull.vertices] + # abs_points = abs_points.reshape((-1, 1, 2)) + return sorted_coordinates + else: + return None + + def gas_data_task(self, tree_device_code): + while not self.__stop_event.is_set(): + asyncio.run_coroutine_threadsafe( + self.handle_gas_alarm(tree_device_code), self.main_loop + ) + time.sleep(5) + + async def handle_gas_alarm(self, tree_device_code): + try: + async for db in get_db(): + gasService = DataGasService(db) + gas_data = await gasService.latest_query(tree_device_code) + if gas_data and time.time() - gas_data.ts.timestamp() < 60: + if gas_data.gas_value > 200.0: + alarm_dict = [d for d in ALARM_DICT if + d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], + alarm_np_img=None,alarm_value=gas_data.gas_value) + except Exception as e: + print(f"Error in latest_query: {e}") + + # 一体机直接接收四合一浓度 + def harmful_data_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) + for gas_type, gas_data in harmful_gas_data.items(): + ts_key = f'{harmful_device_code}_{gas_type}' + last_ts = self.harmful_ts_dict.get(ts_key) + gas_ts = gas_data.get('gas_ts') + if last_ts is None or (gas_ts - last_ts).total_seconds() > 0: + self.harmful_ts_dict[ts_key] = gas_ts + self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) + time.sleep(5) + + def health_data_task(self, helmet_code): + while not self.__stop_event.is_set(): + header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}' + response = get_request(url, headers=header) + if response and response.get('data'): + last_ts = self.health_ts_dict.get(helmet_code) + vitalsigns_data = response.get('data').get('vitalSignsData') + if vitalsigns_data: + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0: + self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), + vitalsigns_data.get('heartRate'), upload_timestamp) + time.sleep(10) + + def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): + logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' + f'upload_timestamp = {upload_timestamp}') + if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + # todo 需要生成报警记录吗 需要往后台发原始数据吗 + + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): + alarm = False + gas_value = gas_data['gas_value'] + if gas_type == 3: # h2s + alarm = gas_value > 120.0 + elif gas_type == 4: # co + alarm = gas_value > 10.0 + elif gas_type == 5: # o2 + alarm = gas_value < 15 + elif gas_type == 50: # ex + alarm = gas_value > 10 + + if alarm: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + # todo 需要生成报警记录吗 + + def model_predict(self, frames): + result_boxes = [] + safe_result_boxes = [] + + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) + + for r in results_generator: + result_boxes.append(r.boxes) + + safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) + for s in safe_results_generator: + safe_result_boxes.append(s.boxes) + + return result_boxes, safe_result_boxes + + def handle_behave_alarm(self, frames, result_boxes, safe_result_boxes): + behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] + for alarm_dict in behave_alarm_dicts: + use_safe_model = alarm_dict.get('model_type', 'labor') == 'safe' + boxes = safe_result_boxes if use_safe_model else result_boxes + model_classes = self.safe_model_classes if use_safe_model else self.model_classes + for idx, frame_boxes in enumerate(boxes): + frame = frames[idx] + object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] + if alarm_dict['handelType'] == 0: # 检测到就报警 + if object_boxes: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) + # 红色标注目标设备,蓝色标注正常施工设备 + for box in frame_boxes: + box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=box_color, + rotated=False) + # 蓝色标注人和正常穿戴设备 + for box in safe_result_boxes[idx]: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[box_cls]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + + elif alarm_dict['handelType'] == 1: # 检测不到报警 + if object_boxes: + self.object_ts_dict[alarm_dict['alarm_name']] = time.time() + else: + last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0) + if time.time() - last_ts > 5: + self.object_ts_dict[alarm_dict['alarm_name']] = time.time() + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) + # 蓝色标注正常施工设备 + for box in frame_boxes: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注人和正常穿戴设备 + for box in safe_result_boxes[idx]: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[box_cls]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + elif alarm_dict['handelType'] == 2: # 人未穿戴报警 + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + # 检查这个人是否佩戴了安全帽 + has_helmet = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + has_helmet = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) + for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + # 红色标注人 + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + # 蓝色标注正常穿戴设备 + for box in frame_boxes: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注正常施工设备 + for box in result_boxes[idx]: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + person_object_box = max( + (box for box in object_boxes if is_overlapping(person_bbox, box.xyxy.cpu().squeeze())), + key=lambda box: box.conf.item(), + default=None + ) + has_object = person_object_box is not None + + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + if has_object: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + # 红色标注人、目标 + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + # 蓝色标注正常穿戴设备 + for box in frame_boxes: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注正常施工设备 + for box in result_boxes[idx]: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + + + def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): + break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] + for alarm_dict in break_in_alarm_dicts: + for idx, frame_boxes in enumerate(safe_result_boxes): + frame = frames[idx] + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] + object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + annotator.result()) + + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + + def run(self): + while not self.stream_loader.init: + if self.__stop_event.is_set(): + break # 如果触发了停止事件,则退出循环 + self.stream_loader.init_cap() + for frames in self.stream_loader: + try: + if self.__stop_event.is_set(): + break # 如果触发了停止事件,则退出循环 + if not frames: + continue + + t1 = time.time() + self.device_status_manager.set_status(device_id=self.device.id) + result_boxes, safe_result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + t2 = time.time() + for idx, frame_boxes in enumerate(safe_result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes, safe_result_boxes) + self.handle_break_in_alarm(frames, result_boxes, safe_result_boxes) + + t3 = time.time() + for person_id in self.tracking_status.keys(): + print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + for idx, frame in enumerate(frames): + annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + frame_boxes = result_boxes[idx] + for s_box in frame_boxes: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + for s_box in safe_result_boxes[idx]: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + + t4 = time.time() + print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') + self.log_fps(len(frames)) + + except Exception as ex: + traceback.print_exc() + logger.error(ex) diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 051927e..95434bb 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -120,3 +120,35 @@ message=message['alarmSoundMessage'], have_response=False), self.main_loop) + + def send_immediate_command(self, command): + """ + 立即发送指定的指令,不经过消息队列。 + 参数: + command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 + """ + print(f"立即发送指令: {command}") + if self.tcp_manager: + # 直接调用 tcp_manager 的发送方法,将指令传递过去 + asyncio.run_coroutine_threadsafe( + self.tcp_manager.send_message_to_device( + device_id=self.device_id, + message=command, + have_response=False + ), + self.main_loop + ) + + def delete_messages(self, condition): + """ + 删除队列中符合条件的报警消息。 + + 参数: + condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 + 例如:lambda msg: msg['alarmCategory'] == 1 + """ + with self.lock: + original_length = len(self.queue) + # 过滤掉满足 condition 的消息,保留其余消息 + self.queue = deque([msg for msg in self.queue if not condition(msg)]) + print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。") diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index db0b841..a365713 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -103,25 +103,25 @@ }, { 'alarmCategory': 0, - 'alarmType': '19', # todo + 'alarmType': '19', # todo 'handelType': 4, 'category_order': -1, 'class_idx': [4], 'alarm_name': 'cigarette', 'alarmContent': '吸烟', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '2', - 'handelType': 4, # todo + 'handelType': 4, # todo 'category_order': -1, 'class_idx': [5], 'alarm_name': 'phone', 'alarmContent': '打电话', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', 'model_type': 'safe', }, @@ -176,8 +176,9 @@ COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) -HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 -HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + def get_group_device_list(device_code): health_device_codes = [] @@ -187,8 +188,10 @@ if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: - health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] - harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes @@ -251,7 +254,7 @@ self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} self.PERSON_CLASS_IDX = 0 self.HEAD_CLASS_IDX = 1 - self.SAFETY_CLASS_IDX = [2,3,6] + self.SAFETY_CLASS_IDX = [2, 3, 6] self.vid_stride = 3 @@ -307,7 +310,8 @@ if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], - alarm_np_img=None,alarm_value=gas_data.gas_value) + alarm_np_img=None, + alarm_value=gas_data.gas_value) except Exception as e: print(f"Error in latest_query: {e}") @@ -410,10 +414,10 @@ for r in results_generator: result_boxes.append(r.boxes) - safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.safe_model_classes.keys()), - imgsz=640, - stream=True) + safe_results_generator = self.safe_model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) for s in safe_results_generator: safe_result_boxes.append(s.boxes) @@ -531,7 +535,7 @@ f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) - + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 @@ -551,7 +555,6 @@ ) has_object = person_object_box is not None - person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 @@ -567,7 +570,8 @@ annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人、目标 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) - annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(), '', color=COLOR_RED, + rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 @@ -590,7 +594,6 @@ rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py new file mode 100644 index 0000000..740e88f --- /dev/null +++ b/scene_handler/helmet_data_processor.py @@ -0,0 +1,89 @@ +import time +from datetime import datetime + +from common.http_utils import get_request +from scene_handler.alarm_record_center import AlarmRecordCenter + +ALARM_DICT = { + 'health_blood_oxygen': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + 'health_heartrate': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员心率异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + } +} + + +class HelmetDataProcessor: + def __init__(self, helmet_code, alarm_record_center: AlarmRecordCenter): + self.helmet_code = helmet_code + self.alarm_record_center = alarm_record_center + self.url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' + self.header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + self.last_ts = None # 上次读取的数据的生成时间戳 + + def getNewData(self): + """ + 阻塞进程 + :return: + """ + while True: + print(f"访问{self.helmet_code}心率血氧数据...") + response = get_request(self.url, headers=self.header) + if response and response.get('data'): + print("访问到心率血氧数据") + vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 + if vitalsigns_data: # 访问成功 + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), + "%Y-%m-%d %H:%M:%S") # 访问数据的时间 + if self.last_ts is None or ( + upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 + self.last_ts = upload_timestamp.timestamp() # 更新数据 + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 + return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') + else: + print("无法访问到心率血氧数据") + time.sleep(5) + + def isDataNormal(self, blood_oxygen, heartrate): + if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 + return False + else: + return True + + def sendAlarmRecord(self, blood_oxygen, heartrate): + if not self.isBloodOxygenNormal(blood_oxygen): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_blood_oxygen'], + alarm_value=blood_oxygen) + if not self.isHeartRateNormal(heartrate): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_heartrate'], + heartrate) + + def isBloodOxygenNormal(self, blood_oxygen): + if blood_oxygen < 85: + return False + else: + return True + + def isHeartRateNormal(self, heartrate): + if heartrate < 60 or heartrate > 120: + return False + else: + return True diff --git a/scene_handler/intranet_block_scene_handler.py b/scene_handler/intranet_block_scene_handler.py new file mode 100644 index 0000000..953d18a --- /dev/null +++ b/scene_handler/intranet_block_scene_handler.py @@ -0,0 +1,718 @@ +import time +import traceback +from asyncio import Event +from copy import deepcopy +from datetime import datetime + +import numpy as np +import asyncio +from scipy.spatial import ConvexHull + +from algo.stream_loader import OpenCVStreamLoad +from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area, is_overlapping +from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager +from common.global_logger import logger +from common.global_thread_pool import GlobalThreadPool +from common.harmful_gas_manager import HarmfulGasManager +from common.image_plotting import Annotator, colors +from db.database import get_db +from entity.device import Device +from scene_handler.alarm_message_center import AlarmMessageCenter +from scene_handler.alarm_record_center import AlarmRecordCenter +from scene_handler.base_scene_handler import BaseSceneHandler +from scene_handler.limit_space_scene_handler import is_overlapping +from services.data_gas_service import DataGasService +from services.global_config import GlobalConfig +from tcp.tcp_manager import TcpManager + +from entity.device import Device +from common.http_utils import get_request +from ultralytics import YOLO + +''' +alarmCategory: +0 行为监管 +1 环境监管 +2 人员监管 +3 围栏监管 + +handelType: +0 检测到报警 +1 未检测到报警 +2 人未穿戴报警 +3 其他 +4 人员检测到报警 +''' +ALARM_DICT = [ + { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 1, + 'class_idx': [34], + 'alarm_name': 'no_fire_extinguisher', + 'alarmContent': '未检测到灭火器', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '15', + 'handelType': 1, + 'category_order': 2, + 'class_idx': [43], + 'alarm_name': 'no_barrier_tape', + 'alarmContent': '未检测到警戒线', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '16', + 'handelType': 1, + 'category_order': 3, + 'class_idx': [48], + 'alarm_name': 'no_cone', + 'alarmContent': '未检测到锥桶', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '17', + 'handelType': 1, + 'category_order': 4, + 'class_idx': [4, 5, 16], + 'alarm_name': 'no_board', + 'alarmContent': '未检测到指示牌', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '18', + 'handelType': 2, + 'category_order': -1, + 'class_idx': [18], + 'alarm_name': 'no_helmet', + 'alarmContent': '未佩戴安全帽', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', + 'label': '未佩戴安全帽', + 'model_type': 'safe', + }, + { + 'alarmCategory': 0, + 'alarmType': '19', # todo + 'handelType': 4, + 'category_order': -1, + 'class_idx': [4], + 'alarm_name': 'cigarette', + 'alarmContent': '吸烟', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '吸烟', + 'model_type': 'safe', + }, + { + 'alarmCategory': 0, + 'alarmType': '2', + 'handelType': 4, # todo + 'category_order': -1, + 'class_idx': [5], + 'alarm_name': 'phone', + 'alarmContent': '打电话', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '打电话', + 'model_type': 'safe', + }, + # todo 明火 + { + 'alarmCategory': 1, + 'alarmType': '1', + 'handelType': 3, + 'category_order': 1, + 'class_idx': [], + 'alarm_name': 'gas_alarm', + 'alarmContent': '甲烷浓度超限', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', + 'label': '', + }, + { + 'alarmCategory': 1, + 'alarmType': '', + 'handelType': 3, + 'category_order': 2, + 'class_idx': [], + 'alarm_name': 'harmful_alarm', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'class_idx': [], + 'alarm_name': 'health_alarm', + 'alarmContent': '心率血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + { + 'alarmCategory': 3, + 'alarmType': '3', + 'handelType': 2, + 'category_order': 4, + 'class_idx': [3], + 'alarm_name': 'break_in_alarm', + 'alarmContent': '非法闯入', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', + 'label': '非法闯入', + 'model_type': 'safe', + }, + +] + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + +def get_group_device_list(device_code): + health_device_codes = [] + harmful_device_codes = [] + url = f'http://172.27.46.84:30003/v3/device/listGroupDevs?devcode={device_code}' + response = get_request(url) + if response and response.get('code') == 200 and response.get('data'): + data = response.get('data') + for item in data: + health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + return health_device_codes, harmful_device_codes + + +class IntranetBlockSceneHandler(BaseSceneHandler): + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): + super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) + self.__stop_event = Event(loop=main_loop) + self.health_ts_dict = {} + self.harmful_ts_dict = {} + self.object_ts_dict = {} + self.thread_pool = GlobalThreadPool() + + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # (优先级:2 > 1 > 3 > 0) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) + self.harmful_data_manager = HarmfulGasManager() + self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() + + # todo 要改成通过后台接口读取设备编号 + # self.health_device_codes = ['HWIH061000056395'] + # self.harmful_device_codes = ['862635063168165A'] + self.health_device_codes, self.harmful_device_codes = get_group_device_list(device.code) + + self.thread_pool.submit_task(self.gas_data_task, device.code) + for helmet_code in self.health_device_codes: + self.thread_pool.submit_task(self.health_data_task, helmet_code) + for harmful_device_code in self.harmful_device_codes: + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) + + self.thread_pool.submit_task(self.alarm_message_center.process_messages) + + # todo 明火 + # self.model = YOLO('weights/labor-v8-20250115-fp16.engine') + self.model = YOLO('weights/labor-v8-20241114.pt') + self.model_classes = { + # 0: '三脚架', + # 3: '人', + 4: '作业信息公示牌', + 6: '危险告知牌', + 9: '反光衣', + # 11: '呼吸面罩', + # 13: '四合一', + # 15: '头', + 16: '安全告知牌', + # 18: '安全帽', + 20: '安全标识牌', + # 24: '工服', + 34: '灭火器', + 43: '警戒线', + 48: '路锥', + 58: '鼓风机', + } + self.PERSON_CLASS_IDX = 3 + self.HEAD_CLASS_IDX = 15 + + self.safe_model = YOLO('weights/yinhuan.pt') + self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} + self.PERSON_CLASS_IDX = 0 + self.HEAD_CLASS_IDX = 1 + self.SAFETY_CLASS_IDX = [2,3,6] + + self.vid_stride = 3 + + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, + device_thread_id=thread_id, vid_stride=self.vid_stride) + self.range_points = range_points + self.abs_range_points = self.get_absolute_range() + + self.tracking_status = {} # 跟踪每个行人的状态 + self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 + self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 + + self.frames_detected = 0 + self.fps_ts = None + + def get_absolute_range(self): + if not self.range_points: + return None + + fence_info = eval(self.range_points) + if fence_info and len(fence_info) > 1: + abs_points = [] + for p in fence_info: + abs_points.append( + [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))]) + + abs_points = np.array(abs_points, dtype=np.int32) + hull = ConvexHull(abs_points) + sorted_coordinates = abs_points[hull.vertices] + # abs_points = abs_points.reshape((-1, 1, 2)) + return sorted_coordinates + else: + return None + + def gas_data_task(self, tree_device_code): + while not self.__stop_event.is_set(): + asyncio.run_coroutine_threadsafe( + self.handle_gas_alarm(tree_device_code), self.main_loop + ) + time.sleep(5) + + async def handle_gas_alarm(self, tree_device_code): + try: + async for db in get_db(): + gasService = DataGasService(db) + gas_data = await gasService.latest_query(tree_device_code) + if gas_data and time.time() - gas_data.ts.timestamp() < 60: + if gas_data.gas_value > 200.0: + alarm_dict = [d for d in ALARM_DICT if + d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], + alarm_np_img=None,alarm_value=gas_data.gas_value) + except Exception as e: + print(f"Error in latest_query: {e}") + + # 一体机直接接收四合一浓度 + def harmful_data_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) + for gas_type, gas_data in harmful_gas_data.items(): + ts_key = f'{harmful_device_code}_{gas_type}' + last_ts = self.harmful_ts_dict.get(ts_key) + gas_ts = gas_data.get('gas_ts') + if last_ts is None or (gas_ts - last_ts).total_seconds() > 0: + self.harmful_ts_dict[ts_key] = gas_ts + self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) + time.sleep(5) + + def health_data_task(self, helmet_code): + while not self.__stop_event.is_set(): + header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}' + response = get_request(url, headers=header) + if response and response.get('data'): + last_ts = self.health_ts_dict.get(helmet_code) + vitalsigns_data = response.get('data').get('vitalSignsData') + if vitalsigns_data: + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0: + self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), + vitalsigns_data.get('heartRate'), upload_timestamp) + time.sleep(10) + + def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): + logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' + f'upload_timestamp = {upload_timestamp}') + if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + # todo 需要生成报警记录吗 需要往后台发原始数据吗 + + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): + alarm = False + gas_value = gas_data['gas_value'] + if gas_type == 3: # h2s + alarm = gas_value > 120.0 + elif gas_type == 4: # co + alarm = gas_value > 10.0 + elif gas_type == 5: # o2 + alarm = gas_value < 15 + elif gas_type == 50: # ex + alarm = gas_value > 10 + + if alarm: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + # todo 需要生成报警记录吗 + + def model_predict(self, frames): + result_boxes = [] + safe_result_boxes = [] + + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) + + for r in results_generator: + result_boxes.append(r.boxes) + + safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) + for s in safe_results_generator: + safe_result_boxes.append(s.boxes) + + return result_boxes, safe_result_boxes + + def handle_behave_alarm(self, frames, result_boxes, safe_result_boxes): + behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] + for alarm_dict in behave_alarm_dicts: + use_safe_model = alarm_dict.get('model_type', 'labor') == 'safe' + boxes = safe_result_boxes if use_safe_model else result_boxes + model_classes = self.safe_model_classes if use_safe_model else self.model_classes + for idx, frame_boxes in enumerate(boxes): + frame = frames[idx] + object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] + if alarm_dict['handelType'] == 0: # 检测到就报警 + if object_boxes: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) + # 红色标注目标设备,蓝色标注正常施工设备 + for box in frame_boxes: + box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=box_color, + rotated=False) + # 蓝色标注人和正常穿戴设备 + for box in safe_result_boxes[idx]: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[box_cls]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + + elif alarm_dict['handelType'] == 1: # 检测不到报警 + if object_boxes: + self.object_ts_dict[alarm_dict['alarm_name']] = time.time() + else: + last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0) + if time.time() - last_ts > 5: + self.object_ts_dict[alarm_dict['alarm_name']] = time.time() + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) + # 蓝色标注正常施工设备 + for box in frame_boxes: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注人和正常穿戴设备 + for box in safe_result_boxes[idx]: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[box_cls]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + elif alarm_dict['handelType'] == 2: # 人未穿戴报警 + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + # 检查这个人是否佩戴了安全帽 + has_helmet = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + has_helmet = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) + for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + # 红色标注人 + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + # 蓝色标注正常穿戴设备 + for box in frame_boxes: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注正常施工设备 + for box in result_boxes[idx]: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + person_object_box = max( + (box for box in object_boxes if is_overlapping(person_bbox, box.xyxy.cpu().squeeze())), + key=lambda box: box.conf.item(), + default=None + ) + has_object = person_object_box is not None + + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + if has_object: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + # 红色标注人、目标 + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + # 蓝色标注正常穿戴设备 + for box in frame_boxes: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注正常施工设备 + for box in result_boxes[idx]: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + + + def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): + break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] + for alarm_dict in break_in_alarm_dicts: + for idx, frame_boxes in enumerate(safe_result_boxes): + frame = frames[idx] + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] + object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + annotator.result()) + + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + + def run(self): + while not self.stream_loader.init: + if self.__stop_event.is_set(): + break # 如果触发了停止事件,则退出循环 + self.stream_loader.init_cap() + for frames in self.stream_loader: + try: + if self.__stop_event.is_set(): + break # 如果触发了停止事件,则退出循环 + if not frames: + continue + + t1 = time.time() + self.device_status_manager.set_status(device_id=self.device.id) + result_boxes, safe_result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + t2 = time.time() + for idx, frame_boxes in enumerate(safe_result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes, safe_result_boxes) + self.handle_break_in_alarm(frames, result_boxes, safe_result_boxes) + + t3 = time.time() + for person_id in self.tracking_status.keys(): + print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + for idx, frame in enumerate(frames): + annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + frame_boxes = result_boxes[idx] + for s_box in frame_boxes: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + for s_box in safe_result_boxes[idx]: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + + t4 = time.time() + print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') + self.log_fps(len(frames)) + + except Exception as ex: + traceback.print_exc() + logger.error(ex) diff --git a/scene_handler/intranet_limit_space_scene_handler.py b/scene_handler/intranet_limit_space_scene_handler.py new file mode 100644 index 0000000..73dcbcf --- /dev/null +++ b/scene_handler/intranet_limit_space_scene_handler.py @@ -0,0 +1,1121 @@ +import asyncio +import base64 +import traceback +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy, copy + +import time +from typing import Dict, List + +import cv2 +from datetime import datetime +import csv + +from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager +from common.global_logger import logger +from common.global_thread_pool import GlobalThreadPool +from common.http_utils import send_request, get_request +from common.image_plotting import Annotator +from entity.device import Device +import numpy as np +from ultralytics import YOLO + +from scene_handler.alarm_message_center import AlarmMessageCenter +from scene_handler.alarm_record_center import AlarmRecordCenter +from scene_handler.base_scene_handler import BaseSceneHandler +from scene_handler.helmet_data_processor import HelmetDataProcessor +from services.global_config import GlobalConfig +from tcp.tcp_manager import TcpManager + + +def create_value_iterator(values): + for value in values: + yield value + + +fake_list = [ # 假如这是你从四合一后台请求来的数据 + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, +] +value_iterator = create_value_iterator(fake_list) + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) + + +def flatten(lst): + result = [] + for i in lst: + if isinstance(i, list): + result.extend(flatten(i)) # 递归调用以处理嵌套列表 + else: + result.append(i) + return result + + +''' +alarmCategory: +0 劳保用品检测异常:三脚架、灭火器、鼓风机、指示牌、面罩、交底 +1 作业过程隐患:闲杂人、安全帽、打电话、吸烟、袖标 +2 人员健康异常 +3 气体浓度异常 +4 上中下气体浓度异常 ? + +handelType: +0 检测到报警 +1 未检测到报警 +2 人未穿戴报警 +3 其他 +4 人员检测到报警 + +''' +ALARM_DICT = { + 'no_brief': { + 'alarmCategory': 0, + 'alarmType': '14', # todo + 'handelType': 1, + 'category_order': 6, + 'alarm_name': 'no_brief', + 'alarmContent': '未进行施工交底', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', # todo + 'label': '', + }, + 'no_tripod': { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 1, + 'alarm_name': 'no_tripod', + 'alarmContent': '未检测到三脚架', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', # todo + 'label': '', + }, + 'no_mask': { + 'alarmCategory': 0, + 'alarmType': '11', + 'handelType': 1, + 'category_order': 5, + 'alarm_name': 'no_mask', + 'alarmContent': '未佩戴呼吸防护设备', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'label': '' + }, + 'no_blower': { + 'alarmCategory': 0, + 'alarmType': '13', + 'handelType': 1, + 'category_order': 3, + 'alarm_name': 'no_blower', + 'alarmContent': '没有检测到通风设备', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x1A\x00\xAE', + 'label': '没有检测到通风设备' + }, + 'no_extinguisher': { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 2, + 'alarm_name': 'no_fire_extinguisher', + 'alarmContent': '未检测到灭火器', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', + 'label': '', + }, + 'no_board': { + 'alarmCategory': 0, + 'alarmType': '17', + 'handelType': 1, + 'category_order': 4, + 'alarm_name': 'no_board', + 'alarmContent': '未检测到指示牌', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', + 'label': '', + }, + 'harmful_gas': { + 'alarmCategory': 3, + 'alarmType': '', # todo + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'harmful_alarm', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + 'umd_harmful_gas': { # todo 要跟上面区分吗 + 'alarmCategory': 4, + 'alarmType': '', # todo + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'umd_harmful_gas', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + 'health': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员心率血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + 'smoke': { + 'alarmCategory': 1, + 'alarmType': '19', # todo + 'handelType': 4, + 'category_order': 4, + 'alarm_name': 'cigarette', + 'alarmContent': '吸烟', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '吸烟', + }, + 'phone': { + 'alarmCategory': 1, + 'alarmType': '2', + 'handelType': 4, + 'category_order': 3, + 'alarm_name': 'phone', + 'alarmContent': '打电话', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '打电话', + }, + 'aqm': { + 'alarmCategory': 1, + 'alarmType': '18', + 'handelType': 2, + 'category_order': 2, + 'alarm_name': 'no_helmet', + 'alarmContent': '未佩戴安全帽', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', + 'label': '未佩戴安全帽', + }, + 'armband': { + 'alarmCategory': 1, + 'alarmType': '18', # todo + 'handelType': 2, + 'category_order': 5, + 'alarm_name': 'no_armband', + 'alarmContent': '未佩戴袖标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '未佩戴袖标', + 'model_type': 'safe', + }, + 'break': { + 'alarmCategory': 1, + 'alarmType': '3', + 'handelType': 2, + 'category_order': 1, + 'alarm_name': 'break_in_alarm', + 'alarmContent': '非法闯入', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', + 'label': '非法闯入', + }, +} + +# UMD_PASS_MESSAGE = b'' # 上中下气体检测通过 +PREPARE_COMPLETE_MESSAGE = b'\xaa\x01\x00\x93\x19\x00\xAD' # 满足有限空间作业要求,可以作业 + + +def writeFile(file_path, data): + # print(f"写入{data}") + with open(file_path, mode='a', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + writer.writerows(data) + + +class EventController(): + def __init__(self): + self.timeout_event = asyncio.Event() + self.umd_complete = asyncio.Event() + self.qianzhi_check_complete = asyncio.Event() + self.laobao_complete = asyncio.Event() + + +class SiHeYi(): + def __init__(self, harmful_device_code): + self.url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={harmful_device_code}' # 后台访问数据的url + self.harmful_device_code = harmful_device_code # 四合一标识符 + self.last_ts = None # 上次读取的数据的生成时间戳 + + def waitPowerOn(self, script_start_time): + """ + 阻塞函数 + 循环是否开机,只有检测到开机才会退出函数 + + :param script_start_time:脚本启动的时间戳 + + :return: + """ + print("检测四合一是否开机") + + while True: + self.getNewData() + flag = script_start_time < self.last_ts # 当前时间T/F 开机/未开机 + print(f'{script_start_time} {self.last_ts} {flag}') + if flag: + print("检测到开机") + return + else: + print("未开机") + time.sleep(2) + + def getNewData(self): + """ + 阻塞函数 + 访问后台数据库 读取最新产生的四合一浓度 + 如果有返回数据则记录 该数据产生的时间。 + 如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间: + 则视为读取到新数据,返回新数据 + 没有数据等待n秒后重复询问 + :return: + """ + while True: + url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={self.harmful_device_code}' + print("访问四合一数据...") + response = get_request(url) + # response = getGasGata_fake() + if response and response.get('data'): + + data = response.get('data') + print(f"访问到四合一数据: {data}") + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0: + self.last_ts = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + return ch4, co, h2s, o2 + else: + print('ignore') + else: # url没有返回数据 + print("四合一没有读取到数据") + time.sleep(5) + + def isDataNormal(self, ch4, co, h2s, o2): + """ + 判断四项气体是否正常 + :param ch4: + :param co: + :param h2s: + :param o2: + :return: + """ + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + return False # 气体异常 + else: + return True # 气体正常 + + +# class AnQuanMao(): +# def __init__(self, helmet_code): +# self.helmet_code = helmet_code +# self.url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={helmet_code}' # 后台访问数据的url +# self.last_ts = None # 上次读取的数据的生成时间戳 +# +# def getNewData(self): +# """ +# 阻塞进程 +# :return: +# """ +# while True: +# header = { +# 'ak': 'fe80b2f021644b1b8c77fda743a83670', +# 'sk': '8771ea6e931d4db646a26f67bcb89909', +# } +# url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' +# print("访问心率血氧数据...") +# response = get_request(url, headers=header) +# if response and response.get('data'): +# print("访问到心率血氧数据") +# vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 +# if vitalsigns_data: # 访问成功 +# upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), +# "%Y-%m-%d %H:%M:%S") # 访问数据的时间 +# if self.last_ts is None or ( +# upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 +# self.last_ts = upload_timestamp.timestamp() # 更新数据 +# if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 +# return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') +# else: +# print("无法访问到心率血氧数据") +# time.sleep(5) +# +# def isDataNormal(self, blood_oxygen, heartrate): +# if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 +# return False +# else: +# return True + + +class Laobaocheck(): + def __init__(self, eventController=None, alarm=None): + self.laobao_model = YOLO("weights/labor-v8-20241114.pt") + self.jiaodi_model = YOLO("weights/jiaodi.pt") + self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} + self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, + "工作指示牌": False} # OD 模型有无检测这些目标 + self.jiaodi_flag = False # 分类模型 有无检测到交底 + self.laobao_pool = {} + + self.eventController = eventController + self.alarm = alarm + + def getUndetectedTarget(self): + # 获取未检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == False: + result.append(name) + if not self.jiaodi_flag: + result.append("交底") + return result + + def name2alarm(self, target_name): + alarm_map = { + '三脚架': 'no_tripod', + '灭火器': 'no_extinguisher', + '鼓风机': 'no_blower', + '面罩': 'no_mask', + '工作指示牌': 'no_board', + '交底': 'no_brief' + } + return alarm_map.get(target_name, None) + + def getDetectedTarget(self): + # 获取已检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == True: + result.append(name) + return result + + def name2id(self, input): + # 检测名称 映射为 id + if isinstance(input, str): + return self.target[input] + elif isinstance(input, list): + result = [] + for item in input: + if item in self.target: + result.append(self.target[item]) + if len(result) == 0: return [] + return list(set(np.concatenate([r for r in result]).astype(int).tolist())) + + def id2name(self, input): + """ + + :param input: int 或 [int,int,int...] + :return: + """ + # id -> 类别名称 + result = [] + if isinstance(input, int): + input = [input] + for id in input: + for k, v in self.target.items(): # k: 类别名称 , v: id_list + if id in v: result.append(k) + + return list(set(result)) + + def predict_isJiaodi(self, frames): + """ + 调用 jiaodi.pt 分类模型 + :return: True:交底,False:没检测到 交底 + """ + jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False) + jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results] + for prob in jiaodi_prob: + if prob > 0.6: + return True + return False + + def predict_laobao(self, frames): + ''' + 调用 labor-v8-20241114.pt OD 模型 + :param frames: + :return: [类别1,类别2] + ''' + target_idx = self.name2id(self.getUndetectedTarget()) + results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, + save=False, verbose=False) # results:list(4) 4帧的检测结果 + pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype( + int).tolist())) # 检测到的目标类别id_list,已去重 + return self.id2name(pred_c_list) + + def updateUnpredictedTargets(self, jiaodi_flag, pred_labels): + ''' + 更新 已检测到的目标 列表 和有无检测到交底 + :param pred_labels: [str, str...] + :return: None + ''' + for pred_label in pred_labels: + print(f"检测到{pred_label}") + self.target_flag[pred_label] = True + if self.jiaodi_flag == False and jiaodi_flag == True: + print(f"劳保检测:检测到交底") + self.jiaodi_flag = jiaodi_flag + + def model_predict_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + while True: + try: + ret, frames = cap.read() + frames = [frames] + if not ret: + break + cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) + cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame", frames[0]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cap.release() + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + if self.eventController != None and self.eventController.timeout_event.is_set(): # 超时退出 + cap.release() + cv2.destroyAllWindows() + return + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if self.getUndetectedTarget() == []: # 如果全部检验到了 + print("劳保物品 通过") + self.eventController.laobao_complete.set() + cap.release() + cv2.destroyAllWindows() + return # 退出检测 + else: # 如果还有未检测到的 + undetectedTargets = self.getUndetectedTarget() + for target_name in undetectedTargets: + alarm_dict = self.name2alarm(target_name) + if alarm_dict: + self.alarm.addAlarm(alarm_dict) + cap.release() + cv2.destroyAllWindows() + + def model_predict(self, stream_loader): + for frames in stream_loader: # type : list (4),连续的4帧 + if self.eventController is not None and self.eventController.timeout_event.is_set(): # 超时退出 + return + + if not frames: + continue + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController is not None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if not self.getUndetectedTarget(): # 如果全部检验到了 + print("劳保物品 通过") + self.eventController.laobao_complete.set() + return # 退出检测 + else: # 如果还有未检测到的 + # todo 这里是否要生成报警记录 + undetectedTargets = self.getUndetectedTarget() + for target_name in undetectedTargets: + alarm_dict = self.name2alarm(target_name) + if alarm_dict: + self.alarm.addAlarm(alarm_dict) + + +class YinHuanCheck: + def __init__(self, device_code, eventController, alarm, frame_threshold=20): + # 初始化YOLO模型及其他参数 + self.model = YOLO("weights/yinhuan.pt") + self.eventController = eventController + self.alarm = alarm + self.device_code = device_code + self.frame_threshold = frame_threshold # 连续异常帧阈值 + + # 针对每个人的异常计数器,键为 person_id + self.counters = { + 'no_helmet': {}, # 未佩戴安全帽 + 'smoking': {}, # 吸烟(检测到烟头) + 'phone': {}, # 打电话(检测到电话) + 'illegal_intrusion': {} # 非法闯入(既没有安全帽也没有工服) + } + # 针对袖标异常为全局条件:如果当前帧中所有人均未检测到袖标,则更新全局计数器 + self.armband_counter = 0 + + def id2name(self, input_id): + """ + 将检测到的类别ID转换为类别名称,支持单个ID或ID列表 + """ + result = [] + if isinstance(input_id, int): + input_id = [input_id] + for id in input_id: + for k, v in self.model.names.items(): # k: id , v: 类别名称 + if k == id: + result.append(v) + return list(set(result)) + + def detect_person(self, frames): + """ + 对输入的一批视频帧进行人员检测与跟踪,返回每帧中检测到的人员信息。 + 返回格式:list,每项为字典,键为 person_id,值为 {'crop': 截取的人像, 'box': 人员检测框 [x1,y1,x2,y2]} + """ + people_results = self.model.track(source=frames, conf=0.6, classes=[0], + save=False, verbose=False) # 检测人(类别0) + results = [] + for people_result in people_results: + orig_img = people_result.orig_img # 当前帧原图 + person_dict = {} + for person_box in people_result.boxes: + person_id = person_box.id.item() + if person_id: + box = person_box.xyxy.squeeze().tolist() # [x1, y1, x2, y2] + # 截取检测到的人像区域 + cropped_image = orig_img[int(box[1]):int(box[3]), int(box[0]):int(box[2])] + person_dict[person_id] = {'crop': cropped_image, 'box': box} + results.append(person_dict) + return results + + def detect_person_targets(self, people_results): + """ + 针对每个人的图像区域进行目标检测(安全帽、工服、烟头、电话、袖标)。 + 返回格式:list,每项为字典,键为 person_id,值为该人身上检测到的目标字典 {label: xyxy} + """ + # 收集所有人的图像区域 + person_images = [] + for frame_persons in people_results: + for info in frame_persons.values(): + person_images.append(info['crop']) + if len(person_images) == 0: + return [] + # 对所有人的区域进行目标检测 + results = self.model.predict(source=person_images, conf=0.6, classes=[2, 3, 4, 5, 6], + save=False, verbose=False) + person_detect_targets = [] + result_idx = 0 + for frame_persons in people_results: + frame_detection = {} + for person_id in frame_persons.keys(): + person_result = results[result_idx] + detection_dict = {} + for box in person_result.boxes: + label = int(box.cls.item()) + label_name = self.id2name(label) + xyxy = box.xyxy.squeeze().tolist() + # 假设每个box只对应一种类别,直接存入字典 + detection_dict[label_name[0]] = xyxy + frame_detection[person_id] = detection_dict + result_idx += 1 + person_detect_targets.append(frame_detection) + return person_detect_targets + + def annotate_alarm(self, frame, condition, person_box=None, detection=None): + """ + 在报警图片上对异常情况进行标注: + - person_box: 异常人员的检测框 + - detection: 异常物品的检测框,格式为 {label: xyxy} + """ + annotator = Annotator(deepcopy(frame), None, 18, "Arial.ttf", False) + + if person_box is not None: + annotator.box_label(person_box, condition, color=COLOR_RED, rotated=False) + + # 绘制异常物品的检测框(如烟头、电话等) + if detection is not None: + if person_box is not None: + offset_x, offset_y = int(person_box[0]), int(person_box[1]) + for label, box in detection.items(): + # 将 detection 框坐标转换为全局坐标 + global_box = [box[0] + offset_x, box[1] + offset_y, + box[2] + offset_x, box[3] + offset_y] + annotator.box_label(global_box, '', color=COLOR_RED, rotated=False) + else: + # 如果没有人的框,则直接使用 detection 框 + for label, box in detection.items(): + annotator.box_label(box, label, color=COLOR_RED, rotated=False) + return annotator.result() + + def trigger_alarm(self, frame, condition, person_id=None, detection=None, person_box=None): + """ + 当某个异常条件达到连续帧阈值时触发报警: + - condition: 异常类型,取值:'no_helmet', 'smoking', 'phone', 'illegal_intrusion', 'armband' + - 对于人员级别的异常,会标注该人员检测框及异常物品 + - 对于全局异常(armband)直接标注图片 + """ + # 定义异常对应的报警类型(需与系统定义的 ALARM_DICT 对应) + alarm_mapping = { + 'no_helmet': 'aqm', # 安全帽异常 + 'smoking': 'smoke', # 吸烟异常 + 'phone': 'phone', # 打电话异常 + 'illegal_intrusion': 'break', # 非法闯入 + 'armband': 'armband' # 袖标异常 + } + alarm_type = alarm_mapping.get(condition, 'unknown') + # 播报异常语言 + self.alarm.addAlarm(ALARM_DICT[alarm_type]) + + if self.alarm.alarm_record_center.need_alarm(self.device_code, ALARM_DICT[alarm_type]): + alarm_detections = {} + if alarm_type == 'smoke': + alarm_detections = {key : detection[key] for key in detection if key == '烟头'} + elif alarm_type == 'phone': + alarm_detections = {key : detection[key] for key in detection if key == '电话'} + # 生成报警图片:在报警图片上对异常人员与异常物品进行标注 + annotated_image = self.annotate_alarm(frame, ALARM_DICT[alarm_type]['label'], person_box, alarm_detections) + + # 上传报警图片到后台 + self.alarm.alarm_record_center.upload_alarm_record(self.device_code, ALARM_DICT[alarm_type], + alarm_np_img=annotated_image) + + def cleanup_counters(self, current_ids): + """ + 清理各异常计数器中失去的 person id(即当前帧中不再检测到的人员) + """ + for cond in self.counters: + lost_ids = [pid for pid in self.counters[cond] if pid not in current_ids] + for pid in lost_ids: + del self.counters[cond][pid] + + def process_batch(self, frames): + """ + 对一批视频帧进行处理: + 1. 对每帧进行人员检测及目标检测 + 2. 针对每个人判断是否存在异常: + - 未佩戴安全帽:若该人检测结果中没有 "安全帽" + - 吸烟:若检测到 "烟头" + - 打电话:若检测到 "电话" + - 非法闯入:若既没有检测到 "安全帽" 也没有检测到 "工服" + 3. 更新每个人针对各异常的连续帧计数器,若达到阈值则触发报警 + 4. 对于袖标异常,若当前帧中所有人均未检测到 "袖标",则更新全局计数器 + 5. 清理当前帧中不再检测到的 person id(清除计数器中遗留的记录) + """ + # 第一步:检测人员及其区域目标 + people_results = self.detect_person(frames) + + empty = all(not d for d in people_results) + if empty: + return + + person_detect_targets = self.detect_person_targets(people_results) + # 收集当前帧所有检测到的 person id(假设所有帧中检测到的人 id 集合取并集) + current_ids = set() + for frame_persons in people_results: + current_ids.update(frame_persons.keys()) + + # 针对每一帧分别处理 + for idx, frame in enumerate(frames): + frame_targets = person_detect_targets[idx] # 当前帧中,各人员的检测结果 + # 遍历当前帧的每个检测到的人员 + for person_id, detections in frame_targets.items(): + # 定义各异常条件 + no_helmet = "安全帽" not in detections + smoking = "烟头" in detections + phone = "电话" in detections + illegal_intrusion = ("安全帽" not in detections) and ("工服" not in detections) + # 依次更新对应计数器(未佩戴安全帽、吸烟、打电话、非法闯入) + for cond, abnormal in zip(['no_helmet', 'smoking', 'phone', 'illegal_intrusion'], + [no_helmet, smoking, phone, illegal_intrusion]): + if person_id not in self.counters[cond]: + self.counters[cond][person_id] = 0 + if abnormal: + self.counters[cond][person_id] += 1 + else: + self.counters[cond][person_id] = 0 + + # 若连续异常帧数达到阈值,则触发报警 + if self.counters[cond][person_id] >= self.frame_threshold: + # 获取该人员的检测框(用于标注) + person_box = people_results[idx][person_id]['box'] + self.trigger_alarm(frame, cond, person_id, detections, person_box) + # 触发报警后重置该人员对应计数器 + self.counters[cond][person_id] = 0 + + # 针对袖标的全局情况:如果当前帧中所有人均未检测到 "袖标",则更新全局计数器 + # 注意:这里遍历当前帧中每个检测结果 + if not any("袖标" in det for det in frame_targets.values()): + self.armband_counter += 1 + else: + self.armband_counter = 0 + if self.armband_counter >= self.frame_threshold: + self.trigger_alarm(frame, 'armband') + self.armband_counter = 0 + + # 清除计数器中已丢失的 person id + self.cleanup_counters(current_ids) + + def main(self, stream_loader): + """ + 主流程:从视频流中获取每一批帧,处理后检测异常并报警 + """ + for frames in stream_loader: # stream_loader 每次返回一批连续帧(例如4帧) + try: + self.process_batch(frames) + except Exception as ex: + traceback.print_exc() + # 记录错误信息 + logger.error(ex) + + def main_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + ret, frames = cap.read() + while True: + try: + ret, frames = cap.read() + frames = [frames] + if not ret: + cap.release() + cv2.destroyAllWindows() + break + cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE) + # cv2.resizeWindow('Video Frame2', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame2", frames[0]) + + self.process_batch(frames) + + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + + +class Alarm(): + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, eventController=None): + self.pool = [] + self.device = device + self.thread_id = thread_id + self.tcp_manager = tcp_manager + self.main_loop = main_loop + self.eventController = eventController + + # self.alarm_interval_dict = {} + # self.alarm_interval = device.alarm_interval + # + # self.socket_interval_dict = {} + # self.socket_interval = device.alarm_interval + # self.socket_retry = 3 + + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={0: 0, 4: 1, 3: 2, 2: 3, + 1: 4}) # (优先级:0 > 4 > 3 > 2 > 1) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) + + # todo 跟下面的alarm task二选一 + # self.thread_pool = GlobalThreadPool() + # self.thread_pool.submit_task(self.alarm_message_center.process_messages) + + def addAlarm(self, alarm_dict): + self.alarm_message_center.add_message(alarm_dict) + + # def addAlarm(self, content): + # """ + # 添加一条报警到报警队列中 + # :param content: + # :return: + # """ + # if content in self.pool: + # self.pool.remove(content) + # self.pool.append(content) + + def deleteAlarmOfLaoBao(self): + """ + 删除池子中有关劳保检测的报警 + :return: + """ + + # self.pool = [item for item in self.pool if "劳保" not in item] + + def laobao_condition(msg): + return msg['alarmCategory'] == 0 + + self.alarm_message_center.delete_messages(laobao_condition) + + def deleteAlaramOfUmdGas(self): + """ + 删除池子中有关上中下气体的报警 + :return: + """ + + def umd_condition(msg): + return msg['alarmCategory'] == 4 + + self.alarm_message_center.delete_messages(umd_condition) + + # self.pool = [item for item in self.pool if "劳保" not in item] + + # def main(self): + # while True: + # if self.eventController.timeout_event.is_set(): # 前置条件检查超时 + # self.deleteAlarmOfLaoBao() + # self.deleteAlaramOfUmdGas() + # if len(self.pool) != 0: + # content = self.pool.pop(0) + # print(f"{content},报警队列长度:{len(self.pool)}") + # # self.send_alarm_message("no_jiandu") + # time.sleep(1) + # + # def send_tcp_message(self, message: bytes, have_response=False): + # asyncio.run_coroutine_threadsafe( + # self.tcp_manager.send_message_to_device(device_id=self.device.id, + # message=message, + # have_response=have_response), + # self.main_loop) + + # def send_alarm_message(self, type): + # if self.tcp_manager: + # # if self.socket_interval_dict.get(type) is None \ + # # or (datetime.now() - self.socket_interval_dict.get(type)).total_seconds() > int(self.socket_interval): + # logger.debug("send alarm message %s %s", ALARM_DICT[type]['alarmContent'], + # ALARM_DICT[type]['alarmSoundMessage']) + # self.send_tcp_message(ALARM_DICT[type]['alarmSoundMessage'], have_response=True) + # self.socket_interval_dict[type] = datetime.now() + + +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + + +def get_group_device_list(device_code): + health_device_codes = [] + harmful_device_codes = [] + url = f'http://172.27.46.84:30003/v3/device/listGroupDevs?devcode={device_code}' + response = get_request(url) + if response and response.get('code') == 200 and response.get('data'): + data = response.get('data') + for item in data: + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + return health_device_codes, harmful_device_codes + + +class IntranetLimitSpaceSceneHandler(BaseSceneHandler): + + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): + super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) + + self.start_time = time.time() # 脚本启动时间戳 + print(f'start time = {self.start_time}') + + # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, + # device_thread_id=thread_id) + + self.executor = ThreadPoolExecutor(max_workers=10) + self.loop = asyncio.get_running_loop() + + self.eventController = EventController() + self.alarm = Alarm(device, thread_id, tcp_manager, main_loop, self.eventController) + self.laobao_check = Laobaocheck(self.eventController, self.alarm) + self.yinhuan_check = YinHuanCheck(device_code=device.code, eventController=self.eventController, + alarm=self.alarm) + + self.anQuanMaoList = [] + self.siHeyiList = [] + self.siHeyiUmd = None # 上中下气体检测用的四合一设备 + health_device_codes, harmful_device_codes = get_group_device_list(device.code) + for health_device_code in health_device_codes: + self.anQuanMaoList.append(HelmetDataProcessor(health_device_code, self.alarm.alarm_record_center)) + for harmful_device_code in harmful_device_codes: + self.siHeyiList.append(SiHeYi(harmful_device_code)) + + if self.siHeyiList: + self.siHeyiUmd = self.siHeyiList[0] # todo 暂时先用第一个,后期要有标识标明用哪个 + else: + self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98') + + async def laobaoCheck_task(self): + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + # await loop.run_in_executor(executor, self.laobao_check.model_predict_fake, + # r"D:\workspace\pythonProject\safe-algo-pro\2025-02-25 15-25-48.mkv") + await self.loop.run_in_executor(self.executor, self.laobao_check.model_predict, self.stream_loader) + + async def uMDGasCheck_task(self, eventController=None): + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + tflag_pool = [] # 返回数据正常了几次 + await self.loop.run_in_executor(self.executor, self.siHeyiUmd.waitPowerOn, + self.start_time) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 + print('上中下气体检测:四合一已开机') + + while True: # 模拟循环检测气体 + if eventController.timeout_event.is_set(): # 超时退出 + return + + ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, self.siHeyiUmd.getNewData) # 判断气体是否合规 + flag = self.siHeyiUmd.isDataNormal(ch4, co, h2s, o2) + if flag == False: + tflag_pool.clear() + self.alarm.addAlarm(ALARM_DICT['umd_harmful_gas']) + else: + tflag_pool.append(True) + print(f"上中下气体检测正常次数:{tflag_pool}") + if len(tflag_pool) == 3: + break # 退出检测 + + print('上中下气体检测:上中下气体检测通过') # todo 需要语音吗 + self.eventController.umd_complete.set() + return + + async def alarm_task(self): + + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + await self.loop.run_in_executor(self.executor, self.alarm.alarm_message_center.process_messages) + + async def yinhuanCheck_task(self): + """ + 检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头) + :return: + """ + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + await self.loop.run_in_executor(self.executor, self.yinhuan_check.main_fake, + r"D:\workspace\pythonProject\safe-algo-pro\2025-02-26 08-49-39.mkv") + + async def xinlvCheck_task(self): + + def fun(anQuanMao): + blood_oxygen, heartrate = anQuanMao.getNewData() + if not anQuanMao.isDataNormal(blood_oxygen, heartrate): + self.alarm.addAlarm(ALARM_DICT['health']) + anQuanMao.sendAlarmRecord(blood_oxygen, heartrate) + # + # flag = anQuanMao.isDataNormal(blood_oxygen, heartrate) + # if flag == False: + # self.alarm.addAlarm(ALARM_DICT['health']) + + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + for aqm in self.anQuanMaoList: + await self.loop.run_in_executor(self.executor, fun, aqm) + + async def gasCheck(self): + """ + 四合一气体检测 + :return: + """ + + def fun(siHeyi): + ch4, co, h2s, o2 = siHeyi.getNewData() + flag = siHeyi.isDataNormal(ch4, co, h2s, o2) + if flag == False: + self.alarm.addAlarm(ALARM_DICT['harmful_gas']) + + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + for siHeYi in self.siHeyiList: + await self.loop.run_in_executor(self.executor, fun, siHeYi) + + def run(self): + async def fun(): + try: + self.loop = asyncio.get_running_loop() + + # 添加异常处理 + def handle_task_exception(task): + try: + task.result() # 触发异常(如果有) + except Exception as e: + logger.exception(f"任务 {task.get_name()} 发生异常: {e}") + + # 并行执行任务 + # uMDGasCheck_task = asyncio.create_task( + # self.uMDGasCheck_task(self.eventController)) + # laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task()) + alarm_task = asyncio.create_task(self.alarm_task()) + + # 给所有任务添加异常处理 + # for task in [uMDGasCheck_task, laobaoCheck_task, alarm_task]: + # task.add_done_callback(handle_task_exception) + alarm_task.add_done_callback(handle_task_exception) + + # done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=300000.0) + # + # if uMDGasCheck_task in done and laobaoCheck_task in done: + # await uMDGasCheck_task + # await laobaoCheck_task + # self.eventController.timeout_event.set() + # self.alarm.alarm_message_center.send_immediate_command(PREPARE_COMPLETE_MESSAGE) + # print("前置条件检查完成,退出") + # + # else: + # # 如果超时,则取消未完成的任务 + # self.eventController.timeout_event.set() + # laobaoCheck_task.cancel() + # uMDGasCheck_task.cancel() + # print("前置条件检查时间过长,退出") + + # 删除前面产生的报警 + self.alarm.deleteAlarmOfLaoBao() + self.alarm.deleteAlaramOfUmdGas() + + # 并行执行任务 + print("开始工作") + xinlvCheck_task = asyncio.create_task(self.xinlvCheck_task()) + yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task()) + gasCheck_task = asyncio.create_task(self.gasCheck()) + + # 也给这些任务添加异常处理 + for task in [xinlvCheck_task, yinhuanCheck_task, gasCheck_task]: + task.add_done_callback(handle_task_exception) + + try: + results = await asyncio.gather(yinhuanCheck_task, gasCheck_task, xinlvCheck_task, + return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.exception(f"任务发生异常: {result}") + except Exception as e: + logger.exception(f"gather 执行过程中发生异常: {e}") + + done1, pending1 = await asyncio.wait({alarm_task}, timeout=300000.0) + except Exception as e: + logger.exception(f"run 方法中的 fun 发生异常: {e}") + + asyncio.run(fun()) + + +if __name__ == '__main__': + # print(getNewGasData()) + model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt") + print(model.names) diff --git a/algo/stream_loader.py b/algo/stream_loader.py index 2202399..b52ebc0 100644 --- a/algo/stream_loader.py +++ b/algo/stream_loader.py @@ -56,14 +56,14 @@ 尝试创建视频流捕获对象。 """ try: - # cap = cv2.VideoCapture(self.url) - gst_pipeline = ( - f"rtspsrc location={self.url} ! " - f"rtph264depay ! h264parse ! " - f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " - f"appsink" - ) - cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) + cap = cv2.VideoCapture(self.url) + # gst_pipeline = ( + # f"rtspsrc location={self.url} ! " + # f"rtph264depay ! h264parse ! " + # f"nvv4l2decoder ! nvvidconv ! video/x-raw, format=(string)BGRx ! videoconvert ! " + # f"appsink" + # ) + # cap = cv2.VideoCapture(gst_pipeline, cv2.CAP_GSTREAMER) # 可以在这里设置cap的一些属性,如果需要的话 return cap except Exception as e: diff --git a/app_instance.py b/app_instance.py index c648f6e..c48d657 100644 --- a/app_instance.py +++ b/app_instance.py @@ -17,8 +17,10 @@ from services.model_service import ModelService from services.scene_service import SceneService from services.schedule_job import start_scheduler +from tcp.harmful_device_handler import HarmfulGasHandler from tcp.tcp_manager import TcpManager -from tcp.tcp_server import start_server +from tcp.tcp_server import TcpServer +# from tcp.tcp_server import start_server _app = None # 创建一个私有变量来存储 app 实例 @@ -51,7 +53,7 @@ tcp_manager = TcpManager(device_service=device_service) app.state.tcp_manager = tcp_manager - await tcp_manager.start() + # await tcp_manager.start() algo_runner = AlgoRunner( device_service=device_service, @@ -71,7 +73,12 @@ app.state.scene_runner = scene_runner await scene_runner.start() - main_loop.create_task(start_server()) + tcp_server = TcpServer() + harmful_handler = HarmfulGasHandler(main_loop=main_loop) + tcp_server.register_data_callback(harmful_handler.parse) + # await tcp_server.start() + main_loop.create_task(tcp_server.start()) + # main_loop.create_task(start_server()) main_loop.create_task(start_scheduler()) diff --git a/scene_handler/alarm_message_center.py b/scene_handler/alarm_message_center.py index 051927e..95434bb 100644 --- a/scene_handler/alarm_message_center.py +++ b/scene_handler/alarm_message_center.py @@ -120,3 +120,35 @@ message=message['alarmSoundMessage'], have_response=False), self.main_loop) + + def send_immediate_command(self, command): + """ + 立即发送指定的指令,不经过消息队列。 + 参数: + command: 要发送的指令内容,可以是字符串、字典或其他数据结构,依据你的实现而定。 + """ + print(f"立即发送指令: {command}") + if self.tcp_manager: + # 直接调用 tcp_manager 的发送方法,将指令传递过去 + asyncio.run_coroutine_threadsafe( + self.tcp_manager.send_message_to_device( + device_id=self.device_id, + message=command, + have_response=False + ), + self.main_loop + ) + + def delete_messages(self, condition): + """ + 删除队列中符合条件的报警消息。 + + 参数: + condition: 一个函数,接受一个消息字典,返回 True 表示该消息需要被删除。 + 例如:lambda msg: msg['alarmCategory'] == 1 + """ + with self.lock: + original_length = len(self.queue) + # 过滤掉满足 condition 的消息,保留其余消息 + self.queue = deque([msg for msg in self.queue if not condition(msg)]) + print(f"删除了 {original_length - len(self.queue)} 条消息,剩余 {len(self.queue)} 条消息。") diff --git a/scene_handler/block_scene_handler.py b/scene_handler/block_scene_handler.py index db0b841..a365713 100644 --- a/scene_handler/block_scene_handler.py +++ b/scene_handler/block_scene_handler.py @@ -103,25 +103,25 @@ }, { 'alarmCategory': 0, - 'alarmType': '19', # todo + 'alarmType': '19', # todo 'handelType': 4, 'category_order': -1, 'class_idx': [4], 'alarm_name': 'cigarette', 'alarmContent': '吸烟', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '吸烟', 'model_type': 'safe', }, { 'alarmCategory': 0, 'alarmType': '2', - 'handelType': 4, # todo + 'handelType': 4, # todo 'category_order': -1, 'class_idx': [5], 'alarm_name': 'phone', 'alarmContent': '打电话', - 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo 'label': '打电话', 'model_type': 'safe', }, @@ -176,8 +176,9 @@ COLOR_RED = (0, 0, 255) COLOR_BLUE = (255, 0, 0) -HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 -HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + def get_group_device_list(device_code): health_device_codes = [] @@ -187,8 +188,10 @@ if response and response.get('code') == 200 and response.get('data'): data = response.get('data') for item in data: - health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] - harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] return health_device_codes, harmful_device_codes @@ -251,7 +254,7 @@ self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} self.PERSON_CLASS_IDX = 0 self.HEAD_CLASS_IDX = 1 - self.SAFETY_CLASS_IDX = [2,3,6] + self.SAFETY_CLASS_IDX = [2, 3, 6] self.vid_stride = 3 @@ -307,7 +310,8 @@ if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], - alarm_np_img=None,alarm_value=gas_data.gas_value) + alarm_np_img=None, + alarm_value=gas_data.gas_value) except Exception as e: print(f"Error in latest_query: {e}") @@ -410,10 +414,10 @@ for r in results_generator: result_boxes.append(r.boxes) - safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, - classes=list(self.safe_model_classes.keys()), - imgsz=640, - stream=True) + safe_results_generator = self.safe_model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) for s in safe_results_generator: safe_result_boxes.append(s.boxes) @@ -531,7 +535,7 @@ f"{self.model_classes[int(box.cls)]}", color=COLOR_BLUE, rotated=False) - + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 @@ -551,7 +555,6 @@ ) has_object = person_object_box is not None - person_status = self.tracking_status[person_id] if alarm_dict['alarm_name'] not in person_status: person_status[alarm_dict['alarm_name']] = 0 @@ -567,7 +570,8 @@ annotator = Annotator(deepcopy(frame)) if annotator is None else annotator # 红色标注人、目标 annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) - annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(), '', color=COLOR_RED, + rotated=False) # 已报警,清零,重新计数 person_status[alarm_dict['alarm_name']] = 0 @@ -590,7 +594,6 @@ rotated=False) self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, alarm_np_img=annotator.result()) - def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] diff --git a/scene_handler/helmet_data_processor.py b/scene_handler/helmet_data_processor.py new file mode 100644 index 0000000..740e88f --- /dev/null +++ b/scene_handler/helmet_data_processor.py @@ -0,0 +1,89 @@ +import time +from datetime import datetime + +from common.http_utils import get_request +from scene_handler.alarm_record_center import AlarmRecordCenter + +ALARM_DICT = { + 'health_blood_oxygen': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + 'health_heartrate': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员心率异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + } +} + + +class HelmetDataProcessor: + def __init__(self, helmet_code, alarm_record_center: AlarmRecordCenter): + self.helmet_code = helmet_code + self.alarm_record_center = alarm_record_center + self.url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' + self.header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + self.last_ts = None # 上次读取的数据的生成时间戳 + + def getNewData(self): + """ + 阻塞进程 + :return: + """ + while True: + print(f"访问{self.helmet_code}心率血氧数据...") + response = get_request(self.url, headers=self.header) + if response and response.get('data'): + print("访问到心率血氧数据") + vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 + if vitalsigns_data: # 访问成功 + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), + "%Y-%m-%d %H:%M:%S") # 访问数据的时间 + if self.last_ts is None or ( + upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 + self.last_ts = upload_timestamp.timestamp() # 更新数据 + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 + return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') + else: + print("无法访问到心率血氧数据") + time.sleep(5) + + def isDataNormal(self, blood_oxygen, heartrate): + if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 + return False + else: + return True + + def sendAlarmRecord(self, blood_oxygen, heartrate): + if not self.isBloodOxygenNormal(blood_oxygen): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_blood_oxygen'], + alarm_value=blood_oxygen) + if not self.isHeartRateNormal(heartrate): + self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_heartrate'], + heartrate) + + def isBloodOxygenNormal(self, blood_oxygen): + if blood_oxygen < 85: + return False + else: + return True + + def isHeartRateNormal(self, heartrate): + if heartrate < 60 or heartrate > 120: + return False + else: + return True diff --git a/scene_handler/intranet_block_scene_handler.py b/scene_handler/intranet_block_scene_handler.py new file mode 100644 index 0000000..953d18a --- /dev/null +++ b/scene_handler/intranet_block_scene_handler.py @@ -0,0 +1,718 @@ +import time +import traceback +from asyncio import Event +from copy import deepcopy +from datetime import datetime + +import numpy as np +import asyncio +from scipy.spatial import ConvexHull + +from algo.stream_loader import OpenCVStreamLoad +from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area, is_overlapping +from common.device_status_manager import DeviceStatusManager +from common.display_frame_manager import DisplayFrameManager +from common.global_logger import logger +from common.global_thread_pool import GlobalThreadPool +from common.harmful_gas_manager import HarmfulGasManager +from common.image_plotting import Annotator, colors +from db.database import get_db +from entity.device import Device +from scene_handler.alarm_message_center import AlarmMessageCenter +from scene_handler.alarm_record_center import AlarmRecordCenter +from scene_handler.base_scene_handler import BaseSceneHandler +from scene_handler.limit_space_scene_handler import is_overlapping +from services.data_gas_service import DataGasService +from services.global_config import GlobalConfig +from tcp.tcp_manager import TcpManager + +from entity.device import Device +from common.http_utils import get_request +from ultralytics import YOLO + +''' +alarmCategory: +0 行为监管 +1 环境监管 +2 人员监管 +3 围栏监管 + +handelType: +0 检测到报警 +1 未检测到报警 +2 人未穿戴报警 +3 其他 +4 人员检测到报警 +''' +ALARM_DICT = [ + { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 1, + 'class_idx': [34], + 'alarm_name': 'no_fire_extinguisher', + 'alarmContent': '未检测到灭火器', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '15', + 'handelType': 1, + 'category_order': 2, + 'class_idx': [43], + 'alarm_name': 'no_barrier_tape', + 'alarmContent': '未检测到警戒线', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '16', + 'handelType': 1, + 'category_order': 3, + 'class_idx': [48], + 'alarm_name': 'no_cone', + 'alarmContent': '未检测到锥桶', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '17', + 'handelType': 1, + 'category_order': 4, + 'class_idx': [4, 5, 16], + 'alarm_name': 'no_board', + 'alarmContent': '未检测到指示牌', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', + 'label': '', + }, + { + 'alarmCategory': 0, + 'alarmType': '18', + 'handelType': 2, + 'category_order': -1, + 'class_idx': [18], + 'alarm_name': 'no_helmet', + 'alarmContent': '未佩戴安全帽', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', + 'label': '未佩戴安全帽', + 'model_type': 'safe', + }, + { + 'alarmCategory': 0, + 'alarmType': '19', # todo + 'handelType': 4, + 'category_order': -1, + 'class_idx': [4], + 'alarm_name': 'cigarette', + 'alarmContent': '吸烟', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '吸烟', + 'model_type': 'safe', + }, + { + 'alarmCategory': 0, + 'alarmType': '2', + 'handelType': 4, # todo + 'category_order': -1, + 'class_idx': [5], + 'alarm_name': 'phone', + 'alarmContent': '打电话', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '打电话', + 'model_type': 'safe', + }, + # todo 明火 + { + 'alarmCategory': 1, + 'alarmType': '1', + 'handelType': 3, + 'category_order': 1, + 'class_idx': [], + 'alarm_name': 'gas_alarm', + 'alarmContent': '甲烷浓度超限', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8', + 'label': '', + }, + { + 'alarmCategory': 1, + 'alarmType': '', + 'handelType': 3, + 'category_order': 2, + 'class_idx': [], + 'alarm_name': 'harmful_alarm', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'class_idx': [], + 'alarm_name': 'health_alarm', + 'alarmContent': '心率血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + { + 'alarmCategory': 3, + 'alarmType': '3', + 'handelType': 2, + 'category_order': 4, + 'class_idx': [3], + 'alarm_name': 'break_in_alarm', + 'alarmContent': '非法闯入', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', + 'label': '非法闯入', + 'model_type': 'safe', + }, + +] + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + +def get_group_device_list(device_code): + health_device_codes = [] + harmful_device_codes = [] + url = f'http://172.27.46.84:30003/v3/device/listGroupDevs?devcode={device_code}' + response = get_request(url) + if response and response.get('code') == 200 and response.get('data'): + data = response.get('data') + for item in data: + health_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + return health_device_codes, harmful_device_codes + + +class IntranetBlockSceneHandler(BaseSceneHandler): + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): + super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) + self.__stop_event = Event(loop=main_loop) + self.health_ts_dict = {} + self.harmful_ts_dict = {} + self.object_ts_dict = {} + self.thread_pool = GlobalThreadPool() + + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={2: 0, 1: 1, 3: 2, + 0: 3}) # (优先级:2 > 1 > 3 > 0) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) + self.harmful_data_manager = HarmfulGasManager() + self.device_status_manager = DeviceStatusManager() + self.display_frame_manager = DisplayFrameManager() + + # todo 要改成通过后台接口读取设备编号 + # self.health_device_codes = ['HWIH061000056395'] + # self.harmful_device_codes = ['862635063168165A'] + self.health_device_codes, self.harmful_device_codes = get_group_device_list(device.code) + + self.thread_pool.submit_task(self.gas_data_task, device.code) + for helmet_code in self.health_device_codes: + self.thread_pool.submit_task(self.health_data_task, helmet_code) + for harmful_device_code in self.harmful_device_codes: + self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code) + + self.thread_pool.submit_task(self.alarm_message_center.process_messages) + + # todo 明火 + # self.model = YOLO('weights/labor-v8-20250115-fp16.engine') + self.model = YOLO('weights/labor-v8-20241114.pt') + self.model_classes = { + # 0: '三脚架', + # 3: '人', + 4: '作业信息公示牌', + 6: '危险告知牌', + 9: '反光衣', + # 11: '呼吸面罩', + # 13: '四合一', + # 15: '头', + 16: '安全告知牌', + # 18: '安全帽', + 20: '安全标识牌', + # 24: '工服', + 34: '灭火器', + 43: '警戒线', + 48: '路锥', + 58: '鼓风机', + } + self.PERSON_CLASS_IDX = 3 + self.HEAD_CLASS_IDX = 15 + + self.safe_model = YOLO('weights/yinhuan.pt') + self.safe_model_classes = {0: '人', 1: '头', 2: '安全帽', 3: '工服', 4: '烟头', 5: '电话', 6: '袖标'} + self.PERSON_CLASS_IDX = 0 + self.HEAD_CLASS_IDX = 1 + self.SAFETY_CLASS_IDX = [2,3,6] + + self.vid_stride = 3 + + self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, + device_thread_id=thread_id, vid_stride=self.vid_stride) + self.range_points = range_points + self.abs_range_points = self.get_absolute_range() + + self.tracking_status = {} # 跟踪每个行人的状态 + self.max_missing_frames = 25 / self.vid_stride # 报警的阈值 + self.disappear_threshold = 25 * 3 / self.vid_stride # 移除行人的阈值 + + self.frames_detected = 0 + self.fps_ts = None + + def get_absolute_range(self): + if not self.range_points: + return None + + fence_info = eval(self.range_points) + if fence_info and len(fence_info) > 1: + abs_points = [] + for p in fence_info: + abs_points.append( + [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))]) + + abs_points = np.array(abs_points, dtype=np.int32) + hull = ConvexHull(abs_points) + sorted_coordinates = abs_points[hull.vertices] + # abs_points = abs_points.reshape((-1, 1, 2)) + return sorted_coordinates + else: + return None + + def gas_data_task(self, tree_device_code): + while not self.__stop_event.is_set(): + asyncio.run_coroutine_threadsafe( + self.handle_gas_alarm(tree_device_code), self.main_loop + ) + time.sleep(5) + + async def handle_gas_alarm(self, tree_device_code): + try: + async for db in get_db(): + gasService = DataGasService(db) + gas_data = await gasService.latest_query(tree_device_code) + if gas_data and time.time() - gas_data.ts.timestamp() < 60: + if gas_data.gas_value > 200.0: + alarm_dict = [d for d in ALARM_DICT if + d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0], + alarm_np_img=None,alarm_value=gas_data.gas_value) + except Exception as e: + print(f"Error in latest_query: {e}") + + # 一体机直接接收四合一浓度 + def harmful_data_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code) + for gas_type, gas_data in harmful_gas_data.items(): + ts_key = f'{harmful_device_code}_{gas_type}' + last_ts = self.harmful_ts_dict.get(ts_key) + gas_ts = gas_data.get('gas_ts') + if last_ts is None or (gas_ts - last_ts).total_seconds() > 0: + self.harmful_ts_dict[ts_key] = gas_ts + self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data) + + # 从后台读取四合一浓度 + def harmful_data_query_task(self, harmful_device_code): + while not self.__stop_event.is_set(): + url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={harmful_device_code}' + response = get_request(url) + if response and response.get('data'): + last_ts = self.harmful_ts_dict.get(harmful_device_code) + data = response.get('data') + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (uptime.timestamp() - last_ts) > 0: + self.harmful_ts_dict[harmful_device_code] = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2) + time.sleep(5) + + def health_data_task(self, helmet_code): + while not self.__stop_event.is_set(): + header = { + 'ak': 'fe80b2f021644b1b8c77fda743a83670', + 'sk': '8771ea6e931d4db646a26f67bcb89909', + } + url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}' + response = get_request(url, headers=header) + if response and response.get('data'): + last_ts = self.health_ts_dict.get(helmet_code) + vitalsigns_data = response.get('data').get('vitalSignsData') + if vitalsigns_data: + upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S") + if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0: + self.health_ts_dict[helmet_code] = upload_timestamp.timestamp() + if time.time() - upload_timestamp.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'), + vitalsigns_data.get('heartRate'), upload_timestamp) + time.sleep(10) + + def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp): + logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, ' + f'upload_timestamp = {upload_timestamp}') + if heartrate < 50 or heartrate > 120 or blood_oxygen < 85: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + # todo 需要生成报警记录吗 需要往后台发原始数据吗 + + def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2): + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + + def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data): + alarm = False + gas_value = gas_data['gas_value'] + if gas_type == 3: # h2s + alarm = gas_value > 120.0 + elif gas_type == 4: # co + alarm = gas_value > 10.0 + elif gas_type == 5: # o2 + alarm = gas_value < 15 + elif gas_type == 50: # ex + alarm = gas_value > 10 + + if alarm: + alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm'] + if alarm_dict: + self.alarm_message_center.add_message(alarm_dict[0]) + # todo 需要生成报警记录吗 + + def model_predict(self, frames): + result_boxes = [] + safe_result_boxes = [] + + results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.model_classes.keys()), + imgsz=640, + stream=True) + + for r in results_generator: + result_boxes.append(r.boxes) + + safe_results_generator = self.safe_model.track(frames,save_txt=False, save=False, verbose=False, conf=0.5, + classes=list(self.safe_model_classes.keys()), + imgsz=640, + stream=True) + for s in safe_results_generator: + safe_result_boxes.append(s.boxes) + + return result_boxes, safe_result_boxes + + def handle_behave_alarm(self, frames, result_boxes, safe_result_boxes): + behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0] + for alarm_dict in behave_alarm_dicts: + use_safe_model = alarm_dict.get('model_type', 'labor') == 'safe' + boxes = safe_result_boxes if use_safe_model else result_boxes + model_classes = self.safe_model_classes if use_safe_model else self.model_classes + for idx, frame_boxes in enumerate(boxes): + frame = frames[idx] + object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] + if alarm_dict['handelType'] == 0: # 检测到就报警 + if object_boxes: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) + # 红色标注目标设备,蓝色标注正常施工设备 + for box in frame_boxes: + box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=box_color, + rotated=False) + # 蓝色标注人和正常穿戴设备 + for box in safe_result_boxes[idx]: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[box_cls]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + + elif alarm_dict['handelType'] == 1: # 检测不到报警 + if object_boxes: + self.object_ts_dict[alarm_dict['alarm_name']] = time.time() + else: + last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0) + if time.time() - last_ts > 5: + self.object_ts_dict[alarm_dict['alarm_name']] = time.time() + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) + # 蓝色标注正常施工设备 + for box in frame_boxes: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注人和正常穿戴设备 + for box in safe_result_boxes[idx]: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX or box_cls == self.PERSON_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[box_cls]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + elif alarm_dict['handelType'] == 2: # 人未穿戴报警 + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + # 检查这个人是否佩戴了安全帽 + has_helmet = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + has_helmet = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze()) + for helmet in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + if not has_helmet: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + # 红色标注人 + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + # 蓝色标注正常穿戴设备 + for box in frame_boxes: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注正常施工设备 + for box in result_boxes[idx]: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + elif alarm_dict['handelType'] == 4: # 人检测到报警:吸烟、打电话 + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + person_object_box = max( + (box for box in object_boxes if is_overlapping(person_bbox, box.xyxy.cpu().squeeze())), + key=lambda box: box.conf.item(), + default=None + ) + has_object = person_object_box is not None + + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + if has_object: + person_status[alarm_dict['alarm_name']] += 1 + else: + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + # 红色标注人、目标 + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + annotator.box_label(person_object_box.xyxy.cpu().squeeze(),'',color=COLOR_RED,rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + # 蓝色标注正常穿戴设备 + for box in frame_boxes: + box_cls = int(box.cls) + if box_cls in self.SAFETY_CLASS_IDX: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + # 蓝色标注正常施工设备 + for box in result_boxes[idx]: + annotator.box_label(box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(box.cls)]}", + color=COLOR_BLUE, + rotated=False) + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + alarm_np_img=annotator.result()) + + + def handle_break_in_alarm(self, frames, result_boxes, safe_result_boxes): + break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3] + for alarm_dict in break_in_alarm_dicts: + for idx, frame_boxes in enumerate(safe_result_boxes): + frame = frames[idx] + person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX] + head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX] + object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']] + has_alarm = False + annotator = None + for person_box in person_boxes: + if person_box.id is None: + continue + + person_bbox = person_box.xyxy.cpu().squeeze() + person_id = int(person_box.id) + has_object = True + person_head = get_person_head(person_bbox, head_boxes) + if person_head is not None: + overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area( + person_bbox) + if overlap_ratio < 0.5: # 头占人<0.5,判断是否穿工服。不太准确 + has_object = any( + is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze()) + for object_boxe in object_boxes) + + person_status = self.tracking_status[person_id] + if alarm_dict['alarm_name'] not in person_status: + person_status[alarm_dict['alarm_name']] = 0 + + if not has_object and is_within_alert_range(person_bbox, self.abs_range_points): + # 未检测到帧数 +1 + person_status[alarm_dict['alarm_name']] += 1 + else: + # 未检测到帧数 清零 + person_status[alarm_dict['alarm_name']] = 0 + + person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames + if person_alarm: + has_alarm = True + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + annotator = Annotator(deepcopy(frame)) if annotator is None else annotator + annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False) + # 已报警,清零,重新计数 + person_status[alarm_dict['alarm_name']] = 0 + + if has_alarm: + self.alarm_message_center.add_message(alarm_dict) + if self.alarm_record_center.need_alarm(self.device.code, alarm_dict): + self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict, + annotator.result()) + + def log_fps(self, frame_count): + self.frames_detected += frame_count + current_time = time.time() + # 每秒输出 FPS + if self.fps_ts is None or current_time - self.fps_ts >= 10: + fps = self.frames_detected / 10.0 + self.frames_detected = 0 + logger.info(f"FPS (detect) for device {self.device.code}: {fps}") + self.fps_ts = current_time + + def run(self): + while not self.stream_loader.init: + if self.__stop_event.is_set(): + break # 如果触发了停止事件,则退出循环 + self.stream_loader.init_cap() + for frames in self.stream_loader: + try: + if self.__stop_event.is_set(): + break # 如果触发了停止事件,则退出循环 + if not frames: + continue + + t1 = time.time() + self.device_status_manager.set_status(device_id=self.device.id) + result_boxes, safe_result_boxes = self.model_predict(frames) # 结果都是二维数组,对应batch中的每个frame + + t2 = time.time() + for idx, frame_boxes in enumerate(safe_result_boxes): + current_person_ids = {int(box.id) for box in frame_boxes + if box.cls is not None and box.id is not None and int( + box.cls) == self.PERSON_CLASS_IDX} + + for person_id in current_person_ids: + if person_id not in self.tracking_status: + self.tracking_status[person_id] = {} + self.tracking_status[person_id]['disappear_frames'] = 0 + for person_id in list(self.tracking_status.keys()): + if person_id not in current_person_ids: + self.tracking_status[person_id]['disappear_frames'] += 1 + if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold: + self.tracking_status.pop(person_id) + + self.handle_behave_alarm(frames, result_boxes, safe_result_boxes) + self.handle_break_in_alarm(frames, result_boxes, safe_result_boxes) + + t3 = time.time() + for person_id in self.tracking_status.keys(): + print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}') + + for idx, frame in enumerate(frames): + annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人") + frame_boxes = result_boxes[idx] + for s_box in frame_boxes: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + for s_box in safe_result_boxes[idx]: + annotator.box_label(s_box.xyxy.cpu().squeeze(), + f"{self.safe_model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}", + color=colors(int(s_box.cls)), + rotated=False) + self.display_frame_manager.add_frame(self.device.id, annotator.result()) + # self.display_frame_manager.add_frame(self.device.id, frames[idx]) + + t4 = time.time() + print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms') + self.log_fps(len(frames)) + + except Exception as ex: + traceback.print_exc() + logger.error(ex) diff --git a/scene_handler/intranet_limit_space_scene_handler.py b/scene_handler/intranet_limit_space_scene_handler.py new file mode 100644 index 0000000..73dcbcf --- /dev/null +++ b/scene_handler/intranet_limit_space_scene_handler.py @@ -0,0 +1,1121 @@ +import asyncio +import base64 +import traceback +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy, copy + +import time +from typing import Dict, List + +import cv2 +from datetime import datetime +import csv + +from algo.stream_loader import OpenCVStreamLoad +from common.device_status_manager import DeviceStatusManager +from common.global_logger import logger +from common.global_thread_pool import GlobalThreadPool +from common.http_utils import send_request, get_request +from common.image_plotting import Annotator +from entity.device import Device +import numpy as np +from ultralytics import YOLO + +from scene_handler.alarm_message_center import AlarmMessageCenter +from scene_handler.alarm_record_center import AlarmRecordCenter +from scene_handler.base_scene_handler import BaseSceneHandler +from scene_handler.helmet_data_processor import HelmetDataProcessor +from services.global_config import GlobalConfig +from tcp.tcp_manager import TcpManager + + +def create_value_iterator(values): + for value in values: + yield value + + +fake_list = [ # 假如这是你从四合一后台请求来的数据 + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:49'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:51'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:52'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, + {"data": {'ch4': '0.00', 'co': '0.00', 'h2s': '0.00', 'id': '142913', + 'logtime': '2025-01-14 15:40:49', 'o2': '15.90', 'uptime': '2026-01-14 15:40:53'}}, +] +value_iterator = create_value_iterator(fake_list) + +COLOR_RED = (0, 0, 255) +COLOR_BLUE = (255, 0, 0) + + +def flatten(lst): + result = [] + for i in lst: + if isinstance(i, list): + result.extend(flatten(i)) # 递归调用以处理嵌套列表 + else: + result.append(i) + return result + + +''' +alarmCategory: +0 劳保用品检测异常:三脚架、灭火器、鼓风机、指示牌、面罩、交底 +1 作业过程隐患:闲杂人、安全帽、打电话、吸烟、袖标 +2 人员健康异常 +3 气体浓度异常 +4 上中下气体浓度异常 ? + +handelType: +0 检测到报警 +1 未检测到报警 +2 人未穿戴报警 +3 其他 +4 人员检测到报警 + +''' +ALARM_DICT = { + 'no_brief': { + 'alarmCategory': 0, + 'alarmType': '14', # todo + 'handelType': 1, + 'category_order': 6, + 'alarm_name': 'no_brief', + 'alarmContent': '未进行施工交底', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', # todo + 'label': '', + }, + 'no_tripod': { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 1, + 'alarm_name': 'no_tripod', + 'alarmContent': '未检测到三脚架', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', # todo + 'label': '', + }, + 'no_mask': { + 'alarmCategory': 0, + 'alarmType': '11', + 'handelType': 1, + 'category_order': 5, + 'alarm_name': 'no_mask', + 'alarmContent': '未佩戴呼吸防护设备', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6', + 'label': '' + }, + 'no_blower': { + 'alarmCategory': 0, + 'alarmType': '13', + 'handelType': 1, + 'category_order': 3, + 'alarm_name': 'no_blower', + 'alarmContent': '没有检测到通风设备', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x1A\x00\xAE', + 'label': '没有检测到通风设备' + }, + 'no_extinguisher': { + 'alarmCategory': 0, + 'alarmType': '14', + 'handelType': 1, + 'category_order': 2, + 'alarm_name': 'no_fire_extinguisher', + 'alarmContent': '未检测到灭火器', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4', + 'label': '', + }, + 'no_board': { + 'alarmCategory': 0, + 'alarmType': '17', + 'handelType': 1, + 'category_order': 4, + 'alarm_name': 'no_board', + 'alarmContent': '未检测到指示牌', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7', + 'label': '', + }, + 'harmful_gas': { + 'alarmCategory': 3, + 'alarmType': '', # todo + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'harmful_alarm', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + 'umd_harmful_gas': { # todo 要跟上面区分吗 + 'alarmCategory': 4, + 'alarmType': '', # todo + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'umd_harmful_gas', + 'alarmContent': '有害气体浓度超标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9', + 'label': '', + }, + 'health': { + 'alarmCategory': 2, + 'alarmType': '18', + 'handelType': 3, + 'category_order': -1, + 'alarm_name': 'health_alarm', + 'alarmContent': '作业人员心率血氧异常', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', + 'label': '', + }, + 'smoke': { + 'alarmCategory': 1, + 'alarmType': '19', # todo + 'handelType': 4, + 'category_order': 4, + 'alarm_name': 'cigarette', + 'alarmContent': '吸烟', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '吸烟', + }, + 'phone': { + 'alarmCategory': 1, + 'alarmType': '2', + 'handelType': 4, + 'category_order': 3, + 'alarm_name': 'phone', + 'alarmContent': '打电话', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '打电话', + }, + 'aqm': { + 'alarmCategory': 1, + 'alarmType': '18', + 'handelType': 2, + 'category_order': 2, + 'alarm_name': 'no_helmet', + 'alarmContent': '未佩戴安全帽', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', + 'label': '未佩戴安全帽', + }, + 'armband': { + 'alarmCategory': 1, + 'alarmType': '18', # todo + 'handelType': 2, + 'category_order': 5, + 'alarm_name': 'no_armband', + 'alarmContent': '未佩戴袖标', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95', # todo + 'label': '未佩戴袖标', + 'model_type': 'safe', + }, + 'break': { + 'alarmCategory': 1, + 'alarmType': '3', + 'handelType': 2, + 'category_order': 1, + 'alarm_name': 'break_in_alarm', + 'alarmContent': '非法闯入', + 'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB', + 'label': '非法闯入', + }, +} + +# UMD_PASS_MESSAGE = b'' # 上中下气体检测通过 +PREPARE_COMPLETE_MESSAGE = b'\xaa\x01\x00\x93\x19\x00\xAD' # 满足有限空间作业要求,可以作业 + + +def writeFile(file_path, data): + # print(f"写入{data}") + with open(file_path, mode='a', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + writer.writerows(data) + + +class EventController(): + def __init__(self): + self.timeout_event = asyncio.Event() + self.umd_complete = asyncio.Event() + self.qianzhi_check_complete = asyncio.Event() + self.laobao_complete = asyncio.Event() + + +class SiHeYi(): + def __init__(self, harmful_device_code): + self.url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={harmful_device_code}' # 后台访问数据的url + self.harmful_device_code = harmful_device_code # 四合一标识符 + self.last_ts = None # 上次读取的数据的生成时间戳 + + def waitPowerOn(self, script_start_time): + """ + 阻塞函数 + 循环是否开机,只有检测到开机才会退出函数 + + :param script_start_time:脚本启动的时间戳 + + :return: + """ + print("检测四合一是否开机") + + while True: + self.getNewData() + flag = script_start_time < self.last_ts # 当前时间T/F 开机/未开机 + print(f'{script_start_time} {self.last_ts} {flag}') + if flag: + print("检测到开机") + return + else: + print("未开机") + time.sleep(2) + + def getNewData(self): + """ + 阻塞函数 + 访问后台数据库 读取最新产生的四合一浓度 + 如果有返回数据则记录 该数据产生的时间。 + 如果之前没有记录 数据产生时间 或 访问到的数据产生时间 晚于 上次记录时间: + 则视为读取到新数据,返回新数据 + 没有数据等待n秒后重复询问 + :return: + """ + while True: + url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={self.harmful_device_code}' + print("访问四合一数据...") + response = get_request(url) + # response = getGasGata_fake() + if response and response.get('data'): + + data = response.get('data') + print(f"访问到四合一数据: {data}") + uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S") + if self.last_ts is None or (uptime.timestamp() - self.last_ts) > 0: + self.last_ts = uptime.timestamp() + if time.time() - uptime.timestamp() < 10 * 60: # 10分钟以前的数据不做处理 + ch4 = data.get('ch4') + co = data.get('co') + h2s = data.get('h2s') + o2 = data.get('o2') + return ch4, co, h2s, o2 + else: + print('ignore') + else: # url没有返回数据 + print("四合一没有读取到数据") + time.sleep(5) + + def isDataNormal(self, ch4, co, h2s, o2): + """ + 判断四项气体是否正常 + :param ch4: + :param co: + :param h2s: + :param o2: + :return: + """ + if float(ch4) > 10.0 \ + or float(co) > 10.0 \ + or float(h2s) > 120.0 \ + or float(o2) < 15: + return False # 气体异常 + else: + return True # 气体正常 + + +# class AnQuanMao(): +# def __init__(self, helmet_code): +# self.helmet_code = helmet_code +# self.url = f'http://172.27.46.84:30003/emergency/harmfulData?devcode={helmet_code}' # 后台访问数据的url +# self.last_ts = None # 上次读取的数据的生成时间戳 +# +# def getNewData(self): +# """ +# 阻塞进程 +# :return: +# """ +# while True: +# header = { +# 'ak': 'fe80b2f021644b1b8c77fda743a83670', +# 'sk': '8771ea6e931d4db646a26f67bcb89909', +# } +# url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' +# print("访问心率血氧数据...") +# response = get_request(url, headers=header) +# if response and response.get('data'): +# print("访问到心率血氧数据") +# vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 +# if vitalsigns_data: # 访问成功 +# upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), +# "%Y-%m-%d %H:%M:%S") # 访问数据的时间 +# if self.last_ts is None or ( +# upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 +# self.last_ts = upload_timestamp.timestamp() # 更新数据 +# if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 +# return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') +# else: +# print("无法访问到心率血氧数据") +# time.sleep(5) +# +# def isDataNormal(self, blood_oxygen, heartrate): +# if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 +# return False +# else: +# return True + + +class Laobaocheck(): + def __init__(self, eventController=None, alarm=None): + self.laobao_model = YOLO("weights/labor-v8-20241114.pt") + self.jiaodi_model = YOLO("weights/jiaodi.pt") + self.target = {"三脚架": [0], "灭火器": [34], "鼓风机": [58], "面罩": [11], "工作指示牌": [4, 6, 16]} + self.target_flag = {"三脚架": False, "灭火器": False, "鼓风机": False, "面罩": False, + "工作指示牌": False} # OD 模型有无检测这些目标 + self.jiaodi_flag = False # 分类模型 有无检测到交底 + self.laobao_pool = {} + + self.eventController = eventController + self.alarm = alarm + + def getUndetectedTarget(self): + # 获取未检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == False: + result.append(name) + if not self.jiaodi_flag: + result.append("交底") + return result + + def name2alarm(self, target_name): + alarm_map = { + '三脚架': 'no_tripod', + '灭火器': 'no_extinguisher', + '鼓风机': 'no_blower', + '面罩': 'no_mask', + '工作指示牌': 'no_board', + '交底': 'no_brief' + } + return alarm_map.get(target_name, None) + + def getDetectedTarget(self): + # 获取已检测目标的名称,返回str列表 + result = [] + for name, flag in self.target_flag.items(): + if flag == True: + result.append(name) + return result + + def name2id(self, input): + # 检测名称 映射为 id + if isinstance(input, str): + return self.target[input] + elif isinstance(input, list): + result = [] + for item in input: + if item in self.target: + result.append(self.target[item]) + if len(result) == 0: return [] + return list(set(np.concatenate([r for r in result]).astype(int).tolist())) + + def id2name(self, input): + """ + + :param input: int 或 [int,int,int...] + :return: + """ + # id -> 类别名称 + result = [] + if isinstance(input, int): + input = [input] + for id in input: + for k, v in self.target.items(): # k: 类别名称 , v: id_list + if id in v: result.append(k) + + return list(set(result)) + + def predict_isJiaodi(self, frames): + """ + 调用 jiaodi.pt 分类模型 + :return: True:交底,False:没检测到 交底 + """ + jiaodi_results = self.jiaodi_model.predict(source=frames, save=False, verbose=False) + jiaodi_prob = [jiaodi_result.probs.data[0].item() for jiaodi_result in jiaodi_results] + for prob in jiaodi_prob: + if prob > 0.6: + return True + return False + + def predict_laobao(self, frames): + ''' + 调用 labor-v8-20241114.pt OD 模型 + :param frames: + :return: [类别1,类别2] + ''' + target_idx = self.name2id(self.getUndetectedTarget()) + results = self.laobao_model.predict(source=frames, classes=flatten(target_idx), conf=0.6, + save=False, verbose=False) # results:list(4) 4帧的检测结果 + pred_c_list = list(set(np.concatenate([result.boxes.cls.tolist() for result in results]).astype( + int).tolist())) # 检测到的目标类别id_list,已去重 + return self.id2name(pred_c_list) + + def updateUnpredictedTargets(self, jiaodi_flag, pred_labels): + ''' + 更新 已检测到的目标 列表 和有无检测到交底 + :param pred_labels: [str, str...] + :return: None + ''' + for pred_label in pred_labels: + print(f"检测到{pred_label}") + self.target_flag[pred_label] = True + if self.jiaodi_flag == False and jiaodi_flag == True: + print(f"劳保检测:检测到交底") + self.jiaodi_flag = jiaodi_flag + + def model_predict_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + while True: + try: + ret, frames = cap.read() + frames = [frames] + if not ret: + break + cv2.namedWindow("Video Frame", cv2.WINDOW_AUTOSIZE) + cv2.resizeWindow('Video Frame', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame", frames[0]) + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cap.release() + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + if self.eventController != None and self.eventController.timeout_event.is_set(): # 超时退出 + cap.release() + cv2.destroyAllWindows() + return + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController != None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if self.getUndetectedTarget() == []: # 如果全部检验到了 + print("劳保物品 通过") + self.eventController.laobao_complete.set() + cap.release() + cv2.destroyAllWindows() + return # 退出检测 + else: # 如果还有未检测到的 + undetectedTargets = self.getUndetectedTarget() + for target_name in undetectedTargets: + alarm_dict = self.name2alarm(target_name) + if alarm_dict: + self.alarm.addAlarm(alarm_dict) + cap.release() + cv2.destroyAllWindows() + + def model_predict(self, stream_loader): + for frames in stream_loader: # type : list (4),连续的4帧 + if self.eventController is not None and self.eventController.timeout_event.is_set(): # 超时退出 + return + + if not frames: + continue + + # 构造 要检测目标的 id_list(把之前检测的目标 从 要检测目标的集合移出) + jiaodi_flag = self.predict_isJiaodi(frames) # bool, 检测 新收集的这几帧有无交底 + pred_label = self.predict_laobao(frames) # [str, str...] + self.updateUnpredictedTargets(jiaodi_flag, pred_label) + + if self.eventController is not None and self.eventController.umd_complete.is_set(): # 上中下气体检测完毕 + + # 检验所有物体都检验到了吗 + if not self.getUndetectedTarget(): # 如果全部检验到了 + print("劳保物品 通过") + self.eventController.laobao_complete.set() + return # 退出检测 + else: # 如果还有未检测到的 + # todo 这里是否要生成报警记录 + undetectedTargets = self.getUndetectedTarget() + for target_name in undetectedTargets: + alarm_dict = self.name2alarm(target_name) + if alarm_dict: + self.alarm.addAlarm(alarm_dict) + + +class YinHuanCheck: + def __init__(self, device_code, eventController, alarm, frame_threshold=20): + # 初始化YOLO模型及其他参数 + self.model = YOLO("weights/yinhuan.pt") + self.eventController = eventController + self.alarm = alarm + self.device_code = device_code + self.frame_threshold = frame_threshold # 连续异常帧阈值 + + # 针对每个人的异常计数器,键为 person_id + self.counters = { + 'no_helmet': {}, # 未佩戴安全帽 + 'smoking': {}, # 吸烟(检测到烟头) + 'phone': {}, # 打电话(检测到电话) + 'illegal_intrusion': {} # 非法闯入(既没有安全帽也没有工服) + } + # 针对袖标异常为全局条件:如果当前帧中所有人均未检测到袖标,则更新全局计数器 + self.armband_counter = 0 + + def id2name(self, input_id): + """ + 将检测到的类别ID转换为类别名称,支持单个ID或ID列表 + """ + result = [] + if isinstance(input_id, int): + input_id = [input_id] + for id in input_id: + for k, v in self.model.names.items(): # k: id , v: 类别名称 + if k == id: + result.append(v) + return list(set(result)) + + def detect_person(self, frames): + """ + 对输入的一批视频帧进行人员检测与跟踪,返回每帧中检测到的人员信息。 + 返回格式:list,每项为字典,键为 person_id,值为 {'crop': 截取的人像, 'box': 人员检测框 [x1,y1,x2,y2]} + """ + people_results = self.model.track(source=frames, conf=0.6, classes=[0], + save=False, verbose=False) # 检测人(类别0) + results = [] + for people_result in people_results: + orig_img = people_result.orig_img # 当前帧原图 + person_dict = {} + for person_box in people_result.boxes: + person_id = person_box.id.item() + if person_id: + box = person_box.xyxy.squeeze().tolist() # [x1, y1, x2, y2] + # 截取检测到的人像区域 + cropped_image = orig_img[int(box[1]):int(box[3]), int(box[0]):int(box[2])] + person_dict[person_id] = {'crop': cropped_image, 'box': box} + results.append(person_dict) + return results + + def detect_person_targets(self, people_results): + """ + 针对每个人的图像区域进行目标检测(安全帽、工服、烟头、电话、袖标)。 + 返回格式:list,每项为字典,键为 person_id,值为该人身上检测到的目标字典 {label: xyxy} + """ + # 收集所有人的图像区域 + person_images = [] + for frame_persons in people_results: + for info in frame_persons.values(): + person_images.append(info['crop']) + if len(person_images) == 0: + return [] + # 对所有人的区域进行目标检测 + results = self.model.predict(source=person_images, conf=0.6, classes=[2, 3, 4, 5, 6], + save=False, verbose=False) + person_detect_targets = [] + result_idx = 0 + for frame_persons in people_results: + frame_detection = {} + for person_id in frame_persons.keys(): + person_result = results[result_idx] + detection_dict = {} + for box in person_result.boxes: + label = int(box.cls.item()) + label_name = self.id2name(label) + xyxy = box.xyxy.squeeze().tolist() + # 假设每个box只对应一种类别,直接存入字典 + detection_dict[label_name[0]] = xyxy + frame_detection[person_id] = detection_dict + result_idx += 1 + person_detect_targets.append(frame_detection) + return person_detect_targets + + def annotate_alarm(self, frame, condition, person_box=None, detection=None): + """ + 在报警图片上对异常情况进行标注: + - person_box: 异常人员的检测框 + - detection: 异常物品的检测框,格式为 {label: xyxy} + """ + annotator = Annotator(deepcopy(frame), None, 18, "Arial.ttf", False) + + if person_box is not None: + annotator.box_label(person_box, condition, color=COLOR_RED, rotated=False) + + # 绘制异常物品的检测框(如烟头、电话等) + if detection is not None: + if person_box is not None: + offset_x, offset_y = int(person_box[0]), int(person_box[1]) + for label, box in detection.items(): + # 将 detection 框坐标转换为全局坐标 + global_box = [box[0] + offset_x, box[1] + offset_y, + box[2] + offset_x, box[3] + offset_y] + annotator.box_label(global_box, '', color=COLOR_RED, rotated=False) + else: + # 如果没有人的框,则直接使用 detection 框 + for label, box in detection.items(): + annotator.box_label(box, label, color=COLOR_RED, rotated=False) + return annotator.result() + + def trigger_alarm(self, frame, condition, person_id=None, detection=None, person_box=None): + """ + 当某个异常条件达到连续帧阈值时触发报警: + - condition: 异常类型,取值:'no_helmet', 'smoking', 'phone', 'illegal_intrusion', 'armband' + - 对于人员级别的异常,会标注该人员检测框及异常物品 + - 对于全局异常(armband)直接标注图片 + """ + # 定义异常对应的报警类型(需与系统定义的 ALARM_DICT 对应) + alarm_mapping = { + 'no_helmet': 'aqm', # 安全帽异常 + 'smoking': 'smoke', # 吸烟异常 + 'phone': 'phone', # 打电话异常 + 'illegal_intrusion': 'break', # 非法闯入 + 'armband': 'armband' # 袖标异常 + } + alarm_type = alarm_mapping.get(condition, 'unknown') + # 播报异常语言 + self.alarm.addAlarm(ALARM_DICT[alarm_type]) + + if self.alarm.alarm_record_center.need_alarm(self.device_code, ALARM_DICT[alarm_type]): + alarm_detections = {} + if alarm_type == 'smoke': + alarm_detections = {key : detection[key] for key in detection if key == '烟头'} + elif alarm_type == 'phone': + alarm_detections = {key : detection[key] for key in detection if key == '电话'} + # 生成报警图片:在报警图片上对异常人员与异常物品进行标注 + annotated_image = self.annotate_alarm(frame, ALARM_DICT[alarm_type]['label'], person_box, alarm_detections) + + # 上传报警图片到后台 + self.alarm.alarm_record_center.upload_alarm_record(self.device_code, ALARM_DICT[alarm_type], + alarm_np_img=annotated_image) + + def cleanup_counters(self, current_ids): + """ + 清理各异常计数器中失去的 person id(即当前帧中不再检测到的人员) + """ + for cond in self.counters: + lost_ids = [pid for pid in self.counters[cond] if pid not in current_ids] + for pid in lost_ids: + del self.counters[cond][pid] + + def process_batch(self, frames): + """ + 对一批视频帧进行处理: + 1. 对每帧进行人员检测及目标检测 + 2. 针对每个人判断是否存在异常: + - 未佩戴安全帽:若该人检测结果中没有 "安全帽" + - 吸烟:若检测到 "烟头" + - 打电话:若检测到 "电话" + - 非法闯入:若既没有检测到 "安全帽" 也没有检测到 "工服" + 3. 更新每个人针对各异常的连续帧计数器,若达到阈值则触发报警 + 4. 对于袖标异常,若当前帧中所有人均未检测到 "袖标",则更新全局计数器 + 5. 清理当前帧中不再检测到的 person id(清除计数器中遗留的记录) + """ + # 第一步:检测人员及其区域目标 + people_results = self.detect_person(frames) + + empty = all(not d for d in people_results) + if empty: + return + + person_detect_targets = self.detect_person_targets(people_results) + # 收集当前帧所有检测到的 person id(假设所有帧中检测到的人 id 集合取并集) + current_ids = set() + for frame_persons in people_results: + current_ids.update(frame_persons.keys()) + + # 针对每一帧分别处理 + for idx, frame in enumerate(frames): + frame_targets = person_detect_targets[idx] # 当前帧中,各人员的检测结果 + # 遍历当前帧的每个检测到的人员 + for person_id, detections in frame_targets.items(): + # 定义各异常条件 + no_helmet = "安全帽" not in detections + smoking = "烟头" in detections + phone = "电话" in detections + illegal_intrusion = ("安全帽" not in detections) and ("工服" not in detections) + # 依次更新对应计数器(未佩戴安全帽、吸烟、打电话、非法闯入) + for cond, abnormal in zip(['no_helmet', 'smoking', 'phone', 'illegal_intrusion'], + [no_helmet, smoking, phone, illegal_intrusion]): + if person_id not in self.counters[cond]: + self.counters[cond][person_id] = 0 + if abnormal: + self.counters[cond][person_id] += 1 + else: + self.counters[cond][person_id] = 0 + + # 若连续异常帧数达到阈值,则触发报警 + if self.counters[cond][person_id] >= self.frame_threshold: + # 获取该人员的检测框(用于标注) + person_box = people_results[idx][person_id]['box'] + self.trigger_alarm(frame, cond, person_id, detections, person_box) + # 触发报警后重置该人员对应计数器 + self.counters[cond][person_id] = 0 + + # 针对袖标的全局情况:如果当前帧中所有人均未检测到 "袖标",则更新全局计数器 + # 注意:这里遍历当前帧中每个检测结果 + if not any("袖标" in det for det in frame_targets.values()): + self.armband_counter += 1 + else: + self.armband_counter = 0 + if self.armband_counter >= self.frame_threshold: + self.trigger_alarm(frame, 'armband') + self.armband_counter = 0 + + # 清除计数器中已丢失的 person id + self.cleanup_counters(current_ids) + + def main(self, stream_loader): + """ + 主流程:从视频流中获取每一批帧,处理后检测异常并报警 + """ + for frames in stream_loader: # stream_loader 每次返回一批连续帧(例如4帧) + try: + self.process_batch(frames) + except Exception as ex: + traceback.print_exc() + # 记录错误信息 + logger.error(ex) + + def main_fake(self, video_path): + cap = cv2.VideoCapture(video_path) + ret, frames = cap.read() + while True: + try: + ret, frames = cap.read() + frames = [frames] + if not ret: + cap.release() + cv2.destroyAllWindows() + break + cv2.namedWindow("Video Frame2", cv2.WINDOW_AUTOSIZE) + # cv2.resizeWindow('Video Frame2', 800, 600) # 宽度800像素,高度600像素 + cv2.imshow("Video Frame2", frames[0]) + + self.process_batch(frames) + + except Exception as ex: + traceback.print_exc() + logger.error(ex) + cv2.destroyAllWindows() + # 等待1毫秒,检查是否按下了'q'键退出 + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + + +class Alarm(): + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, eventController=None): + self.pool = [] + self.device = device + self.thread_id = thread_id + self.tcp_manager = tcp_manager + self.main_loop = main_loop + self.eventController = eventController + + # self.alarm_interval_dict = {} + # self.alarm_interval = device.alarm_interval + # + # self.socket_interval_dict = {} + # self.socket_interval = device.alarm_interval + # self.socket_retry = 3 + + self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager, + category_interval=30, message_send_interval=3, retention_time=10, + category_priority={0: 0, 4: 1, 3: 2, 2: 3, + 1: 4}) # (优先级:0 > 4 > 3 > 2 > 1) + self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop) + + # todo 跟下面的alarm task二选一 + # self.thread_pool = GlobalThreadPool() + # self.thread_pool.submit_task(self.alarm_message_center.process_messages) + + def addAlarm(self, alarm_dict): + self.alarm_message_center.add_message(alarm_dict) + + # def addAlarm(self, content): + # """ + # 添加一条报警到报警队列中 + # :param content: + # :return: + # """ + # if content in self.pool: + # self.pool.remove(content) + # self.pool.append(content) + + def deleteAlarmOfLaoBao(self): + """ + 删除池子中有关劳保检测的报警 + :return: + """ + + # self.pool = [item for item in self.pool if "劳保" not in item] + + def laobao_condition(msg): + return msg['alarmCategory'] == 0 + + self.alarm_message_center.delete_messages(laobao_condition) + + def deleteAlaramOfUmdGas(self): + """ + 删除池子中有关上中下气体的报警 + :return: + """ + + def umd_condition(msg): + return msg['alarmCategory'] == 4 + + self.alarm_message_center.delete_messages(umd_condition) + + # self.pool = [item for item in self.pool if "劳保" not in item] + + # def main(self): + # while True: + # if self.eventController.timeout_event.is_set(): # 前置条件检查超时 + # self.deleteAlarmOfLaoBao() + # self.deleteAlaramOfUmdGas() + # if len(self.pool) != 0: + # content = self.pool.pop(0) + # print(f"{content},报警队列长度:{len(self.pool)}") + # # self.send_alarm_message("no_jiandu") + # time.sleep(1) + # + # def send_tcp_message(self, message: bytes, have_response=False): + # asyncio.run_coroutine_threadsafe( + # self.tcp_manager.send_message_to_device(device_id=self.device.id, + # message=message, + # have_response=have_response), + # self.main_loop) + + # def send_alarm_message(self, type): + # if self.tcp_manager: + # # if self.socket_interval_dict.get(type) is None \ + # # or (datetime.now() - self.socket_interval_dict.get(type)).total_seconds() > int(self.socket_interval): + # logger.debug("send alarm message %s %s", ALARM_DICT[type]['alarmContent'], + # ALARM_DICT[type]['alarmSoundMessage']) + # self.send_tcp_message(ALARM_DICT[type]['alarmSoundMessage'], have_response=True) + # self.socket_interval_dict[type] = datetime.now() + + +HEALTH_DEVICE_TYPE = '2' # 安全帽设备类型 +HARMFUL_DEVICE_TYPE = '4' # 四合一设备类型 + + +def get_group_device_list(device_code): + health_device_codes = [] + harmful_device_codes = [] + url = f'http://172.27.46.84:30003/v3/device/listGroupDevs?devcode={device_code}' + response = get_request(url) + if response and response.get('code') == 200 and response.get('data'): + data = response.get('data') + for item in data: + health_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HEALTH_DEVICE_TYPE] + harmful_device_codes = [item.get('deviceCode', '') for item in data if + item.get('deviceType', '') == HARMFUL_DEVICE_TYPE] + return health_device_codes, harmful_device_codes + + +class IntranetLimitSpaceSceneHandler(BaseSceneHandler): + + def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points): + super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop) + + self.start_time = time.time() # 脚本启动时间戳 + print(f'start time = {self.start_time}') + + # self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code, + # device_thread_id=thread_id) + + self.executor = ThreadPoolExecutor(max_workers=10) + self.loop = asyncio.get_running_loop() + + self.eventController = EventController() + self.alarm = Alarm(device, thread_id, tcp_manager, main_loop, self.eventController) + self.laobao_check = Laobaocheck(self.eventController, self.alarm) + self.yinhuan_check = YinHuanCheck(device_code=device.code, eventController=self.eventController, + alarm=self.alarm) + + self.anQuanMaoList = [] + self.siHeyiList = [] + self.siHeyiUmd = None # 上中下气体检测用的四合一设备 + health_device_codes, harmful_device_codes = get_group_device_list(device.code) + for health_device_code in health_device_codes: + self.anQuanMaoList.append(HelmetDataProcessor(health_device_code, self.alarm.alarm_record_center)) + for harmful_device_code in harmful_device_codes: + self.siHeyiList.append(SiHeYi(harmful_device_code)) + + if self.siHeyiList: + self.siHeyiUmd = self.siHeyiList[0] # todo 暂时先用第一个,后期要有标识标明用哪个 + else: + self.siHeyiUmd = SiHeYi('ZA0024587CC4CA98') + + async def laobaoCheck_task(self): + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + # await loop.run_in_executor(executor, self.laobao_check.model_predict_fake, + # r"D:\workspace\pythonProject\safe-algo-pro\2025-02-25 15-25-48.mkv") + await self.loop.run_in_executor(self.executor, self.laobao_check.model_predict, self.stream_loader) + + async def uMDGasCheck_task(self, eventController=None): + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + tflag_pool = [] # 返回数据正常了几次 + await self.loop.run_in_executor(self.executor, self.siHeyiUmd.waitPowerOn, + self.start_time) # 阻塞 uMDGasCheck_task 协程, 检测不到开机不往后进行 + print('上中下气体检测:四合一已开机') + + while True: # 模拟循环检测气体 + if eventController.timeout_event.is_set(): # 超时退出 + return + + ch4, co, h2s, o2 = await self.loop.run_in_executor(self.executor, self.siHeyiUmd.getNewData) # 判断气体是否合规 + flag = self.siHeyiUmd.isDataNormal(ch4, co, h2s, o2) + if flag == False: + tflag_pool.clear() + self.alarm.addAlarm(ALARM_DICT['umd_harmful_gas']) + else: + tflag_pool.append(True) + print(f"上中下气体检测正常次数:{tflag_pool}") + if len(tflag_pool) == 3: + break # 退出检测 + + print('上中下气体检测:上中下气体检测通过') # todo 需要语音吗 + self.eventController.umd_complete.set() + return + + async def alarm_task(self): + + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + await self.loop.run_in_executor(self.executor, self.alarm.alarm_message_center.process_messages) + + async def yinhuanCheck_task(self): + """ + 检查有无吸烟、袖标、安全帽、打电话、闲杂人(工服)等(隐含的类别:人,头) + :return: + """ + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + await self.loop.run_in_executor(self.executor, self.yinhuan_check.main_fake, + r"D:\workspace\pythonProject\safe-algo-pro\2025-02-26 08-49-39.mkv") + + async def xinlvCheck_task(self): + + def fun(anQuanMao): + blood_oxygen, heartrate = anQuanMao.getNewData() + if not anQuanMao.isDataNormal(blood_oxygen, heartrate): + self.alarm.addAlarm(ALARM_DICT['health']) + anQuanMao.sendAlarmRecord(blood_oxygen, heartrate) + # + # flag = anQuanMao.isDataNormal(blood_oxygen, heartrate) + # if flag == False: + # self.alarm.addAlarm(ALARM_DICT['health']) + + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + for aqm in self.anQuanMaoList: + await self.loop.run_in_executor(self.executor, fun, aqm) + + async def gasCheck(self): + """ + 四合一气体检测 + :return: + """ + + def fun(siHeyi): + ch4, co, h2s, o2 = siHeyi.getNewData() + flag = siHeyi.isDataNormal(ch4, co, h2s, o2) + if flag == False: + self.alarm.addAlarm(ALARM_DICT['harmful_gas']) + + # executor = ThreadPoolExecutor(max_workers=3) + # loop = asyncio.get_running_loop() + for siHeYi in self.siHeyiList: + await self.loop.run_in_executor(self.executor, fun, siHeYi) + + def run(self): + async def fun(): + try: + self.loop = asyncio.get_running_loop() + + # 添加异常处理 + def handle_task_exception(task): + try: + task.result() # 触发异常(如果有) + except Exception as e: + logger.exception(f"任务 {task.get_name()} 发生异常: {e}") + + # 并行执行任务 + # uMDGasCheck_task = asyncio.create_task( + # self.uMDGasCheck_task(self.eventController)) + # laobaoCheck_task = asyncio.create_task(self.laobaoCheck_task()) + alarm_task = asyncio.create_task(self.alarm_task()) + + # 给所有任务添加异常处理 + # for task in [uMDGasCheck_task, laobaoCheck_task, alarm_task]: + # task.add_done_callback(handle_task_exception) + alarm_task.add_done_callback(handle_task_exception) + + # done, pending = await asyncio.wait({uMDGasCheck_task, laobaoCheck_task}, timeout=300000.0) + # + # if uMDGasCheck_task in done and laobaoCheck_task in done: + # await uMDGasCheck_task + # await laobaoCheck_task + # self.eventController.timeout_event.set() + # self.alarm.alarm_message_center.send_immediate_command(PREPARE_COMPLETE_MESSAGE) + # print("前置条件检查完成,退出") + # + # else: + # # 如果超时,则取消未完成的任务 + # self.eventController.timeout_event.set() + # laobaoCheck_task.cancel() + # uMDGasCheck_task.cancel() + # print("前置条件检查时间过长,退出") + + # 删除前面产生的报警 + self.alarm.deleteAlarmOfLaoBao() + self.alarm.deleteAlaramOfUmdGas() + + # 并行执行任务 + print("开始工作") + xinlvCheck_task = asyncio.create_task(self.xinlvCheck_task()) + yinhuanCheck_task = asyncio.create_task(self.yinhuanCheck_task()) + gasCheck_task = asyncio.create_task(self.gasCheck()) + + # 也给这些任务添加异常处理 + for task in [xinlvCheck_task, yinhuanCheck_task, gasCheck_task]: + task.add_done_callback(handle_task_exception) + + try: + results = await asyncio.gather(yinhuanCheck_task, gasCheck_task, xinlvCheck_task, + return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.exception(f"任务发生异常: {result}") + except Exception as e: + logger.exception(f"gather 执行过程中发生异常: {e}") + + done1, pending1 = await asyncio.wait({alarm_task}, timeout=300000.0) + except Exception as e: + logger.exception(f"run 方法中的 fun 发生异常: {e}") + + asyncio.run(fun()) + + +if __name__ == '__main__': + # print(getNewGasData()) + model = YOLO("/home/pc/Desktop/project/safe-algo-pro/weights/yinhuan.pt") + print(model.names) diff --git a/tcp/harmful_device_handler.py b/tcp/harmful_device_handler.py new file mode 100644 index 0000000..ecbdd9c --- /dev/null +++ b/tcp/harmful_device_handler.py @@ -0,0 +1,207 @@ +import asyncio +import struct +import traceback +import json +import base64 +import re +from datetime import datetime +from typing import Dict + +from common.harmful_gas_manager import HarmfulGasManager +from common.http_utils import send_request_async +from common.global_logger import logger +from scene_handler.alarm_record_center import AlarmRecordCenter +from services.global_config import GlobalConfig + + +# 有害气体数据解析器 +class HarmfulGasHandler: + """有害气体设备数据解析器""" + + gas_units = { + 0: "%LEL", + 1: "%VOL", + 2: "PPM", + 3: "umol/mol", + 4: "mg/m3", + 5: "ug/m3", + 6: "℃", + 7: "%" + } + + decimals = { + 0: "没有小数点", + 1: "有一位小数", + 2: "有两位小数", + 3: "有三位小数" + } + + gas_statuses = { + 0: "预热", + 1: "正常", + 3: "传感器故障", + 5: "低限报警", + 6: "高限报警" + } + + gas_types = { + 3: "硫化氢 (H2S)", + 4: "一氧化碳 (CO)", + 5: "氧气 (O2)", + 50: "可燃气体 (Ex)", + } + + alarm_dict = { + 3: { + 'alarmType': '5', + 'alarmContent': '硫化氢浓度过高', + }, + 4: { + 'alarmType': '7', + 'alarmContent': '一氧化碳浓度过高', + }, + 5: { + 'alarmType': '4', + 'alarmContent': '氧气浓度过低', + }, + 50: { + 'alarmType': '8', + 'alarmContent': '可燃气体浓度过高', + } + } + + def __init__(self, main_loop=None): + self._harmful_gas_manager = HarmfulGasManager() + self._push_ts_dict = {} + self.alarm_record_center = AlarmRecordCenter(main_loop=main_loop) + + async def parse(self, message) -> Dict: + + message = message.replace('\r\n', '\n').replace('\r', '\n') + + """解析有害气体设备数据""" + harmful_gas_pattern = r"^([A-Za-z0-9]+)\{(\"sensorDatas\":\[(.*?)\])\}$" + match = re.match(harmful_gas_pattern, message, re.DOTALL) + if not match: + print('有害气体浓度解析异常') + return {} + + device_code = match.group(1) # 设备号 + sensor_data_str = "{" + match.group(2) + "}" # JSON数组部分 + + try: + sensor_data_json = json.loads(sensor_data_str) + sensor_data = sensor_data_json.get('sensorDatas', []) + + if sensor_data: + await self._push_data(device_code, message) + + for data_item in sensor_data: + gas_type, gas_data = self._parse_sensor_item(device_code, data_item) + # 判断异常并上报 + if gas_type is not None and gas_data is not None: + if self._is_data_alarm(device_code, gas_type, gas_data): + print(f"报警: {device_code}, {gas_type}, {gas_data}") + self._save_and_send_alarm(device_code, gas_type, gas_data) + + print(self._harmful_gas_manager.get_device_all_data(device_code)) + + except json.JSONDecodeError: + logger.error(f"JSON解析错误: {message}") + return {} + except Exception as e: + logger.error(f"解析有害气体数据时出错: {e}") + logger.error(traceback.format_exc()) + return {} + + def _parse_sensor_item(self, device_code: str, data: Dict): + """解析单个传感器数据项""" + try: + flag = data.get("flag") + gas_value = data.get("gas_value") + gas_dec = data.get("gas_dec") + gas_status = data.get("gas_status") + gas_type = data.get("gas_type") + gas_unit = data.get("gas_unit") + + # 获取单位、精度、状态和气体类型的描述 + unit = self.gas_units.get(gas_unit, "未知单位") + precision = self.decimals.get(gas_dec, "未知精度") + status = self.gas_statuses.get(gas_status, "未知状态") + gas_type_name = self.gas_types.get(gas_type, "未知气体") + + # 格式化气体浓度(根据精度进行转换) + gas_value = self._handle_precision(gas_value, gas_dec) + + gas_data = { + "flag": flag, + "gas_value": gas_value, + "gas_unit": unit, + "gas_status": status, + "gas_type": gas_type_name, + "gas_type_code": gas_type, + "precision": precision, + 'gas_ts': datetime.now() + } + + self._harmful_gas_manager.set_device_data(device_code, gas_type, gas_data) + return gas_type, gas_data + except Exception as e: + logger.error(f"解析传感器数据项时出错: {e}") + return None + + def _is_data_alarm(self, device_code: str, gas_type: int, gas_data: Dict) -> bool: + """处理报警""" + gas_value = gas_data['gas_value'] + if gas_type == 3: + return gas_value > 10 + elif gas_type == 4: + return gas_value > 30 + elif gas_type == 5: + return gas_value < 19.5 + elif gas_type == 50: + return gas_value > 10 + else: + return False + + def _save_and_send_alarm(self, device_code: str, gas_type: int, gas_data: Dict) -> None: + self.alarm_record_center.upload_alarm_record(device_code=device_code, + alarm_dict=self.alarm_dict[gas_type], + alarm_value=gas_data['gas_value']) + + def _handle_precision(self, gas_value, gas_dec): + """处理气体浓度精度""" + if gas_dec == 0: + return gas_value + elif gas_dec == 1: + return gas_value / 10 + elif gas_dec == 2: + return gas_value / 100 + elif gas_dec == 3: + return gas_value / 1000 + else: + return gas_value + + async def _push_data(self, device_code: str, message: str) -> None: + """推送数据到外部系统""" + global_config = GlobalConfig() + harmful_push_config = global_config.get_harmful_gas_push_config() + + if harmful_push_config and harmful_push_config.push_url: + last_ts = self._push_ts_dict.get(device_code) + current_time = datetime.now() + + # 检查是否需要推送数据 + if last_ts is None or (current_time - last_ts).total_seconds() > harmful_push_config.push_interval: + # 将字符串编码为字节类型 + encoded_bytes = base64.b64encode(message.encode('utf-8')) + # 将字节编码结果转换为字符串 + encoded_string = encoded_bytes.decode('utf-8') + logger.debug(f'before encode: {message}') + logger.debug(f'after encode: {encoded_string}') + push_message = {"content": encoded_string} + logger.debug(f'body: {push_message}') + asyncio.create_task(send_request_async(harmful_push_config.push_url, push_message)) + self._push_ts_dict[device_code] = current_time # 更新推送时间戳 + else: + logger.debug('no harmful push config')