|
import queue |
|
import pyaudio |
|
|
|
|
|
RATE = 16000 |
|
CHUNK = int(RATE / 10) |
|
|
|
class MicrophoneStream: |
|
"""Opens a recording stream as a generator yielding the audio chunks.""" |
|
|
|
def __init__(self: object, rate: int = RATE, chunk: int = CHUNK) -> None: |
|
"""The audio -- and generator -- is guaranteed to be on the main thread.""" |
|
self._rate = rate |
|
self._chunk = chunk |
|
|
|
|
|
self._buff = queue.Queue() |
|
self.closed = True |
|
|
|
def __enter__(self: object) -> object: |
|
self._audio_interface = pyaudio.PyAudio() |
|
self._audio_stream = self._audio_interface.open( |
|
format=pyaudio.paInt16, |
|
|
|
|
|
channels=1, |
|
rate=self._rate, |
|
input=True, |
|
frames_per_buffer=self._chunk, |
|
|
|
|
|
|
|
stream_callback=self._fill_buffer, |
|
input_device_index=0 |
|
) |
|
|
|
self.closed = False |
|
|
|
return self |
|
|
|
def __exit__( |
|
self: object, |
|
type: object, |
|
value: object, |
|
traceback: object, |
|
) -> None: |
|
"""Closes the stream, regardless of whether the connection was lost or not.""" |
|
self._audio_stream.stop_stream() |
|
self._audio_stream.close() |
|
self.closed = True |
|
|
|
|
|
self._buff.put(None) |
|
self._audio_interface.terminate() |
|
|
|
def _fill_buffer( |
|
self: object, |
|
in_data: object, |
|
frame_count: int, |
|
time_info: object, |
|
status_flags: object, |
|
) -> object: |
|
"""Continuously collect data from the audio stream, into the buffer. |
|
|
|
Args: |
|
in_data: The audio data as a bytes object |
|
frame_count: The number of frames captured |
|
time_info: The time information |
|
status_flags: The status flags |
|
|
|
Returns: |
|
The audio data as a bytes object |
|
""" |
|
self._buff.put(in_data) |
|
return None, pyaudio.paContinue |
|
|
|
def generator(self: object) -> object: |
|
"""Generates audio chunks from the stream of audio data in chunks. |
|
|
|
Args: |
|
self: The MicrophoneStream object |
|
|
|
Returns: |
|
A generator that outputs audio chunks. |
|
""" |
|
while not self.closed: |
|
|
|
|
|
|
|
chunk = self._buff.get() |
|
if chunk is None: |
|
return |
|
data = [chunk] |
|
|
|
|
|
while True: |
|
try: |
|
chunk = self._buff.get(block=False) |
|
if chunk is None: |
|
return |
|
data.append(chunk) |
|
except queue.Empty: |
|
break |
|
|
|
yield b"".join(data) |