123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- # -*- encoding: utf-8 -*-
- """
- @Desc: 语音合成(朗读)功能
- # 合成小语种需要传输小语种文本、使用小语种发音人vcn、tte=unicode以及修改文本编码方式
- # 错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)
- """
- import _thread as thread
- import base64
- import datetime
- import hashlib
- import hmac
- import json
- import os
- import ssl
- import sys
- import time
- from datetime import datetime
- from threading import Thread
- from time import mktime
- from urllib.parse import urlencode
- from wsgiref.handlers import format_date_time
- import websocket
- URL = 'wss://tts-api.xfyun.cn/v2/tts'
- APPID = 'ab154863'
- APIKey = '5a584b63730d3450d64cdd65f17f56d0'
- APISecret = 'NGE3NGIyZDBlNWU0YTg3YjQ5YTJlY2Q2'
- def get_proxy():
- _proxy = ""
- proxy_type = ""
- host = ""
- port = ""
- user = ""
- pswd = ""
- try:
- _proxy = os.environ["http_proxy"]
- except:
- pass
- else:
- try:
- addr = _proxy.split("//")[1]
- proxy_type_str = _proxy.split("//")[0]
- proxy_type = proxy_type_str[:-1]
- port = addr.split(":")[-1]
- if "@" in addr: # 有用户密码
- host_str = addr.split("@")[-1]
- host = host_str.split(f":{port}")[0]
- auth_str = addr.split(host_str)[0]
- user = auth_str.split(":")[0]
- pswd_str = auth_str.split(":")[-1]
- pswd = pswd_str[:-1]
- else:
- host = addr.split(":")[0]
- except:
- pass
- return _proxy, proxy_type, host, port, user, pswd
- def get_home_env():
- if sys.platform == "win32":
- return 'APPDATA'
- return 'HOME'
- def gen_tmp_mp3_file():
- _dir = os.path.join(os.environ[get_home_env()], "xAssistant")
- if not os.path.exists(_dir):
- os.makedirs(_dir)
- _tmp_dir = os.path.join(_dir, "tmp")
- if not os.path.exists(_tmp_dir):
- os.makedirs(_tmp_dir)
- _path = os.path.join(_tmp_dir, f"{int(time.time())}.mp3")
- return _path
- class WsManager:
- # 上一块音频的合成进度
- _last_ced = 0
- # 上一块音频数据
- _last_buf = None
- # 音频数据块
- _audio = bytearray()
- # 初始化
- def __init__(self):
- self.Data = None
- self.Text = None
- self.vcn = None
- self._ws = None
- self.BusinessArgs = None
- self.APPID = APPID
- self.APIKey = APIKey
- self.APISecret = APISecret
- self.callback = None
- self.d = None
- # 公共参数(common)
- self.CommonArgs = {"app_id": self.APPID}
- self.ws_url = self.create_url()
- websocket.enableTrace(False)
- def run(self, text: str, vcn: str, speed_mode: int, call_back):
- self.callback = call_back
- speed = self.get_speed_value(speed_mode)
- _text = text + "\n"
- self.add_parameters(_text, speed, vcn)
- self._ws = websocket.WebSocketApp(self.ws_url, on_open=self.on_open, on_message=self.on_message,
- on_error=self.on_error,
- on_close=self.on_close)
- _proxy, proxy_type, host, port, user, pswd = get_proxy()
- try:
- if all([_proxy, proxy_type, host, port, user, pswd]):
- proxy_auth = (user, pswd)
- elif all([_proxy, proxy_type, host, port]):
- proxy_auth = None
- else:
- host = None
- port = None
- proxy_type = None
- proxy_auth = None
- # os.environ["no_proxy"] = "wsapi.xfyun.cn"
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE},
- http_proxy_host=host,
- http_proxy_port=port,
- proxy_type=proxy_type,
- http_proxy_auth=proxy_auth
- )
- except:
- return
- def stop(self):
- if self._ws:
- self._ws.close()
- # 抛出音频
- def _cb_audio(self):
- buf = bytes(self._audio)
- if buf:
- data = {"buf": buf}
- self.callback(data)
- self._audio.clear() # 清空音频块
- # 生成url
- def create_url(self):
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- # 拼接字符串
- signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
- self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": "ws-api.xfyun.cn"
- }
- # 拼接鉴权参数,生成url
- url = URL + '?' + urlencode(v)
- return url
- def add_parameters(self, word, speed, vcn):
- self.Text = word
- self.vcn = vcn
- # 业务参数(business),更多个性化参数可在官网查看
- self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "speed": speed, "vcn": vcn, "tte": "utf8"}
- self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")}
- # 使用小语种须使用以下方式,此处的unicode指的是 utf16小端的编码方式,即"UTF-16LE"”
- # self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-16')), "UTF8")}
- self.d = {"common": self.CommonArgs,
- "business": self.BusinessArgs,
- "data": self.Data,
- }
- self._audio.clear()
- self._last_ced = 0
- self._last_buf = None
- @staticmethod
- def get_speed_value(speed_mode):
- if speed_mode == 1:
- speed = 80
- elif speed_mode == 2:
- speed = 30
- else:
- speed = 50
- return speed
- # 收到websocket连接建立的处理
- def on_open(self, ws):
- def run(*args):
- self._ws.send(json.dumps(self.d))
- thread.start_new_thread(run, ())
- def on_message(self, ws, message):
- message = json.loads(message)
- code = message["code"]
- sid = message["sid"]
- status = message["data"]["status"]
- ced = int(message["data"]["ced"])
- # logger.debug(f"接收到了回调:{len(audio)}")
- if status == 2:
- # if self.vcn in ["aisjiuxu", ]:
- # self._audio += self._last_buf
- self._cb_audio() # 抛出音频
- ws.close()
- data = {"end": 1}
- self.callback(data)
- else:
- audio_str = message["data"]["audio"]
- audio = base64.b64decode(audio_str)
- if code != 0:
- err_msg = message["message"]
- self.callback({"error": [sid, err_msg, code]})
- return
- if self._last_ced < ced:
- self._cb_audio() # 抛出音频
- # 因播放器原因 这里一些发音人的首帧音频需要拼接两次
- self._audio += bytearray(audio) # 拼接句首音频
- # 20220428 不多拼接一次可能会出现句首字不发音现象,拼接后可能会出现句首重复发音现象 暂无法解决
- # if self.vcn in ["x2_mingge", "x2_yifei", "x2_catherine", "x2_john"]:
- # self._audio += bytearray(audio) # 拼接句首音频
- else:
- self._audio += bytearray(audio) # 拼接句中音频
- self._last_buf = bytearray(audio)
- self._last_ced = ced
- # except Exception as e:
- # logger.error(f"receive msg,but parse exception:{e}")
- # 收到websocket关闭的处理
- @staticmethod
- def on_close(ws, *args):
- pass
- # 收到websocket错误的处理
- def on_error(self, ws, error):
- self.callback({"error": error})
- class TTSManager(Thread):
- def __init__(self, text, vcn, speed_mode, cb_fn, audio_queue):
- super(TTSManager, self).__init__()
- self.APPID = APPID
- self.APIKey = APIKey
- self.APISecret = APISecret
- self.callback = cb_fn
- self.audio_queue = audio_queue
- self._ws = None
- self.code = None
- self.sid = None
- self._speed = self.get_speed_value(speed_mode)
- # 公共参数(common)
- self.CommonArgs = {"app_id": self.APPID}
- # 业务参数(business),更多个性化参数可在官网查看
- self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": f"{vcn}", "speed": self._speed,
- "tte": "utf8"}
- self.Data = {"status": 2, "text": str(base64.b64encode(text.encode('utf-8')), "UTF8")}
- self.d = {"common": self.CommonArgs,
- "business": self.BusinessArgs,
- "data": self.Data,
- }
- # 生成url
- def create_url(self):
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- # 拼接字符串
- signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
- self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": "ws-api.xfyun.cn"
- }
- # 拼接鉴权参数,生成url
- url = URL + '?' + urlencode(v)
- return url
- def run(self):
- ws_url = self.create_url()
- self._ws = websocket.WebSocketApp(ws_url, on_open=self.on_open, on_message=self.on_message,
- on_error=self.on_error,
- on_close=self.on_close)
- _proxy, proxy_type, host, port, user, pswd = get_proxy()
- try:
- if all([_proxy, proxy_type, host, port, user, pswd]):
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE},
- http_proxy_host=host,
- http_proxy_port=port,
- proxy_type=proxy_type,
- http_proxy_auth=(user, pswd)
- )
- elif all([_proxy, proxy_type, host, port]):
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE},
- http_proxy_host=host,
- http_proxy_port=port,
- proxy_type=proxy_type
- )
- else:
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
- except:
- return
- # 收到websocket连接建立的处理
- def on_open(self, ws):
- def run(*args):
- ws.send(json.dumps(self.d))
- thread.start_new_thread(run, ())
- def on_message(self, ws, message):
- try:
- message = json.loads(message)
- self.code = message["code"]
- self.sid = message["sid"]
- status = message["data"]["status"]
- if status == 2:
- ws.close()
- data = {"end": 1}
- self.callback(data)
- if self.code != 0:
- err_msg = message["message"]
- if "NoneType" not in str(err_msg):
- self.callback({"error": [self.sid, err_msg, self.code]})
- else:
- audio = message["data"]["audio"]
- audio = base64.b64decode(audio)
- self.audio_queue.put(audio)
- except Exception as e:
- if "NoneType" not in str(e):
- self.callback({"error": [self.sid, str(e), self.code]})
- # 收到websocket关闭的处理
- @staticmethod
- def on_close(ws, *args):
- pass
- # 收到websocket错误的处理
- def on_error(self, ws, error):
- if "NoneType" not in str(error):
- self.callback({"error": [self.sid, str(error), self.code]})
- @staticmethod
- def get_speed_value(speed_mode):
- if speed_mode == 1:
- speed = 80
- elif speed_mode == 2:
- speed = 30
- else:
- speed = 50
- return speed
- def stop(self):
- self._ws.close()
- # TTS工作线程
- class TTSManagerNew(Thread):
- def __init__(self, text, vcn, speed_mode, queue):
- super(TTSManagerNew, self).__init__()
- self.APPID = APPID
- self.APIKey = APIKey
- self.APISecret = APISecret
- self.daemon = True
- self.queue = queue
- self._ws = None
- self.code = None
- self.sid = None
- self._speed = self.get_speed_value(speed_mode)
- # 公共参数(common)
- self.CommonArgs = {"app_id": self.APPID}
- # 业务参数(business),更多个性化参数可在官网查看
- self.BusinessArgs = {
- # "aue": "lame",
- "aue": "raw",
- # 'sfl': 1,
- "auf": "audio/L16;rate=16000",
- "vcn": f"{vcn}",
- "speed": self._speed,
- "tte": "utf8"}
- self.Data = {"status": 2, "text": str(base64.b64encode(text.encode('utf-8')), "UTF8")}
- self.d = {"common": self.CommonArgs,
- "business": self.BusinessArgs,
- "data": self.Data,
- }
- # 生成url
- def create_url(self):
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- # 拼接字符串
- signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + "/v2/tts " + "HTTP/1.1"
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
- self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": "ws-api.xfyun.cn"
- }
- # 拼接鉴权参数,生成url
- url = URL + '?' + urlencode(v)
- return url
- def run(self):
- ws_url = self.create_url()
- self._ws = websocket.WebSocketApp(ws_url, on_open=self.on_open, on_message=self.on_message,
- on_error=self.on_error,
- on_close=self.on_close)
- _proxy, proxy_type, host, port, user, pswd = get_proxy()
- try:
- if all([_proxy, proxy_type, host, port, user, pswd]):
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE},
- http_proxy_host=host,
- http_proxy_port=port,
- proxy_type=proxy_type,
- http_proxy_auth=(user, pswd)
- )
- elif all([_proxy, proxy_type, host, port]):
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE},
- http_proxy_host=host,
- http_proxy_port=port,
- proxy_type=proxy_type
- )
- else:
- self._ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
- except:
- return
- # 收到websocket连接建立的处理
- def on_open(self, ws):
- def run(*args):
- ws.send(json.dumps(self.d))
- thread.start_new_thread(run, ())
- def on_message(self, ws, message):
- try:
- message = json.loads(message)
- self.code = message["code"]
- self.sid = message["sid"]
- status = message["data"]["status"]
- if status == 2:
- audio = message["data"]["audio"]
- audio = base64.b64decode(audio)
- self.queue.put(audio)
- pass
- self.queue.put("--end--")
- if self.code != 0:
- err_msg = message["message"]
- if "NoneType" not in str(err_msg):
- self.queue.put(f"error:{self.sid},{err_msg},{self.code}")
- else:
- audio = message["data"]["audio"]
- audio = base64.b64decode(audio)
- self.queue.put(audio)
- except Exception as e:
- if "NoneType" not in str(e):
- self.queue.put(f"error:{self.sid},{str(e)},{self.code}")
- # 收到websocket关闭的处理
- @staticmethod
- def on_close(ws, *args):
- pass
- # 收到websocket错误的处理
- def on_error(self, ws, error):
- if "NoneType" not in str(error):
- self.queue.put(f"error:{self.sid},{str(error)},{self.code}")
- @staticmethod
- def get_speed_value(speed_mode):
- if speed_mode == 1:
- speed = 80
- elif speed_mode == 2:
- speed = 30
- else:
- speed = 50
- return speed
- def stop(self):
- self._ws.close()
|