# -*- encoding: utf-8 -*- """ @Desc: 语音合成(朗读)功能 # 合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式 # 错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看) """ import _thread as thread import base64 import datetime import hashlib import hmac import json import os import ssl import sys import time from datetime import datetime from threading import Thread from time import mktime from urllib.parse import urlencode from wsgiref.handlers import format_date_time import websocket URL = 'wss://tts-api.xfyun.cn/v2/tts' APPID = 'ab154863' APIKey = '5a584b63730d3450d64cdd65f17f56d0' APISecret = 'NGE3NGIyZDBlNWU0YTg3YjQ5YTJlY2Q2' def get_proxy(): _proxy = "" proxy_type = "" host = "" port = "" user = "" pswd = "" try: _proxy = os.environ["http_proxy"] except: pass else: try: addr = _proxy.split("//")[1] proxy_type_str = _proxy.split("//")[0] proxy_type = proxy_type_str[:-1] port = addr.split(":")[-1] if "@" in addr: # 有用户密码 host_str = addr.split("@")[-1] host = host_str.split(f":{port}")[0] auth_str = addr.split(host_str)[0] user = auth_str.split(":")[0] pswd_str = auth_str.split(":")[-1] pswd = pswd_str[:-1] else: host = addr.split(":")[0] except: pass return _proxy, proxy_type, host, port, user, pswd def get_home_env(): if sys.platform == "win32": return 'APPDATA' return 'HOME' def gen_tmp_mp3_file(): _dir = os.path.join(os.environ[get_home_env()], "xAssistant") if not os.path.exists(_dir): os.makedirs(_dir) _tmp_dir = os.path.join(_dir, "tmp") if not os.path.exists(_tmp_dir): os.makedirs(_tmp_dir) _path = os.path.join(_tmp_dir, f"{int(time.time())}.mp3") return _path class WsManager: # 上一块音频的合成进度 _last_ced = 0 # 上一块音频数据 _last_buf = None # 音频数据块 _audio = bytearray() # 初始化 def __init__(self): self.Data = None self.Text = None self.vcn = None self._ws = None self.BusinessArgs = None self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.callback = None self.d = None # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} self.ws_url = self.create_url() websocket.enableTrace(False) def run(self, text: str, vcn: str, speed_mode: int, call_back): self.callback = call_back speed = self.get_speed_value(speed_mode) _text = text + "\n" self.add_parameters(_text, speed, vcn) self._ws = websocket.WebSocketApp(self.ws_url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) _proxy, proxy_type, host, port, user, pswd = get_proxy() try: if all([_proxy, proxy_type, host, port, user, pswd]): proxy_auth = (user, pswd) elif all([_proxy, proxy_type, host, port]): proxy_auth = None else: host = None port = None proxy_type = None proxy_auth = None # os.environ["no_proxy"] = "wsapi.xfyun.cn" self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type, http_proxy_auth=proxy_auth ) except: return def stop(self): if self._ws: self._ws.close() # 抛出音频 def _cb_audio(self): buf = bytes(self._audio) if buf: data = {"buf": buf} self.callback(data) self._audio.clear() # 清空音频块 # 生成url def create_url(self): # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = URL + '?' + urlencode(v) return url def add_parameters(self, word, speed, vcn): self.Text = word self.vcn = vcn # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "speed": speed, "vcn": vcn, "tte": "utf8"} self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")} # 使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"” # self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")} self.d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": self.Data, } self._audio.clear() self._last_ced = 0 self._last_buf = None @staticmethod def get_speed_value(speed_mode): if speed_mode == 1: speed = 80 elif speed_mode == 2: speed = 30 else: speed = 50 return speed # 收到websocket连接建立的处理 def on_open(self, ws): def run(*args): self._ws.send(json.dumps(self.d)) thread.start_new_thread(run, ()) def on_message(self, ws, message): message = json.loads(message) code = message["code"] sid = message["sid"] status = message["data"]["status"] ced = int(message["data"]["ced"]) # logger.debug(f"接收到了回调:{len(audio)}") if status == 2: # if self.vcn in ["aisjiuxu", ]: # self._audio += self._last_buf self._cb_audio() # 抛出音频 ws.close() data = {"end": 1} self.callback(data) else: audio_str = message["data"]["audio"] audio = base64.b64decode(audio_str) if code != 0: err_msg = message["message"] self.callback({"error": [sid, err_msg, code]}) return if self._last_ced < ced: self._cb_audio() # 抛出音频 # 因播放器原因 这里一些发音人的首帧音频需要拼接两次 self._audio += bytearray(audio) # 拼接句首音频 # 20220428 不多拼接一次可能会出现句首字不发音现象,拼接后可能会出现句首重复发音现象 暂无法解决 # if self.vcn in ["x2_mingge", "x2_yifei", "x2_catherine", "x2_john"]: # self._audio += bytearray(audio) # 拼接句首音频 else: self._audio += bytearray(audio) # 拼接句中音频 self._last_buf = bytearray(audio) self._last_ced = ced # except Exception as e: # logger.error(f"receive msg,but parse exception:{e}") # 收到websocket关闭的处理 @staticmethod def on_close(ws, *args): pass # 收到websocket错误的处理 def on_error(self, ws, error): self.callback({"error": error}) class TTSManager(Thread): def __init__(self, text, vcn, speed_mode, cb_fn, audio_queue): super(TTSManager, self).__init__() self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.callback = cb_fn self.audio_queue = audio_queue self._ws = None self.code = None self.sid = None self._speed = self.get_speed_value(speed_mode) # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": f"{vcn}", "speed": self._speed, "tte": "utf8"} self.Data = {"status": 2, "text": str(base64.b64encode(text.encode('utf-8')), "UTF8")} self.d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": self.Data, } # 生成url def create_url(self): # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = URL + '?' + urlencode(v) return url def run(self): ws_url = self.create_url() self._ws = websocket.WebSocketApp(ws_url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) _proxy, proxy_type, host, port, user, pswd = get_proxy() try: if all([_proxy, proxy_type, host, port, user, pswd]): self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type, http_proxy_auth=(user, pswd) ) elif all([_proxy, proxy_type, host, port]): self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type ) else: self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) except: return # 收到websocket连接建立的处理 def on_open(self, ws): def run(*args): ws.send(json.dumps(self.d)) thread.start_new_thread(run, ()) def on_message(self, ws, message): try: message = json.loads(message) self.code = message["code"] self.sid = message["sid"] status = message["data"]["status"] if status == 2: ws.close() data = {"end": 1} self.callback(data) if self.code != 0: err_msg = message["message"] if "NoneType" not in str(err_msg): self.callback({"error": [self.sid, err_msg, self.code]}) else: audio = message["data"]["audio"] audio = base64.b64decode(audio) self.audio_queue.put(audio) except Exception as e: if "NoneType" not in str(e): self.callback({"error": [self.sid, str(e), self.code]}) # 收到websocket关闭的处理 @staticmethod def on_close(ws, *args): pass # 收到websocket错误的处理 def on_error(self, ws, error): if "NoneType" not in str(error): self.callback({"error": [self.sid, str(error), self.code]}) @staticmethod def get_speed_value(speed_mode): if speed_mode == 1: speed = 80 elif speed_mode == 2: speed = 30 else: speed = 50 return speed def stop(self): self._ws.close() # TTS工作线程 class TTSManagerNew(Thread): def __init__(self, text, vcn, speed_mode, queue): super(TTSManagerNew, self).__init__() self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.daemon = True self.queue = queue self._ws = None self.code = None self.sid = None self._speed = self.get_speed_value(speed_mode) # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = { # "aue": "lame", "aue": "raw", # 'sfl': 1, "auf": "audio/L16;rate=16000", "vcn": f"{vcn}", "speed": self._speed, "tte": "utf8"} self.Data = {"status": 2, "text": str(base64.b64encode(text.encode('utf-8')), "UTF8")} self.d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": self.Data, } # 生成url def create_url(self): # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = URL + '?' + urlencode(v) return url def run(self): ws_url = self.create_url() self._ws = websocket.WebSocketApp(ws_url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) _proxy, proxy_type, host, port, user, pswd = get_proxy() try: if all([_proxy, proxy_type, host, port, user, pswd]): self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type, http_proxy_auth=(user, pswd) ) elif all([_proxy, proxy_type, host, port]): self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type ) else: self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) except: return # 收到websocket连接建立的处理 def on_open(self, ws): def run(*args): ws.send(json.dumps(self.d)) thread.start_new_thread(run, ()) def on_message(self, ws, message): try: message = json.loads(message) self.code = message["code"] self.sid = message["sid"] status = message["data"]["status"] if status == 2: audio = message["data"]["audio"] audio = base64.b64decode(audio) self.queue.put(audio) pass self.queue.put("--end--") if self.code != 0: err_msg = message["message"] if "NoneType" not in str(err_msg): self.queue.put(f"error:{self.sid},{err_msg},{self.code}") else: audio = message["data"]["audio"] audio = base64.b64decode(audio) self.queue.put(audio) except Exception as e: if "NoneType" not in str(e): self.queue.put(f"error:{self.sid},{str(e)},{self.code}") # 收到websocket关闭的处理 @staticmethod def on_close(ws, *args): pass # 收到websocket错误的处理 def on_error(self, ws, error): if "NoneType" not in str(error): self.queue.put(f"error:{self.sid},{str(error)},{self.code}") @staticmethod def get_speed_value(speed_mode): if speed_mode == 1: speed = 80 elif speed_mode == 2: speed = 30 else: speed = 50 return speed def stop(self): self._ws.close()