Newer
Older
go-algo-server / audio_player.py
zhangyingjie on 3 Jul 4 KB 增加语音播报功能
import ctypes
import os
import threading
import time

from ctypes import *
from config import AUDIO_DEVICE_IP, AUDIO_CLIENT_IP, GAS_AUDIO_FILE, CONSTRUCTION_AUDIO_FILE

from global_logger import logger


# 错误类
class CustomError(Exception):
    def __init__(self, ErrorInfo):
        super().__init__(self)
        self.errorinfo = ErrorInfo

    def __str__(self):
        return self.errorinfo


# 播放信息结构体
class PlayInfo(Structure):
    _fields_ = [
        ("playstatus", c_int),
        ("progress", c_int),
        ("totaltime", c_int),
        ("samplerate", c_int),
        ("bitrate", c_int)
    ]


# 底层接口封装类
class NAudioClient:
    def __init__(self, local_ip, device_ip):
        self.local_ip = local_ip
        self.device_ip = device_ip

        self.dll = cdll.LoadLibrary("./lib/libNAudioClient.so")

        self._init_client()

    def _init_client(self):
        # 回调函数定义
        def PlayStatusChangeCallback(scene, inst, status):
            sceneList = ["文件播放", "TTS播放", "实时寻呼", "音频流播放"]
            statusList = ["停止", "播放中..", "暂停中.."]
            print(f"播放状态改变,场景: {sceneList[scene]}, 实例: {inst + 1}, 状态: {statusList[status]}")
            return 0

        self.PSCCB = CFUNCTYPE(c_int, c_byte, c_byte, c_uint)(PlayStatusChangeCallback)
        # 初始化
        char_p_localip = bytes(self.local_ip, "utf8")
        self.dll.na_client_init.restype = c_int
        self.dll.na_client_init.argtypes = [c_char_p]
        res = self.dll.na_client_init(char_p_localip)
        if res != 0:
            raise CustomError('通信设备初始化失败!')
        # 启动
        self.dll.na_set_callback.restype = None
        self.dll.na_client_start.restype = c_int
        self.dll.na_client_start.argtypes = [CFUNCTYPE(c_int, c_byte, c_byte, c_uint)]
        res = self.dll.na_client_start(self.PSCCB)
        if res != 0:
            raise CustomError('启动客户端失败!')

    def set_play_volume(self, volume=80):
        self.dll.na_set_play_volume.restype = c_int
        self.dll.na_set_play_volume.argtypes = [c_byte, c_byte, c_int]
        res = self.dll.na_set_play_volume(0, 0, volume)
        if res != 0:
            raise CustomError('设置音量失败!')

    def clear_play_device(self):
        self.dll.na_clear_play_device.restype = c_int
        self.dll.na_clear_play_device.argtypes = [c_byte, c_byte]
        res = self.dll.na_clear_play_device(0, 0)
        if res != 0:
            raise CustomError('清空播放设备失败!')

    def add_play_device(self):
        self.dll.na_add_play_device.restype = c_int
        self.dll.na_add_play_device.argtypes = [c_byte, c_byte, c_char_p]
        char_p_devip = bytes(self.device_ip, "utf8")
        res = self.dll.na_add_play_device(0, 0, char_p_devip)
        if res != 0:
            raise CustomError('添加播放设备失败!')

    def start_play(self, filepath, timeout=10):
        self.dll.na_start_play_1.restype = c_int
        self.dll.na_start_play_1.argtypes = [c_byte, c_byte, c_char_p, c_int]
        char_p_filepath = bytes(filepath, "utf8")
        res = self.dll.na_start_play_1(0, 0, char_p_filepath, timeout)
        if res != 0:
            raise CustomError('启动播放失败!')

    def get_playinfo(self):
        playinfo = PlayInfo()
        self.dll.na_get_playinfo.restype = c_int
        self.dll.na_get_playinfo.argtypes = [c_byte, c_byte, POINTER(PlayInfo)]
        self.dll.na_get_playinfo(0, 0, byref(playinfo))
        return playinfo

    def stop(self):
        self.dll.na_client_stop()


# 单例音频播放器
class AudioPlayer:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super().__new__(cls)
                    cls._instance._init_config()
                    cls._instance._init_client()
        return cls._instance

    def _init_config(self):
        self.device_ip = AUDIO_DEVICE_IP
        self.local_ip = AUDIO_CLIENT_IP

    def _init_client(self):
        logger.info(f"初始化音频播放设备,设备IP: {self.device_ip}, 客户端IP: {self.local_ip}")
        self.client = NAudioClient(self.local_ip, self.device_ip)

    def play_audio_file(self, filepath):
        logger.info(f"开始播放音频文件: {filepath}")
        self.client.set_play_volume(80)
        self.client.clear_play_device()
        self.client.add_play_device()
        self.client.start_play(filepath, 10)
        while True:
            playinfo = self.client.get_playinfo()
            if playinfo.playstatus == 0:
                break
            time.sleep(1)

    def stop(self):
        self.client.stop()


if __name__ == "__main__":
    player = AudioPlayer()
    player.play_audio_file(GAS_AUDIO_FILE)
    player.play_audio_file(CONSTRUCTION_AUDIO_FILE)
# 用法示例:
# player = AudioPlayer('config.ini')
# player.play_audio_file('/path/to/audio.mp3')