Newer
Older
safe-algo-pro / scene_handler / block_scene_handler.py
import time
import traceback
from asyncio import Event
from copy import deepcopy
from datetime import datetime


from flatbuffers.builder import np
from scipy.spatial import ConvexHull

from algo.stream_loader import OpenCVStreamLoad
from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area
from common.device_status_manager import DeviceStatusManager
from common.global_logger import logger
from common.global_thread_pool import GlobalThreadPool
from common.harmful_gas_manager import HarmfulGasManager
from common.image_plotting import Annotator
from entity.device import Device
from scene_handler.alarm_message_center import AlarmMessageCenter
from scene_handler.alarm_record_center import AlarmRecordCenter
from scene_handler.base_scene_handler import BaseSceneHandler
from scene_handler.limit_space_scene_handler import is_overlapping
from services.global_config import GlobalConfig
from tcp.tcp_manager import TcpManager

from entity.device import Device
from common.http_utils import get_request
from ultralytics import YOLO

'''
alarmCategory: 
0 行为监管
1 环境监管
2 人员监管
3 围栏监管

handelType:
0 检测到报警
1 未检测到报警
2 人未穿戴报警
3 其他
'''
ALARM_DICT = [
    {
        'alarmCategory': 0,
        'alarmType': '1',
        'handelType': 1,
        'category_order': 1,
        'class_idx': [34],
        'alarm_name': 'no_fire_extinguisher',
        'alarmContent': '未检测到灭火器',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '2',
        'handelType': 1,
        'category_order': 2,
        'class_idx': [43],
        'alarm_name': 'no_barrier_tape',
        'alarmContent': '未检测到警戒线',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '3',
        'handelType': 1,
        'category_order': 3,
        'class_idx': [48],
        'alarm_name': 'no_cone',
        'alarmContent': '未检测到锥桶',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '4',
        'handelType': 1,
        'category_order': 4,
        'class_idx': [4, 5, 16],
        'alarm_name': 'no_board',
        'alarmContent': '未检测到指示牌',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '5',
        'handelType': 2,
        'category_order': -1,
        'class_idx': [18],
        'alarm_name': 'no_helmet',
        'alarmContent': '未佩戴安全帽',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '未佩戴安全帽',
    },
    # todo 明火
    {
        'alarmCategory': 1,
        'alarmType': '7',
        'handelType': 3,
        'category_order': 1,
        'class_idx': [],
        'alarm_name': 'gas_alarm',
        'alarmContent': '甲烷浓度超限',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 1,
        'alarmType': '8',
        'handelType': 3,
        'category_order': 2,
        'class_idx': [],
        'alarm_name': 'harmful_alarm',
        'alarmContent': '有害气体浓度超标',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 2,
        'alarmType': '9',
        'handelType': 3,
        'category_order': -1,
        'class_idx': [],
        'alarm_name': 'health_alarm',
        'alarmContent': '心率血氧异常',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '',
    },
    {
        'alarmCategory': 3,
        'alarmType': '10',
        'handelType': 2,
        'category_order': 4,
        'class_idx': [24],
        'alarm_name': 'break_in_alarm',
        'alarmContent': '非法闯入',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x12\x00\xA6',
        'label': '非法闯入',
    },

]

COLOR_RED = (0, 0, 255)
COLOR_BLUE = (255, 0, 0)


class BlockSceneHandler(BaseSceneHandler):
    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points):
        super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)
        self.__stop_event = Event(loop=main_loop)
        self.health_ts_dict = {}
        self.harmful_ts_dict = {}
        self.object_ts_dict = {}
        self.thread_pool = GlobalThreadPool()

        self.alarm_message_center = AlarmMessageCenter(device.id,main_loop=main_loop, tcp_manager=tcp_manager,
                                                       category_priority={2: 0, 1: 1, 3: 2, 0: 3})
        self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval,main_loop=main_loop)
        self.harmful_data_manager = HarmfulGasManager()
        self.device_status_manager = DeviceStatusManager()


        self.health_device_codes = ['HWIH061000056395']  # todo
        self.harmful_device_codes = []  # todo

        for helmet_code in self.health_device_codes:
            self.thread_pool.submit_task(self.health_data_task, helmet_code)
        for harmful_device_code in self.harmful_device_codes:
            self.thread_pool.submit_task(self.harmful_data_task, harmful_device_code)

        self.thread_pool.submit_task(self.alarm_message_center.process_messages)

        # todo 明火
        self.model = YOLO('weights/labor-v8-20241114.pt')
        self.model_classes = {
            0: '三脚架',
            3: '人',
            4: '作业信息公示牌',
            6: '危险告知牌',
            9: '反光衣',
            11: '呼吸面罩',
            13: '四合一',
            15: '头',
            16: '安全告知牌',
            18: '安全帽',
            20: '安全标识牌',
            24: '工服',
            34: '灭火器',
            43: '警戒线',
            48: '路锥',
            58: '鼓风机',
        }
        self.PERSON_CLASS_IDX = 3
        self.HEAD_CLASS_IDX = 15

        self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
                                              device_thread_id=thread_id)
        self.range_points = range_points
        self.abs_range_points = self.get_absolute_range()

        self.tracking_status = {}  # 跟踪每个行人的状态
        self.max_missing_frames = 25  # 报警的阈值
        self.disappear_threshold = 25 * 3  # 移除行人的阈值

    def get_absolute_range(self):
        fence_info = eval(self.range_points)
        if fence_info and len(fence_info) > 1:
            abs_points = []
            for p in fence_info:
                abs_points.append(
                    [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))])

            abs_points = np.array(abs_points, dtype=np.int32)
            hull = ConvexHull(abs_points)
            sorted_coordinates = abs_points[hull.vertices]
            # abs_points = abs_points.reshape((-1, 1, 2))
            return sorted_coordinates
        else:
            return None

    def harmful_data_task(self, harmful_device_code):
        while not self.__stop_event.is_set():
            harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code)
            for gas_type, gas_data in harmful_gas_data.items():
                ts_key = f'{harmful_device_code}_{gas_type}'
                last_ts = self.harmful_ts_dict.get(ts_key)
                gas_ts = gas_data.get('gas_ts')
                if last_ts is None or (gas_ts - last_ts).total_seconds() > 0:
                    self.harmful_ts_dict[ts_key] = gas_ts
                    self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data)

    def health_data_task(self, helmet_code):
        while not self.__stop_event.is_set():
            header = {
                'ak': 'fe80b2f021644b1b8c77fda743a83670',
                'sk': '8771ea6e931d4db646a26f67bcb89909',
            }
            url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}'
            response = get_request(url, headers=header)
            if response and response.get('data'):
                last_ts = self.health_ts_dict.get(helmet_code)
                vitalsigns_data = response.get('data').get('vitalSignsData')
                if vitalsigns_data:
                    upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S")
                    if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0:
                        self.health_ts_dict[helmet_code] = upload_timestamp.timestamp()
                        if time.time() - upload_timestamp.timestamp() < 10 * 60:  # 10分钟以前的数据不做处理
                            self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'),
                                                     vitalsigns_data.get('heartRate'),upload_timestamp)
            time.sleep(10)

    def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp):
        logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, '
                     f'upload_timestamp = {upload_timestamp}')
        if heartrate < 60 or heartrate > 120 or blood_oxygen < 85:
            alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2]
            if alarm_dict:
                self.alarm_message_center.add_message(alarm_dict[0])
        # todo 需要生成报警记录吗 需要往后台发原始数据吗

    def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data):
        alarm = False
        gas_value = gas_data['gas_value']
        if gas_type == 3:  # h2s
            alarm = gas_value > 120.0
        elif gas_type == 4:  # co
            alarm = gas_value > 10.0
        elif gas_type == 5:  # o2
            alarm = gas_value < 15
        elif gas_type == 50:  # ex
            alarm = gas_value > 10

        if alarm:
            alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1]
            if alarm_dict:
                self.alarm_message_center.add_message(alarm_dict[0])
                # todo  需要生成报警记录吗

    def model_predict(self, frames):
        results_generator = self.model(frames, save_txt=False, save=False, verbose=False, conf=0.5,
                                       classes=list(self.model_classes.keys()),
                                       imgsz=640,
                                       stream=True)
        result_boxes = []
        for r in results_generator:
            result_boxes.append(r.boxes)

        pred_ids = [[int(box.cls) for box in sublist] for sublist in result_boxes]
        pred_names = [[self.model_classes[int(box.cls)] for box in sublist] for sublist in result_boxes]
        return result_boxes, pred_ids, pred_names

    def handle_behave_alarm(self, frames, result_boxes, pred_ids, pred_names):
        behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0]
        for alarm_dict in behave_alarm_dicts:
            for idx, frame_boxes in enumerate(result_boxes):
                frame = frames[idx]
                frame_ids, frame_names = pred_ids[idx], pred_names[idx]
                object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']]
                if alarm_dict['handelType'] == 0:  # 检测到就报警
                    if object_boxes:
                        self.alarm_message_center.add_message(alarm_dict)
                        if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                            annotator = Annotator(deepcopy(frame))
                            for box in frame_boxes:
                                if int(box.cls) != self.HEAD_CLASS_IDX:
                                    box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE
                                    annotator.box_label(box.xyxy.cpu().squeeze(),
                                                        f"{self.model_classes[int(box.cls)]}",
                                                        color=box_color,
                                                        rotated=False)
                            self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                         annotator.result())

                elif alarm_dict['handelType'] == 1:  # 检测不到报警
                    if object_boxes:
                        self.object_ts_dict[alarm_dict['alarm_name']] = time.time()
                    else:
                        last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0)
                        if time.time() - last_ts > 5:
                            self.object_ts_dict[alarm_dict['alarm_name']] = time.time()
                            self.alarm_message_center.add_message(alarm_dict)
                            if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                                annotator = Annotator(deepcopy(frame))
                                for box in frame_boxes:
                                    if int(box.cls) != self.HEAD_CLASS_IDX:
                                        annotator.box_label(box.xyxy.cpu().squeeze(),
                                                            f"{self.model_classes[int(box.cls)]}",
                                                            color=COLOR_BLUE,
                                                            rotated=False)
                                self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                             annotator.result())
                elif alarm_dict['handelType'] == 2:  # 人未穿戴报警
                    person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX]
                    head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX]
                    has_alarm = False
                    annotator = None
                    for person_box in person_boxes:
                        person_bbox = person_box.xyxy.cpu().squeeze()
                        # 检查这个人是否佩戴了安全帽
                        has_helmet = True
                        person_head = get_person_head(person_bbox, head_boxes)
                        if person_head is not None:
                            has_helmet = any(
                                is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze())
                                for helmet in object_boxes)
                        if not has_helmet:
                            has_alarm = True
                            if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                                annotator = Annotator(deepcopy(frame)) if annotator is None else annotator
                                annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False)

                    if has_alarm:
                        self.alarm_message_center.add_message(alarm_dict)
                        if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                            for box in frame_boxes:
                                box_cls = box.cls
                                if box_cls != self.PERSON_CLASS_IDX and box_cls != self.HEAD_CLASS_IDX:
                                    annotator.box_label(box.xyxy.cpu().squeeze(),
                                                        f"{self.model_classes[int(box.cls)]}",
                                                        color=COLOR_BLUE,
                                                        rotated=False)
                            self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                         annotator.result())

    def handle_break_in_alarm(self, frames, result_boxes, pred_ids, pred_names):
        break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3]
        for alarm_dict in break_in_alarm_dicts:
            for idx, frame_boxes in enumerate(result_boxes):
                frame = frames[idx]
                frame_ids, frame_names = pred_ids[idx], pred_names[idx]
                person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX]
                head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX]
                object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']]
                has_alarm = False
                annotator = None
                for person_box in person_boxes:
                    person_bbox = person_box.xyxy.cpu().squeeze()
                    person_id = person_box.id
                    if is_within_alert_range(person_bbox, self.abs_range_points):
                        has_object = True
                        person_head = get_person_head(person_bbox, head_boxes)
                        if person_head is not None:
                            overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(person_bbox)
                            if overlap_ratio < 0.5:  # 头占人<0.5,判断是否穿工服。不太准确
                                has_object = any(
                                    is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze())
                                    for object_boxe in object_boxes)
                        if not has_object:
                            self.tracking_status[person_box.id] = self.tracking_status.get(person_box.id, 0) + 1

                            has_alarm = True
                            if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                                annotator = Annotator(deepcopy(frame)) if annotator is None else annotator
                                annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False)

                if has_alarm:
                    self.alarm_message_center.add_message(alarm_dict)
                    if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                        self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                     annotator.result())

    def run(self):
        while not self.stream_loader.init:
            if self.__stop_event.is_set():
                break  # 如果触发了停止事件,则退出循环
            self.stream_loader.init_cap()
        for frames in self.stream_loader:
            try:
                if self.__stop_event.is_set():
                    break  # 如果触发了停止事件,则退出循环
                if not frames:
                    continue

                self.device_status_manager.set_status(device_id=self.device.id)
                result_boxes, pred_ids, pred_names = self.model_predict(frames)  # 结果都是二维数组,对应batch中的每个frame
                # print(pred_names)
                self.handle_behave_alarm(frames, result_boxes, pred_ids, pred_names)
                self.handle_break_in_alarm(frames, result_boxes, pred_ids, pred_names)

            except Exception as ex:
                traceback.print_exc()
                logger.error(ex)