import ctypes import os import threading import time from ctypes import * from config import AUDIO_DEVICE_IP, AUDIO_CLIENT_IP, GAS_AUDIO_FILE, CONSTRUCTION_AUDIO_FILE from global_logger import logger # 错误类 class CustomError(Exception): def __init__(self, ErrorInfo): super().__init__(self) self.errorinfo = ErrorInfo def __str__(self): return self.errorinfo # 播放信息结构体 class PlayInfo(Structure): _fields_ = [ ("playstatus", c_int), ("progress", c_int), ("totaltime", c_int), ("samplerate", c_int), ("bitrate", c_int) ] # 底层接口封装类 class NAudioClient: def __init__(self, local_ip, device_ip): self.local_ip = local_ip self.device_ip = device_ip self.dll = cdll.LoadLibrary("./lib/libNAudioClient.so") self._init_client() def _init_client(self): # 回调函数定义 def PlayStatusChangeCallback(scene, inst, status): sceneList = ["文件播放", "TTS播放", "实时寻呼", "音频流播放"] statusList = ["停止", "播放中..", "暂停中.."] print(f"播放状态改变,场景: {sceneList[scene]}, 实例: {inst + 1}, 状态: {statusList[status]}") return 0 self.PSCCB = CFUNCTYPE(c_int, c_byte, c_byte, c_uint)(PlayStatusChangeCallback) # 初始化 char_p_localip = bytes(self.local_ip, "utf8") self.dll.na_client_init.restype = c_int self.dll.na_client_init.argtypes = [c_char_p] res = self.dll.na_client_init(char_p_localip) if res != 0: raise CustomError('通信设备初始化失败!') # 启动 self.dll.na_set_callback.restype = None self.dll.na_client_start.restype = c_int self.dll.na_client_start.argtypes = [CFUNCTYPE(c_int, c_byte, c_byte, c_uint)] res = self.dll.na_client_start(self.PSCCB) if res != 0: raise CustomError('启动客户端失败!') def set_play_volume(self, volume=80): self.dll.na_set_play_volume.restype = c_int self.dll.na_set_play_volume.argtypes = [c_byte, c_byte, c_int] res = self.dll.na_set_play_volume(0, 0, volume) if res != 0: raise CustomError('设置音量失败!') def clear_play_device(self): self.dll.na_clear_play_device.restype = c_int self.dll.na_clear_play_device.argtypes = [c_byte, c_byte] res = self.dll.na_clear_play_device(0, 0) if res != 0: raise CustomError('清空播放设备失败!') def add_play_device(self): self.dll.na_add_play_device.restype = c_int self.dll.na_add_play_device.argtypes = [c_byte, c_byte, c_char_p] char_p_devip = bytes(self.device_ip, "utf8") res = self.dll.na_add_play_device(0, 0, char_p_devip) if res != 0: raise CustomError('添加播放设备失败!') def start_play(self, filepath, timeout=10): self.dll.na_start_play_1.restype = c_int self.dll.na_start_play_1.argtypes = [c_byte, c_byte, c_char_p, c_int] char_p_filepath = bytes(filepath, "utf8") res = self.dll.na_start_play_1(0, 0, char_p_filepath, timeout) if res != 0: raise CustomError('启动播放失败!') def get_playinfo(self): playinfo = PlayInfo() self.dll.na_get_playinfo.restype = c_int self.dll.na_get_playinfo.argtypes = [c_byte, c_byte, POINTER(PlayInfo)] self.dll.na_get_playinfo(0, 0, byref(playinfo)) return playinfo def stop(self): self.dll.na_client_stop() # 单例音频播放器 class AudioPlayer: _instance = None _lock = threading.Lock() def __new__(cls): if not cls._instance: with cls._lock: if not cls._instance: cls._instance = super().__new__(cls) cls._instance._init_config() cls._instance._init_client() return cls._instance def _init_config(self): self.device_ip = AUDIO_DEVICE_IP self.local_ip = AUDIO_CLIENT_IP def _init_client(self): logger.info(f"初始化音频播放设备,设备IP: {self.device_ip}, 客户端IP: {self.local_ip}") self.client = NAudioClient(self.local_ip, self.device_ip) def play_audio_file(self, filepath): logger.info(f"开始播放音频文件: {filepath}") self.client.set_play_volume(80) self.client.clear_play_device() self.client.add_play_device() self.client.start_play(filepath, 10) while True: playinfo = self.client.get_playinfo() if playinfo.playstatus == 0: break time.sleep(1) def stop(self): self.client.stop() if __name__ == "__main__": player = AudioPlayer() player.play_audio_file(GAS_AUDIO_FILE) player.play_audio_file(CONSTRUCTION_AUDIO_FILE) # 用法示例: # player = AudioPlayer('config.ini') # player.play_audio_file('/path/to/audio.mp3')