Newer
Older
safe-algo-pro / scene_handler / block_scene_handler.py
zhangyingjie on 4 Mar 27 KB 部署版本
import time
import traceback
from asyncio import Event
from copy import deepcopy
from datetime import datetime

import numpy as np
import asyncio
from scipy.spatial import ConvexHull

from algo.stream_loader import OpenCVStreamLoad
from common.detect_utils import is_within_alert_range, get_person_head, intersection_area, bbox_area
from common.device_status_manager import DeviceStatusManager
from common.display_frame_manager import DisplayFrameManager
from common.global_logger import logger
from common.global_thread_pool import GlobalThreadPool
from common.harmful_gas_manager import HarmfulGasManager
from common.image_plotting import Annotator, colors
from db.database import get_db
from entity.device import Device
from scene_handler.alarm_message_center import AlarmMessageCenter
from scene_handler.alarm_record_center import AlarmRecordCenter
from scene_handler.base_scene_handler import BaseSceneHandler
from scene_handler.limit_space_scene_handler import is_overlapping
from services.data_gas_service import DataGasService
from services.global_config import GlobalConfig
from tcp.tcp_manager import TcpManager

from entity.device import Device
from common.http_utils import get_request
from ultralytics import YOLO

'''
alarmCategory: 
0 行为监管
1 环境监管
2 人员监管
3 围栏监管

handelType:
0 检测到报警
1 未检测到报警
2 人未穿戴报警
3 其他
'''
ALARM_DICT = [
    {
        'alarmCategory': 0,
        'alarmType': '14',
        'handelType': 1,
        'category_order': 1,
        'class_idx': [34],
        'alarm_name': 'no_fire_extinguisher',
        'alarmContent': '未检测到灭火器',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x30\x00\xC4',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '15',
        'handelType': 1,
        'category_order': 2,
        'class_idx': [43],
        'alarm_name': 'no_barrier_tape',
        'alarmContent': '未检测到警戒线',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x32\x00\xC6',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '16',
        'handelType': 1,
        'category_order': 3,
        'class_idx': [48],
        'alarm_name': 'no_cone',
        'alarmContent': '未检测到锥桶',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x31\x00\xC5',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '17',
        'handelType': 1,
        'category_order': 4,
        'class_idx': [4, 5, 16],
        'alarm_name': 'no_board',
        'alarmContent': '未检测到指示牌',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x33\x00\xC7',
        'label': '',
    },
    {
        'alarmCategory': 0,
        'alarmType': '2',
        'handelType': 2,
        'category_order': -1,
        'class_idx': [18],
        'alarm_name': 'no_helmet',
        'alarmContent': '未佩戴安全帽',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x01\x00\x95',
        'label': '未佩戴安全帽',
    },
    # todo 明火
    {
        'alarmCategory': 1,
        'alarmType': '1',
        'handelType': 3,
        'category_order': 1,
        'class_idx': [],
        'alarm_name': 'gas_alarm',
        'alarmContent': '甲烷浓度超限',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x34\x00\xC8',
        'label': '',
    },
    {
        'alarmCategory': 1,
        'alarmType': '',
        'handelType': 3,
        'category_order': 2,
        'class_idx': [],
        'alarm_name': 'harmful_alarm',
        'alarmContent': '有害气体浓度超标',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x35\x00\xC9',
        'label': '',
    },
    {
        'alarmCategory': 2,
        'alarmType': '18',
        'handelType': 3,
        'category_order': -1,
        'class_idx': [],
        'alarm_name': 'health_alarm',
        'alarmContent': '心率血氧异常',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA',
        'label': '',
    },
    {
        'alarmCategory': 3,
        'alarmType': '3',
        'handelType': 2,
        'category_order': 4,
        'class_idx': [24],
        'alarm_name': 'break_in_alarm',
        'alarmContent': '非法闯入',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x37\x00\xCB',
        'label': '非法闯入',
    },

]

COLOR_RED = (0, 0, 255)
COLOR_BLUE = (255, 0, 0)


class BlockSceneHandler(BaseSceneHandler):
    def __init__(self, device: Device, thread_id: str, tcp_manager: TcpManager, main_loop, range_points):
        super().__init__(device=device, thread_id=thread_id, tcp_manager=tcp_manager, main_loop=main_loop)
        self.__stop_event = Event(loop=main_loop)
        self.health_ts_dict = {}
        self.harmful_ts_dict = {}
        self.object_ts_dict = {}
        self.thread_pool = GlobalThreadPool()

        self.alarm_message_center = AlarmMessageCenter(device.id, main_loop=main_loop, tcp_manager=tcp_manager,
                                                       category_interval=30, message_send_interval=3, retention_time=10,
                                                       category_priority={2: 0, 1: 1, 3: 2,
                                                                          0: 3})  # (优先级:2 > 1 > 3 > 0)
        self.alarm_record_center = AlarmRecordCenter(save_interval=device.alarm_interval, main_loop=main_loop)
        self.harmful_data_manager = HarmfulGasManager()
        self.device_status_manager = DeviceStatusManager()
        self.display_frame_manager = DisplayFrameManager()

        # todo 要改成通过后台接口读取设备编号
        self.health_device_codes = ['HWIH061000056395']
        self.harmful_device_codes = ['862635063168165A']

        self.thread_pool.submit_task(self.gas_data_task, device.code)
        for helmet_code in self.health_device_codes:
            self.thread_pool.submit_task(self.health_data_task, helmet_code)
        for harmful_device_code in self.harmful_device_codes:
            self.thread_pool.submit_task(self.harmful_data_query_task, harmful_device_code)

        self.thread_pool.submit_task(self.alarm_message_center.process_messages)

        # todo 明火
        self.model = YOLO('weights/labor-v8-20250115-fp16.engine')
        self.model_classes = {
            # 0: '三脚架',
            3: '人',
            4: '作业信息公示牌',
            6: '危险告知牌',
            9: '反光衣',
            # 11: '呼吸面罩',
            # 13: '四合一',
            15: '头',
            16: '安全告知牌',
            18: '安全帽',
            20: '安全标识牌',
            24: '工服',
            34: '灭火器',
            43: '警戒线',
            48: '路锥',
            58: '鼓风机',
        }
        self.PERSON_CLASS_IDX = 3
        self.HEAD_CLASS_IDX = 15

        self.vid_stride = 3

        self.stream_loader = OpenCVStreamLoad(camera_url=device.input_stream_url, camera_code=device.code,
                                              device_thread_id=thread_id, vid_stride=self.vid_stride)
        self.range_points = range_points
        self.abs_range_points = self.get_absolute_range()

        self.tracking_status = {}  # 跟踪每个行人的状态
        self.max_missing_frames = 25 / self.vid_stride  # 报警的阈值
        self.disappear_threshold = 25 * 3 / self.vid_stride  # 移除行人的阈值

        self.frames_detected = 0
        self.fps_ts = None

    def get_absolute_range(self):
        if not self.range_points:
            return None

        fence_info = eval(self.range_points)
        if fence_info and len(fence_info) > 1:
            abs_points = []
            for p in fence_info:
                abs_points.append(
                    [int(p[0] * int(self.stream_loader.frame_width)), int(p[1] * int(self.stream_loader.frame_height))])

            abs_points = np.array(abs_points, dtype=np.int32)
            hull = ConvexHull(abs_points)
            sorted_coordinates = abs_points[hull.vertices]
            # abs_points = abs_points.reshape((-1, 1, 2))
            return sorted_coordinates
        else:
            return None

    def gas_data_task(self, tree_device_code):
        while not self.__stop_event.is_set():
            asyncio.run_coroutine_threadsafe(
                self.handle_gas_alarm(tree_device_code), self.main_loop
            )
            time.sleep(5)

    async def handle_gas_alarm(self, tree_device_code):
        try:
            async for db in get_db():
                gasService = DataGasService(db)
                gas_data = await gasService.latest_query(tree_device_code)
                if gas_data and time.time() - gas_data.ts.timestamp() < 60:
                    if gas_data.gas_value > 200.0:
                        alarm_dict = [d for d in ALARM_DICT if
                                      d['alarmCategory'] == 1 and d['alarm_name'] == 'gas_alarm']
                        if alarm_dict:
                            self.alarm_message_center.add_message(alarm_dict[0])

                            if self.alarm_record_center.need_alarm(self.device.code, alarm_dict[0]):
                                self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict[0],
                                                                             alarm_np_img=None,alarm_value=gas_data.gas_value)
        except Exception as e:
            print(f"Error in latest_query: {e}")

    # 一体机直接接收四合一浓度
    def harmful_data_task(self, harmful_device_code):
        while not self.__stop_event.is_set():
            harmful_gas_data = self.harmful_data_manager.get_device_all_data(harmful_device_code)
            for gas_type, gas_data in harmful_gas_data.items():
                ts_key = f'{harmful_device_code}_{gas_type}'
                last_ts = self.harmful_ts_dict.get(ts_key)
                gas_ts = gas_data.get('gas_ts')
                if last_ts is None or (gas_ts - last_ts).total_seconds() > 0:
                    self.harmful_ts_dict[ts_key] = gas_ts
                    self.handle_harmful_gas_alarm(harmful_device_code, gas_type, gas_data)

    # 从后台读取四合一浓度
    def harmful_data_query_task(self, harmful_device_code):
        while not self.__stop_event.is_set():
            url = f'http://111.198.10.15:22006/emergency/harmfulData?devcode={harmful_device_code}'
            response = get_request(url)
            if response and response.get('data'):
                last_ts = self.harmful_ts_dict.get(harmful_device_code)
                data = response.get('data')
                uptime = datetime.strptime(data.get('uptime'), "%Y-%m-%d %H:%M:%S")
                if last_ts is None or (uptime.timestamp() - last_ts) > 0:
                    self.harmful_ts_dict[harmful_device_code] = uptime.timestamp()
                    if time.time() - uptime.timestamp() < 10 * 60:  # 10分钟以前的数据不做处理
                        ch4 = data.get('ch4')
                        co = data.get('co')
                        h2s = data.get('h2s')
                        o2 = data.get('o2')
                        self.handle_query_harmful_gas_alarm(harmful_device_code, ch4, co, h2s, o2)
            time.sleep(5)

    def health_data_task(self, helmet_code):
        while not self.__stop_event.is_set():
            header = {
                'ak': 'fe80b2f021644b1b8c77fda743a83670',
                'sk': '8771ea6e931d4db646a26f67bcb89909',
            }
            url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{helmet_code}'
            response = get_request(url, headers=header)
            if response and response.get('data'):
                last_ts = self.health_ts_dict.get(helmet_code)
                vitalsigns_data = response.get('data').get('vitalSignsData')
                if vitalsigns_data:
                    upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S")
                    if last_ts is None or (upload_timestamp.timestamp() - last_ts) > 0:
                        self.health_ts_dict[helmet_code] = upload_timestamp.timestamp()
                        if time.time() - upload_timestamp.timestamp() < 10 * 60:  # 10分钟以前的数据不做处理
                            self.handle_health_alarm(helmet_code, vitalsigns_data.get('bloodOxygen'),
                                                     vitalsigns_data.get('heartRate'), upload_timestamp)
            time.sleep(10)

    def handle_health_alarm(self, helmet_code, blood_oxygen, heartrate, upload_timestamp):
        logger.debug(f'health_data: {helmet_code}, blood_oxygen = {blood_oxygen}, heartrate = {heartrate}, '
                     f'upload_timestamp = {upload_timestamp}')
        if heartrate < 50 or heartrate > 120 or blood_oxygen < 85:
            alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 2]
            if alarm_dict:
                self.alarm_message_center.add_message(alarm_dict[0])
        # todo 需要生成报警记录吗 需要往后台发原始数据吗

    def handle_query_harmful_gas_alarm(self, device_code, ch4, co, h2s, o2):
        if float(ch4) > 10.0 \
                or float(co) > 10.0 \
                or float(h2s) > 120.0 \
                or float(o2) < 15:
            alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm']
            if alarm_dict:
                self.alarm_message_center.add_message(alarm_dict[0])

    def handle_harmful_gas_alarm(self, device_code, gas_type, gas_data):
        alarm = False
        gas_value = gas_data['gas_value']
        if gas_type == 3:  # h2s
            alarm = gas_value > 120.0
        elif gas_type == 4:  # co
            alarm = gas_value > 10.0
        elif gas_type == 5:  # o2
            alarm = gas_value < 15
        elif gas_type == 50:  # ex
            alarm = gas_value > 10

        if alarm:
            alarm_dict = [d for d in ALARM_DICT if d['alarmCategory'] == 1 and d['alarm_name'] == 'harmful_alarm']
            if alarm_dict:
                self.alarm_message_center.add_message(alarm_dict[0])
                # todo  需要生成报警记录吗

    def model_predict(self, frames):
        results_generator = self.model.track(frames, save_txt=False, save=False, verbose=False, conf=0.5,
                                             classes=list(self.model_classes.keys()),
                                             imgsz=640,
                                             stream=True)
        result_boxes = []
        for r in results_generator:
            result_boxes.append(r.boxes)

        return result_boxes

    def handle_behave_alarm(self, frames, result_boxes):
        behave_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 0]
        for alarm_dict in behave_alarm_dicts:
            for idx, frame_boxes in enumerate(result_boxes):
                frame = frames[idx]
                object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']]
                if alarm_dict['handelType'] == 0:  # 检测到就报警
                    if object_boxes:
                        self.alarm_message_center.add_message(alarm_dict)
                        if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                            annotator = Annotator(deepcopy(frame))
                            for box in frame_boxes:
                                if int(box.cls) != self.HEAD_CLASS_IDX:
                                    box_color = COLOR_RED if int(box.cls) in alarm_dict['class_idx'] else COLOR_BLUE
                                    annotator.box_label(box.xyxy.cpu().squeeze(),
                                                        f"{self.model_classes[int(box.cls)]}",
                                                        color=box_color,
                                                        rotated=False)
                            self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                         alarm_np_img=annotator.result())

                elif alarm_dict['handelType'] == 1:  # 检测不到报警
                    if object_boxes:
                        self.object_ts_dict[alarm_dict['alarm_name']] = time.time()
                    else:
                        last_ts = self.object_ts_dict.get(alarm_dict['alarm_name'], 0)
                        if time.time() - last_ts > 5:
                            self.object_ts_dict[alarm_dict['alarm_name']] = time.time()
                            self.alarm_message_center.add_message(alarm_dict)
                            if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                                annotator = Annotator(deepcopy(frame))
                                for box in frame_boxes:
                                    if int(box.cls) != self.HEAD_CLASS_IDX:
                                        annotator.box_label(box.xyxy.cpu().squeeze(),
                                                            f"{self.model_classes[int(box.cls)]}",
                                                            color=COLOR_BLUE,
                                                            rotated=False)
                                self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                             alarm_np_img=annotator.result())
                elif alarm_dict['handelType'] == 2:  # 人未穿戴报警
                    person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX]
                    head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX]
                    has_alarm = False
                    annotator = None
                    for person_box in person_boxes:
                        if person_box.id is None:
                            continue

                        person_bbox = person_box.xyxy.cpu().squeeze()
                        person_id = int(person_box.id)
                        # 检查这个人是否佩戴了安全帽
                        has_helmet = True
                        person_head = get_person_head(person_bbox, head_boxes)
                        if person_head is not None:
                            has_helmet = any(
                                is_overlapping(person_head.xyxy.cpu().squeeze(), helmet.xyxy.cpu().squeeze())
                                for helmet in object_boxes)

                        person_status = self.tracking_status[person_id]
                        if alarm_dict['alarm_name'] not in person_status:
                            person_status[alarm_dict['alarm_name']] = 0
                        if not has_helmet:
                            person_status[alarm_dict['alarm_name']] += 1
                        else:
                            person_status[alarm_dict['alarm_name']] = 0

                        person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames
                        if person_alarm:
                            has_alarm = True
                            if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                                annotator = Annotator(deepcopy(frame)) if annotator is None else annotator
                                annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False)
                            # 已报警,清零,重新计数
                            person_status[alarm_dict['alarm_name']] = 0

                    if has_alarm:
                        self.alarm_message_center.add_message(alarm_dict)
                        if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                            for box in frame_boxes:
                                box_cls = box.cls
                                if box_cls != self.PERSON_CLASS_IDX and box_cls != self.HEAD_CLASS_IDX:
                                    annotator.box_label(box.xyxy.cpu().squeeze(),
                                                        f"{self.model_classes[int(box.cls)]}",
                                                        color=COLOR_BLUE,
                                                        rotated=False)
                            self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                         alarm_np_img=annotator.result())

    def handle_break_in_alarm(self, frames, result_boxes):
        break_in_alarm_dicts = [d for d in ALARM_DICT if d['alarmCategory'] == 3]
        for alarm_dict in break_in_alarm_dicts:
            for idx, frame_boxes in enumerate(result_boxes):
                frame = frames[idx]
                person_boxes = [box for box in frame_boxes if int(box.cls) == self.PERSON_CLASS_IDX]
                head_boxes = [box for box in frame_boxes if int(box.cls) == self.HEAD_CLASS_IDX]
                object_boxes = [box for box in frame_boxes if int(box.cls) in alarm_dict['class_idx']]
                has_alarm = False
                annotator = None
                for person_box in person_boxes:
                    if person_box.id is None:
                        continue

                    person_bbox = person_box.xyxy.cpu().squeeze()
                    person_id = int(person_box.id)
                    has_object = True
                    person_head = get_person_head(person_bbox, head_boxes)
                    if person_head is not None:
                        overlap_ratio = intersection_area(person_bbox, person_head.xyxy.cpu().squeeze()) / bbox_area(
                            person_bbox)
                        if overlap_ratio < 0.5:  # 头占人<0.5,判断是否穿工服。不太准确
                            has_object = any(
                                is_overlapping(person_head.xyxy.cpu().squeeze(), object_boxe.xyxy.cpu().squeeze())
                                for object_boxe in object_boxes)

                    person_status = self.tracking_status[person_id]
                    if alarm_dict['alarm_name'] not in person_status:
                        person_status[alarm_dict['alarm_name']] = 0

                    if not has_object and is_within_alert_range(person_bbox, self.abs_range_points):
                        # 未检测到帧数 +1
                        person_status[alarm_dict['alarm_name']] += 1
                    else:
                        # 未检测到帧数 清零
                        person_status[alarm_dict['alarm_name']] = 0

                    person_alarm = person_status[alarm_dict['alarm_name']] > self.max_missing_frames
                    if person_alarm:
                        has_alarm = True
                        if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                            annotator = Annotator(deepcopy(frame)) if annotator is None else annotator
                            annotator.box_label(person_bbox, alarm_dict['label'], color=COLOR_RED, rotated=False)
                        # 已报警,清零,重新计数
                        person_status[alarm_dict['alarm_name']] = 0

                if has_alarm:
                    self.alarm_message_center.add_message(alarm_dict)
                    if self.alarm_record_center.need_alarm(self.device.code, alarm_dict):
                        self.alarm_record_center.upload_alarm_record(self.device.code, alarm_dict,
                                                                     annotator.result())

    def log_fps(self, frame_count):
        self.frames_detected += frame_count
        current_time = time.time()
        # 每秒输出 FPS
        if self.fps_ts is None or current_time - self.fps_ts >= 10:
            fps = self.frames_detected / 10.0
            self.frames_detected = 0
            logger.info(f"FPS (detect) for device {self.device.code}: {fps}")
            self.fps_ts = current_time

    def run(self):
        while not self.stream_loader.init:
            if self.__stop_event.is_set():
                break  # 如果触发了停止事件,则退出循环
            self.stream_loader.init_cap()
        for frames in self.stream_loader:
            try:
                if self.__stop_event.is_set():
                    break  # 如果触发了停止事件,则退出循环
                if not frames:
                    continue

                t1 = time.time()
                self.device_status_manager.set_status(device_id=self.device.id)
                result_boxes = self.model_predict(frames)  # 结果都是二维数组,对应batch中的每个frame

                t2 = time.time()
                for idx, frame_boxes in enumerate(result_boxes):
                    current_person_ids = {int(box.id) for box in frame_boxes
                                          if box.cls is not None and box.id is not None and int(
                            box.cls) == self.PERSON_CLASS_IDX}

                    for person_id in current_person_ids:
                        if person_id not in self.tracking_status:
                            self.tracking_status[person_id] = {}
                        self.tracking_status[person_id]['disappear_frames'] = 0
                    for person_id in list(self.tracking_status.keys()):
                        if person_id not in current_person_ids:
                            self.tracking_status[person_id]['disappear_frames'] += 1
                            if self.tracking_status[person_id]['disappear_frames'] > self.disappear_threshold:
                                self.tracking_status.pop(person_id)

                self.handle_behave_alarm(frames, result_boxes)
                self.handle_break_in_alarm(frames, result_boxes)

                t3 = time.time()
                # for person_id in self.tracking_status.keys():
                #     print(f'person_id: {person_id}, status: {self.tracking_status[person_id]}')

                for idx, frame in enumerate(frames):
                    annotator = Annotator(frame, None, 18, "Arial.ttf", False, example="人")
                    frame_boxes = result_boxes[idx]
                    for s_box in frame_boxes:
                        annotator.box_label(s_box.xyxy.cpu().squeeze(),
                                            f"{self.model_classes[int(s_box.cls)]} {float(s_box.conf):.2f}",
                                            color=colors(int(s_box.cls)),
                                            rotated=False)
                    self.display_frame_manager.add_frame(self.device.id, annotator.result())
                # self.display_frame_manager.add_frame(self.device.id, frames[idx])

                t4 = time.time()
                print(f'============={(t2 - t1) * 1000}ms {(t3 - t2) * 1000}ms {(t4 - t3) * 1000}ms')
                self.log_fps(len(frames))

            except Exception as ex:
                traceback.print_exc()
                logger.error(ex)