Merge pull request 'Improving reconnection logic' (#2) from fix-disconnection-bug into master
Some checks failed
release-tag / release-image (push) Failing after 46s
Lint / lint (push) Successful in 19s

Reviewed-on: #2
This commit is contained in:
2025-03-04 22:02:32 -05:00
15 changed files with 380 additions and 256 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,5 @@
__pycache__*
bot-poc.py
configs*
.env
.env
*.log

View File

@@ -60,6 +60,9 @@ VOLUME ["/configs"]
# Set the working directory in the container
WORKDIR /app
# Copy opus first to break up the build time
COPY ./app/internal/opus /app/internal/opus
# Copy the rest of the directory contents into the container at /app
COPY ./app /app

View File

@@ -1,158 +0,0 @@
import asyncio
from typing import Optional, Dict
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import discord
from discord.ext import commands
from NoiseGatev2 import NoiseGate
import op25_controller
import pulse
# Define FastAPI app
app = FastAPI()
# Discord Bot Setup
intents = discord.Intents.default()
intents.voice_states = True
intents.guilds = True
# Models for API requests
class BotConfig(BaseModel):
token: str # Discord Bot Token
class VoiceChannelRequest(BaseModel):
guild_id: int
channel_id: int
# Discord Bot Manager
class DiscordBotManager:
def __init__(self):
self.bot: Optional[commands.Bot] = None
self.bot_task: Optional[asyncio.Task] = None
self.voice_clients: Dict[int, discord.VoiceClient] = {}
self.token: Optional[str] = None
self.loop = asyncio.get_event_loop()
self.lock = asyncio.Lock()
async def start_bot(self, token: str):
async with self.lock:
if self.bot and self.bot.is_closed():
raise RuntimeError("Bot is already running.")
if self.bot_task and not self.bot_task.done():
raise RuntimeError("Bot is already running.")
self.token = token
self.bot = commands.Bot(command_prefix="!", intents=intents)
@self.bot.event
async def on_ready():
print(f'Logged in as {self.bot.user}')
# Handle graceful shutdown when all voice connections are closed
@self.bot.event
async def on_voice_state_update(member, before, after):
# Check if all voice clients are disconnected
await asyncio.sleep(1) # Give time for the state to update
if not self.voice_clients:
await self.stop_bot()
# Start the bot in the background
self.bot_task = self.loop.create_task(self.bot.start(token))
async def stop_bot(self):
async with self.lock:
if self.bot:
await self.bot.close()
self.bot = None
if self.bot_task:
await self.bot_task
self.bot_task = None
self.voice_clients.clear()
print("Bot has been stopped.")
async def join_voice_channel(self, guild_id: int, channel_id: int, ng_threshold: int = 50, device_id: int = 4):
if not self.bot:
raise RuntimeError("Bot is not running.")
guild = self.bot.get_guild(guild_id)
if not guild:
raise ValueError("Guild not found.")
channel = guild.get_channel(channel_id)
if not isinstance(channel, discord.VoiceChannel):
raise ValueError("Channel is not a voice channel.")
if guild_id in self.voice_clients:
raise RuntimeError("Already connected to this guild's voice channel.")
voice_client = await channel.connect()
streamHandler = NoiseGate(
_input_device_index=device_id,
_voice_connection=voice_client,
_noise_gate_threshold=ng_threshold)
# Start the audio stream
streamHandler.run()
self.voice_clients[guild_id] = voice_client
print(f"Joined guild {guild_id} voice channel {channel_id}.")
async def leave_voice_channel(self, guild_id: int):
if not self.bot:
raise RuntimeError("Bot is not running.")
voice_client = self.voice_clients.get(guild_id)
if not voice_client:
raise RuntimeError("Not connected to the specified guild's voice channel.")
await voice_client.disconnect()
del self.voice_clients[guild_id]
print(f"Left guild {guild_id} voice channel.")
# Initialize Discord Bot Manager
bot_manager = DiscordBotManager()
# API Endpoints
@app.post("/start_bot")
async def start_bot(config: BotConfig):
try:
await bot_manager.start_bot(config.token)
return {"status": "Bot started successfully."}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/stop_bot")
async def stop_bot():
try:
await bot_manager.stop_bot()
return {"status": "Bot stopped successfully."}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/join_voice")
async def join_voice_channel(request: VoiceChannelRequest):
try:
await bot_manager.join_voice_channel(request.guild_id, request.channel_id)
return {"status": f"Joined guild {request.guild_id} voice channel {request.channel_id}."}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/leave_voice")
async def leave_voice_channel(request: VoiceChannelRequest):
try:
await bot_manager.leave_voice_channel(request.guild_id)
return {"status": f"Left guild {request.guild_id} voice channel."}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/status")
async def get_status():
status = {
"bot_running": bot_manager.bot is not None and not bot_manager.bot.is_closed(),
"connected_guilds": list(bot_manager.voice_clients.keys())
}
return status
app.include_router(op25_controller.router, prefix="/op25")
app.include_router(pulse.router, prefix="/pulse")

View File

@@ -1,15 +1,15 @@
import audioop
import logging
import math
import time
import pyaudio
import discord
import numpy
from internal.logger import create_logger
voice_connection = None
LOGGER = logging.getLogger("Discord_Radio_Bot.NoiseGateV2")
LOGGER = create_logger(__name__)
# noinspection PyUnresolvedReferences

139
app/internal/bot_manager.py Normal file
View File

@@ -0,0 +1,139 @@
import asyncio
import platform
import os
from discord import VoiceClient, VoiceChannel, opus, Activity, ActivityType, Intents
from discord.ext import commands
from typing import Optional, Dict
from internal.NoiseGatev2 import NoiseGate
from internal.logger import create_logger
LOGGER = create_logger(__name__)
# Configure discord intents
intents = Intents.default()
intents.voice_states = True
intents.guilds = True
class DiscordBotManager:
def __init__(self):
self.bot: Optional[commands.Bot] = None
self.bot_task: Optional[asyncio.Task] = None
self.voice_clients: Dict[int, VoiceClient] = {}
self.token: Optional[str] = None
self.loop = asyncio.get_event_loop()
self.lock = asyncio.Lock()
async def start_bot(self, token: str):
async with self.lock:
if self.bot and not self.bot.is_closed():
raise RuntimeError("Bot is already running.")
if self.bot_task and not self.bot_task.done():
raise RuntimeError("Bot is already running.")
self.token = token
self.bot = commands.Bot(command_prefix="!", intents=intents)
@self.bot.event
async def on_ready():
LOGGER.info(f'Logged in as {self.bot.user}')
@self.bot.event
async def on_voice_state_update(member, before, after):
# Check if the bot was disconnected
if member == self.bot.user and after.channel is None:
guild_id = before.channel.guild.id
LOGGER.info(f"Bot was disconnected from channel in guild {guild_id}. Attempting to reconnect...")
try:
await self.leave_voice_channel(guild_id)
except Exception as e:
LOGGER.warning(f"Error leaving voice channel: '{e}'")
# Attempt to reconnect to the channel after a brief pause
await asyncio.sleep(2)
await self.join_voice_channel(guild_id, before.channel.id)
# Load Opus for the current CPU
await self.load_opus()
self.bot_task = self.loop.create_task(self.bot.start(token))
async def stop_bot(self):
async with self.lock:
if self.bot:
await self.bot.close()
self.bot = None
if self.bot_task:
await self.bot_task
self.bot_task = None
self.voice_clients.clear()
LOGGER.info("Bot has been stopped.")
async def join_voice_channel(self, guild_id: int, channel_id: int, ng_threshold: int = 50, device_id: int = 4):
if not self.bot:
raise RuntimeError("Bot is not running.")
guild = self.bot.get_guild(guild_id)
if not guild:
raise ValueError("Guild not found.")
if not opus.is_loaded():
raise RuntimeError("Opus is not loaded.")
channel = guild.get_channel(channel_id)
if not isinstance(channel, VoiceChannel):
raise ValueError("Channel is not a voice channel.")
if guild_id in self.voice_clients:
raise RuntimeError("Already connected to this guild's voice channel.")
try:
voice_client = await channel.connect(timeout=60.0, reconnect=True)
LOGGER.debug("Voice Connected.")
streamHandler = NoiseGate(
_input_device_index=device_id,
_voice_connection=voice_client,
_noise_gate_threshold=ng_threshold)
streamHandler.run()
LOGGER.debug("Stream is running.")
self.voice_clients[guild_id] = voice_client
LOGGER.info(f"Joined guild {guild_id} voice channel {channel_id} and stream is running.")
except Exception as e:
LOGGER.error(f"Failed to connect to voice channel: {e}")
async def leave_voice_channel(self, guild_id: int):
if not self.bot:
raise RuntimeError("Bot is not running.")
voice_client = self.voice_clients.get(guild_id)
if not voice_client:
raise RuntimeError("Not connected to the specified guild's voice channel.")
await voice_client.disconnect()
del self.voice_clients[guild_id]
LOGGER.info(f"Left guild {guild_id} voice channel.")
async def load_opus(self):
""" Load the proper OPUS library for the device being used """
processor = platform.machine()
script_dir = os.path.dirname(os.path.abspath(__file__))
LOGGER.debug("Processor: ", processor)
if os.name == 'nt':
if processor == "AMD64":
opus.load_opus(os.path.join(script_dir, './opus/libopus_amd64.dll'))
LOGGER.info("Loaded OPUS library for AMD64")
return "AMD64"
else:
if processor == "aarch64":
opus.load_opus(os.path.join(script_dir, './opus/libopus_aarcch64.so'))
LOGGER.info("Loaded OPUS library for aarch64")
return "aarch64"
elif processor == "armv7l":
opus.load_opus(os.path.join(script_dir, './opus/libopus_armv7l.so'))
LOGGER.info("Loaded OPUS library for armv7l")
return "armv7l"
async def set_presence(self, presence: str):
""" Set the presense (activity) of the bot """
try:
await self.bot.change_presence(activity=Activity(type=ActivityType.listening, name=presence))
except Exception as pe:
LOGGER.error(f"Unable to set presence: '{pe}'")

55
app/internal/logger.py Normal file
View File

@@ -0,0 +1,55 @@
import logging
from logging.handlers import RotatingFileHandler
def create_logger(name, level=logging.DEBUG, max_bytes=10485760, backup_count=2):
"""
Creates a logger with a console and rotating file handlers for both debug and info log levels.
Args:
name (str): The name for the logger.
level (int): The logging level for the logger. Defaults to logging.DEBUG.
max_bytes (int): Maximum size of the log file in bytes before it gets rotated. Defaults to 10 MB.
backup_count (int): Number of backup files to keep. Defaults to 2.
Returns:
logging.Logger: Configured logger.
"""
# Set the log file paths
debug_log_file = "./client.debug.log"
info_log_file = "./client.log"
# Create a logger
logger = logging.getLogger(name)
logger.setLevel(level)
# Check if the logger already has handlers to avoid duplicate logs
if not logger.hasHandlers():
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
# Create rotating file handler for debug level
debug_file_handler = RotatingFileHandler(debug_log_file, maxBytes=max_bytes, backupCount=backup_count)
debug_file_handler.setLevel(logging.DEBUG)
# Create rotating file handler for info level
info_file_handler = RotatingFileHandler(info_log_file, maxBytes=max_bytes, backupCount=backup_count)
info_file_handler.setLevel(logging.INFO)
# Create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
debug_file_handler.setFormatter(formatter)
info_file_handler.setFormatter(formatter)
# Add the handlers to the logger
logger.addHandler(console_handler)
logger.addHandler(debug_file_handler)
logger.addHandler(info_file_handler)
return logger
# Example usage:
# logger = create_logger('my_logger')
# logger.debug('This is a debug message')
# logger.info('This is an info message')

15
app/main.py Normal file
View File

@@ -0,0 +1,15 @@
from fastapi import FastAPI
import routers.op25_controller as op25_controller
import routers.pulse as pulse
import routers.bot as bot
from internal.logger import create_logger
# Initialize logging
LOGGER = create_logger(__name__)
# Define FastAPI app
app = FastAPI()
app.include_router(op25_controller.router, prefix="/op25")
app.include_router(pulse.router, prefix="/pulse")
app.include_router(bot.router, prefix="/bot")

101
app/models.py Normal file
View File

@@ -0,0 +1,101 @@
from pydantic import BaseModel
from typing import List, Optional
from enum import Enum
class BotConfig(BaseModel):
token: str
class VoiceChannelRequest(BaseModel):
guild_id: int
channel_id: int
class DecodeMode(str, Enum):
P25 = "P25"
DMR = "DMR"
ANALOG = "NBFM"
class TalkgroupTag(BaseModel):
talkgroup: str
tagDec: int
class ConfigGenerator(BaseModel):
type: DecodeMode
systemName: str
channels: List[str]
tags: Optional[List[TalkgroupTag]]
whitelist: Optional[List[int]]
class DemodType(str, Enum):
CQPSK = "cqpsk"
FSK4 = "fsk4"
class FilterType(str, Enum):
RC = "rc"
WIDEPULSE = "widepulse"
class ChannelConfig(BaseModel):
name: str
trunking_sysname: Optional[str]
enable_analog: str
demod_type: DemodType
filter_type: FilterType
device: Optional[str] = "sdr"
cqpsk_tracking: Optional[bool] = None
frequency: Optional[float] = None
nbfmSquelch: Optional[float] = None
destination: Optional[str] = "udp://127.0.0.1:23456"
tracking_threshold: Optional[int] = 120
tracking_feedback: Optional[float] = 0.75
excess_bw: Optional[float] = 0.2
if_rate: Optional[int] = 24000
plot: Optional[str] = ""
symbol_rate: Optional[int] = 4800
blacklist: Optional[str] = ""
whitelist: Optional[str] = ""
class DeviceConfig(BaseModel):
args: Optional[str] = "rtl"
gains: Optional[str] = "lna:39"
gain_mode: Optional[bool] = False
name: Optional[str] = "sdr"
offset: Optional[int] = 0
ppm: Optional[float] = 0.0
rate: Optional[int] = 1920000
usable_bw_pct: Optional[float] = 0.85
tunable: Optional[bool] = True
class TrunkingChannelConfig(BaseModel):
sysname: str
control_channel_list: str
tagsFile: Optional[str] = None
whitelist: Optional[str] = None
nac: Optional[str] = ""
wacn: Optional[str] = ""
tdma_cc: Optional[bool] = False
crypt_behavior: Optional[int] = 2
class TrunkingConfig(BaseModel):
module: str
chans: List[TrunkingChannelConfig]
class AudioInstanceConfig(BaseModel):
instance_name: Optional[str] = "audio0"
device_name: Optional[str] = "pulse"
udp_port: Optional[int] = 23456
audio_gain: Optional[float] = 2.5
number_channels: Optional[int] = 1
class AudioConfig(BaseModel):
module: Optional[str] = "sockaudio.py"
instances: Optional[List[AudioInstanceConfig]] = [AudioInstanceConfig()]
class TerminalConfig(BaseModel):
module: Optional[str] = "terminal.py"
terminal_type: Optional[str] = "http:0.0.0.0:8081"
terminal_timeout: Optional[float] = 5.0
curses_plot_interval: Optional[float] = 0.2
http_plot_interval: Optional[float] = 1.0
http_plot_directory: Optional[str] = "../www/images"
tuning_step_large: Optional[int] = 1200
tuning_step_small: Optional[int] = 100

57
app/routers/bot.py Normal file
View File

@@ -0,0 +1,57 @@
from fastapi import APIRouter, HTTPException
from models import BotConfig, VoiceChannelRequest
from internal.bot_manager import DiscordBotManager
from internal.logger import create_logger
LOGGER = create_logger(__name__)
# Define FastAPI app
router = APIRouter()
# Initialize Discord Bot Manager
bot_manager = DiscordBotManager()
# API Endpoints
@router.post("/start_bot")
async def start_bot(config: BotConfig):
try:
await bot_manager.start_bot(config.token)
return {"status": "Bot started successfully."}
except Exception as e:
LOGGER.error(f"Error starting bot: {e}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/stop_bot")
async def stop_bot():
try:
await bot_manager.stop_bot()
return {"status": "Bot stopped successfully."}
except Exception as e:
LOGGER.error(f"Error stopping bot: {e}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/join_voice")
async def join_voice_channel(request: VoiceChannelRequest):
try:
await bot_manager.join_voice_channel(request.guild_id, request.channel_id)
return {"status": f"Joined guild {request.guild_id} voice channel {request.channel_id}."}
except Exception as e:
LOGGER.error(f"Error joining voice channel: {e}")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/leave_voice")
async def leave_voice_channel(request: VoiceChannelRequest):
try:
await bot_manager.leave_voice_channel(request.guild_id)
return {"status": f"Left guild {request.guild_id} voice channel."}
except Exception as e:
LOGGER.error(f"Error leaving voice channel: {e}")
raise HTTPException(status_code=400, detail=str(e))
@router.get("/status")
async def get_status():
status = {
"bot_running": bot_manager.bot is not None and not bot_manager.bot.is_closed(),
"connected_guilds": list(bot_manager.voice_clients.keys())
}
return status

View File

@@ -1,14 +1,15 @@
from fastapi import HTTPException, APIRouter
from pydantic import BaseModel
from enum import Enum
import subprocess
import os
import signal
import json
import csv
from typing import List, Optional
from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig, TalkgroupTag
from internal.logger import create_logger
from typing import List
router = APIRouter()
LOGGER = create_logger(__name__)
op25_process = None
OP25_PATH = "/op25/op25/gr-op25_repeater/apps/"
@@ -20,7 +21,7 @@ async def start_op25():
if op25_process is None:
try:
op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH)
print(op25_process)
LOGGER.debug(op25_process)
return {"status": "OP25 started"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@@ -44,96 +45,6 @@ async def stop_op25():
async def get_status():
return {"status": "running" if op25_process else "stopped"}
class DecodeMode(str, Enum):
P25 = "P25"
DMR = "DMR"
ANALOG = "NBFM"
class TalkgroupTag(BaseModel):
talkgroup: str
tagDec: int
class ConfigGenerator(BaseModel):
type: DecodeMode
systemName: str
channels: List[str]
tags: List[TalkgroupTag]
whitelist: List[int]
class DemodType(str, Enum):
CQPSK = "cqpsk"
FSK4 = "fsk4"
class FilterType(str, Enum):
RC = "rc"
WIDEPULSE = "widepulse"
class ChannelConfig(BaseModel):
name: str
trunking_sysname: Optional[str]
enable_analog: str
demod_type: DemodType
filter_type: FilterType
device: Optional[str] = "sdr"
cqpsk_tracking: Optional[bool] = None
frequency: Optional[float] = None
nbfmSquelch: Optional[float] = None
destination: Optional[str] = "udp://127.0.0.1:23456"
tracking_threshold: Optional[int] = 120
tracking_feedback: Optional[float] = 0.75
excess_bw: Optional[float] = 0.2
if_rate: Optional[int] = 24000
plot: Optional[str] = ""
symbol_rate: Optional[int] = 4800
blacklist: Optional[str] = ""
whitelist: Optional[str] = ""
class DeviceConfig(BaseModel):
args: Optional[str] = "rtl"
gains: Optional[str] = "lna:39"
gain_mode: Optional[bool] = False
name: Optional[str] = "sdr"
offset: Optional[int] = 0
ppm: Optional[float] = 0.0
rate: Optional[int] = 1920000
usable_bw_pct: Optional[float] = 0.85
tunable: Optional[bool] = True
class TrunkingChannelConfig(BaseModel):
sysname: str
control_channel_list: str
tagsFile: Optional[str] = None
whitelist: Optional[str] = None
nac: Optional[str] = ""
wacn: Optional[str] = ""
tdma_cc: Optional[bool] = False
crypt_behavior: Optional[int] = 2
class TrunkingConfig(BaseModel):
module: str
chans: List[TrunkingChannelConfig]
class AudioInstanceConfig(BaseModel):
instance_name: Optional[str] = "audio0"
device_name: Optional[str] = "pulse"
udp_port: Optional[int] = 23456
audio_gain: Optional[float] = 2.5
number_channels: Optional[int] = 1
class AudioConfig(BaseModel):
module: Optional[str] = "sockaudio.py"
instances: Optional[List[AudioInstanceConfig]] = [AudioInstanceConfig()]
class TerminalConfig(BaseModel):
module: Optional[str] = "terminal.py"
terminal_type: Optional[str] = "http:0.0.0.0:8081"
terminal_timeout: Optional[float] = 5.0
curses_plot_interval: Optional[float] = 0.2
http_plot_interval: Optional[float] = 1.0
http_plot_directory: Optional[str] = "../www/images"
tuning_step_large: Optional[int] = 1200
tuning_step_small: Optional[int] = 100
@router.post("/generate-config")
async def generate_config(generator: ConfigGenerator):
try:
@@ -231,7 +142,7 @@ def del_none_in_dict(d):
This alters the input so you may wish to ``copy`` the dict first.
"""
for key, value in list(d.items()):
print(f"Key: '{key}'\nValue: '{value}'")
LOGGER.info(f"Key: '{key}'\nValue: '{value}'")
if value is None:
del d[key]
elif isinstance(value, dict):