import pyaudio import struct import numpy import time from threading import Thread sound_buffer = [] class AudioStream(Thread): def __init__(self, _channels: int = 1, _sample_rate: int = 48000, _frames_per_buffer: int = 2048, _input_device_index: int = None, _output_device_index: int = None, _input: bool = True, _output: bool = True): super(AudioStream, self).__init__() self.paInstance_kwargs = { 'format': pyaudio.paFloat32, 'channels': _channels, 'rate': _sample_rate, 'input': _input, 'output': _output, 'frames_per_buffer': _frames_per_buffer, 'stream_callback': callback } if _input_device_index: if _input: self.paInstance_kwargs['input_device_index'] = _input_device_index else: print(f"[AudioStream.__init__]:\tInput was not enabled." f" Reinitialize with '_input=True'") if _output_device_index: if _output: self.paInstance_kwargs['output_device_index'] = _output_device_index else: print(f"[AudioStream.__init__]:\tOutput was not enabled." f" Reinitialize with '_output=True'") # Init PyAudio instance self.paInstance = pyaudio.PyAudio() # Define and initialize stream object if we have been passed a device ID (pyaudio.open) self.stream = None if _output_device_index and _input_device_index: self.init_stream() # temp section def init_stream(self, _new_output_device_index: int = None, _new_input_device_index: int = None): # Check what device was asked to be changed (or set) if _new_input_device_index: if self.paInstance_kwargs['input']: self.paInstance_kwargs['input_device_index'] = _new_input_device_index else: print(f"[AudioStream.init_stream]:\tInput was not enabled when initialized." f" Reinitialize with '_input=True'") if _new_output_device_index: if self.paInstance_kwargs['output']: self.paInstance_kwargs['output_device_index'] = _new_output_device_index else: print(f"[AudioStream.init_stream]:\tOutput was not enabled when initialized." f" Reinitialize with '_output=True'") self.close_if_open() # Reopen the stream self.stream = self.paInstance.open(**self.paInstance_kwargs) def close_if_open(self): # Stop the stream if it is started if self.stream: if self.stream.is_active(): self.stream.stop_stream() self.stream.close() print(f"[ReopenStream.close_if_open]:\t Stream was open; It was closed.") def list_devices(self, _show_input_devices: bool = True, _show_output_devices: bool = True): info = self.paInstance.get_host_api_info_by_index(0) numdevices = info.get('deviceCount') for i in range(0, numdevices): if (self.paInstance.get_device_info_by_host_api_device_index(0, i).get('maxInputChannels')) > 0: if _show_input_devices: print("Input Device id ", i, " - ", self.paInstance.get_device_info_by_host_api_device_index(0, i).get('name')) if (self.paInstance.get_device_info_by_host_api_device_index(0, i).get('maxOutputChannels')) > 0: if _show_output_devices: print("Output Device id ", i, " - ", self.paInstance.get_device_info_by_host_api_device_index(0, i).get('name')) class NoiseGate(AudioStream): def __init__(self, **kwargs): super().__init__(**kwargs) def run(self) -> None: # Start the audio stream self.stream.start_stream() global sound_buffer # While the stream is running, display the stream buffer in floats? while self.stream.is_active(): if len(sound_buffer) > 0: for buffer in sound_buffer: volume_normalization = numpy.linalg.norm(numpy.fromstring(buffer)) * 10 print(str(float(volume_normalization))) self.stream.stop_stream() self.stream.close() self.paInstance.terminate() def callback(in_data, frame_count, time_info, status): global sound_buffer sound_buffer.append(in_data) return in_data, pyaudio.paContinue if __name__ == '__main__': input_index = int(input("Input:\t")) output_index = int(input("Output:\t")) ng = NoiseGate(_input_device_index=input_index, _output_device_index=output_index) ng.start()