from PyQt5.QtCore import QObject, pyqtSignal import grpc import iat_pb2 import iat_pb2_grpc import threading import queue import time import uuid import hashlib from util import constants import lib_nlp_grpc import config from logger_config import logger url = '124.243.226.40:17022' class AudioRecorderClient(QObject): trigger = pyqtSignal(object) # 新信号,用于在收到响应时发射 def __init__(self, vad_flag): super().__init__() self.hibernate = False self.stub = None self.channel = None self.vad_flag = vad_flag self.sid = get_auth_id() self.server_address = url self.is_engine_end = False ''' RPC error: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Connection reset" debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-06-04T02:43:28.2852678+00:00", grpc_status:14, grpc_message:"Connection reset"}" > ''' # TODO,下面2行代码是不是应该每次识别的时候,都重新做一次,上面的报错在唤醒模式下偶现 # 2次唤醒后,长时间不说话,可能造成grpc连接问题,需要测试演示笔是否也有问题? 放到start_speech里面是否会更好? # self.channel = grpc.insecure_channel(self.server_address) # self.stub = iat_pb2_grpc.IatStub(self.channel) self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息 self.running = False # 获取当前系统时间戳,用于判断对话id是否过期,过期需要重新申请 self.init_timestamp = time.time() # 调用引擎初始化对话,返回的对话id dialog_data = lib_nlp_grpc.create_dialog() self.did = dialog_data["did"] # self.did = str(uuid.uuid4()) print('重新开启引擎连接' + str(self.did)) self.hotwords = dialog_data["hotwords"] def _request_generator(self): # 生成器函数,用于发送音频数据和接收响应 while True: request = self.request_queue.get() # 从队列中获取请求 # logger.info(request) # 演示器松开是,调用stop_speech,将self.request_queue.put(None)置为None时,代表语音完成,跳出while循环 # 麦克风模式,当收到引擎返回的endFlag标记时,也会清空request_queue if request is None: # None 作为结束信号 print("Received None, closing the request generator.") break # 如果已经收到引擎返回结束标识了,也不需要再发送音频了 # if self.is_engine_end: # print("引擎已结束,跳出循环发送音频数据") # break # grpc接口文档中3.3章节要求,发送音频有间隔,应该是针对模拟音频数据,此处已经是用户说话场景了,应该不需要发送间隔了 # if request.samples: # 如果请求包含音频样本 # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms # 打印请求信息,如是否为结束标志和样本数据长度 # print( # f"Sending request: sessionParam={request.sessionParam}, endFlag={request.endFlag}, samples length={len(request.samples)}") yield request # def _listen_for_responses(self): # # 监听响应 # try: # for response in self.response_iterator: # self.trigger.emit(response) # 发射响应信号 # except grpc.RpcError as e: # print(f"RPC error: {e}") def _listen_for_responses(self): ans_str_cache = "" # 缓存中间结果的字符串 try: endFlag = False for response in self.response_iterator: print(response) print("----") # 检查是否存在中间结果,如果存在则缓存 if response.ansStr: ans_str_cache += response.ansStr if response.endFlag: endFlag = True ''' 使用VAD后端点模式,引擎已经返回识别结束标识了,因为麦克风一直监听中,程序可能还在发送音频,会收到此响应,需要过滤掉此类响应 errStr: "session already stop" errCode: 30019 endFlag: true ''' # if response.endFlag and response.errCode == 30019: # continue if response.endFlag and response.errCode: print("errCode有值,直接跳出") continue print("self.hibernate:", self.hibernate) if self.hibernate: return # 检查是否为最后一个响应,如果是,则发射信号 if endFlag: self.is_engine_end = True self.running = False # 发射信号,将缓存的中间结果和最后一次的结果一起发送 # self.trigger.emit(ans_str_cache + response.ansStr) # 如果使用后断点识别,因为没有主动调用stop_speech发送endFlag=True,需要在接收到引擎的endFlag标记时,清空request_queue if self.vad_flag: self.request_queue.put(iat_pb2.IatRequest(endFlag=True)) self.request_queue.put(None) self.running = False # 关闭grpc连接 # self.channel.close() # print("关闭grpc") self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, ans_str_cache)) print("最终识别结果:", ans_str_cache) if ans_str_cache != "": if ans_str_cache.startswith("搜索"): nlp_result = '{"action_type":"9", "action_content":"' + ans_str_cache[2:] + '"}' self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) else: local_command = False # 新判断本地命令集合是否有满足的命令 local_commands = config.local_commands for row_index, command in enumerate(local_commands): # 将action_content按逗号分割为列表 action_list = command[3].split(',') # 检查列表中的任何一个字符串是否存在于ans_str_cache中 local_command = any(action in ans_str_cache for action in action_list) if local_command: nlp_result = '{"action_type":"' + str( command[2]) + '", "action_content":"' + command[4] + '"}' self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) break if not local_command: # 调用语义引擎 curr_timestamp = time.time() # 判断是否超过24小时,如果超过,需要调用接口重新申请 # 计算时间戳之间的差值(单位为秒) time_difference = curr_timestamp - self.init_timestamp # 将差值转换为小时 hours_difference = time_difference / 3600 # 1小时 = 3600秒 # 如果差值超过24小时 if hours_difference > 24: self.init_timestamp = curr_timestamp self.did = lib_nlp_grpc.create_dialog() nlp_result = lib_nlp_grpc.get_nlp_result(self.did, ans_str_cache) self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) else: # 识别文本为空时,也要通知,麦克风唤醒模式,需要通知麦克风重新开始收音 nlp_result = '{}' self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) # 重置缓存 ans_str_cache = "" except grpc.RpcError as e: print(f"RPC error: {e}") ''' <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "Connection reset" debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-07-01T11:32:19.6209787+00:00", grpc_status:14, grpc_message:"Connection reset"}" > ''' # 后端点模式,如果引擎连接报错,需要手动通知麦克风重新开启录音 # 会收到多次响应,都是这个Exception,这样写会造成多次重新开启录音 if self.vad_flag: nlp_result = '{}' self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result)) def start_speech(self): print("start_speech:", self.running) self.is_engine_end = False self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息 self.running = False if not self.running: self.channel = grpc.insecure_channel(self.server_address) self.stub = iat_pb2_grpc.IatStub(self.channel) self.running = True print("self.running被修改为True了") sid = str(uuid.uuid4()) # 发送初始请求,例如会话参数 if self.vad_flag: self.request_queue.put(iat_pb2.IatRequest( sessionParam={"sid": sid, "aue": "raw", "rst": "plain", "rate": "16k", "hotword": self.hotwords, "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": "", "eos": "1500", "engine_param": "srec_param_bVadOn=true;srec_param_nVadOutPut=0"}, endFlag=False)) print("111111111111111111111") else: self.request_queue.put(iat_pb2.IatRequest( sessionParam={"sid": sid, "aue": "raw", "rst": "plain", "rate": "16k", "hotword": self.hotwords, "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": ""}, endFlag=False)) print("222222222222222222222") # 初始化请求生成器并发送初始化请求 self.response_iterator = self.stub.createRec(self._request_generator()) threading.Thread(target=self._listen_for_responses, daemon=True).start() def audio_write(self, audio_chunk): if self.running: # 向队列中添加音频数据 # print(audio_chunk) self.request_queue.put(iat_pb2.IatRequest(samples=audio_chunk, endFlag=False)) def stop_speech(self): if self.running: # 可选,发送最后一个音频块后等待 # 实际测试时,这个sleep必须有,不然识别结果不正确 # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms # 发送结束信号 self.request_queue.put(iat_pb2.IatRequest(endFlag=True)) # 发送None以关闭生成器 self.request_queue.put(None) self.running = False # 唤醒模式下,超时后,需要主动修改self.running,防止未置为False, 导致唤醒后无法启动 def stop_running(self): print("stop_running") self.hibernate = True # 发送结束信号 self.request_queue.put(iat_pb2.IatRequest(endFlag=True)) # 发送None以关闭生成器 self.request_queue.put(None) self.running = False def start_running(self): print("start_running") self.hibernate = False self.request_queue = queue.Queue() self.running = False def close(self): self.channel.close() def _callback(self, data: dict): try: self.trigger.emit(data) except Exception as e: print(2) print(e) # 生成json数据模版 def gen_q_data(self, code, data): return {"code": code, "data": data} def get_auth_id(): mac = uuid.UUID(int=uuid.getnode()).hex[-12:] return hashlib.md5(":".join([mac[e:e + 2] for e in range(0, 11, 2)]).encode("utf-8")).hexdigest()