Newer
Older
safe-algo-pro / scene_handler / helmet_data_processor.py
import time
from datetime import datetime

from common.http_utils import get_request
from scene_handler.alarm_record_center import AlarmRecordCenter

ALARM_DICT = {
    'health_blood_oxygen': {
        'alarmCategory': 2,
        'alarmType': '18',
        'handelType': 3,
        'category_order': -1,
        'alarm_name': 'health_alarm',
        'alarmContent': '作业人员血氧异常',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA',
        'label': '',
    },
    'health_heartrate': {
        'alarmCategory': 2,
        'alarmType': '19',
        'handelType': 3,
        'category_order': -1,
        'alarm_name': 'health_alarm',
        'alarmContent': '作业人员心率异常',
        'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA',
        'label': '',
    }
}


class HelmetDataProcessor:
    def __init__(self, helmet_code, alarm_record_center: AlarmRecordCenter):
        self.helmet_code = helmet_code
        self.alarm_record_center = alarm_record_center
        self.url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}'
        self.header = {
            'ak': 'fe80b2f021644b1b8c77fda743a83670',
            'sk': '8771ea6e931d4db646a26f67bcb89909',
        }
        self.last_ts = None  # 上次读取的数据的生成时间戳

    def getNewData(self):
        """
        阻塞进程
        :return:
        """
        while True:
            print(f"访问{self.helmet_code}心率血氧数据...")
            response = get_request(self.url, headers=self.header)
            if response and response.get('data'):
                print("访问到心率血氧数据")
                vitalsigns_data = response.get('data').get('vitalSignsData')  # 访问而来的数据
                if vitalsigns_data:  # 访问成功
                    upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'),
                                                         "%Y-%m-%d %H:%M:%S")  # 访问数据的时间
                    if self.last_ts is None or (
                            upload_timestamp.timestamp() - self.last_ts) > 0:  # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据
                        self.last_ts = upload_timestamp.timestamp()  # 更新数据
                        if time.time() - upload_timestamp.timestamp() < 10 * 60:  # 访问到的数据是 10分钟内的数据
                            return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate')
            else:
                print("无法访问到心率血氧数据")
            time.sleep(5)

    def isDataNormal(self, blood_oxygen, heartrate):
        if heartrate < 60 or heartrate > 120 or blood_oxygen < 85:  # 心率和血氧异常
            return False
        else:
            return True

    def sendAlarmRecord(self, blood_oxygen, heartrate):
        if not self.isBloodOxygenNormal(blood_oxygen):
            self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_blood_oxygen'],
                                                         alarm_value=blood_oxygen)
        if not self.isHeartRateNormal(heartrate):
            self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_heartrate'],
                                                         heartrate)

    def isBloodOxygenNormal(self, blood_oxygen):
        if blood_oxygen < 85:
            return False
        else:
            return True

    def isHeartRateNormal(self, heartrate):
        if heartrate < 60 or heartrate > 120:
            return False
        else:
            return True