import time from datetime import datetime from common.http_utils import get_request from scene_handler.alarm_record_center import AlarmRecordCenter ALARM_DICT = { 'health_blood_oxygen': { 'alarmCategory': 2, 'alarmType': '18', 'handelType': 3, 'category_order': -1, 'alarm_name': 'health_alarm', 'alarmContent': '作业人员血氧异常', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', }, 'health_heartrate': { 'alarmCategory': 2, 'alarmType': '19', 'handelType': 3, 'category_order': -1, 'alarm_name': 'health_alarm', 'alarmContent': '作业人员心率异常', 'alarmSoundMessage': b'\xaa\x01\x00\x93\x36\x00\xCA', 'label': '', } } class HelmetDataProcessor: def __init__(self, helmet_code, alarm_record_center: AlarmRecordCenter): self.helmet_code = helmet_code self.alarm_record_center = alarm_record_center self.url = f'https://jls.huaweisoft.com//api/ih-log/v1.0/ih-api/helmetInfo/{self.helmet_code}' self.header = { 'ak': 'fe80b2f021644b1b8c77fda743a83670', 'sk': '8771ea6e931d4db646a26f67bcb89909', } self.last_ts = None # 上次读取的数据的生成时间戳 def getNewData(self, retry_until_success=True): """ 阻塞进程 :return: """ while True: print(f"访问{self.helmet_code}心率血氧数据...") response = get_request(self.url, headers=self.header) if response and response.get('data'): print("访问到心率血氧数据") vitalsigns_data = response.get('data').get('vitalSignsData') # 访问而来的数据 if vitalsigns_data: # 访问成功 upload_timestamp = datetime.strptime(vitalsigns_data.get('uploadTimestamp'), "%Y-%m-%d %H:%M:%S") # 访问数据的时间 if self.last_ts is None or ( upload_timestamp.timestamp() - self.last_ts) > 0: # 如果这次访问是第一次访问 或者 访问数据的时间晚于上次时间的数据 self.last_ts = upload_timestamp.timestamp() # 更新数据 if time.time() - upload_timestamp.timestamp() < 10 * 60: # 访问到的数据是 10分钟内的数据 return vitalsigns_data.get('bloodOxygen'), vitalsigns_data.get('heartRate') else: print("无法访问到心率血氧数据") if not retry_until_success: return None, None # 只尝试一次,直接返回空值 else: time.sleep(5) def isDataNormal(self, blood_oxygen, heartrate): if heartrate < 60 or heartrate > 120 or blood_oxygen < 85: # 心率和血氧异常 return False else: return True def sendAlarmRecord(self, blood_oxygen, heartrate): if not self.isBloodOxygenNormal(blood_oxygen): self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_blood_oxygen'], alarm_value=blood_oxygen) if not self.isHeartRateNormal(heartrate): self.alarm_record_center.upload_alarm_record(self.helmet_code, ALARM_DICT['health_heartrate'], heartrate) def isBloodOxygenNormal(self, blood_oxygen): if blood_oxygen < 85: return False else: return True def isHeartRateNormal(self, heartrate): if heartrate < 60 or heartrate > 120: return False else: return True