Files
drb-client-discord-bot/app/internal/NoiseGatev2.py
Logan Cusano 0a0d8d3af9 fix typo
2025-07-14 21:52:08 -04:00

116 lines
4.1 KiB
Python

import audioop
import math
import pyaudio
import asyncio
from discord import VoiceClient
from internal.logger import create_logger
from discord.opus import OpusNotLoaded
LOGGER = create_logger(__name__)
# Constants for audio processing
SAMPLES_PER_FRAME = 960 # For 20ms audio at 48kHz
CHANNELS = 2
SAMPLE_RATE = 48000
FRAME_SIZE = SAMPLES_PER_FRAME * CHANNELS * 2 # 16-bit PCM (2 bytes)
class AudioTransmitter:
def __init__(self, voice_client: VoiceClient, noise_gate_threshold: int, loop: asyncio.AbstractEventLoop, input_device_index: int):
if not voice_client.is_connected():
raise ValueError("VoiceClient is not connected.")
if not hasattr(voice_client, 'encoder') or not voice_client.encoder:
raise OpusNotLoaded("VoiceClient has not initialized its Opus encoder.")
self.voice_client = voice_client
self.threshold = noise_gate_threshold
self.loop = loop
self.input_device_index = input_device_index
self.pa = pyaudio.PyAudio()
self.stream = self.pa.open(
format=pyaudio.paInt16,
channels=CHANNELS,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=SAMPLES_PER_FRAME,
input_device_index=self.input_device_index
)
self._is_running = False
self._is_speaking = False
self.ng_fadeout_count = 0
self.NG_FADEOUT_FRAMES = 12 # 240ms fadeout time
async def _set_speaking(self, speaking: bool):
"""Safely sets the speaking state if it has changed."""
if self._is_speaking != speaking:
self._is_speaking = speaking
await self.voice_client.ws.speak(speaking)
async def start(self):
"""Starts the main audio transmission loop."""
self._is_running = True
self.stream.start_stream()
LOGGER.info("Audio transmitter started.")
try:
while self._is_running:
# Read audio data in a separate thread to not block the event loop
pcm_data = await self.loop.run_in_executor(
None, self.stream.read, SAMPLES_PER_FRAME
)
gate_is_open = self._check_noise_gate(pcm_data)
if gate_is_open:
# If gate is open, ensure speaking is on and send audio
await self._set_speaking(True)
# Encode PCM data to Opus
encoded_packets = self.voice_client.encoder.encode(pcm_data, SAMPLES_PER_FRAME)
# Send each encoded packet
for packet in encoded_packets:
self.voice_client.send_audio_packet(packet)
else:
# If gate is closed, ensure speaking is off
await self._set_speaking(False)
# Wait for the next 20ms interval
await asyncio.sleep(0.02)
except asyncio.CancelledError:
LOGGER.info("Audio transmitter task cancelled.")
except Exception as e:
LOGGER.error(f"Error in audio transmitter loop: {e}", exc_info=True)
finally:
await self._cleanup()
def _check_noise_gate(self, pcm_data: bytes) -> bool:
"""Applies the noise gate logic to raw PCM data."""
rms = audioop.rms(pcm_data, 2)
if rms == 0: return False
db = 20 * math.log10(rms)
if db >= self.threshold:
self.ng_fadeout_count = self.NG_FADEOUT_FRAMES
return True
elif self.ng_fadeout_count > 0:
self.ng_fadeout_count -= 1
return True
return False
async def stop(self):
"""Stops the transmission loop."""
self._is_running = False
async def _cleanup(self):
"""Cleans up all resources."""
LOGGER.info("Cleaning up transmitter resources.")
if self._is_speaking:
await self._set_speaking(False)
if self.stream.is_active():
self.stream.stop_stream()
self.stream.close()
self.pa.terminate()