from PyQt5.QtCore import QUrl, QTimer, QObject, pyqtSignal from PyQt5.QtWebSockets import QWebSocket import base64 import hashlib import json import time import struct import hmac import random from urllib.parse import quote from util import constants from libs import lib_nlp_dipai import config BASE_URL = "ws://112.30.115.188:28080/dpwsAsr/recog" type = "wsAsr" APPID = "shuchuan" APIKey = "9863296d1a7234088c383e9e05b1a41b" APISecret = "a924a14b977d1587242d31228d026a00" # BASE_URL = config.config["DiPai_ASR_URL"] # type = "wsAsr" # APPID = config.config["DiPai_ASR_APPID"] # APIKey = config.config["DiPai_ASR_APIKey"] # APISecret = config.config["DiPai_ASR_APISecret"] def get_nonce(): random_number = random.randint(1, 100) return random_number class DiPaiASRWebSocket(QObject): trigger = pyqtSignal(dict) def __init__(self): super().__init__() # 发送首个音频段时的状态 self.status = None self.complete_transcription = "" # 创建WebSocket客户端实例 self.socket = QWebSocket() # 连接成功时的信号和槽函数 self.socket.connected.connect(self.on_connected) # 接收到消息时的信号和槽函数 self.socket.textMessageReceived.connect(self.on_message) # 发生错误时的信号和槽函数 self.socket.error.connect(self.on_error) # 连接关闭时的信号和槽函数 self.socket.disconnected.connect(self.on_close) def on_connected(self): print("WebSocket 连接成功") self._callback(self.gen_q_data(1, '')) pass def on_error(self, error_code): print("WebSocket error:", error_code) pass def on_close(self): print("连接正常关闭") # def on_close(self, code, reason=None): # if code == 1000: # print("连接正常关闭") # else: # print("连接异常关闭,code:" + str(code) + " ,reason:" + str(reason)) # 发送音频数据 def audio_write(self, buf: bytes): # print(f"send_message socket.state(): {self.socket.state()}") # self.socket.sendBinaryMessage(buf) audio_base64 = base64.b64encode(buf).decode('utf-8') data = { "common": { "app_id": APPID }, "data": { "status": self.status, "audio": audio_base64, "sr": 16000 } } payload = json.dumps(data) # 将字典对象转换为JSON字符串 print(payload) self.socket.sendTextMessage(payload) # 发送完第一个音频后,中间音频status要发送2 self.status = 2 def on_message(self, message): try: print("Received message:", message) received_data = json.loads(str(message)) txt = received_data["data"]["txt"] txt = txt.replace("|", "") self.complete_transcription += txt is_end = received_data["data"]["isEnd"] if is_end == "1": # TODO 增加信号,通知调用方 self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, self.complete_transcription)) print("最终识别结果:", self.complete_transcription) # 调用帝派语义引擎 nlp_result = lib_nlp_dipai.get_nlp_result(self.complete_transcription) print("语义理解结果:", nlp_result) self._callback(self.gen_q_data(constants.AITEST_DIPAI_NLP, nlp_result)) # _action = data["action"] # self._sid = data["sid"] # if _action == "started": # 正常连接 # d = f"连接已建立,sid:{self._sid}" # self._callback(self.gen_q_data(constants.AITEST_AIUI_LOG, d)) # self._callback(self.gen_q_data(constants.AITEST_AIUI_START, True)) # elif _action == "result": # _data = self.parse_aiui_v2_result(data) # if _data.get("is_finish"): # self._callback( # self.gen_q_data(constants.AITEST_AIUI_NLP, json.dumps(data["data"], ensure_ascii=False))) # self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, _data["rawText"])) # else: # self._callback(self.gen_q_data(constants.AITEST_AIUI_ERROR, data)) except Exception as e: print(1) print(e) def _callback(self, data: dict): try: self.trigger.emit(data) except Exception as e: print(2) print(e) def create_url(self): # 构造握手参数 curTime = int(time.time()) nonce = get_nonce() srcStr = "APIKey=" + APIKey + "&Nonce=" + str(nonce) + "&Region=bj&Task=ASR&Timestamp=" + str(curTime) signature_sha = hmac.new(APISecret.encode('utf-8'), srcStr.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') # print(signature_sha) signature_encoded = quote(signature_sha, safe='') # print(signature_encoded) connParam = "?" + srcStr + "&Signature=" + signature_encoded self.ws_url = BASE_URL + connParam def start_speech(self): self.complete_transcription = "" # 发送首个音频段时的状态 self.status = 1 self.create_url() self.socket.open(QUrl(f"{self.ws_url}")) def stop_speech(self): print("发送结束标识") # self.socket.sendBinaryMessage(bytes(end_tag.encode("utf-8"))) # 发送末个音频段时的状态 status = 4 # 发送空音频段 sample_rate = 16000 # 采样率为16kHz sample_width = 2 # 采样位数为16位(2字节) duration = 0.1 # 静音持续时间(秒) num_samples = int(sample_rate * duration) # 计算所需的采样点数量 # 定义静音样本值 silent_sample = struct.pack("