""" 新版播放器,包含划词朗读/AI朗读的TTS实时播放和语音合成的音频文件播放 sudo apt-get install python3-pyqt5.qtmultimedia sudo apt-get install libqt5multimedia5-plugins """ import threading from queue import Queue import sounddevice as sd import time import os import sys import config from PyQt5.QtWidgets import QApplication from libs import lib_tts, lib_tts_dipai import lib_tts_grpc from PyQt5.QtMultimedia import QMediaPlayer, QMediaContent from PyQt5.QtCore import QUrl, QObject, pyqtSignal # 获取家目录 def get_home_env(): if sys.platform == "win32": return 'APPDATA' return 'HOME' # 生成临时音频文件保存路径 def gen_audio_save_path(): _dir = os.path.join(os.environ[get_home_env()], "xAssistant", "tts_cache") if not os.path.exists(_dir): os.makedirs(_dir) _fp = f"{int(time.time())}.pcm" _audio_fp = os.path.join(_dir, _fp) print(_audio_fp) return _audio_fp # 计算待合成文本的音频时长(预估) def calculate_duration(text: str, speed_mode: int): """ :param text: 待合成文本 :param speed_mode: 语速模式 1:较快 2:较慢 其他为正常 """ if speed_mode == 1: _speed = 8.145 elif speed_mode == 2: _speed = 4.064 else: _speed = 5.358 _duration = len(text) / _speed return int(_duration) # TTS临时文件播放线程 class PlayTmpFile(threading.Thread): def __init__(self, fp, queue): super().__init__() self.daemon = True self._tmp_file = fp self._run_flag = True self._audio_queue = queue def run(self): if self._tmp_file is None or not os.path.exists(self._tmp_file): return self._audio_queue.queue.clear() with open(self._tmp_file, "rb") as f: while self._run_flag: data = f.read(4096) if not data: self._audio_queue.put("--end--") break self._audio_queue.put(data) time.sleep(0.02) def stop(self): self._run_flag = False # TTS播放器(支持播放、暂停、继续、停止) class TTSSteamPlayer(QObject): playing_signal = pyqtSignal() paused_signal = pyqtSignal() stopped_signal = pyqtSignal() def __init__(self, cb_fn): super().__init__() self._audio_queue = Queue() self._tmp_file = None self._is_paused = False # self._select_default_output_device() self._player = sd.RawOutputStream(samplerate=16000, channels=1, dtype='int16') self._player.start() self._tts_thread = None self._run_flag = True self._callback = cb_fn self.engine_type = config.config["ENGINE_TYPE"] # 设置默认输出设备 def _select_default_output_device(self): try: system_output_device = sd.query_devices(kind='output') sd.default.device = system_output_device['index'] except Exception as e: pass # 播放 def play(self, tts_info: dict): self.playing_signal.emit() if self._is_paused: self._is_paused = False return self._audio_queue.queue.clear() if tts_info: # 有文本时,调用语音合成来播放实时音频 self._tmp_file = gen_audio_save_path() text = tts_info.get("text") vcn = tts_info.get("vcn") speed = tts_info.get("speed") # self._tts_thread = lib_tts.TTSManagerNew(text, vcn, speed, self._audio_queue) # self._tts_thread = lib_tts_dipai.TTSManagerNew(text, vcn, speed, self._audio_queue) # self._tts_thread = lib_tts_grpc.TTSManagerNew(text, vcn, speed, self._audio_queue) if self.engine_type == "1": self._tts_thread = lib_tts.TTSManagerNew(text, vcn, speed, self._audio_queue) elif self.engine_type == "2": self._tts_thread = lib_tts_dipai.TTSManagerNew(text, vcn, speed, self._audio_queue) else: self._tts_thread = lib_tts_grpc.TTSManagerNew(text, vcn, speed, self._audio_queue) self._tts_thread.start() threading.Thread(target=self._write_stream, name="_write_stream", args=(False,), daemon=True).start() else: # 无文本时,播放临时文件 self._tts_thread = PlayTmpFile(self._tmp_file, self._audio_queue) self._tts_thread.start() threading.Thread(target=self._write_stream, name="_write_stream", args=(True,), daemon=True).start() # 暂停 def pause(self): self.paused_signal.emit() self._is_paused = True def resume(self): self._is_paused = False def is_paused(self): return self._is_paused # 停止 def stop(self): self._is_paused = False self._audio_queue.queue.clear() self._audio_queue.put("--stop--") # 销毁播放器 def quit(self): self._is_paused = False self._run_flag = False self._audio_queue.queue.clear() self._player.stop() # 向播放器写入音频流 def _write_stream(self, is_file=False): while self._run_flag: if self._is_paused: time.sleep(0.02) continue if not self._audio_queue.empty(): buf = self._audio_queue.get() if isinstance(buf, str): if buf == "--end--" or buf == "--stop--": if self._tts_thread: self._tts_thread.stop() self._callback(buf) else: # 异常 self._callback(buf) break if not is_file: self._write_tmp(buf) try: self._player.write(buf) except: continue self.stopped_signal.emit() self._audio_queue.queue.clear() # 写入临时文件用于重新播放 def _write_tmp(self, data): with open(self._tmp_file, "ab") as f: f.write(data) # 音频文件播放器 class AudioPlayer(QObject): playing_signal = pyqtSignal() paused_signal = pyqtSignal() stopped_signal = pyqtSignal() rate_changed_signal = pyqtSignal(dict) def __init__(self): super().__init__() self.duration = 1 self.position = 0 self.media_player = QMediaPlayer() self.play_state = None self.media_player.stateChanged.connect(self.state_changed) # 获取播放状态 self.media_player.positionChanged.connect(self.position_changed) # 获取播放进度 self.media_player.durationChanged.connect(self.duration_changed) # 获取音频总时长 def set_media(self, audio_path: str): self.media_player.setMedia(QMediaContent(QUrl.fromLocalFile(audio_path.replace('\\', '/')))) # 播放状态改变 def state_changed(self, state): if state == QMediaPlayer.State.PlayingState: self.play_state = QMediaPlayer.State.PlayingState self.playing_signal.emit() elif state == QMediaPlayer.State.PausedState: self.play_state = QMediaPlayer.State.PausedState self.paused_signal.emit() elif state == QMediaPlayer.State.StoppedState: self.play_state = QMediaPlayer.State.StoppedState self.stopped_signal.emit() # 播放进度改变 def position_changed(self, position): try: self.position = position rate = position / self.duration # 格式化时间 # duration = self.format_time(self.duration) # position = self.format_time(position) info = {"rate": rate, "position": position, "duration": self.duration} self.rate_changed_signal.emit(info) except ZeroDivisionError: pass @staticmethod def format_time(input_time): minute = int(input_time / 60000) second = int((input_time - minute * 60000) / 1000) # 单数前面加0 if minute < 10: minute = f"0{minute}" if second < 10: second = f"0{second}" output_time = f"{minute}:{second}" return output_time # 音频总时长改变 def duration_changed(self, duration): self.duration = duration # 设置播放进度 def set_position(self, float_val: float): val = float_val * self.duration self.media_player.setPosition(val) def set_time(self, val): self.media_player.setPosition(val) # 设置播放倍速 def set_playback_speed(self, speed: float): if speed == 0.5: speed = 0.8 self.media_player.setPlaybackRate(speed) # 播放 def play(self, *args): self.media_player.play() # 暂停 def pause(self): self.media_player.pause() # 继续 def resume(self): self.media_player.play() # 停止 def stop(self): try: self.media_player.stop() except: pass # 销毁播放器 def quit(self): pass def is_playing(self): return self.play_state == QMediaPlayer.State.PlayingState if __name__ == '__main__': app = QApplication([]) _play_audio_vlc_thread = AudioPlayer() _play_audio_vlc_thread.set_media(r"/media/llh/data/test/19.wav") _play_audio_vlc_thread.play() app.exec_()