123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from PyQt5.QtCore import QUrl, QTimer, QObject, pyqtSignal
- from PyQt5.QtWebSockets import QWebSocket
- import base64
- import hashlib
- import json
- import time
- import struct
- import hmac
- import random
- from urllib.parse import quote
- from util import constants
- from libs import lib_nlp_dipai
- import config
- BASE_URL = "ws://112.30.115.188:28080/dpwsAsr/recog"
- type = "wsAsr"
- APPID = "shuchuan"
- APIKey = "9863296d1a7234088c383e9e05b1a41b"
- APISecret = "a924a14b977d1587242d31228d026a00"
- # BASE_URL = config.config["DiPai_ASR_URL"]
- # type = "wsAsr"
- # APPID = config.config["DiPai_ASR_APPID"]
- # APIKey = config.config["DiPai_ASR_APIKey"]
- # APISecret = config.config["DiPai_ASR_APISecret"]
- def get_nonce():
- random_number = random.randint(1, 100)
- return random_number
- class DiPaiASRWebSocket(QObject):
- trigger = pyqtSignal(dict)
- def __init__(self):
- super().__init__()
- # 发送首个音频段时的状态
- self.status = None
- self.complete_transcription = ""
- # 创建WebSocket客户端实例
- self.socket = QWebSocket()
- # 连接成功时的信号和槽函数
- self.socket.connected.connect(self.on_connected)
- # 接收到消息时的信号和槽函数
- self.socket.textMessageReceived.connect(self.on_message)
- # 发生错误时的信号和槽函数
- self.socket.error.connect(self.on_error)
- # 连接关闭时的信号和槽函数
- self.socket.disconnected.connect(self.on_close)
- def on_connected(self):
- print("WebSocket 连接成功")
- self._callback(self.gen_q_data(1, ''))
- pass
- def on_error(self, error_code):
- print("WebSocket error:", error_code)
- pass
- def on_close(self):
- print("连接正常关闭")
- # def on_close(self, code, reason=None):
- # if code == 1000:
- # print("连接正常关闭")
- # else:
- # print("连接异常关闭,code:" + str(code) + " ,reason:" + str(reason))
- # 发送音频数据
- def audio_write(self, buf: bytes):
- # print(f"send_message socket.state(): {self.socket.state()}")
- # self.socket.sendBinaryMessage(buf)
- audio_base64 = base64.b64encode(buf).decode('utf-8')
- data = {
- "common": {
- "app_id": APPID
- },
- "data": {
- "status": self.status,
- "audio": audio_base64,
- "sr": 16000
- }
- }
- payload = json.dumps(data) # 将字典对象转换为JSON字符串
- print(payload)
- self.socket.sendTextMessage(payload)
- # 发送完第一个音频后,中间音频status要发送2
- self.status = 2
- def on_message(self, message):
- try:
- print("Received message:", message)
- received_data = json.loads(str(message))
- txt = received_data["data"]["txt"]
- txt = txt.replace("|", "")
- self.complete_transcription += txt
- is_end = received_data["data"]["isEnd"]
- if is_end == "1":
- # TODO 增加信号,通知调用方
- self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, self.complete_transcription))
- print("最终识别结果:", self.complete_transcription)
- # 调用帝派语义引擎
- nlp_result = lib_nlp_dipai.get_nlp_result(self.complete_transcription)
- print("语义理解结果:", nlp_result)
- self._callback(self.gen_q_data(constants.AITEST_DIPAI_NLP, nlp_result))
- # _action = data["action"]
- # self._sid = data["sid"]
- # if _action == "started": # 正常连接
- # d = f"连接已建立,sid:{self._sid}"
- # self._callback(self.gen_q_data(constants.AITEST_AIUI_LOG, d))
- # self._callback(self.gen_q_data(constants.AITEST_AIUI_START, True))
- # elif _action == "result":
- # _data = self.parse_aiui_v2_result(data)
- # if _data.get("is_finish"):
- # self._callback(
- # self.gen_q_data(constants.AITEST_AIUI_NLP, json.dumps(data["data"], ensure_ascii=False)))
- # self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, _data["rawText"]))
- # else:
- # self._callback(self.gen_q_data(constants.AITEST_AIUI_ERROR, data))
- except Exception as e:
- print(1)
- print(e)
- def _callback(self, data: dict):
- try:
- self.trigger.emit(data)
- except Exception as e:
- print(2)
- print(e)
- def create_url(self):
- # 构造握手参数
- curTime = int(time.time())
- nonce = get_nonce()
- srcStr = "APIKey=" + APIKey + "&Nonce=" + str(nonce) + "&Region=bj&Task=ASR&Timestamp=" + str(curTime)
- signature_sha = hmac.new(APISecret.encode('utf-8'),
- srcStr.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
- # print(signature_sha)
- signature_encoded = quote(signature_sha, safe='')
- # print(signature_encoded)
- connParam = "?" + srcStr + "&Signature=" + signature_encoded
- self.ws_url = BASE_URL + connParam
- def start_speech(self):
- self.complete_transcription = ""
- # 发送首个音频段时的状态
- self.status = 1
- self.create_url()
- self.socket.open(QUrl(f"{self.ws_url}"))
- def stop_speech(self):
- print("发送结束标识")
- # self.socket.sendBinaryMessage(bytes(end_tag.encode("utf-8")))
- # 发送末个音频段时的状态
- status = 4
- # 发送空音频段
- sample_rate = 16000 # 采样率为16kHz
- sample_width = 2 # 采样位数为16位(2字节)
- duration = 0.1 # 静音持续时间(秒)
- num_samples = int(sample_rate * duration) # 计算所需的采样点数量
- # 定义静音样本值
- silent_sample = struct.pack("<h", 0) # 对于16位PCM,使用零样本
- # 生成静音数据
- # main.py中无法知道音频流何时结束,帝派引擎最后一个音频端不能为空,只能手动在结束时,拼接上100ms的静音数据
- silent_audio = silent_sample * num_samples * sample_width
- audio_base64 = base64.b64encode(silent_audio).decode('utf-8')
- data = {
- "common": {
- "app_id": APPID
- },
- "data": {
- "status": status,
- "audio": audio_base64,
- "sr": 16000
- }
- }
- payload = json.dumps(data) # 将字典对象转换为JSON字符串
- print(payload)
- self.socket.sendTextMessage(payload)
- # 生成json数据模版
- def gen_q_data(self, code, data):
- return {"code": code, "data": data}
- def sendAudio(self):
- print(self.socket.state())
- chunk_size = 32000 # 32000字节为一段音频数据
- with open('recording.wav', 'rb') as audio_file:
- audio_chunk = audio_file.read(chunk_size)
- status = 1 # 发送首个音频段时的状态
- while audio_chunk:
- if len(audio_chunk) < chunk_size:
- status = 4 # 发送末个音频段时的状态
- audio_base64 = base64.b64encode(audio_chunk).decode('utf-8')
- data = {
- "common": {
- "app_id": APPID
- },
- "data": {
- "status": status,
- "audio": audio_base64,
- "sr": 16000
- }
- }
- payload = json.dumps(data) # 将字典对象转换为JSON字符串
- self.socket.sendTextMessage(payload)
- status = 2 # 发送中间音频段时的状态
- audio_chunk = audio_file.read(chunk_size)
|