123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- from PyQt5.QtCore import QObject, pyqtSignal
- import grpc
- import iat_pb2
- import iat_pb2_grpc
- import threading
- import queue
- import time
- import uuid
- import hashlib
- from util import constants
- import lib_nlp_grpc
- import config
- url = '124.243.226.40:17022'
- class AudioRecorderClient(QObject):
- trigger = pyqtSignal(object) # 新信号,用于在收到响应时发射
- def __init__(self):
- super().__init__()
- self.sid = get_auth_id()
- self.server_address = url
- self.channel = grpc.insecure_channel(self.server_address)
- self.stub = iat_pb2_grpc.IatStub(self.channel)
- self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息
- self.running = False
- # 获取当前系统时间戳,用于判断对话id是否过期,过期需要重新申请
- self.init_timestamp = time.time()
- # 调用引擎初始化对话,返回的对话id
- dialog_data = lib_nlp_grpc.create_dialog()
- self.did = dialog_data["did"]
- self.hotwords = dialog_data["hotwords"]
- def _request_generator(self):
- # 生成器函数,用于发送音频数据和接收响应
- while True:
- request = self.request_queue.get() # 从队列中获取请求
- if request is None:
- # None 作为结束信号
- print("Received None, closing the request generator.")
- break
- # grpc接口文档中3.3章节要求,发送音频有间隔,应该是针对模拟音频数据,此处已经是用户说话场景了,应该不需要发送间隔了
- # if request.samples: # 如果请求包含音频样本
- # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
- # 打印请求信息,如是否为结束标志和样本数据长度
- # print(
- # f"Sending request: sessionParam={request.sessionParam}, endFlag={request.endFlag}, samples length={len(request.samples)}")
- yield request
- # def _listen_for_responses(self):
- # # 监听响应
- # try:
- # for response in self.response_iterator:
- # self.trigger.emit(response) # 发射响应信号
- # except grpc.RpcError as e:
- # print(f"RPC error: {e}")
- def _listen_for_responses(self):
- ans_str_cache = "" # 缓存中间结果的字符串
- try:
- for response in self.response_iterator:
- # 检查是否存在中间结果,如果存在则缓存
- if response.ansStr and not response.endFlag:
- ans_str_cache += response.ansStr
- # 检查是否为最后一个响应,如果是,则发射信号
- if response.endFlag:
- # 发射信号,将缓存的中间结果和最后一次的结果一起发送
- # self.trigger.emit(ans_str_cache + response.ansStr)
- self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, ans_str_cache))
- print("最终识别结果:", ans_str_cache)
- if ans_str_cache != "":
- if ans_str_cache.startswith("搜索"):
- nlp_result = '{"action_type":"9", "action_content":"' + ans_str_cache[2:] + '"}'
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- else:
- local_command = False
- # 新判断本地命令集合是否有满足的命令
- local_commands = config.local_commands
- for row_index, command in enumerate(local_commands):
- # 将action_content按逗号分割为列表
- action_list = command[3].split(',')
- # 检查列表中的任何一个字符串是否存在于ans_str_cache中
- local_command = any(action in ans_str_cache for action in action_list)
- if local_command:
- nlp_result = '{"action_type":"' + str(
- command[2]) + '", "action_content":"' + command[4] + '"}'
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- break
- if not local_command:
- # 调用语义引擎
- curr_timestamp = time.time()
- # 判断是否超过24小时,如果超过,需要调用接口重新申请
- # 计算时间戳之间的差值(单位为秒)
- time_difference = curr_timestamp - self.init_timestamp
- # 将差值转换为小时
- hours_difference = time_difference / 3600 # 1小时 = 3600秒
- # 如果差值超过24小时
- if hours_difference > 24:
- self.init_timestamp = curr_timestamp
- self.did = lib_nlp_grpc.create_dialog()
- nlp_result = lib_nlp_grpc.get_nlp_result(self.did, ans_str_cache)
- self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
- # 重置缓存
- ans_str_cache = ""
- except grpc.RpcError as e:
- print(f"RPC error: {e}")
- def start_speech(self):
- if not self.running:
- self.running = True
- # 初始化请求生成器并发送初始化请求
- self.response_iterator = self.stub.createRec(self._request_generator())
- threading.Thread(target=self._listen_for_responses, daemon=True).start()
- # 发送初始请求,例如会话参数
- self.request_queue.put(iat_pb2.IatRequest(
- sessionParam={"sid": self.sid, "aue": "raw", "rst": "plain", "rate": "16k",
- "hotword": self.hotwords,
- "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": ""}, endFlag=False))
- def audio_write(self, audio_chunk):
- if self.running:
- # 向队列中添加音频数据
- self.request_queue.put(iat_pb2.IatRequest(samples=audio_chunk, endFlag=False))
- def stop_speech(self):
- if self.running:
- # 可选,发送最后一个音频块后等待
- # 实际测试时,这个sleep必须有,不然识别结果不正确
- # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
- # 发送结束信号
- self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
- # 发送None以关闭生成器
- self.request_queue.put(None)
- self.running = False
- def close(self):
- self.channel.close()
- def _callback(self, data: dict):
- try:
- self.trigger.emit(data)
- except Exception as e:
- print(2)
- print(e)
- # 生成json数据模版
- def gen_q_data(self, code, data):
- return {"code": code, "data": data}
- def get_auth_id():
- mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
- return hashlib.md5(":".join([mac[e:e + 2] for e in range(0, 11, 2)]).encode("utf-8")).hexdigest()
|