lib_grpc.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. from PyQt5.QtCore import QObject, pyqtSignal
  2. import grpc
  3. import iat_pb2
  4. import iat_pb2_grpc
  5. import threading
  6. import queue
  7. import time
  8. import uuid
  9. import hashlib
  10. from util import constants
  11. import lib_nlp_grpc
  12. import config
  13. from logger_config import logger
  14. url = '124.243.226.40:17022'
  15. class AudioRecorderClient(QObject):
  16. trigger = pyqtSignal(object) # 新信号,用于在收到响应时发射
  17. def __init__(self, vad_flag):
  18. super().__init__()
  19. self.hibernate = False
  20. self.stub = None
  21. self.channel = None
  22. self.vad_flag = vad_flag
  23. self.sid = get_auth_id()
  24. self.server_address = url
  25. self.is_engine_end = False
  26. '''
  27. RPC error: <_MultiThreadedRendezvous of RPC that terminated with:
  28. status = StatusCode.UNAVAILABLE
  29. details = "Connection reset"
  30. debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-06-04T02:43:28.2852678+00:00", grpc_status:14, grpc_message:"Connection reset"}"
  31. >
  32. '''
  33. # TODO,下面2行代码是不是应该每次识别的时候,都重新做一次,上面的报错在唤醒模式下偶现
  34. # 2次唤醒后,长时间不说话,可能造成grpc连接问题,需要测试演示笔是否也有问题? 放到start_speech里面是否会更好?
  35. # self.channel = grpc.insecure_channel(self.server_address)
  36. # self.stub = iat_pb2_grpc.IatStub(self.channel)
  37. self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息
  38. self.running = False
  39. # 获取当前系统时间戳,用于判断对话id是否过期,过期需要重新申请
  40. self.init_timestamp = time.time()
  41. # 调用引擎初始化对话,返回的对话id
  42. dialog_data = lib_nlp_grpc.create_dialog()
  43. self.did = dialog_data["did"]
  44. # self.did = str(uuid.uuid4())
  45. print('重新开启引擎连接' + str(self.did))
  46. self.hotwords = dialog_data["hotwords"]
  47. def _request_generator(self):
  48. # 生成器函数,用于发送音频数据和接收响应
  49. while True:
  50. request = self.request_queue.get() # 从队列中获取请求
  51. # logger.info(request)
  52. # 演示器松开是,调用stop_speech,将self.request_queue.put(None)置为None时,代表语音完成,跳出while循环
  53. # 麦克风模式,当收到引擎返回的endFlag标记时,也会清空request_queue
  54. if request is None:
  55. # None 作为结束信号
  56. print("Received None, closing the request generator.")
  57. break
  58. # 如果已经收到引擎返回结束标识了,也不需要再发送音频了
  59. # if self.is_engine_end:
  60. # print("引擎已结束,跳出循环发送音频数据")
  61. # break
  62. # grpc接口文档中3.3章节要求,发送音频有间隔,应该是针对模拟音频数据,此处已经是用户说话场景了,应该不需要发送间隔了
  63. # if request.samples: # 如果请求包含音频样本
  64. # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
  65. # 打印请求信息,如是否为结束标志和样本数据长度
  66. # print(
  67. # f"Sending request: sessionParam={request.sessionParam}, endFlag={request.endFlag}, samples length={len(request.samples)}")
  68. yield request
  69. # def _listen_for_responses(self):
  70. # # 监听响应
  71. # try:
  72. # for response in self.response_iterator:
  73. # self.trigger.emit(response) # 发射响应信号
  74. # except grpc.RpcError as e:
  75. # print(f"RPC error: {e}")
  76. def _listen_for_responses(self):
  77. ans_str_cache = "" # 缓存中间结果的字符串
  78. try:
  79. endFlag = False
  80. for response in self.response_iterator:
  81. print(response)
  82. print("----")
  83. # 检查是否存在中间结果,如果存在则缓存
  84. if response.ansStr:
  85. ans_str_cache += response.ansStr
  86. if response.endFlag:
  87. endFlag = True
  88. '''
  89. 使用VAD后端点模式,引擎已经返回识别结束标识了,因为麦克风一直监听中,程序可能还在发送音频,会收到此响应,需要过滤掉此类响应
  90. errStr: "session already stop"
  91. errCode: 30019
  92. endFlag: true
  93. '''
  94. # if response.endFlag and response.errCode == 30019:
  95. # continue
  96. if response.endFlag and response.errCode:
  97. print("errCode有值,直接跳出")
  98. continue
  99. print("self.hibernate:", self.hibernate)
  100. if self.hibernate:
  101. return
  102. # 检查是否为最后一个响应,如果是,则发射信号
  103. if endFlag:
  104. self.is_engine_end = True
  105. self.running = False
  106. # 发射信号,将缓存的中间结果和最后一次的结果一起发送
  107. # self.trigger.emit(ans_str_cache + response.ansStr)
  108. # 如果使用后断点识别,因为没有主动调用stop_speech发送endFlag=True,需要在接收到引擎的endFlag标记时,清空request_queue
  109. if self.vad_flag:
  110. self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
  111. self.request_queue.put(None)
  112. self.running = False
  113. # 关闭grpc连接
  114. # self.channel.close()
  115. # print("关闭grpc")
  116. self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, ans_str_cache))
  117. print("最终识别结果:", ans_str_cache)
  118. if ans_str_cache != "":
  119. if ans_str_cache.startswith("搜索"):
  120. nlp_result = '{"action_type":"9", "action_content":"' + ans_str_cache[2:] + '"}'
  121. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  122. else:
  123. local_command = False
  124. # 新判断本地命令集合是否有满足的命令
  125. local_commands = config.local_commands
  126. for row_index, command in enumerate(local_commands):
  127. # 将action_content按逗号分割为列表
  128. action_list = command[3].split(',')
  129. # 检查列表中的任何一个字符串是否存在于ans_str_cache中
  130. local_command = any(action in ans_str_cache for action in action_list)
  131. if local_command:
  132. nlp_result = '{"action_type":"' + str(
  133. command[2]) + '", "action_content":"' + command[4] + '"}'
  134. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  135. break
  136. if not local_command:
  137. # 调用语义引擎
  138. curr_timestamp = time.time()
  139. # 判断是否超过24小时,如果超过,需要调用接口重新申请
  140. # 计算时间戳之间的差值(单位为秒)
  141. time_difference = curr_timestamp - self.init_timestamp
  142. # 将差值转换为小时
  143. hours_difference = time_difference / 3600 # 1小时 = 3600秒
  144. # 如果差值超过24小时
  145. if hours_difference > 24:
  146. self.init_timestamp = curr_timestamp
  147. self.did = lib_nlp_grpc.create_dialog()
  148. nlp_result = lib_nlp_grpc.get_nlp_result(self.did, ans_str_cache)
  149. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  150. else:
  151. # 识别文本为空时,也要通知,麦克风唤醒模式,需要通知麦克风重新开始收音
  152. nlp_result = '{}'
  153. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  154. # 重置缓存
  155. ans_str_cache = ""
  156. except grpc.RpcError as e:
  157. print(f"RPC error: {e}")
  158. '''
  159. <_MultiThreadedRendezvous of RPC that terminated with:
  160. status = StatusCode.UNAVAILABLE
  161. details = "Connection reset"
  162. debug_error_string = "UNKNOWN:Error received from peer {created_time:"2024-07-01T11:32:19.6209787+00:00", grpc_status:14, grpc_message:"Connection reset"}"
  163. >
  164. '''
  165. # 后端点模式,如果引擎连接报错,需要手动通知麦克风重新开启录音
  166. # 会收到多次响应,都是这个Exception,这样写会造成多次重新开启录音
  167. if self.vad_flag:
  168. nlp_result = '{}'
  169. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  170. def start_speech(self):
  171. print("start_speech:", self.running)
  172. self.is_engine_end = False
  173. self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息
  174. self.running = False
  175. if not self.running:
  176. self.channel = grpc.insecure_channel(self.server_address)
  177. self.stub = iat_pb2_grpc.IatStub(self.channel)
  178. self.running = True
  179. print("self.running被修改为True了")
  180. sid = str(uuid.uuid4())
  181. # 发送初始请求,例如会话参数
  182. if self.vad_flag:
  183. self.request_queue.put(iat_pb2.IatRequest(
  184. sessionParam={"sid": sid, "aue": "raw", "rst": "plain", "rate": "16k",
  185. "hotword": self.hotwords,
  186. "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": "",
  187. "eos": "1500",
  188. "engine_param": "srec_param_bVadOn=true;srec_param_nVadOutPut=0"},
  189. endFlag=False))
  190. print("111111111111111111111")
  191. else:
  192. self.request_queue.put(iat_pb2.IatRequest(
  193. sessionParam={"sid": sid, "aue": "raw", "rst": "plain", "rate": "16k",
  194. "hotword": self.hotwords,
  195. "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": ""},
  196. endFlag=False))
  197. print("222222222222222222222")
  198. # 初始化请求生成器并发送初始化请求
  199. self.response_iterator = self.stub.createRec(self._request_generator())
  200. threading.Thread(target=self._listen_for_responses, daemon=True).start()
  201. def audio_write(self, audio_chunk):
  202. if self.running:
  203. # 向队列中添加音频数据
  204. # print(audio_chunk)
  205. self.request_queue.put(iat_pb2.IatRequest(samples=audio_chunk, endFlag=False))
  206. def stop_speech(self):
  207. if self.running:
  208. # 可选,发送最后一个音频块后等待
  209. # 实际测试时,这个sleep必须有,不然识别结果不正确
  210. # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
  211. # 发送结束信号
  212. self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
  213. # 发送None以关闭生成器
  214. self.request_queue.put(None)
  215. self.running = False
  216. # 唤醒模式下,超时后,需要主动修改self.running,防止未置为False, 导致唤醒后无法启动
  217. def stop_running(self):
  218. print("stop_running")
  219. self.hibernate = True
  220. # 发送结束信号
  221. self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
  222. # 发送None以关闭生成器
  223. self.request_queue.put(None)
  224. self.running = False
  225. def start_running(self):
  226. print("start_running")
  227. self.hibernate = False
  228. self.request_queue = queue.Queue()
  229. self.running = False
  230. def close(self):
  231. self.channel.close()
  232. def _callback(self, data: dict):
  233. try:
  234. self.trigger.emit(data)
  235. except Exception as e:
  236. print(2)
  237. print(e)
  238. # 生成json数据模版
  239. def gen_q_data(self, code, data):
  240. return {"code": code, "data": data}
  241. def get_auth_id():
  242. mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
  243. return hashlib.md5(":".join([mac[e:e + 2] for e in range(0, 11, 2)]).encode("utf-8")).hexdigest()