lib_grpc.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. from PyQt5.QtCore import QObject, pyqtSignal
  2. import grpc
  3. import iat_pb2
  4. import iat_pb2_grpc
  5. import threading
  6. import queue
  7. import time
  8. import uuid
  9. import hashlib
  10. from util import constants
  11. import lib_nlp_grpc
  12. import config
  13. url = '124.243.226.40:17022'
  14. class AudioRecorderClient(QObject):
  15. trigger = pyqtSignal(object) # 新信号,用于在收到响应时发射
  16. def __init__(self):
  17. super().__init__()
  18. self.sid = get_auth_id()
  19. self.server_address = url
  20. self.channel = grpc.insecure_channel(self.server_address)
  21. self.stub = iat_pb2_grpc.IatStub(self.channel)
  22. self.request_queue = queue.Queue() # 请求队列,用于管理音频数据和控制消息
  23. self.running = False
  24. # 获取当前系统时间戳,用于判断对话id是否过期,过期需要重新申请
  25. self.init_timestamp = time.time()
  26. # 调用引擎初始化对话,返回的对话id
  27. dialog_data = lib_nlp_grpc.create_dialog()
  28. self.did = dialog_data["did"]
  29. self.hotwords = dialog_data["hotwords"]
  30. def _request_generator(self):
  31. # 生成器函数,用于发送音频数据和接收响应
  32. while True:
  33. request = self.request_queue.get() # 从队列中获取请求
  34. if request is None:
  35. # None 作为结束信号
  36. print("Received None, closing the request generator.")
  37. break
  38. # grpc接口文档中3.3章节要求,发送音频有间隔,应该是针对模拟音频数据,此处已经是用户说话场景了,应该不需要发送间隔了
  39. # if request.samples: # 如果请求包含音频样本
  40. # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
  41. # 打印请求信息,如是否为结束标志和样本数据长度
  42. # print(
  43. # f"Sending request: sessionParam={request.sessionParam}, endFlag={request.endFlag}, samples length={len(request.samples)}")
  44. yield request
  45. # def _listen_for_responses(self):
  46. # # 监听响应
  47. # try:
  48. # for response in self.response_iterator:
  49. # self.trigger.emit(response) # 发射响应信号
  50. # except grpc.RpcError as e:
  51. # print(f"RPC error: {e}")
  52. def _listen_for_responses(self):
  53. ans_str_cache = "" # 缓存中间结果的字符串
  54. try:
  55. for response in self.response_iterator:
  56. # 检查是否存在中间结果,如果存在则缓存
  57. if response.ansStr and not response.endFlag:
  58. ans_str_cache += response.ansStr
  59. # 检查是否为最后一个响应,如果是,则发射信号
  60. if response.endFlag:
  61. # 发射信号,将缓存的中间结果和最后一次的结果一起发送
  62. # self.trigger.emit(ans_str_cache + response.ansStr)
  63. self._callback(self.gen_q_data(constants.AITEST_AIUI_RESULT, ans_str_cache))
  64. print("最终识别结果:", ans_str_cache)
  65. if ans_str_cache != "":
  66. if ans_str_cache.startswith("搜索"):
  67. nlp_result = '{"action_type":"9", "action_content":"' + ans_str_cache[2:] + '"}'
  68. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  69. else:
  70. local_command = False
  71. # 新判断本地命令集合是否有满足的命令
  72. local_commands = config.local_commands
  73. for row_index, command in enumerate(local_commands):
  74. # 将action_content按逗号分割为列表
  75. action_list = command[3].split(',')
  76. # 检查列表中的任何一个字符串是否存在于ans_str_cache中
  77. local_command = any(action in ans_str_cache for action in action_list)
  78. if local_command:
  79. nlp_result = '{"action_type":"' + str(
  80. command[2]) + '", "action_content":"' + command[4] + '"}'
  81. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  82. break
  83. if not local_command:
  84. # 调用语义引擎
  85. curr_timestamp = time.time()
  86. # 判断是否超过24小时,如果超过,需要调用接口重新申请
  87. # 计算时间戳之间的差值(单位为秒)
  88. time_difference = curr_timestamp - self.init_timestamp
  89. # 将差值转换为小时
  90. hours_difference = time_difference / 3600 # 1小时 = 3600秒
  91. # 如果差值超过24小时
  92. if hours_difference > 24:
  93. self.init_timestamp = curr_timestamp
  94. self.did = lib_nlp_grpc.create_dialog()
  95. nlp_result = lib_nlp_grpc.get_nlp_result(self.did, ans_str_cache)
  96. self._callback(self.gen_q_data(constants.AITEST_GPRC_NLP, nlp_result))
  97. # 重置缓存
  98. ans_str_cache = ""
  99. except grpc.RpcError as e:
  100. print(f"RPC error: {e}")
  101. def start_speech(self):
  102. if not self.running:
  103. self.running = True
  104. # 初始化请求生成器并发送初始化请求
  105. self.response_iterator = self.stub.createRec(self._request_generator())
  106. threading.Thread(target=self._listen_for_responses, daemon=True).start()
  107. # 发送初始请求,例如会话参数
  108. self.request_queue.put(iat_pb2.IatRequest(
  109. sessionParam={"sid": self.sid, "aue": "raw", "rst": "plain", "rate": "16k",
  110. "hotword": self.hotwords,
  111. "domain": "", "appId": "", "bizId": "", "resIdList": "", "personal": ""}, endFlag=False))
  112. def audio_write(self, audio_chunk):
  113. if self.running:
  114. # 向队列中添加音频数据
  115. self.request_queue.put(iat_pb2.IatRequest(samples=audio_chunk, endFlag=False))
  116. def stop_speech(self):
  117. if self.running:
  118. # 可选,发送最后一个音频块后等待
  119. # 实际测试时,这个sleep必须有,不然识别结果不正确
  120. # time.sleep(0.02) # 引擎为16k16bit,每次发送1280大小音频段,那么每次应该等待40ms,目前演示笔每段音频大小是320,间隔20ms
  121. # 发送结束信号
  122. self.request_queue.put(iat_pb2.IatRequest(endFlag=True))
  123. # 发送None以关闭生成器
  124. self.request_queue.put(None)
  125. self.running = False
  126. def close(self):
  127. self.channel.close()
  128. def _callback(self, data: dict):
  129. try:
  130. self.trigger.emit(data)
  131. except Exception as e:
  132. print(2)
  133. print(e)
  134. # 生成json数据模版
  135. def gen_q_data(self, code, data):
  136. return {"code": code, "data": data}
  137. def get_auth_id():
  138. mac = uuid.UUID(int=uuid.getnode()).hex[-12:]
  139. return hashlib.md5(":".join([mac[e:e + 2] for e in range(0, 11, 2)]).encode("utf-8")).hexdigest()