Compare commits

8 Commits

Author SHA1 Message Date
Logan Cusano
2cbdb8b364 Added work-in-progress recorder
- Will always listen to the audio device using the noisegate
- When the noisegate is triggered it will begin recording, when the noisegate closes, the recording will be saved
- Currently saving corrupt data
2024-09-15 14:01:22 -04:00
Logan Cusano
549a87bed0 Add presense endpoint and client function 2024-05-05 18:28:00 -04:00
Logan Cusano
098dfc9a10 Update PDAB
- Added standard debugger file for logging
- Added fault handler to see what happens if it faults
2024-04-28 03:17:08 -04:00
Logan Cusano
fc6c114473 Update gitignore to ignore logs 2024-04-20 20:10:46 -04:00
Logan Cusano
91e8e90998 Add updated changes from DRB repo 2024-04-15 00:16:39 -04:00
Logan Cusano
792e6d7a55 Updated naming of guildId to match server 2024-04-14 16:36:19 -04:00
Logan Cusano
de763858b6 Update requirements with better versioning 2024-04-14 16:12:55 -04:00
Logan Cusano
e84adaa9c4 Update for DRBv3 2024-04-03 23:25:31 -04:00
7 changed files with 395 additions and 303 deletions

3
.gitignore vendored
View File

@@ -2,4 +2,5 @@
*__pycache__/
*.html
*.exe
LICENSE
LICENSE
*.log

View File

@@ -6,10 +6,12 @@ import time
import pyaudio
import discord
import numpy
from debugger import setup_logger
voice_connection = None
LOGGER = logging.getLogger("Discord_Radio_Bot.NoiseGateV2")
# Configure logging
logger = setup_logger('NoiseGatev2')
# noinspection PyUnresolvedReferences
@@ -30,19 +32,19 @@ class AudioStream:
if _input:
self.paInstance_kwargs['input_device_index'] = _input_device_index
else:
LOGGER.warning(f"[AudioStream.__init__]:\tInput was not enabled."
logger.warning(f"[AudioStream.__init__]:\tInput was not enabled."
f" Reinitialize with '_input=True'")
if _output_device_index:
if _output:
self.paInstance_kwargs['output_device_index'] = _output_device_index
else:
LOGGER.warning(f"[AudioStream.__init__]:\tOutput was not enabled."
logger.warning(f"[AudioStream.__init__]:\tOutput was not enabled."
f" Reinitialize with '_output=True'")
if _init_on_startup:
# Init PyAudio instance
LOGGER.info("Creating PyAudio instance")
logger.info("Creating PyAudio instance")
self.paInstance = pyaudio.PyAudio()
# Define and initialize stream object if we have been passed a device ID (pyaudio.open)
@@ -50,27 +52,30 @@ class AudioStream:
if _output_device_index or _input_device_index:
if _init_on_startup:
LOGGER.info("Init stream")
logger.info("Init stream")
self.init_stream()
def init_stream(self, _new_output_device_index: int = None, _new_input_device_index: int = None):
logger.info("Checking what device has been set if any")
# Check what device was asked to be changed (or set)
if _new_input_device_index:
if self.paInstance_kwargs['input']:
self.paInstance_kwargs['input_device_index'] = _new_input_device_index
else:
LOGGER.warning(f"[AudioStream.init_stream]:\tInput was not enabled when initialized."
logger.warning(f"[AudioStream.init_stream]:\tInput was not enabled when initialized."
f" Reinitialize with '_input=True'")
if _new_output_device_index:
if self.paInstance_kwargs['output']:
self.paInstance_kwargs['output_device_index'] = _new_output_device_index
else:
LOGGER.warning(f"[AudioStream.init_stream]:\tOutput was not enabled when initialized."
logger.warning(f"[AudioStream.init_stream]:\tOutput was not enabled when initialized."
f" Reinitialize with '_output=True'")
logger.info("Close the stream if it's open")
self.close_if_open()
logger.info("Open the audio stream")
# Open the stream
self.stream = self.paInstance.open(**self.paInstance_kwargs)
@@ -80,10 +85,10 @@ class AudioStream:
if self.stream.is_active():
self.stream.stop_stream()
self.stream.close()
LOGGER.debug(f"[ReopenStream.close_if_open]:\t Stream was open; It was closed.")
logger.debug(f"[ReopenStream.close_if_open]:\t Stream was open; It was closed.")
def list_devices(self, _display_input_devices: bool = True, _display_output_devices: bool = True):
LOGGER.info('Getting a list of the devices connected')
logger.info('Getting a list of the devices connected')
info = self.paInstance.get_host_api_info_by_index(0)
numdevices = info.get('deviceCount')
@@ -96,13 +101,13 @@ class AudioStream:
input_device = self.paInstance.get_device_info_by_host_api_device_index(0, i).get('name')
devices['Input'][i] = input_device
if _display_input_devices:
LOGGER.debug(f"Input Device id {i} - {input_device}")
logger.debug(f"Input Device id {i} - {input_device}")
if (self.paInstance.get_device_info_by_host_api_device_index(0, i).get('maxOutputChannels')) > 0:
output_device = self.paInstance.get_device_info_by_host_api_device_index(0, i).get('name')
devices['Output'][i] = output_device
if _display_output_devices:
LOGGER.debug(f"Output Device id {i} - {output_device}")
logger.debug(f"Output Device id {i} - {output_device}")
return devices
@@ -126,28 +131,31 @@ class NoiseGate(AudioStream):
def run(self) -> None:
global voice_connection
# Start the audio stream
LOGGER.debug(f"Starting stream")
self.stream.start_stream()
# Start the stream to discord
self.core()
logger.debug(f"Starting stream")
try:
self.stream.start_stream()
# Start the stream to discord
self.core()
except Exception as e:
logger.error(err)
def core(self, error=None):
def core(self, error=None):
if error:
LOGGER.warning(error)
logger.warning(error)
while not voice_connection.is_connected():
time.sleep(.2)
if not voice_connection.is_playing():
LOGGER.debug(f"Playing stream to discord")
logger.debug(f"Playing stream to discord")
voice_connection.play(self.NGStream, after=self.core)
async def close(self):
LOGGER.debug(f"Closing")
logger.debug(f"Closing")
await voice_connection.disconnect()
if self.stream.is_active:
self.stream.stop_stream()
LOGGER.debug(f"Stopping stream")
logger.debug(f"Stopping stream")
# noinspection PyUnresolvedReferences
@@ -169,9 +177,9 @@ class NoiseGateStream(discord.AudioSource):
if self.process_set_count % 10 == 0:
if buffer_decibel >= self.stream.THRESHOLD:
LOGGER.debug(f"[Noisegate Open] {buffer_decibel} db")
logger.debug(f"[Noisegate Open] {buffer_decibel} db")
else:
LOGGER.debug(f"[Noisegate Closed] {buffer_decibel} db")
logger.debug(f"[Noisegate Closed] {buffer_decibel} db")
if buffer_decibel >= self.stream.THRESHOLD:
self.NG_fadeout_count = self.NG_fadeout
@@ -182,13 +190,13 @@ class NoiseGateStream(discord.AudioSource):
else:
if self.NG_fadeout_count > 0:
self.NG_fadeout_count -= 1
LOGGER.debug(f"Frames in fadeout remaining: {self.NG_fadeout_count}")
logger.debug(f"Frames in fadeout remaining: {self.NG_fadeout_count}")
self.process_set_count += 1
if curr_buffer:
return bytes(curr_buffer)
except OSError as e:
LOGGER.warning(e)
logger.warning(e)
pass
def audio_datalist_set_volume(self, datalist, volume):

28
debugger.py Normal file
View File

@@ -0,0 +1,28 @@
import logging
import os
running_dir = os.path.dirname(__file__)
# Ensure the directory exists
log_dir = f"{running_dir}/logs"
os.makedirs(log_dir, exist_ok=True)
def setup_logger(namespace):
# Create the file if it doesn't exist
log_file = f"{running_dir}/logs/pdab.log"
open(log_file, 'a').close()
# Configure logging
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-16.16s] [%(levelname)-7.7s] - %(message)s", "%Y-%m-%d %H:%M:%S")
logger = logging.getLogger(namespace)
logger.setLevel(logging.INFO)
fileHandler = logging.FileHandler(log_file)
fileHandler.setFormatter(logFormatter)
logger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
logger.addHandler(consoleHandler)
return logger

View File

@@ -3,6 +3,7 @@ from NoiseGatev2 import AudioStream
print('Getting a list of devices')
list_of_devices = AudioStream().list_devices()
print("----- INPUT DEVICES -----")
print("----- *You will likely want to pick from one of these devices* -----")
for inputDevice in list_of_devices['Input']:
print(f"{inputDevice}\t-\t{list_of_devices['Input'][inputDevice]}")

491
main.py
View File

@@ -1,301 +1,246 @@
import argparse, platform, os
from discord import Intents, Client, Member, opus
# Python client file (client.py)
import argparse
import os
import platform
import socketio
import asyncio
from discord import Intents, opus, Activity, ActivityType
from discord.ext import commands
from NoiseGatev2 import NoiseGate
from debugger import setup_logger, running_dir
import faulthandler
faulthandler.enable()
# Load the proper OPUS library for the device being used
async def load_opus():
# Check the system type and load the correct library
# Linux ARM AARCH64 running 32bit OS
logger = setup_logger('main')
# Example usage
logger.info("Logging initialized successfully.")
sio = socketio.AsyncClient()
client = None
device_id = None
ng_threshold = None
### Core functions
def load_opus():
logger.info(f"Running dir: '{running_dir}'")
processor = platform.machine()
print("Processor: ", processor)
if os.name == 'nt':
if processor == "AMD64":
print(f"Loaded OPUS library for AMD64")
opus.load_opus('./opus/libopus_amd64.dll')
return "AMD64"
logger.info(f"Processor: {processor}")
logger.info(f'OS: {os.name}')
if os.name == 'nt':
opus_path = f'{running_dir}/opus/libopus_amd64.dll' if processor == "AMD64" else None
else:
if processor == "aarch64":
print(f"Loaded OPUS library for aarch64")
opus.load_opus('./opus/libopus_aarcch64.so')
return "aarch64"
elif processor == "armv7l":
print(f"Loaded OPUS library for armv7l")
opus.load_opus('./opus/libopus_armv7l.so')
return "armv7l"
opus_path = f'{running_dir}/opus/libopus_aarcch64.so' if processor == "aarch64" else f'{running_dir}/opus/libopus_armv7l.so'
logger.debug(f"Opus path: '{opus_path}'")
logger.info(f"Opus path: '{opus_path}'")
if opus_path:
opus.load_opus(opus_path)
logger.info(f"Loaded OPUS library from {opus_path}")
return True
else:
logger.error("Unsupported architecture or OS.")
return False
def main(clientId='OTQzNzQyMDQwMjU1MTE1MzA0.Yg3eRA.ZxEbRr55xahjfaUmPY8pmS-RHTY', channelId=367396189529833476, NGThreshold=50, deviceId=1):
intents = Intents.default()
async def set_discord_presense(presense):
# Set the presence of the bot (what it's listening to)
await client.change_presence(activity=Activity(type=ActivityType.listening, name=presense))
client = commands.Bot(command_prefix='!', intents=intents)
@client.event
async def on_ready():
print(f'We have logged in as {client.user}')
channelIdToJoin = client.get_channel(channelId)
print("Channel", channelIdToJoin)
print("Loading opus")
await load_opus()
async def join_voice_channel(channel_id):
global device_id, ng_threshold
channel = client.get_channel(int(channel_id))
logger.info(f"Joining voice channel {channel}")
try:
voice_connection = await channel.connect(timeout=60.0, reconnect=True)
if opus.is_loaded():
print("Joining voice")
channelConnection = await channelIdToJoin.connect(timeout=60.0, reconnect=True)
print("Voice Connected")
streamHandler = NoiseGate(
_input_device_index=deviceId,
_voice_connection=channelConnection,
_noise_gate_threshold=NGThreshold)
# Start the audio stream
streamHandler.run()
print("stream running")
logger.info("OPUS library loaded successfully")
logger.info(f"Input index: {device_id}, Voice Connection: {voice_connection}, Channel: {channel}")
stream_handler = NoiseGate(
_input_device_index=device_id,
_voice_connection=voice_connection,
_noise_gate_threshold=ng_threshold
)
stream_handler.run()
logger.info("Audio stream started")
return True
else:
logger.error("Failed to load OPUS library")
return False
except Exception as e:
logging.error(e)
logging.info('error encountered')
client.run(clientId)
async def leave_voice_channel(guild_id):
guild = await client.fetch_guild(guild_id)
logger.info(f"Leaving voice channel in guild {guild}")
voice_client = guild.voice_client
if voice_client:
await voice_client.disconnect()
logger.info("Disconnected from voice channel")
else:
logger.info("Not connected to any voice channel in the guild")
# Check if the client needs to open
if len(check_for_open_vc_connections()) <= 0:
# Tell the server the client is going to close
return False
parser = argparse.ArgumentParser()
parser.add_argument("deviceId", type=int, help="The ID of the audio device to use")
parser.add_argument("channelId", type=int, help="The ID of the voice channel to use")
parser.add_argument("clientId", type=str, help="The discord client ID")
parser.add_argument("-n", "--NGThreshold", type=int, help="Change the noisegate threshold. This defaults to 50")
args = parser.parse_args()
if (not args.NGThreshold):
args.NGThreshold = 50
print("Arguments:", args)
main(
clientId=args.clientId,
channelId=args.channelId,
NGThreshold=args.NGThreshold,
deviceId=args.deviceId
)
return True
def check_for_open_vc_connections():
return client.voice_clients
def check_if_discord_vc_connected(guild_id):
if client:
return any(int(vc.guild.id) == int(guild_id) for vc in client.voice_clients)
return False
async def get_discord_username(guild_id):
try:
guild = await client.fetch_guild(guild_id)
member = await guild.fetch_member(get_discord_id())
if member.nick:
print(f"Username: {member.nick}")
return member.nick
print(f"Username: {client.user.name if client.user else None}")
return client.user.name if client.user else None
except Exception as e:
logging.warning(e)
return None
def check_if_client_is_open():
if client:
return client.is_ready()
return False
def get_discord_id():
print(f"ID: {client.user.id if client.user else None}")
return int(client.user.id) if client.user else None
async def on_connect():
logger.info("Connected to WebSocket server")
#import asyncio
#import functools
#import itertools
#import math
#import random
#
#import discord
#from async_timeout import timeout
#from discord.ext import commands
#
#
#class VoiceError(Exception):
# pass
#
#
#class VoiceState:
# def __init__(self, bot: commands.Bot, ctx: commands.Context):
# self.bot = bot
# self._ctx = ctx
#
# self.current = None
# self.voice = None
# self.next = asyncio.Event()
# self.songs = SongQueue()
#
# self._loop = False
# self._volume = 0.5
# self.skip_votes = set()
#
# self.audio_player = bot.loop.create_task(self.audio_player_task())
#
# def __del__(self):
# self.audio_player.cancel()
#
# @property
# def loop(self):
# return self._loop
#
# @loop.setter
# def loop(self, value: bool):
# self._loop = value
#
# @property
# def volume(self):
# return self._volume
#
# @volume.setter
# def volume(self, value: float):
# self._volume = value
#
# @property
# def is_playing(self):
# return self.voice and self.current
#
# async def audio_player_task(self):
# while True:
# self.next.clear()
#
# if not self.loop:
# # Try to get the next song within 3 minutes.
# # If no song will be added to the queue in time,
# # the player will disconnect due to performance
# # reasons.
# try:
# async with timeout(180): # 3 minutes
# self.current = await self.songs.get()
# except asyncio.TimeoutError:
# self.bot.loop.create_task(self.stop())
# return
#
# self.current.source.volume = self._volume
# self.voice.play(self.current.source, after=self.play_next_song)
# await self.current.source.channel.send(embed=self.current.create_embed())
#
# await self.next.wait()
#
# def play_next_song(self, error=None):
# if error:
# raise VoiceError(str(error))
#
# self.next.set()
#
# def skip(self):
# self.skip_votes.clear()
#
# if self.is_playing:
# self.voice.stop()
#
# async def stop(self):
# self.songs.clear()
#
# if self.voice:
# await self.voice.disconnect()
# self.voice = None
#
#
#class Music(commands.Cog):
# def __init__(self, bot: commands.Bot):
# self.bot = bot
# self.voice_states = {}
#
# def get_voice_state(self, ctx: commands.Context):
# state = self.voice_states.get(ctx.guild.id)
# if not state:
# state = VoiceState(self.bot, ctx)
# self.voice_states[ctx.guild.id] = state
#
# return state
#
# def cog_unload(self):
# for state in self.voice_states.values():
# self.bot.loop.create_task(state.stop())
#
# def cog_check(self, ctx: commands.Context):
# if not ctx.guild:
# raise commands.NoPrivateMessage('This command can\'t be used in DM channels.')
#
# return True
#
# async def cog_before_invoke(self, ctx: commands.Context):
# ctx.voice_state = self.get_voice_state(ctx)
#
# async def cog_command_error(self, ctx: commands.Context, error: commands.CommandError):
# await ctx.send('An error occurred: {}'.format(str(error)))
#
# @commands.command(name='join', invoke_without_subcommand=True)
# async def _join(self, ctx: commands.Context):
# """Joins a voice channel."""
#
# destination = ctx.author.voice.channel
# if ctx.voice_state.voice:
# await ctx.voice_state.voice.move_to(destination)
# return
#
# ctx.voice_state.voice = await destination.connect()
#
# @commands.command(name='summon')
# @commands.has_permissions(manage_guild=True)
# async def _summon(self, ctx: commands.Context, *, channel: discord.VoiceChannel = None):
# """Summons the bot to a voice channel.
#
# If no channel was specified, it joins your channel.
# """
#
# if not channel and not ctx.author.voice:
# raise VoiceError('You are neither connected to a voice channel nor specified a channel to join.')
#
# destination = channel or ctx.author.voice.channel
# if ctx.voice_state.voice:
# await ctx.voice_state.voice.move_to(destination)
# return
#
# ctx.voice_state.voice = await destination.connect()
#
# @commands.command(name='leave', aliases=['disconnect'])
# @commands.has_permissions(manage_guild=True)
# async def _leave(self, ctx: commands.Context):
# """Clears the queue and leaves the voice channel."""
#
# if not ctx.voice_state.voice:
# return await ctx.send('Not connected to any voice channel.')
#
# await ctx.voice_state.stop()
# del self.voice_states[ctx.guild.id]
#
# @commands.command(name='play')
# async def _play(self, ctx: commands.Context, *, search: str):
# """Plays a song.
#
# If there are songs in the queue, this will be queued until the
# other songs finished playing.
#
# This command automatically searches from various sites if no URL is provided.
# A list of these sites can be found here: https://rg3.github.io/youtube-dl/supportedsites.html
# """
#
# if not ctx.voice_state.voice:
# await ctx.invoke(self._join)
#
# async with ctx.typing():
# try:
# source = await YTDLSource.create_source(ctx, search, loop=self.bot.loop)
# except YTDLError as e:
# await ctx.send('An error occurred while processing this request: {}'.format(str(e)))
# else:
# song = Song(source)
#
# await ctx.voice_state.songs.put(song)
# await ctx.send('Enqueued {}'.format(str(source)))
#
# @_join.before_invoke
# @_play.before_invoke
# async def ensure_voice_state(self, ctx: commands.Context):
# if not ctx.author.voice or not ctx.author.voice.channel:
# raise commands.CommandError('You are not connected to any voice channel.')
#
# if ctx.voice_client:
# if ctx.voice_client.channel != ctx.author.voice.channel:
# raise commands.CommandError('Bot is already in a voice channel.')
#
#intents = discord.Intents.default()
#intents.message_content = True
#
#bot = commands.Bot('music.', description="Brent's Revenge", intents=intents)
#bot.add_cog(Music(bot))
#
#
#@bot.event
#async def on_ready():
# print('Logged in as:\n{0.user.name}\n{0.user.id}'.format(bot))
#
#bot.run('OTQzNzQyMDQwMjU1MTE1MzA0.Yg3eRA.ZxEbRr55xahjfaUmPY8pmS-RHTY')
async def on_disconnect():
logger.info("Disconnected from WebSocket server")
### Socket Events
@sio.event
async def connect_error():
logger.error("Connection to WebSocket server failed")
@sio.event
async def set_system(data):
logger.info(f"Received command to set system name (presense): {data['system']}")
return await set_discord_presense(data['system'])
@sio.event
async def join_server(data):
logger.info(f"Received command to join server: {data['channelId']}")
return await join_voice_channel(data['channelId'])
@sio.event
async def leave_server(data):
logger.info(f"Received command to leave server: {data['guildId']}")
return await leave_voice_channel(data['guildId'])
@sio.event
async def check_discord_vc_connected(data):
return check_if_discord_vc_connected(data['guildId'])
@sio.event
async def request_discord_username(data):
return await get_discord_username(data['guildId'])
@sio.event
async def check_client_is_open():
return check_if_client_is_open()
@sio.event
async def request_discord_id():
return get_discord_id()
@sio.event
async def request_client_close():
exit()
async def on_ready():
logger.info(f"We have logged in as {client.user}")
logger.info("Loading OPUS library")
load_opus()
logger.info(opus.is_loaded())
if opus.is_loaded():
logger.info('Emitting to the server')
await sio.emit('discord_ready', {'message': 'Discord bot is ready'})
async def start_bot(args):
global client, device_id, ng_threshold
# Connect to the WebSocket server
await sio.connect('http://127.0.0.1:{}'.format(args.websocket_port), namespaces=['/'])
logger.info("Connecting to WebSocket server...")
intents = Intents.default()
client = commands.Bot(command_prefix='!', intents=intents)
device_id = args.device_id
ng_threshold = args.ng_threshold
client.add_listener(on_connect)
client.add_listener(on_disconnect)
client.add_listener(on_ready)
await client.start(args.client_id)
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("device_id", type=int, help="The ID of the audio device to use")
parser.add_argument("client_id", type=str, help="The Discord client ID")
parser.add_argument("websocket_port", type=int, help="The port of the WebSocket server")
parser.add_argument("-n", "--ng_threshold", type=int, default=50,
help="Change the noise gate threshold. Defaults to 50")
return parser.parse_args()
if __name__ == "__main__":
try:
args = parse_arguments()
logger.info("Arguments: %s", args)
# Create an event loop
loop = asyncio.get_event_loop()
# Run the start_bot function within the event loop
loop.run_until_complete(start_bot(args))
except Exception as e:
logger.error(e)
logger.warning("Exiting now...")

108
pdab-recorder.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import time
import argparse
import wave
import pyaudio
from pydub import AudioSegment
from datetime import datetime
from NoiseGatev2 import NoiseGateStream, AudioStream, math, audioop
from debugger import setup_logger
# Configure logging
log = setup_logger('NoiseGatev2')
# Parameters for the audio stream
NOISE_GATE_THRESHOLD = -50 # Adjust the threshold as needed
INPUT_DEVICE_INDEX = None # Set your input device index here
# Create the recordings directory if it doesn't exist
RECORDINGS_DIR = 'recordings'
if not os.path.exists(RECORDINGS_DIR):
os.makedirs(RECORDINGS_DIR)
# Create an instance of NoiseGate
class NoiseGate(AudioStream):
def __init__(self, _noise_gate_threshold: int, _input_device_index: int = None, **kwargs):
super(NoiseGate, self).__init__(_input_device_index=_input_device_index, **kwargs)
self.THRESHOLD = _noise_gate_threshold
self.stream.start_stream()
def read(self):
try:
curr_buffer = self.stream.read(960, exception_on_overflow=False) # Read audio data from the stream
buffer_rms = audioop.rms(curr_buffer, 2) # Calculate RMS of the audio data
buffer_decibel = 20 * math.log10(buffer_rms) if buffer_rms > 0 else -float('inf')
if buffer_decibel >= self.THRESHOLD:
log.debug(f"[Noisegate Open] {buffer_decibel} dB")
return bytes(curr_buffer) # Return audio data if above threshold
log.debug(f"[Noisegate Closed] {buffer_decibel} dB")
return None # Return None if below threshold
except Exception as e:
log.warning(e)
return None
def close(self):
log.debug(f"Closing")
self.close_if_open()
self.paInstance.terminate()
def get_filename():
"""Generate a filename based on the current date and time."""
return os.path.join(RECORDINGS_DIR, datetime.now().strftime("%Y-%m-%d_%H-%M-%S.wav"))
def save_wave_file(frames, filename, ng):
audio_segment = AudioSegment(b''.join(frames), sample_width=ng.paInstance.get_sample_size(pyaudio.paInt16), frame_rate=48000, channels=2)
"""Save the given audio segment to a WAV file in the recordings directory."""
audio_segment.export(filename, format="mp3")
print(f"Saved recording: {filename}")
def record_transmissions():
noise_gate = NoiseGate(_noise_gate_threshold=-50, _input_device_index=INPUT_DEVICE_INDEX)
frames = []
recording = False
print("Listening for transmissions...")
while True:
audio_data = noise_gate.read() # Get audio data from the NoiseGateStream
if audio_data: # If there's audio data (not silent)
if not recording:
print("Transmission detected, starting recording...")
recording = True
frames = [] # Reset frames for the new recording
frames.append(audio_data) # Collect audio data
elif recording:
# Silence detected, stop the recording
print("Silence detected, stopping recording...")
filename = get_filename()
save_wave_file(frames, filename, noise_gate)
recording = False # Reset recording state
frames.clear() # Clear frames for the next transmission
print("Recording stopped, waiting for the next transmission...")
time.sleep(0.1) # Short delay to avoid busy waiting
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("device_id", type=int, help="The ID of the audio device to use")
return parser.parse_args()
if __name__ == "__main__":
try:
args = parse_arguments()
print("Arguments: %s", args)
INPUT_DEVICE_INDEX = args.device_id
record_transmissions()
except KeyboardInterrupt:
print("Stopping...")

View File

@@ -1,5 +1,6 @@
discord^=2.2.3
PyNaCl^=1.5.0
pyaudio^=0.2.13
numpy^=1.24.3
argparse
discord>=2.3.2
PyNaCl>=1.5.0
pyaudio>=0.2.14
numpy>=1.26.4
argparse
python-socketio[client]