-
-
Notifications
You must be signed in to change notification settings - Fork 613
Open
Labels
enhancementNew feature or requestNew feature or request
Description
🚀 需求描述
客户端实现的打断,比较粗糙,供大家参考
打断后的前面两个字不容易识别,可以先说小智后在说需求
🎯 解决方案
使用了vad人声检测和resemblyzer声纹检测
🛠️ 相关代码
src/audio_processing/vad.py
import numpy as np
import webrtcvad
import sounddevice as sd # 用于实时音频采集
from resemblyzer import VoiceEncoder
from collections import deque
import threading
import time
import librosa
import logging
# 配置日志
logger = logging.getLogger(__name__)
class RealTimeSpeakerDetector:
def __init__(self, target_voiceprint_path, threshold=0.6, sample_rate=16000, chunk_size=480):
"""
初始化实时声纹检测器
参数:
target_voiceprint_path: 目标说话人的语音文件路径
threshold: 相似度阈值(0-1)
sample_rate: 音频采样率(Hz)
chunk_size: 每次处理的音频样本数(推荐480对应30ms@16kHz)
"""
self.encoder = VoiceEncoder()
self.threshold = threshold
self.vad = webrtcvad.Vad(2) # 高敏感度1-3越大越灵敏
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.audio_buffer = deque(maxlen=sample_rate * 3) # 3秒缓冲
self.running = False
self.result = None
self.lock = threading.Lock()
self.target_voiceprint = self._load_voiceprint(target_voiceprint_path)
self.stream = None
def _load_voiceprint(self, path):
"""加载目标声纹"""
try:
audio, _ = librosa.load(path, sr=self.sample_rate, mono=True)
return self.encoder.embed_utterance(audio)
except Exception as e:
logger.error(f"加载声纹文件失败: {e}")
return None
def _detect_voice_activity(self, audio):
"""检测语音活动"""
try:
# 确保音频数据是有效的
if audio is None or len(audio) == 0:
return False
# Convert float32 to int16 PCM
audio_int16 = (audio * 32767).astype(np.int16)
frame_length = int(self.sample_rate * 0.03) # 30ms帧
# 确保帧长度有效
if frame_length <= 0:
return False
frames = [audio_int16[i:i+frame_length] for i in range(0, len(audio_int16), frame_length)]
# 过滤掉长度不足的帧
valid_frames = [f for f in frames if len(f) == frame_length]
if not valid_frames:
return False
return any(self.vad.is_speech(f.tobytes(), self.sample_rate) for f in valid_frames)
except Exception as e:
logger.error(f"语音活动检测失败: {e}")
return False
def _audio_callback(self, indata, frames, time, status):
"""音频输入回调函数"""
try:
# 检查状态
if status:
logger.warning(f"音频回调状态: {status}")
# 确保输入数据有效
if indata is None or indata.size == 0:
return
# 取单声道
audio_chunk = indata[:, 0] if indata.ndim > 1 else indata
# 添加到缓冲区
self.audio_buffer.extend(audio_chunk)
# 当缓冲足够时进行处理
if len(self.audio_buffer) >= self.sample_rate: # 1秒音频
with self.lock:
try:
# 取最近1秒
audio_segment = np.array(list(self.audio_buffer)[-self.sample_rate:])
# 检查是否有语音活动
if self._detect_voice_activity(audio_segment):
# 计算声纹相似度
current_voiceprint = self.encoder.embed_utterance(audio_segment)
# 确保目标声纹有效
if self.target_voiceprint is not None:
similarity = np.dot(current_voiceprint, self.target_voiceprint) / (
np.linalg.norm(current_voiceprint) * np.linalg.norm(self.target_voiceprint))
self.result = similarity > self.threshold
else:
self.result = False
else:
self.result = False
except Exception as e:
logger.error(f"处理音频段时出错: {e}")
self.result = False
except Exception as e:
logger.error(f"音频回调出错: {e}")
self.result = False
def start_detection(self):
"""开始实时检测"""
try:
self.running = True
self.result = None
# 启动音频流
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
blocksize=self.chunk_size,
callback=self._audio_callback,
dtype='float32'
)
self.stream.start()
logger.info("声纹检测已启动")
except Exception as e:
logger.error(f"启动声纹检测失败: {e}")
self.running = False
def stop_detection(self):
"""停止检测"""
try:
self.running = False
if self.stream is not None:
self.stream.stop()
self.stream.close()
self.stream = None
logger.info("声纹检测已停止")
except Exception as e:
logger.error(f"停止声纹检测失败: {e}")
# 暂停检测
def pause_detection(self):
self.stream.stop()
# 恢复检测
def resume_detection(self):
self.stream.start()
def get_result(self):
"""获取当前检测结果"""
with self.lock:
return self.result
src/application.py
def __init__(self):
....
# 检测声纹
try:
self.audio_detector = RealTimeSpeakerDetector("audios/xxx.wav")
logger.info("声纹检测器初始化成功")
except Exception as e:
logger.error(f"声纹检测器初始化失败: {e}")
self.audio_detector = None
def run(self):
....
# 启动检测
logger.debug("启动声纹检测")
self.audio_detector.start_detection()
self.audio_detector.pause_detection() # 暂停检测 仅在TTS播放时开始
def _handle_voice_start(self):
self.audio_detector.resume_detection()
# 启动监控线程
print("启动声纹监控线程")
def monitor_audio():
try:
time.sleep(1)
# 确保音频检测器已初始化
if not hasattr(self, 'audio_detector') or self.audio_detector is None:
logger.warning("音频检测器未初始化,跳过声纹检测")
return
# 监控循环
while self.is_tts_playing:
time.sleep(0.2) # 每1000ms检查一次
try:
if self.audio_detector and self.audio_detector.get_result():
# 如果正在播放TTS且检测到人在说话,则触发中止
logger.info("检测到人在说话,触发中止")
self.audio_detector.pause_detection()
self.abort_speaking(AbortReason.WAKE_WORD_DETECTED)
break
else:
logger.info("未检测到人在说话,继续播放")
except Exception as e:
logger.error(f"声纹检测监控出错: {e}")
# 出错时不要中断循环,继续尝试
# # 停止检测
# if self.audio_detector:
# self.audio_detector.stop_detection()
except Exception as e:
logger.error(f"声纹检测线程出错: {e}")
# 启动监控线程
threading.Thread(target=monitor_audio, daemon=True).start()
def _handle_tts_start(self):
"""处理TTS开始事件"""
self.aborted = False
self.is_tts_playing = True
# 清空可能存在的旧音频数据
self.audio_codec.clear_audio_queue()
if self.device_state == DeviceState.IDLE or self.device_state == DeviceState.LISTENING:
self.set_device_state(DeviceState.SPEAKING)
self._handle_voice_start()
# # 注释掉恢复VAD检测器的代码
# if hasattr(self, 'vad_detector') and self.vad_detector:
# self.vad_detector.resume()
def shutdown(self):
.....
# 停止声纹检测
if self.audio_detector:
self.audio_detector.stop_detection()
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request