lib_asr_dipai.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. from PyQt5.QtCore import QUrl, QTimer, QObject, pyqtSignal
  2. from PyQt5.QtWebSockets import QWebSocket
  3. import base64
  4. import hashlib
  5. import json
  6. import time
  7. import struct
  8. import hmac
  9. import random
  10. from urllib.parse import quote
  11. from util import constants
  12. from libs import lib_nlp_dipai
  13. import config
  14. BASE_URL = "ws://112.30.115.188:28080/dpwsAsr/recog"
  15. type = "wsAsr"
  16. APPID = "shuchuan"
  17. APIKey = "9863296d1a7234088c383e9e05b1a41b"
  18. APISecret = "a924a14b977d1587242d31228d026a00"
  19. # BASE_URL = config.config["DiPai_ASR_URL"]
  20. # type = "wsAsr"
  21. # APPID = config.config["DiPai_ASR_APPID"]
  22. # APIKey = config.config["DiPai_ASR_APIKey"]
  23. # APISecret = config.config["DiPai_ASR_APISecret"]
  24. def get_nonce():
  25. random_number = random.randint(1, 100)
  26. return random_number
  27. class DiPaiASRWebSocket(QObject):
  28. trigger = pyqtSignal(dict)
  29. def __init__(self):
  30. super().__init__()
  31. # 发送首个音频段时的状态
  32. self.status = None
  33. self.complete_transcription = ""
  34. # 创建WebSocket客户端实例
  35. self.socket = QWebSocket()
  36. # 连接成功时的信号和槽函数
  37. self.socket.connected.connect(self.on_connected)
  38. # 接收到消息时的信号和槽函数
  39. self.socket.textMessageReceived.connect(self.on_message)
  40. # 发生错误时的信号和槽函数
  41. self.socket.error.connect(self.on_error)
  42. # 连接关闭时的信号和槽函数
  43. self.socket.disconnected.connect(self.on_close)
  44. def on_connected(self):
  45. print("WebSocket 连接成功")
  46. self._callback(self.gen_q_data(1, ''))
  47. pass
  48. def on_error(self, error_code):
  49. print("WebSocket error:", error_code)
  50. pass
  51. def on_close(self):
  52. print("连接正常关闭")
  53. # def on_close(self, code, reason=None):
  54. # if code == 1000:
  55. # print("连接正常关闭")
  56. # else:
  57. # print("连接异常关闭,code:" + str(code) + " ,reason:" + str(reason))
  58. # 发送音频数据
  59. def audio_write(self, buf: bytes):
  60. # print(f"send_message socket.state(): {self.socket.state()}")
  61. # self.socket.sendBinaryMessage(buf)
  62. audio_base64 = base64.b64encode(buf).decode('utf-8')
  63. data = {
  64. "common": {
  65. "app_id": APPID
  66. },
  67. "data": {
  68. "status": self.status,
  69. "audio": audio_base64,
  70. "sr": 16000
  71. }
  72. }
  73. payload = json.dumps(data) # 将字典对象转换为JSON字符串
  74. print(payload)
  75. self.socket.sendTextMessage(payload)
  76. # 发送完第一个音频后,中间音频status要发送2
  77. self.status = 2
  78. def on_message(self, message):
  79. try:
  80. print("Received message:", message)
  81. received_data = json.loads(str(message))
  82. txt = received_data["data"]["txt"]
  83. txt = txt.replace("|", "")
  84. self.complete_transcription += txt
  85. is_end = received_data["data"]["isEnd"]
  86. if is_end == "1":
  87. # TODO 增加信号,通知调用方
  88. self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, self.complete_transcription))
  89. print("最终识别结果:", self.complete_transcription)
  90. # 调用帝派语义引擎
  91. nlp_result = lib_nlp_dipai.get_nlp_result(self.complete_transcription)
  92. print("语义理解结果:", nlp_result)
  93. self._callback(self.gen_q_data(constants.AITEST_DIPAI_NLP, nlp_result))
  94. # _action = data["action"]
  95. # self._sid = data["sid"]
  96. # if _action == "started": # 正常连接
  97. # d = f"连接已建立,sid:{self._sid}"
  98. # self._callback(self.gen_q_data(constants.AITEST_AIUI_LOG, d))
  99. # self._callback(self.gen_q_data(constants.AITEST_AIUI_START, True))
  100. # elif _action == "result":
  101. # _data = self.parse_aiui_v2_result(data)
  102. # if _data.get("is_finish"):
  103. # self._callback(
  104. # self.gen_q_data(constants.AITEST_AIUI_NLP, json.dumps(data["data"], ensure_ascii=False)))
  105. # self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, _data["rawText"]))
  106. # else:
  107. # self._callback(self.gen_q_data(constants.AITEST_AIUI_ERROR, data))
  108. except Exception as e:
  109. print(1)
  110. print(e)
  111. def _callback(self, data: dict):
  112. try:
  113. self.trigger.emit(data)
  114. except Exception as e:
  115. print(2)
  116. print(e)
  117. def create_url(self):
  118. # 构造握手参数
  119. curTime = int(time.time())
  120. nonce = get_nonce()
  121. srcStr = "APIKey=" + APIKey + "&Nonce=" + str(nonce) + "&Region=bj&Task=ASR&Timestamp=" + str(curTime)
  122. signature_sha = hmac.new(APISecret.encode('utf-8'),
  123. srcStr.encode('utf-8'),
  124. digestmod=hashlib.sha256).digest()
  125. signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
  126. # print(signature_sha)
  127. signature_encoded = quote(signature_sha, safe='')
  128. # print(signature_encoded)
  129. connParam = "?" + srcStr + "&Signature=" + signature_encoded
  130. self.ws_url = BASE_URL + connParam
  131. def start_speech(self):
  132. self.complete_transcription = ""
  133. # 发送首个音频段时的状态
  134. self.status = 1
  135. self.create_url()
  136. self.socket.open(QUrl(f"{self.ws_url}"))
  137. def stop_speech(self):
  138. print("发送结束标识")
  139. # self.socket.sendBinaryMessage(bytes(end_tag.encode("utf-8")))
  140. # 发送末个音频段时的状态
  141. status = 4
  142. # 发送空音频段
  143. sample_rate = 16000 # 采样率为16kHz
  144. sample_width = 2 # 采样位数为16位(2字节)
  145. duration = 0.1 # 静音持续时间(秒)
  146. num_samples = int(sample_rate * duration) # 计算所需的采样点数量
  147. # 定义静音样本值
  148. silent_sample = struct.pack("<h", 0) # 对于16位PCM,使用零样本
  149. # 生成静音数据
  150. # main.py中无法知道音频流何时结束,帝派引擎最后一个音频端不能为空,只能手动在结束时,拼接上100ms的静音数据
  151. silent_audio = silent_sample * num_samples * sample_width
  152. audio_base64 = base64.b64encode(silent_audio).decode('utf-8')
  153. data = {
  154. "common": {
  155. "app_id": APPID
  156. },
  157. "data": {
  158. "status": status,
  159. "audio": audio_base64,
  160. "sr": 16000
  161. }
  162. }
  163. payload = json.dumps(data) # 将字典对象转换为JSON字符串
  164. print(payload)
  165. self.socket.sendTextMessage(payload)
  166. # 生成json数据模版
  167. def gen_q_data(self, code, data):
  168. return {"code": code, "data": data}
  169. def sendAudio(self):
  170. print(self.socket.state())
  171. chunk_size = 32000 # 32000字节为一段音频数据
  172. with open('recording.wav', 'rb') as audio_file:
  173. audio_chunk = audio_file.read(chunk_size)
  174. status = 1 # 发送首个音频段时的状态
  175. while audio_chunk:
  176. if len(audio_chunk) < chunk_size:
  177. status = 4 # 发送末个音频段时的状态
  178. audio_base64 = base64.b64encode(audio_chunk).decode('utf-8')
  179. data = {
  180. "common": {
  181. "app_id": APPID
  182. },
  183. "data": {
  184. "status": status,
  185. "audio": audio_base64,
  186. "sr": 16000
  187. }
  188. }
  189. payload = json.dumps(data) # 将字典对象转换为JSON字符串
  190. self.socket.sendTextMessage(payload)
  191. status = 2 # 发送中间音频段时的状态
  192. audio_chunk = audio_file.read(chunk_size)