123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- from PyQt5.QtCore import QObject, pyqtSignal
- import grpc
- import iat_pb2
- import iat_pb2_grpc
- import threading
- import queue
- import time
- import uuid
- import hashlib
- from util import constants
- import lib_nlp_grpc
- import config
- from logger_config import logger
- url = '124.243.226.40:17022'
- class AudioRecorderClient(QObject):
- trigger = pyqtSignal(object) # 新信号,用于在收到响应时发射
- def __init__(self, vad_flag):
- super().__init__()
- self.hibernate = False
- self.stub = None
- self.channel = None
- self.vad_flag = vad_flag
- self.sid = get_auth_id()
- self.server_address = url
- self.is_engine_end = False
- '''
- RPC error: <_MultiThreadedRendezvous of RPC that terminated with:
- status = StatusCode.UNAVAILABLE
- details = "Connection reset"
- debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-06-04T02:43:28.2852678+00:00", grpc_status:14, grpc_message:"Connection reset"}"
- >
- '''
- # TODO,下面2行代码是不是应该每次识别的时候,都重新做一次,上面的报错在唤醒模式下偶现
- # 2次唤醒后,长时间不说话,可能造成grpc连接问题,需要测试演示笔是否也有问题? 放到start_speech里面是否会更好?
- # self.channel = grpc.insecure_channel(self.server_address)
- # self.stub = iat_pb2_grpc.IatStub(self.channel)
- self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息
- self.running = False
- # 获取当前系统时间戳,用于判断对话id是否过期,过期需要重新申请
- self.init_timestamp = time.time()
- # 调用引擎初始化对话,返回的对话id
- dialog_data = lib_nlp_grpc.create_dialog()
- self.did = dialog_data["did"]
- # self.did = str(uuid.uuid4())
- print('重新开启引擎连接' + str(self.did))
- self.hotwords = dialog_data["hotwords"]
- def _request_generator(self):
- # 生成器函数,用于发送音频数据和接收响应
- while True:
- request = self.request_queue.get() # 从队列中获取请求
- # logger.info(request)
- # 演示器松开是,调用stop_speech,将self.request_queue.put(None)置为None时,代表语音完成,跳出while循环
- # 麦克风模式,当收到引擎返回的endFlag标记时,也会清空request_queue
- if request is None:
- # None 作为结束信号
- print("Received None, closing the request generator.")
- break
- # 如果已经收到引擎返回结束标识了,也不需要再发送音频了
- # if self.is_engine_end:
- # print("引擎已结束,跳出循环发送音频数据")
- # break
- # grpc接口文档中3.3章节要求,发送音频有间隔,应该是针对模拟音频数据,此处已经是用户说话场景了,应该不需要发送间隔了
- # if request.samples: # 如果请求包含音频样本
- # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
- # 打印请求信息,如是否为结束标志和样本数据长度
- # print(
- # f"Sending request: sessionParam={request.sessionParam}, endFlag={request.endFlag}, samples length={len(request.samples)}")
- yield request
- # def _listen_for_responses(self):
- # # 监听响应
- # try:
- # for response in self.response_iterator:
- # self.trigger.emit(response) # 发射响应信号
- # except grpc.RpcError as e:
- # print(f"RPC error: {e}")
- def _listen_for_responses(self):
- ans_str_cache = "" # 缓存中间结果的字符串
- try:
- endFlag = False
- for response in self.response_iterator:
- print(response)
- print("----")
- # 检查是否存在中间结果,如果存在则缓存
- if response.ansStr:
- ans_str_cache += response.ansStr
- if response.endFlag:
- endFlag = True
- '''
- 使用VAD后端点模式,引擎已经返回识别结束标识了,因为麦克风一直监听中,程序可能还在发送音频,会收到此响应,需要过滤掉此类响应
- errStr: "session already stop"
- errCode: 30019
- endFlag: true
- '''
- # if response.endFlag and response.errCode == 30019:
- # continue
- if response.endFlag and response.errCode:
- print("errCode有值,直接跳出")
- continue
- print("self.hibernate:", self.hibernate)
- if self.hibernate:
- return
- # 检查是否为最后一个响应,如果是,则发射信号
- if endFlag:
- self.is_engine_end = True
- self.running = False
- # 发射信号,将缓存的中间结果和最后一次的结果一起发送
- # self.trigger.emit(ans_str_cache + response.ansStr)
- # 如果使用后断点识别,因为没有主动调用stop_speech发送endFlag=True,需要在接收到引擎的endFlag标记时,清空request_queue
- if self.vad_flag:
- self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
- self.request_queue.put(None)
- self.running = False
- # 关闭grpc连接
- # self.channel.close()
- # print("关闭grpc")
- self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, ans_str_cache))
- print("最终识别结果:", ans_str_cache)
- if ans_str_cache != "":
- if ans_str_cache.startswith("搜索"):
- nlp_result = '{"action_type":"9", "action_content":"' + ans_str_cache[2:] + '"}'
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- else:
- local_command = False
- # 新判断本地命令集合是否有满足的命令
- local_commands = config.local_commands
- for row_index, command in enumerate(local_commands):
- # 将action_content按逗号分割为列表
- action_list = command[3].split(',')
- # 检查列表中的任何一个字符串是否存在于ans_str_cache中
- local_command = any(action in ans_str_cache for action in action_list)
- if local_command:
- nlp_result = '{"action_type":"' + str(
- command[2]) + '", "action_content":"' + command[4] + '"}'
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- break
- if not local_command:
- # 调用语义引擎
- curr_timestamp = time.time()
- # 判断是否超过24小时,如果超过,需要调用接口重新申请
- # 计算时间戳之间的差值(单位为秒)
- time_difference = curr_timestamp - self.init_timestamp
- # 将差值转换为小时
- hours_difference = time_difference / 3600 # 1小时 = 3600秒
- # 如果差值超过24小时
- if hours_difference > 24:
- self.init_timestamp = curr_timestamp
- self.did = lib_nlp_grpc.create_dialog()
- nlp_result = lib_nlp_grpc.get_nlp_result(self.did, ans_str_cache)
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- else:
- # 识别文本为空时,也要通知,麦克风唤醒模式,需要通知麦克风重新开始收音
- nlp_result = '{}'
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- # 重置缓存
- ans_str_cache = ""
- except grpc.RpcError as e:
- print(f"RPC error: {e}")
- '''
- <_MultiThreadedRendezvous of RPC that terminated with:
- status = StatusCode.UNAVAILABLE
- details = "Connection reset"
- debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-07-01T11:32:19.6209787+00:00", grpc_status:14, grpc_message:"Connection reset"}"
- >
- '''
- # 后端点模式,如果引擎连接报错,需要手动通知麦克风重新开启录音
- # 会收到多次响应,都是这个Exception,这样写会造成多次重新开启录音
- if self.vad_flag:
- nlp_result = '{}'
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- def start_speech(self):
- print("start_speech:", self.running)
- self.is_engine_end = False
- self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息
- self.running = False
- if not self.running:
- self.channel = grpc.insecure_channel(self.server_address)
- self.stub = iat_pb2_grpc.IatStub(self.channel)
- self.running = True
- print("self.running被修改为True了")
- sid = str(uuid.uuid4())
- # 发送初始请求,例如会话参数
- if self.vad_flag:
- self.request_queue.put(iat_pb2.IatRequest(
- sessionParam={"sid": sid, "aue": "raw", "rst": "plain", "rate": "16k",
- "hotword": self.hotwords,
- "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": "",
- "eos": "1500",
- "engine_param": "srec_param_bVadOn=true;srec_param_nVadOutPut=0"},
- endFlag=False))
- print("111111111111111111111")
- else:
- self.request_queue.put(iat_pb2.IatRequest(
- sessionParam={"sid": sid, "aue": "raw", "rst": "plain", "rate": "16k",
- "hotword": self.hotwords,
- "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": ""},
- endFlag=False))
- print("222222222222222222222")
- # 初始化请求生成器并发送初始化请求
- self.response_iterator = self.stub.createRec(self._request_generator())
- threading.Thread(target=self._listen_for_responses, daemon=True).start()
- def audio_write(self, audio_chunk):
- if self.running:
- # 向队列中添加音频数据
- # print(audio_chunk)
- self.request_queue.put(iat_pb2.IatRequest(samples=audio_chunk, endFlag=False))
- def stop_speech(self):
- if self.running:
- # 可选,发送最后一个音频块后等待
- # 实际测试时,这个sleep必须有,不然识别结果不正确
- # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
- # 发送结束信号
- self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
- # 发送None以关闭生成器
- self.request_queue.put(None)
- self.running = False
- # 唤醒模式下,超时后,需要主动修改self.running,防止未置为False, 导致唤醒后无法启动
- def stop_running(self):
- print("stop_running")
- self.hibernate = True
- # 发送结束信号
- self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
- # 发送None以关闭生成器
- self.request_queue.put(None)
- self.running = False
- def start_running(self):
- print("start_running")
- self.hibernate = False
- self.request_queue = queue.Queue()
- self.running = False
- def close(self):
- self.channel.close()
- def _callback(self, data: dict):
- try:
- self.trigger.emit(data)
- except Exception as e:
- print(2)
- print(e)
- # 生成json数据模版
- def gen_q_data(self, code, data):
- return {"code": code, "data": data}
- def get_auth_id():
- mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
- return hashlib.md5(":".join([mac[e:e + 2] for e in range(0, 11, 2)]).encode("utf-8")).hexdigest()
|