# -*- encoding: utf-8 -*- """ @Author: 程尧 @Desc: 语音合成(朗读)功能 # 合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式 # 错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看) """ import time import websocket import datetime import hashlib import base64 import hmac import json import ssl import _thread as thread import os from threading import Thread import sys import random from urllib.parse import quote import config URL = 'ws://112.30.115.188:28080/dpwsTts/v2/tts' APPID = 'shuchuan' APIKey = '988472d12573b52e0a9f07ff5368eeac' APISecret = '1b802ee73ba0cb67e7a2331d59cb4fc8' # URL = config.config["DiPai_TTS_URL"] # APPID = config.config["DiPai_TTS_APPID"] # APIKey = config.config["DiPai_TTS_APIKey"] # APISecret = config.config["DiPai_TTS_APISecret"] def get_proxy(): _proxy = "" proxy_type = "" host = "" port = "" user = "" pswd = "" try: _proxy = os.environ["http_proxy"] except: pass else: try: addr = _proxy.split("//")[1] proxy_type_str = _proxy.split("//")[0] proxy_type = proxy_type_str[:-1] port = addr.split(":")[-1] if "@" in addr: # 有用户密码 host_str = addr.split("@")[-1] host = host_str.split(f":{port}")[0] auth_str = addr.split(host_str)[0] user = auth_str.split(":")[0] pswd_str = auth_str.split(":")[-1] pswd = pswd_str[:-1] else: host = addr.split(":")[0] except: pass return _proxy, proxy_type, host, port, user, pswd def get_home_env(): if sys.platform == "win32": return 'APPDATA' return 'HOME' def gen_tmp_mp3_file(): _dir = os.path.join(os.environ[get_home_env()], "iFLYAssistant") if not os.path.exists(_dir): os.makedirs(_dir) _tmp_dir = os.path.join(_dir, "tmp") if not os.path.exists(_tmp_dir): os.makedirs(_tmp_dir) _path = os.path.join(_tmp_dir, f"{int(time.time())}.mp3") return _path def get_nonce(): random_number = random.randint(1, 100) return random_number # TTS工作线程 class TTSManagerNew(Thread): def __init__(self, text, vcn, speed_mode, queue): super(TTSManagerNew, self).__init__() self.daemon = True self.queue = queue self._ws = None self.code = None self.sid = None self._speed = self.get_speed_value(speed_mode) # 公共参数(common) self.CommonArgs = {"app_id": APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": vcn, "tte": "UTF8"} self.Data = {"status": 2, "text": str(base64.b64encode(text.encode('utf-8')), "UTF8")} self.d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": self.Data, } # 生成url def create_url(self): # 构造握手参数 curTime = int(time.time()) nonce = get_nonce() srcStr = "APIKey=" + APIKey + "&Nonce=" + str(nonce) + "&Region=bj&Task=TTS&Timestamp=" + str(curTime) signature_sha = hmac.new(APISecret.encode('utf-8'), srcStr.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') print(signature_sha) signature_encoded = quote(signature_sha, safe='') print(signature_encoded) connParam = "?" + srcStr + "&Signature=" + signature_encoded # 拼接鉴权参数,生成url url = URL + connParam # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url def run(self): ws_url = self.create_url() self._ws = websocket.WebSocketApp(ws_url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) _proxy, proxy_type, host, port, user, pswd = get_proxy() try: if all([_proxy, proxy_type, host, port, user, pswd]): self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type, http_proxy_auth=(user, pswd) ) elif all([_proxy, proxy_type, host, port]): self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, http_proxy_host=host, http_proxy_port=port, proxy_type=proxy_type ) else: self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) except: return # 收到websocket连接建立的处理 def on_open(self, ws): def run(*args): ws.send(json.dumps(self.d)) thread.start_new_thread(run, ()) def on_message(self, ws, message): try: message = json.loads(message) self.code = message["code"] # self.sid = message["sid"] status = message["status"] if status == 2: audio = message["audio"] audio = base64.b64decode(audio) self.queue.put(audio) pass self.queue.put("--end--") if self.code != 0: err_msg = message["message"] if "NoneType" not in str(err_msg): self.queue.put(f"error:{err_msg},{self.code}") else: audio = message["audio"] audio = base64.b64decode(audio) self.queue.put(audio) except Exception as e: if "NoneType" not in str(e): self.queue.put(f"error:{self.sid},{str(e)},{self.code}") # 收到websocket关闭的处理 @staticmethod def on_close(ws, *args): pass # 收到websocket错误的处理 def on_error(self, ws, error): if "NoneType" not in str(error): self.queue.put(f"error:{self.sid},{str(error)},{self.code}") @staticmethod def get_speed_value(speed_mode): if speed_mode == 1: speed = 80 elif speed_mode == 2: speed = 30 else: speed = 50 return speed def stop(self): self._ws.close()