from PyQt5.QtCore import QObject, pyqtSignal import grpc import iat_pb2 import iat_pb2_grpc import threading import queue import time import uuid import hashlib from util import constants import lib_nlp_grpc import config url = '124.243.226.40:17022' class AudioRecorderClient(QObject): trigger = pyqtSignal(object) # 新信号,用于在收到响应时发射 def __init__(self): super().__init__() self.sid = get_auth_id() self.server_address = url self.channel = grpc.insecure_channel(self.server_address) self.stub = iat_pb2_grpc.IatStub(self.channel) self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息 self.running = False # 获取当前系统时间戳,用于判断对话id是否过期,过期需要重新申请 self.init_timestamp = time.time() # 调用引擎初始化对话,返回的对话id dialog_data = lib_nlp_grpc.create_dialog() self.did = dialog_data["did"] self.hotwords = dialog_data["hotwords"] def _request_generator(self): # 生成器函数,用于发送音频数据和接收响应 while True: request = self.request_queue.get() # 从队列中获取请求 if request is None: # None 作为结束信号 print("Received None, closing the request generator.") break # grpc接口文档中3.3章节要求,发送音频有间隔,应该是针对模拟音频数据,此处已经是用户说话场景了,应该不需要发送间隔了 # if request.samples: # 如果请求包含音频样本 # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms # 打印请求信息,如是否为结束标志和样本数据长度 # print( # f"Sending request: sessionParam={request.sessionParam}, endFlag={request.endFlag}, samples length={len(request.samples)}") yield request # def _listen_for_responses(self): # # 监听响应 # try: # for response in self.response_iterator: # self.trigger.emit(response) # 发射响应信号 # except grpc.RpcError as e: # print(f"RPC error: {e}") def _listen_for_responses(self): ans_str_cache = "" # 缓存中间结果的字符串 try: for response in self.response_iterator: # 检查是否存在中间结果,如果存在则缓存 if response.ansStr and not response.endFlag: ans_str_cache += response.ansStr # 检查是否为最后一个响应,如果是,则发射信号 if response.endFlag: # 发射信号,将缓存的中间结果和最后一次的结果一起发送 # self.trigger.emit(ans_str_cache + response.ansStr) self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, ans_str_cache)) print("最终识别结果:", ans_str_cache) if ans_str_cache != "": if ans_str_cache.startswith("搜索"): nlp_result = '{"action_type":"9", "action_content":"' + ans_str_cache[2:] + '"}' self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) else: local_command = False # 新判断本地命令集合是否有满足的命令 local_commands = config.local_commands for row_index, command in enumerate(local_commands): # 将action_content按逗号分割为列表 action_list = command[3].split(',') # 检查列表中的任何一个字符串是否存在于ans_str_cache中 local_command = any(action in ans_str_cache for action in action_list) if local_command: nlp_result = '{"action_type":"' + str( command[2]) + '", "action_content":"' + command[4] + '"}' self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) break if not local_command: # 调用语义引擎 curr_timestamp = time.time() # 判断是否超过24小时,如果超过,需要调用接口重新申请 # 计算时间戳之间的差值(单位为秒) time_difference = curr_timestamp - self.init_timestamp # 将差值转换为小时 hours_difference = time_difference / 3600 # 1小时 = 3600秒 # 如果差值超过24小时 if hours_difference > 24: self.init_timestamp = curr_timestamp self.did = lib_nlp_grpc.create_dialog() nlp_result = lib_nlp_grpc.get_nlp_result(self.did, ans_str_cache) self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) # 重置缓存 ans_str_cache = "" except grpc.RpcError as e: print(f"RPC error: {e}") def start_speech(self): if not self.running: self.running = True # 初始化请求生成器并发送初始化请求 self.response_iterator = self.stub.createRec(self._request_generator()) threading.Thread(target=self._listen_for_responses, daemon=True).start() # 发送初始请求,例如会话参数 self.request_queue.put(iat_pb2.IatRequest( sessionParam={"sid": self.sid, "aue": "raw", "rst": "plain", "rate": "16k", "hotword": self.hotwords, "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": ""}, endFlag=False)) def audio_write(self, audio_chunk): if self.running: # 向队列中添加音频数据 self.request_queue.put(iat_pb2.IatRequest(samples=audio_chunk, endFlag=False)) def stop_speech(self): if self.running: # 可选,发送最后一个音频块后等待 # 实际测试时,这个sleep必须有,不然识别结果不正确 # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms # 发送结束信号 self.request_queue.put(iat_pb2.IatRequest(endFlag=True)) # 发送None以关闭生成器 self.request_queue.put(None) self.running = False def close(self): self.channel.close() def _callback(self, data: dict): try: self.trigger.emit(data) except Exception as e: print(2) print(e) # 生成json数据模版 def gen_q_data(self, code, data): return {"code": code, "data": data} def get_auth_id(): mac = uuid.UUID(int=uuid.getnode()).hex[-12:] return hashlib.md5(":".join([mac[e:e + 2] for e in range(0, 11, 2)]).encode("utf-8")).hexdigest()