Newer
Older
safe-algo-pro / scene_handler / alarm_record_center.py
import asyncio
import base64
import time
from datetime import datetime

import cv2

from common.global_thread_pool import GlobalThreadPool
from common.http_utils import send_request
from db.database import get_db
from entity.alarm_record import AlarmRecordCreate
from services.alarm_record_service import AlarmRecordService
from services.global_config import GlobalConfig


def image_to_base64(numpy_image, format='jpg'):
    # 将NumPy数组转换为图片格式的字节流
    # format指定图片格式,'png'或'jpeg'
    success, encoded_image = cv2.imencode(f'.{format}', numpy_image)
    if not success:
        raise ValueError("Could not encode image")

    # 将字节流转换为Base64编码
    base64_encoded_image = base64.b64encode(encoded_image)

    # 将bytes类型转换为UTF-8字符串
    base64_message = base64_encoded_image.decode('utf-8')

    return base64_message
    # return f'data:image/{format};base64,{base64_message}'


class AlarmRecordCenter:
    def __init__(self, save_interval=-1, main_loop=None):
        """
        初始化报警记录中心
        :param upload_interval: 报警上传间隔(秒),如果 <= 0,则不报警
        """
        self.main_loop = main_loop
        self.save_interval = save_interval
        self.thread_pool = GlobalThreadPool()
        self.global_config = GlobalConfig()
        # self.upload_interval = upload_interval
        self.device_alarm_upload_time = {}  # key: device_code, value: {alarm_type: last upload time}
        self.device_alarm_save_time = {}

    def need_alarm(self, device_code, alarm_dict):
        """
        是否需要报警
        :param device_code: 设备编号
        :param alarm_dict: 报警类型字典
        :return: 是否需要报警
        """
        current_time = time.time()
        alarm_type = alarm_dict['alarmType']

        return self.need_save_alarm_record(device_code, current_time, alarm_type) \
            or self.need_upload_alarm_record(device_code, current_time, alarm_type)

    def need_save_alarm_record(self, device_code, current_time, alarm_type):
        if self.save_interval <= 0:
            return False

        if device_code not in self.device_alarm_save_time:
            self.device_alarm_save_time[device_code] = {}
        last_save_time = self.device_alarm_save_time[device_code].get(alarm_type)
        if last_save_time is None or (current_time - last_save_time) > self.save_interval:
            return True
        return False

    def need_upload_alarm_record(self, device_code, current_time, alarm_type):
        push_config = self.global_config.get_alarm_push_config()
        if not push_config or not push_config.push_url:
            return False

        if device_code not in self.device_alarm_upload_time:
            self.device_alarm_upload_time[device_code] = {}
        last_upload_time = self.device_alarm_upload_time[device_code].get(alarm_type)
        if last_upload_time is None or (current_time - last_upload_time) > push_config.upload_interval:
            return True
        return False

    async def save_record(self, alarm_data, alarm_np_img):
        async for db in get_db():
            alarm_record_service = AlarmRecordService(db)
            await alarm_record_service.add_alarm(alarm_data=alarm_data, alarm_np_img=alarm_np_img)

    def upload_alarm_record(self, device_code, alarm_dict, alarm_np_img=None, alarm_value=None):
        """
        上传报警记录
        :param alarm_value:
        :param device_code: 设备编号
        :param alarm_dict: 报警类型字典
        :param alarm_np_img: 报警图片(np array)
        """
        # 获取当前时间
        current_time = time.time()
        alarm_type = alarm_dict['alarmType']

        if self.need_save_alarm_record(device_code, current_time, alarm_type):
            alarm_record_data = AlarmRecordCreate(
                device_code=device_code,
                device_id=100,
                alarm_type=alarm_type,
                alarm_content=alarm_dict['alarmContent'],
                alarm_time=datetime.now(),
                alarm_value=alarm_value if alarm_value else None,
            )
            asyncio.run_coroutine_threadsafe(
                self.save_record(alarm_record_data, alarm_np_img),
                self.main_loop)
            self.device_alarm_save_time[device_code][alarm_type] = current_time

        if self.need_upload_alarm_record(device_code, current_time, alarm_type):
            alarm_record = {
                'devcode': device_code,
                'alarmTime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'alarmType': alarm_dict['alarmType'],
                'alarmContent': alarm_dict['alarmContent'],
            }
            if alarm_value:
                alarm_record['alarmValue'] = alarm_value
            if alarm_np_img:
                alarm_record['alarmImage'] = image_to_base64(alarm_np_img)

            push_config = self.global_config.get_alarm_push_config()
            self.thread_pool.submit_task(send_request, push_config.upload_url, alarm_record)
            self.device_alarm_upload_time[device_code][alarm_type] = current_time