|
- """
- 新版播放器,包含划词朗读/AI朗读的TTS实时播放和语音合成的音频文件播放
- sudo apt-get install python3-pyqt5.qtmultimedia
- sudo apt-get install libqt5multimedia5-plugins
- """
- import threading
- from queue import Queue
- import sounddevice as sd
- import time
- import os
- import sys
- import config
- from PyQt5.QtWidgets import QApplication
- from libs import lib_tts, lib_tts_dipai
- import lib_tts_grpc
- from PyQt5.QtMultimedia import QMediaPlayer, QMediaContent
- from PyQt5.QtCore import QUrl, QObject, pyqtSignal
- # 获取家目录
- def get_home_env():
- if sys.platform == "win32":
- return 'APPDATA'
- return 'HOME'
- # 生成临时音频文件保存路径
- def gen_audio_save_path():
- _dir = os.path.join(os.environ[get_home_env()], "xAssistant", "tts_cache")
- if not os.path.exists(_dir):
- os.makedirs(_dir)
- _fp = f"{int(time.time())}.pcm"
- _audio_fp = os.path.join(_dir, _fp)
- print(_audio_fp)
- return _audio_fp
- # 计算待合成文本的音频时长(预估)
- def calculate_duration(text: str, speed_mode: int):
- """
- :param text: 待合成文本
- :param speed_mode: 语速模式 1:较快 2:较慢 其他为正常
- """
- if speed_mode == 1:
- _speed = 8.145
- elif speed_mode == 2:
- _speed = 4.064
- else:
- _speed = 5.358
- _duration = len(text) / _speed
- return int(_duration)
- # TTS临时文件播放线程
- class PlayTmpFile(threading.Thread):
- def __init__(self, fp, queue):
- super().__init__()
- self.daemon = True
- self._tmp_file = fp
- self._run_flag = True
- self._audio_queue = queue
- def run(self):
- if self._tmp_file is None or not os.path.exists(self._tmp_file):
- return
- self._audio_queue.queue.clear()
- with open(self._tmp_file, "rb") as f:
- while self._run_flag:
- data = f.read(4096)
- if not data:
- self._audio_queue.put("--end--")
- break
- self._audio_queue.put(data)
- time.sleep(0.02)
- def stop(self):
- self._run_flag = False
- # TTS播放器(支持播放、暂停、继续、停止)
- class TTSSteamPlayer(QObject):
- playing_signal = pyqtSignal()
- paused_signal = pyqtSignal()
- stopped_signal = pyqtSignal()
- def __init__(self, cb_fn):
- super().__init__()
- self._audio_queue = Queue()
- self._tmp_file = None
- self._is_paused = False
- # self._select_default_output_device()
- self._player = sd.RawOutputStream(samplerate=16000, channels=1, dtype='int16')
- self._player.start()
- self._tts_thread = None
- self._run_flag = True
- self._callback = cb_fn
- self.engine_type = config.config["ENGINE_TYPE"]
- # 设置默认输出设备
- def _select_default_output_device(self):
- try:
- system_output_device = sd.query_devices(kind='output')
- sd.default.device = system_output_device['index']
- except Exception as e:
- pass
- # 播放
- def play(self, tts_info: dict):
- self.playing_signal.emit()
- if self._is_paused:
- self._is_paused = False
- return
- self._audio_queue.queue.clear()
- if tts_info: # 有文本时,调用语音合成来播放实时音频
- self._tmp_file = gen_audio_save_path()
- text = tts_info.get("text")
- vcn = tts_info.get("vcn")
- speed = tts_info.get("speed")
- # self._tts_thread = lib_tts.TTSManagerNew(text, vcn, speed, self._audio_queue)
- # self._tts_thread = lib_tts_dipai.TTSManagerNew(text, vcn, speed, self._audio_queue)
- # self._tts_thread = lib_tts_grpc.TTSManagerNew(text, vcn, speed, self._audio_queue)
- if self.engine_type == "1":
- self._tts_thread = lib_tts.TTSManagerNew(text, vcn, speed, self._audio_queue)
- elif self.engine_type == "2":
- self._tts_thread = lib_tts_dipai.TTSManagerNew(text, vcn, speed, self._audio_queue)
- else:
- self._tts_thread = lib_tts_grpc.TTSManagerNew(text, vcn, speed, self._audio_queue)
- self._tts_thread.start()
- threading.Thread(target=self._write_stream, name="_write_stream", args=(False,), daemon=True).start()
- else: # 无文本时,播放临时文件
- self._tts_thread = PlayTmpFile(self._tmp_file, self._audio_queue)
- self._tts_thread.start()
- threading.Thread(target=self._write_stream, name="_write_stream", args=(True,), daemon=True).start()
- # 暂停
- def pause(self):
- self.paused_signal.emit()
- self._is_paused = True
- def resume(self):
- self._is_paused = False
- def is_paused(self):
- return self._is_paused
- # 停止
- def stop(self):
- self._is_paused = False
- self._audio_queue.queue.clear()
- self._audio_queue.put("--stop--")
- # 销毁播放器
- def quit(self):
- self._is_paused = False
- self._run_flag = False
- self._audio_queue.queue.clear()
- self._player.stop()
- # 向播放器写入音频流
- def _write_stream(self, is_file=False):
- while self._run_flag:
- if self._is_paused:
- time.sleep(0.02)
- continue
- if not self._audio_queue.empty():
- buf = self._audio_queue.get()
- if isinstance(buf, str):
- if buf == "--end--" or buf == "--stop--":
- if self._tts_thread:
- self._tts_thread.stop()
- self._callback(buf)
- else: # 异常
- self._callback(buf)
- break
- if not is_file:
- self._write_tmp(buf)
- try:
- self._player.write(buf)
- except:
- continue
- self.stopped_signal.emit()
- self._audio_queue.queue.clear()
- # 写入临时文件用于重新播放
- def _write_tmp(self, data):
- with open(self._tmp_file, "ab") as f:
- f.write(data)
- # 音频文件播放器
- class AudioPlayer(QObject):
- playing_signal = pyqtSignal()
- paused_signal = pyqtSignal()
- stopped_signal = pyqtSignal()
- rate_changed_signal = pyqtSignal(dict)
- def __init__(self):
- super().__init__()
- self.duration = 1
- self.position = 0
- self.media_player = QMediaPlayer()
- self.play_state = None
- self.media_player.stateChanged.connect(self.state_changed) # 获取播放状态
- self.media_player.positionChanged.connect(self.position_changed) # 获取播放进度
- self.media_player.durationChanged.connect(self.duration_changed) # 获取音频总时长
- def set_media(self, audio_path: str):
- self.media_player.setMedia(QMediaContent(QUrl.fromLocalFile(audio_path.replace('\\', '/'))))
- # 播放状态改变
- def state_changed(self, state):
- if state == QMediaPlayer.State.PlayingState:
- self.play_state = QMediaPlayer.State.PlayingState
- self.playing_signal.emit()
- elif state == QMediaPlayer.State.PausedState:
- self.play_state = QMediaPlayer.State.PausedState
- self.paused_signal.emit()
- elif state == QMediaPlayer.State.StoppedState:
- self.play_state = QMediaPlayer.State.StoppedState
- self.stopped_signal.emit()
- # 播放进度改变
- def position_changed(self, position):
- try:
- self.position = position
- rate = position / self.duration
- # 格式化时间
- # duration = self.format_time(self.duration)
- # position = self.format_time(position)
- info = {"rate": rate, "position": position, "duration": self.duration}
- self.rate_changed_signal.emit(info)
- except ZeroDivisionError:
- pass
- @staticmethod
- def format_time(input_time):
- minute = int(input_time / 60000)
- second = int((input_time - minute * 60000) / 1000)
- # 单数前面加0
- if minute < 10:
- minute = f"0{minute}"
- if second < 10:
- second = f"0{second}"
- output_time = f"{minute}:{second}"
- return output_time
- # 音频总时长改变
- def duration_changed(self, duration):
- self.duration = duration
- # 设置播放进度
- def set_position(self, float_val: float):
- val = float_val * self.duration
- self.media_player.setPosition(val)
- def set_time(self, val):
- self.media_player.setPosition(val)
- # 设置播放倍速
- def set_playback_speed(self, speed: float):
- if speed == 0.5:
- speed = 0.8
- self.media_player.setPlaybackRate(speed)
- # 播放
- def play(self, *args):
- self.media_player.play()
- # 暂停
- def pause(self):
- self.media_player.pause()
- # 继续
- def resume(self):
- self.media_player.play()
- # 停止
- def stop(self):
- try:
- self.media_player.stop()
- except:
- pass
- # 销毁播放器
- def quit(self):
- pass
- def is_playing(self):
- return self.play_state == QMediaPlayer.State.PlayingState
- if __name__ == '__main__':
- app = QApplication([])
- _play_audio_vlc_thread = AudioPlayer()
- _play_audio_vlc_thread.set_media(r"/media/llh/data/test/19.wav")
- _play_audio_vlc_thread.play()
- app.exec_()
|