""" 调用讯飞演示笔获取录音 QObject::connect: Cannot queue arguments of type 'QTextCursor' (Make sure 'QTextCursor' is registered using qRegisterMetaType().) """ import os import sys from functools import partial import json import sounddevice as sd import numpy as np import threading import queue import time import pyautogui import win32gui from PyQt5 import uic, QtCore from PyQt5.QtCore import QSize, Qt, QUrl from PyQt5.QtGui import QIcon, QPixmap, QDesktopServices from PyQt5.QtWidgets import QMainWindow, QPushButton, QTextBrowser, QLabel, QApplication, QSystemTrayIcon, QMenu, \ QAction, QVBoxLayout, QHBoxLayout, QWidget, QListWidget, QStackedWidget, QListWidgetItem, QScrollArea import mspeech_ui_thr as mspeech # from libs import lib_aiui, lib_asr_dipai, lib_player, mock_button from libs import lib_player, mock_button import lib_grpc import lib_to_ws from util import constants from pynput import keyboard from windows.DraggableWindow import DraggableWindow from windows.SettingsPage import SettingsPage from windows.AboutPage import AboutPage from FileSettingPage import FileSettingPage from logger_config import logger import config from EverythingResultWin import ResultWindow # from BusinessHadler import BusinessHadler # socketHadler = None # 全局变量 TTS_FLAG = False class MainUI(QMainWindow): def __init__(self): super().__init__(flags=Qt.WindowType.Window) self._run_flag = False self.tts_player = None # 读取配置文件中的引擎类型 self.engine_type = config.config["ENGINE_TYPE"] # 读取配置文件中的客户端模式 self.mode_type = config.config["MODE_TYPE"] # 系统麦克风所需变量 self.samplerate = 16000 self.channels = 1 self.recording = False self._audio_queue = queue.Queue() self.buffer = b"" # 初始化监听快捷键,当客户端模式配置为系统麦克风时才需要初始化 if self.mode_type == "1": self._init_keys_listener() # 初始化演示笔,当客户端模式配置为演示笔时才需要初始化 if self.mode_type == "0": self.init_thr() # 初始话客户端页面 self.init_ui() def init_thr(self): # 子线程 self._mspeech = mspeech.Mspeech() self._mspeech.daemon = True self._mspeech.start() # 处理子线程发送的数据 self._mspeech.sign_thread_send.connect(self._proc) # 业务处理(处理子进程和子线程发送的数据) def _proc(self, result): try: code = result["code"] if code != constants.MSPEECH_AIUI_SEND_DATA: print("UI_main_proc_real", code) if code == constants.MSPEECH_AIUI_SEND_DATA: # aiui QtCore.QTimer.singleShot(0, partial(self._ist.audio_write, result["data"])) elif code == constants.MSPEECH_AIUI_RESET_DICTATION: # aiui self._start_record_pen() elif code == constants.MSPEECH_AIUI_STOP_WS: # aiui self._stop_record_pen() except Exception as e: print(e) pass def init_ui(self): self.resize(800, 600) self.center() self.setWindowTitle('语音智控') self.setWindowIcon(QIcon("images/logo.ico")) self.init_system_tray() self.init_main_ui() self.show() # 初始化识别文本组件 self.init_d_ui() asr_show = config.config["ASR_TXT_SHOW"] if asr_show == "0": self.d_window.hide() else: self.d_window.show() self._init_ist() def init_main_ui(self): '''加载界面ui''' with open('css/QListWidgetQSS.qss', 'r') as f: # 导入QListWidget的qss样式 self.list_style = f.read() # 中心部件 self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.main_layout = QHBoxLayout(self.central_widget) # 窗口的整体布局 self.main_layout.setContentsMargins(0, 0, 0, 0) self.left_widget = QListWidget() # 左侧选项列表 self.left_widget.setStyleSheet(self.list_style) self.main_layout.addWidget(self.left_widget) self.right_widget = QStackedWidget() self.main_layout.addWidget(self.right_widget) self.left_widget.currentRowChanged.connect(self.right_widget.setCurrentIndex) # list和右侧窗口的index对应绑定 self.left_widget.setFrameShape(QListWidget.NoFrame) # 去掉边框 self.left_widget.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOff) # 隐藏滚动条 self.left_widget.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) # 左侧选项的添加 self.item = QListWidgetItem('基础设置', self.left_widget) self.item.setSizeHint(QSize(30, 60)) self.item.setTextAlignment(Qt.AlignCenter) # 居中显示 # 设置页面 self.settingsPage = SettingsPage() self.right_widget.addWidget(self.settingsPage) # 配置修改后的信号 self.settingsPage.floatingWindowEnableCheckBox.stateChanged.connect(self.handleFloatingWindowStateChanged) self.settingsPage.colorLineEdit.textChanged.connect(self.handleColorTextChanged) self.item = QListWidgetItem('个人配置', self.left_widget) self.item.setSizeHint(QSize(30, 60)) self.item.setTextAlignment(Qt.AlignCenter) # 居中显示 # 设置页面 self.fileSettingPage = FileSettingPage(config.local_commands) self.right_widget.addWidget(self.fileSettingPage) self.item = QListWidgetItem('关于', self.left_widget) self.item.setSizeHint(QSize(30, 60)) self.item.setTextAlignment(Qt.AlignCenter) # 居中显示 # 设置页面 self.aboutPage = AboutPage() self.right_widget.addWidget(self.aboutPage) # 默认选中“设置” self.left_widget.setCurrentRow(0) # 初始化可拖拽界面,显示语音识别文字 def init_d_ui(self): # 创建悬浮窗口 self.d_window = DraggableWindow() # 主布局 mainLayout = QVBoxLayout() # 为了居中语音图标,使用水平布局 iconLayout = QHBoxLayout() labelIcon = QLabel() labelIcon.setPixmap(QPixmap("images/123.png").scaled(80, 80, Qt.KeepAspectRatio)) iconLayout.addStretch() # 在图标前面添加伸展空间 iconLayout.addWidget(labelIcon) iconLayout.addStretch() # 在图标后面添加伸展空间 # 设置语音识别文本 self.text_label = QLabel() self.text_label.setWordWrap(True) # 设置文本自动换行 self.text_label.setAlignment(Qt.AlignCenter) # 设置文本居中对齐 asr_color = config.config["ASR_TXT_COLOR"] self.text_label.setStyleSheet(f"color: {asr_color};") # 设置文本颜色 # 创建滚动区域并将文本标签放置其中,解决文本过长时,上下部分被遮挡 scrollArea = QScrollArea() scrollArea.setWidgetResizable(True) # 设置滚动区域自适应大小 scrollArea.setWidget(self.text_label) # 将文本标签放置在滚动区域内 # 设置滚动区域的背景和边框都透明 scrollArea.setStyleSheet("background: transparent; border: none;") mainLayout.addLayout(iconLayout) # 将图标布局添加到主布局 # mainLayout.addWidget(self.text_label) # 将文本标签添加到主布局 mainLayout.addWidget(scrollArea) # 将滚动区域添加到主布局 self.d_window.setLayout(mainLayout) self.d_window.adjustSize() # 获取屏幕的宽度和高度 screen = app.primaryScreen() screen_geometry = screen.geometry() screen_width = screen_geometry.width() screen_height = screen_geometry.height() # 计算悬浮窗口的坐标 window_x = screen_width - self.d_window.width() window_y = screen_height - self.d_window.height() - 80 # 移动悬浮窗口到右下角 self.d_window.move(window_x, window_y) # 初始化系统托盘图标 def init_system_tray(self): # 创建系统托盘图标 self.tray_icon = QSystemTrayIcon(self) self.tray_icon.setIcon(QIcon("images/logo.ico")) # 创建托盘菜单 self.tray_menu = QMenu(self) self.quit_action = QAction("退出", self) # 将动作添加到菜单 self.tray_menu.addAction(self.quit_action) # 将菜单设置为托盘图标的菜单 self.tray_icon.setContextMenu(self.tray_menu) # 连接系统托盘图标的 activated 信号到槽函数,实现在系统托盘点击图标,窗口恢复 self.tray_icon.activated.connect(self.restore_window) self.tray_icon.show() # 显示系统托盘图标 # 系统托盘图标,点击恢复窗口 def restore_window(self, reason): if reason == QSystemTrayIcon.ActivationReason.Trigger: self.showNormal() # 恢复窗口正常大小 # 重写closeEvent方法,阻止关闭窗口,只隐藏 def closeEvent(self, event): event.ignore() # 忽略关闭事件 self.hide() # 隐藏窗口 # 初始化监听快捷键 def _init_keys_listener(self): # OK键 # key = keyboard.Key.enter # 遥控器上的<键,即ESC键 # key = keyboard.Key.esc # 键盘上左侧的Ctrl键 key2 = keyboard.Key.ctrl_r key3 = keyboard.Key.ctrl_l self._btn = mock_button.MockButton([key2, key3], self._hotkey_callback) # 热键监听回调 def _hotkey_callback(self, result): if result == 0: # key down # _param = {"scene": "main", "cloud_vad_eos": "1500", "aue": "opus-wb"} # 命令超时1.5s # _param = {"cloud_vad_eos": "5000"} # self._ist.update_params(_param) # print('key down') QtCore.QTimer.singleShot(0, self._start_record_mic) else: # key up # print('key up') QtCore.QTimer.singleShot(0, self._stop_record_mic) def _open_dir(self): _dir = self.gen_home_doc_path() if sys.platform == "win32": filepath = _dir.replace("\\", "/") QDesktopServices.openUrl(QUrl(filepath, QUrl.StrictMode)) else: os.popen("open %s" % _dir) def gen_home_doc_path(self): dir_name = "VoiceAssistant" if sys.platform == "win32": import ctypes.wintypes path_id = 5 # 5:文档 0:桌面 buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) ctypes.windll.shell32.SHGetFolderPathW(None, path_id, None, 0, buf) path = buf.value elif sys.platform == "darwin": path = os.path.join(os.environ[self.get_home_env()], dir_name) elif sys.platform == "linux": path = os.path.join(os.environ[self.get_home_env()], dir_name) else: path = os.path.join(os.environ[self.get_home_env()], dir_name) dir_path = os.path.join(path, dir_name) if not os.path.exists(dir_path): os.makedirs(dir_path) return dir_path @staticmethod def get_home_env(): if sys.platform == "win32": return 'APPDATA' return 'HOME' # 初始化ist引擎 def _init_ist(self): # AIUI websocket # self._ist = lib_aiui.AiuiManager() # 帝派 websocket # self._ist = lib_asr_dipai.DiPaiASRWebSocket() # grpc服务 # self._ist = lib_grpc.AudioRecorderClient() if self.engine_type == "1": pass # self._ist = lib_aiui.AiuiManager() elif self.engine_type == "2": pass # self._ist = lib_asr_dipai.DiPaiASRWebSocket() else: logger.info(f"引擎类型:{self.engine_type}") self._ist = lib_grpc.AudioRecorderClient() self._ist.trigger.connect(self._proc_aiui) logger.info("ISR引擎初始化成功!") def _tts_player_callback(self, msg): print(msg) if msg == "--end--": # self._tts_play_end = True # self._send_data(my_util.gen_q_data(constants.TTS_STEAM_END, None)) self.tts_player.quit() elif msg == "--stop--": self._tts_play_end = False def tts_play(self, text, vcn=0, speed_mode=0): _duration = lib_player.calculate_duration(text, speed_mode) # self._send_data(my_util.gen_q_data(constants.TTS_STEAM_DURATION, _duration)) # _info = { # "text": text, # "vcn": vcn, # "speed": speed_mode # } # _info = { # "text": text, # "vcn": "50000", # "speed": speed_mode # } _info = { "text": text, "vcn": "bingjie", "speed": "0" } # 上一个播放器处于暂停状态 if self.tts_player and self.tts_player.is_paused(): self.tts_player.resume() return self.tts_player = lib_player.TTSSteamPlayer(self._tts_player_callback) self.tts_player.play(_info) def tts_pause(self): if self.tts_player: self.tts_player.pause() def tts_stop(self): if self.tts_player: self.tts_player.stop() self.tts_player.quit() # 开启录音---演示笔 def _start_record_pen(self): print("开始录音") if not self._run_flag: self._run_flag = True self._start_ist_pen() # 停止录音---演示笔 def _stop_record_pen(self): print("结束录音") self._run_flag = False self._start_ist_pen(stop=True) # 启动识别引擎---演示笔 def _start_ist_pen(self, stop=False): if stop: print("stop") self._ist.stop_speech() else: print("start") self._ist.start_speech() # 开启录音---系统麦克风 def _start_record_mic(self): if not self.recording: print('开启录音') logger.info('开启录音') self.recording = True self.init_record_device() self._recorder.start() threading.Thread(target=self._read_buf_from_cffi_backend, name="read_record_buf").start() self._start_ist_mic() # 停止录音---系统麦克风 def _stop_record_mic(self): print("_stop_record结束录音") self.recording = False if self._recorder: self._recorder.stop() self._recorder.close() self._start_ist_mic(stop=True) self._recorder = None def _start_ist_mic(self, stop=False): if stop: self._ist.stop_speech() else: self._ist.start_speech() threading.Thread(target=self.send_buf_to_engine, name="send_buf_to_engine").start() # 发送音频逻辑 def send_buf_to_engine(self): while self.recording: if not self._audio_queue.empty(): buf = self._audio_queue.get() # print("发送音频数据长度") # print(len(buf)) # self.ist.send_message(buf) QtCore.QTimer.singleShot(0, partial(self._ist.audio_write, buf)) else: time.sleep(0.02) # 初始化录音设备 def init_record_device(self): try: self._recorder = sd.RawInputStream(samplerate=self.samplerate, channels=self.channels, blocksize=640, device=sd.default.device, dtype=np.int16) except Exception as e: self._recorder = None logger.info(f"初始化错误:{e}") else: logger.info(f"设备初始化成功!现在可以开始录音了!当前设置采样率为:16000,声道数为:1") # 从缓冲区读取录音设备录制的音频数据 def _read_buf_from_cffi_backend(self): while self.recording: if self._recorder: data: tuple = self._recorder.read(2) try: buf = bytes(data[0]) # 每块buf长度是4 为适配AIUI 要拼接到640 self.buffer += buf if len(self.buffer) == 640: self._audio_queue.put(self.buffer) self.buffer = b"" except Exception as e: print('----------') print(e) else: print('睡眠1秒') time.sleep(1.0) def _proc_aiui(self, msg): code = msg.get("code") data = msg.get("data") # IST引擎开始 if code == constants.AITEST_AIUI_START: if data: logger.info(f"IST引擎启动成功!") # IST引擎异常 elif code == constants.AITEST_AIUI_ERROR: logger.info(f"IST错误:{data}") # IST引擎结果 elif code == constants.AITEST_AIUI_RESULT: logger.info(f"IST转写结果:{data}") self.text_label.setText(data) elif code == constants.AITEST_AIUI_LOG: logger.info(data) # elif code == constants.AITEST_AIUI_NLP: # logger.info(data) # print('开始业务逻辑') # nlp_date = json.loads(str(data)) # intent = nlp_date['intent'] # if intent != {} and intent['rc'] == 0: # intent_action = intent['semantic'][0]['intent'] # print("intent_action: ", intent_action) # # 遍历词槽 # intent_solts = intent['semantic'][0]['slots'] # print("intent_solts: ", intent_solts) # # # # 方式1:使用这种方式,当语音命令为打开浏览器操作时, # # # webbrowser.open()会阻塞_proc方法,导致AIUI重新发送消息给websocket,会导致_proc再执行一次,从而打开2次浏览器的BUG # # # tts_text = socketHadler.handler(intent_action, intent_solts) # # # 方式2:为解决方式1的BUG,在调用业务方式时,先断开再重连的方式 # # 断开信号连接 # # PYQT6 中测试没有这个问题,断开再重连的代码注释掉 # self._ist.trigger.disconnect(self._proc_aiui) # tts_text = socketHadler.handler(intent_action, intent_solts) # # 重新连接信号 # self._ist.trigger.connect(self._proc_aiui) # # # # 方式3:在socketHadler里面使用以下方式打开,也可以避免打开2次web页面问题 # # # import os # # # os.system('start http://101.37.148.192:8080/') # # if len(tts_text) != 0: # self.tts_stop() # self.tts_play(tts_text) # else: # self.tts_stop() # self.tts_play("我没有理解您说的话") # print("我没有理解你说的话啊") # elif code == constants.AITEST_DIPAI_NLP: # # 帝派语义结果 # print('开始业务逻辑' + data) # # 替换所有的竖线字符 # json_data = data.replace("|", "") # nlp_date = json.loads(str(json_data), strict=False) # results = nlp_date['result']['results'] # print(results) # if results: # intent_action = results[0][0]['mind_name'] # print("intent_action: ", intent_action) # # 遍历词槽 # intent_solts = results[0][0]['slots'] # print("intent_solts: ", intent_solts) # # # # 方式1:使用这种方式,当语音命令为打开浏览器操作时, # # # webbrowser.open()会阻塞_proc方法,导致AIUI重新发送消息给websocket,会导致_proc再执行一次,从而打开2次浏览器的BUG # # # tts_text = socketHadler.handler(intent_action, intent_solts) # # # 方式2:为解决方式1的BUG,在调用业务方式时,先断开再重连的方式 # # 断开信号连接 # # PYQT6 中测试没有这个问题,断开再重连的代码注释掉 # # self.ist.signal.disconnect(self._proc) # tts_text = socketHadler.handler_dipai(intent_action, intent_solts) # # 重新连接信号 # # self.ist.signal.connect(self._proc) # # # # 方式3:在socketHadler里面使用以下方式打开,也可以避免打开2次web页面问题 # # # import os # # # os.system('start http://101.37.148.192:8080/') # # if len(tts_text) != 0: # self.tts_stop() # self.tts_play(tts_text) # else: # self.tts_play("我没有理解您说的话") # print("我没有理解你说的话啊") elif code == constants.AITEST_GPRC_NLP: print(f'开始业务逻辑AITEST_GPRC_NLP: {data}') nlp_date = json.loads(str(data)) # 通过json中包含的data,判断是经过语义理解的 if 'data' in nlp_date: output = nlp_date['data']['output'] action_tag = output[:2] # 截取前2个字符 if action_tag == "##": # 流程节点 action_type = output[2] # 截取第4个字符 action_content = output[3:] # 截取第4个字符 # 1:tts播报 2:播放录音 3:打开网页 4:页面跳转 5:大屏切换 6:自定义指令 if action_type == "1": self.text_label.setText(action_content) self.tts_stop() self.tts_play(action_content) elif action_type == "2": pass elif action_type == "3": os.system(f'start {action_content}') time.sleep(2) # 根据实际情况调整等待时间 # 发送 F11 键按下事件 pyautogui.press('f11') elif action_type == "5": pass else: lib_to_ws.send_action_ws(action_type, action_content) else: # 问答知识库,直接播报 if output: self.text_label.setText(output) self.tts_stop() self.tts_play(output) elif 'action_type' in nlp_date: action_type = nlp_date['action_type'] action_content = nlp_date['action_content'] if action_type == "0": os.system(f'start {action_content}') elif action_type == "1": os.system(f'start {action_content}') elif action_type == "2": if action_content == "上一页": pyautogui.press('pageup') elif action_content == "下一页": pyautogui.press('pagedown') else: pass elif action_type == "3": if action_content == "previous_window": # 模拟按下 Alt+Tab 键组合 pyautogui.keyDown('alt') pyautogui.press('tab') pyautogui.keyUp('alt') else: self.switch_to_window(action_content) elif action_type == "9": self.e_window = ResultWindow([]) # 创建 ResultWindow 对象,传递一个空的数据列表 self.e_window.searchIatFiles(action_content) # 调用 searchFiles 方法 self.e_window.setWindowFlags(Qt.WindowStaysOnTopHint) self.e_window.show() # 显示窗口 else: pass else: self.tts_stop() self.tts_play("我没有理解您说的话") print("我没有理解你说的话啊") else: pass def center(self): qr = self.frameGeometry() cp = self.screen().availableGeometry().center() qr.moveCenter(cp) self.move(qr.topLeft()) def handleFloatingWindowStateChanged(self, state): if state == Qt.Checked: self.d_window.show() else: self.d_window.hide() def handleColorTextChanged(self, text): self.text_label.setStyleSheet(f"color: {text};") # 设置文本颜色 def switch_to_window(self, window_title): # 获取所有顶级窗口的句柄和标题 def enum_handler(hwnd, window_titles): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) if title: window_titles.append((hwnd, title)) window_titles = [] win32gui.EnumWindows(enum_handler, window_titles) print(window_titles) # 遍历窗口标题,找到匹配的窗口 for hwnd, title in window_titles: if window_title in title: print(title) # 切换到指定窗口 # win32gui.ShowWindow(hwnd, 9) # 还原窗口 win32gui.SetForegroundWindow(hwnd) # 将窗口置于前台 time.sleep(0.1) # 等待一小段时间确保窗口已激活 # 确保窗口最大化 win32gui.ShowWindow(hwnd, 3) return True return False # def create_socket_handler(): # print('create_socket_handler') # global socketHadler # if socketHadler is None: # socketHadler = BusinessHadler() # app.aboutToQuit.connect(socketHadler.cleanup) # 关闭应用时进行清理操作 if __name__ == '__main__': app = QApplication(sys.argv) win = MainUI() # 为菜单动作添加槽函数 win.quit_action.triggered.connect(app.quit) # create_socket_handler() sys.exit(app.exec_())