详解python的webrtc库实现语音端点检测

时间:2021-10-06 06:01:55

引言

语音端点检测最早应用于电话传输和检测系统当中,用于通信信道的时间分配,提高传输线路的利用效率.端点检测属于语音处理系统的前端操作,在语音检测领域意义重大.

但是目前的语音端点检测,尤其是检测 人声 开始和结束的端点始终是属于技术难点,各家公司始终处于 能判断,但是不敢保证 判别准确性 的阶段.

详解python的webrtc库实现语音端点检测

现在基于云端语义库的聊天机器人层出不穷,其中最著名的当属amazon的 Alexa/Echo 智能音箱.

详解python的webrtc库实现语音端点检测

国内如雨后春笋般出现了各种搭载语音聊天的智能音箱(如前几天在知乎上广告的若琪机器人)和各类智能机器人产品.国内语音服务提供商主要面对中文语音服务,由于语音不像图像有分辨率等等较为客观的指标,很多时候凭主观判断,所以较难判断各家语音识别和合成技术的好坏.但是我个人认为,国内的中文语音服务和国外的英文语音服务,在某些方面已经有超越的趋势.

详解python的webrtc库实现语音端点检测

通常搭建机器人聊天系统主要包括以下三个方面: 

  1.  语音转文字(ASR/STT)
  2.  语义内容(NLU/NLP)
  3.  文字转语音(TTS)

语音转文字(ASR/STT)

在将语音传给云端API之前,是本地前端的语音采集,这部分主要包括如下几个方面: 

  1.  麦克风降噪
  2.  声源定位
  3.  回声消除
  4.  唤醒词
  5.  语音端点检测
  6.  音频格式压缩

python 端点检测

由于实际应用中,单纯依靠能量检测特征检测等方法很难判断人声说话的起始点,所以市面上大多数的语音产品都是使用唤醒词判断语音起始.另外加上声音回路,还可以做语音打断.这样的交互方式可能有些傻,每次必须喊一下 唤醒词 才能继续聊天.这种方式聊多了,个人感觉会嘴巴疼:-O .现在github上有snowboy唤醒词的开源库,大家可以登录snowboy官网训练自己的唤醒词模型. 

  1.  Kitt-AI : Snowboy 
  2.  Sensory : Sensory

考虑到用唤醒词嘴巴会累,所以大致调研了一下,Python拥有丰富的库,直接import就能食用.这种方式容易受强噪声干扰,适合一个人在家玩玩. 

  1.  pyaudio: pip install pyaudio 可以从设备节点读取原始音频流数据,音频编码是PCM格式;
  2.  webrtcvad: pip install webrtcvad 检测判断一组语音数据是否为空语音;

当检测到持续时间长度 T1 vad检测都有语音活动,可以判定为语音起始;

当检测到持续时间长度 T2 vad检测都没有有语音活动,可以判定为语音结束;

完整程序代码可以从我的github下载

程序很简单,相信看一会儿就明白了

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
'''
Requirements:
+ pyaudio - `pip install pyaudio`
+ py-webrtcvad - `pip install webrtcvad`
'''
import webrtcvad
import collections
import sys
import signal
import pyaudio
 
from array import array
from struct import pack
import wave
import time
 
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK_DURATION_MS = 30    # supports 10, 20 and 30 (ms)
PADDING_DURATION_MS = 1500  # 1 sec jugement
CHUNK_SIZE = int(RATE CHUNK_DURATION_MS / 1000) # chunk to read
CHUNK_BYTES = CHUNK_SIZE 2 # 16bit = 2 bytes, PCM
NUM_PADDING_CHUNKS = int(PADDING_DURATION_MS / CHUNK_DURATION_MS)
# NUM_WINDOW_CHUNKS = int(240 / CHUNK_DURATION_MS)
NUM_WINDOW_CHUNKS = int(400 / CHUNK_DURATION_MS) # 400 ms/ 30ms ge
NUM_WINDOW_CHUNKS_END = NUM_WINDOW_CHUNKS 2
 
START_OFFSET = int(NUM_WINDOW_CHUNKS CHUNK_DURATION_MS 0.5 RATE)
 
vad = webrtcvad.Vad(1)
 
pa = pyaudio.PyAudio()
stream = pa.open(format=FORMAT,
         channels=CHANNELS,
         rate=RATE,
         input=True,
         start=False,
         # input_device_index=2,
         frames_per_buffer=CHUNK_SIZE)
 
 
got_a_sentence = False
leave = False
 
 
def handle_int(sig, chunk):
  global leave, got_a_sentence
  leave = True
  got_a_sentence = True
 
 
def record_to_file(path, data, sample_width):
  "Records from the microphone and outputs the resulting data to 'path'"
  # sample_width, data = record()
  data = pack('<' + ('h' len(data)), data)
  wf = wave.open(path, 'wb')
  wf.setnchannels(1)
  wf.setsampwidth(sample_width)
  wf.setframerate(RATE)
  wf.writeframes(data)
  wf.close()
 
 
def normalize(snd_data):
  "Average the volume out"
  MAXIMUM = 32767 # 16384
  times = float(MAXIMUM) / max(abs(i) for i in snd_data)
  r = array('h')
  for i in snd_data:
    r.append(int(i times))
  return r
 
signal.signal(signal.SIGINT, handle_int)
 
while not leave:
  ring_buffer = collections.deque(maxlen=NUM_PADDING_CHUNKS)
  triggered = False
  voiced_frames = []
  ring_buffer_flags = [0] NUM_WINDOW_CHUNKS
  ring_buffer_index = 0
 
  ring_buffer_flags_end = [0] NUM_WINDOW_CHUNKS_END
  ring_buffer_index_end = 0
  buffer_in = ''
  # WangS
  raw_data = array('h')
  index = 0
  start_point = 0
  StartTime = time.time()
  print(" recording: ")
  stream.start_stream()
 
  while not got_a_sentence and not leave:
    chunk = stream.read(CHUNK_SIZE)
    # add WangS
    raw_data.extend(array('h', chunk))
    index += CHUNK_SIZE
    TimeUse = time.time() - StartTime
 
    active = vad.is_speech(chunk, RATE)
 
    sys.stdout.write('1' if active else '_')
    ring_buffer_flags[ring_buffer_index] = 1 if active else 0
    ring_buffer_index += 1
    ring_buffer_index %= NUM_WINDOW_CHUNKS
 
    ring_buffer_flags_end[ring_buffer_index_end] = 1 if active else 0
    ring_buffer_index_end += 1
    ring_buffer_index_end %= NUM_WINDOW_CHUNKS_END
 
    # start point detection
    if not triggered:
      ring_buffer.append(chunk)
      num_voiced = sum(ring_buffer_flags)
      if num_voiced > 0.8 NUM_WINDOW_CHUNKS:
        sys.stdout.write(' Open ')
        triggered = True
        start_point = index - CHUNK_SIZE 20 # start point
        # voiced_frames.extend(ring_buffer)
        ring_buffer.clear()
    # end point detection
    else:
      # voiced_frames.append(chunk)
      ring_buffer.append(chunk)
      num_unvoiced = NUM_WINDOW_CHUNKS_END - sum(ring_buffer_flags_end)
      if num_unvoiced > 0.90 NUM_WINDOW_CHUNKS_END or TimeUse > 10:
        sys.stdout.write(' Close ')
        triggered = False
        got_a_sentence = True
 
    sys.stdout.flush()
 
  sys.stdout.write('\n')
  # data = b''.join(voiced_frames)
 
  stream.stop_stream()
  print(" done recording")
  got_a_sentence = False
 
  # write to file
  raw_data.reverse()
  for index in range(start_point):
    raw_data.pop()
  raw_data.reverse()
  raw_data = normalize(raw_data)
  record_to_file("recording.wav", raw_data, 2)
  leave = True
 
stream.close()

程序运行方式sudo python vad.py

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:http://blog.csdn.net/u012123989/article/details/72771667