import os import time import argparse import wave import pyaudio from pydub import AudioSegment from datetime import datetime from NoiseGatev2 import NoiseGateStream, AudioStream, math, audioop from debugger import setup_logger # Configure logging log = setup_logger('NoiseGatev2') # Parameters for the audio stream NOISE_GATE_THRESHOLD = -50 # Adjust the threshold as needed INPUT_DEVICE_INDEX = None # Set your input device index here # Create the recordings directory if it doesn't exist RECORDINGS_DIR = 'recordings' if not os.path.exists(RECORDINGS_DIR): os.makedirs(RECORDINGS_DIR) # Create an instance of NoiseGate class NoiseGate(AudioStream): def __init__(self, _noise_gate_threshold: int, _input_device_index: int = None, **kwargs): super(NoiseGate, self).__init__(_input_device_index=_input_device_index, **kwargs) self.THRESHOLD = _noise_gate_threshold self.stream.start_stream() def read(self): try: curr_buffer = self.stream.read(960, exception_on_overflow=False) # Read audio data from the stream buffer_rms = audioop.rms(curr_buffer, 2) # Calculate RMS of the audio data buffer_decibel = 20 * math.log10(buffer_rms) if buffer_rms > 0 else -float('inf') if buffer_decibel >= self.THRESHOLD: log.debug(f"[Noisegate Open] {buffer_decibel} dB") return bytes(curr_buffer) # Return audio data if above threshold log.debug(f"[Noisegate Closed] {buffer_decibel} dB") return None # Return None if below threshold except Exception as e: log.warning(e) return None def close(self): log.debug(f"Closing") self.close_if_open() self.paInstance.terminate() def get_filename(): """Generate a filename based on the current date and time.""" return os.path.join(RECORDINGS_DIR, datetime.now().strftime("%Y-%m-%d_%H-%M-%S.wav")) def save_wave_file(frames, filename, ng): audio_segment = AudioSegment(b''.join(frames), sample_width=ng.paInstance.get_sample_size(pyaudio.paInt16), frame_rate=48000, channels=2) """Save the given audio segment to a WAV file in the recordings directory.""" audio_segment.export(filename, format="mp3") print(f"Saved recording: {filename}") def record_transmissions(): noise_gate = NoiseGate(_noise_gate_threshold=-50, _input_device_index=INPUT_DEVICE_INDEX) frames = [] recording = False print("Listening for transmissions...") while True: audio_data = noise_gate.read() # Get audio data from the NoiseGateStream if audio_data: # If there's audio data (not silent) if not recording: print("Transmission detected, starting recording...") recording = True frames = [] # Reset frames for the new recording frames.append(audio_data) # Collect audio data elif recording: # Silence detected, stop the recording print("Silence detected, stopping recording...") filename = get_filename() save_wave_file(frames, filename, noise_gate) recording = False # Reset recording state frames.clear() # Clear frames for the next transmission print("Recording stopped, waiting for the next transmission...") time.sleep(0.1) # Short delay to avoid busy waiting def parse_arguments(): parser = argparse.ArgumentParser() parser.add_argument("device_id", type=int, help="The ID of the audio device to use") return parser.parse_args() if __name__ == "__main__": try: args = parse_arguments() print("Arguments: %s", args) INPUT_DEVICE_INDEX = args.device_id record_transmissions() except KeyboardInterrupt: print("Stopping...")