import configparser import shutil import logging import threading import subprocess import time from pathlib import Path from telnetlib import Telnet from BotResources import * from time import sleep def reset_crashed(_config_path): config = configparser.SafeConfigParser() config.read(_config_path) if config.has_section('General'): if config.getboolean('General', 'crashed'): config['General']['crashed'] = 'false' with open(_config_path, 'w') as config_file: config.write(config_file) return True else: return False def enable_agc(_config_path): config = configparser.SafeConfigParser() config.read(_config_path) if config.has_option('receiver', 'agc_off'): config.remove_option('receiver', 'agc_off') with open(_config_path, 'w') as config_file: config.write(config_file) return True class GQRXHandler(threading.Thread): def __init__(self): super().__init__() self.GQRX_Config_Path = Path(f"{Path.home()}/.config/gqrx/drb_defaults.conf") self.GQRXDir: str = GQRX_BIN_LOCATION self.GQRXEXE: str = shutil.which(GQRX_BIN) self.GQRXProc = None self.GQRX_Started = False self.logger = logging.getLogger("Discord_Radio_Bot.GQRXHandler") self.Frequency = None self.Mode = DEFAULT_RADIO_SETTINGS['mode'] self.Frequency = DEFAULT_RADIO_SETTINGS['freq'] self.Squelch = DEFAULT_RADIO_SETTINGS['squelch'] self.Start_GQRX = False self.Stop_GQRX = False self.Output_Device_Name = None self.hostname = "localhost" self.port = 7356 self.tel_conn = None def run(self) -> None: while True: if self.Start_GQRX: self.open_gqrx() self.Start_GQRX = False self.Stop_GQRX = False self.logger.debug("GQRX is open, waiting for it to close") while not self.Stop_GQRX: sleep(1) self.logger.debug('Request to close GQRX') self.close_gqrx() sleep(.5) def set_gqrx_parameters(self, _frequency: str = False, _start: bool = False, _stop: bool = False, _output_device_name: str = None, _fm_mode: str = None, _squelch: float = None, _hostname: str = None, _port: int = None, _start_dsp: bool = None): if _frequency: self.Frequency = _frequency if self.GQRX_Started: self.change_freq(_frequency) if _output_device_name: self.Output_Device_Name = _output_device_name if _fm_mode: self.Mode = _fm_mode if self.GQRX_Started: self.change_mode(_fm_mode) if _squelch: self.Squelch = _squelch if self.GQRX_Started: self.change_squelch(_squelch) if _hostname: self.hostname = _hostname self.Start_GQRX = True if _port: self.port = _port self.Start_GQRX = True if _start_dsp: self.start_dsp() if _start: self.Start_GQRX = _start if _stop: self.Stop_GQRX = _stop def open_gqrx(self): if self.GQRX_Started: self.close_gqrx() gqrx_kwargs = [f"gqrx", "-c", "drb_defaults.conf"] self.logger.info(f"Resetting 'crashed' option in the GQRX config") self.reset_or_create_config() self.logger.info(f"Starting GQRX") self.logger.debug(f"GQRX Keyword Args: {gqrx_kwargs}") self.GQRXProc = subprocess.Popen(gqrx_kwargs, executable=self.GQRXEXE, shell=False) while not self.tel_conn: self.create_telnet_connection() sleep(2) self.logger.debug(f"Waiting for GQRX to start") self.GQRX_Started = True self.start_dsp() self.set_all_settings(_squelch=self.Squelch, _mode=self.Mode, _freq=self.Frequency) self.logger.debug('Finished opening GQRX') def close_gqrx(self): self.logger.info(f"Closing GQRX") try: self.GQRXProc.kill() seconds_waited = 0 while self.GQRXProc.poll() is None: # Terminate the process every 5 seconds if seconds_waited % 5 == 0: self.logger.info("Terminating GQRX") self.GQRXProc.terminate() sleep(1) self.logger.debug(f"Waited {seconds_waited} seconds") seconds_waited += 1 self.logger.debug("GQRX Closed") self.GQRX_Started = False self.tel_conn = None except Exception as e: self.logger.error(e) def create_telnet_connection(self): self.logger.debug("Creating connection") try: self.tel_conn = Telnet(self.hostname, self.port) self.tel_conn.open(self.hostname, self.port) self.logger.debug(f"GQRX is open") return True except Exception as err: self.logger.warning(err) self.tel_conn = None return False def check_dsp(self): self.logger.debug(f"Checking if DSP is running on GQRX") self.tel_conn.write(bytes(f"u DSP", 'utf-8')) if self.tel_conn.read_some() == b"1": return True else: return False def start_dsp(self): if not self.check_dsp(): self.logger.debug(f"Starting DSP on GQRX") self.tel_conn.write(bytes(f"U DSP 1", 'utf-8')) self.tel_conn.read_until(b'RPRT 0') def change_freq(self, freq): self.logger.debug(f"Changing freq to {freq}") self.tel_conn.write(bytes(f"F {int(freq)}", 'utf-8')) self.tel_conn.read_until(b'RPRT 0') def change_squelch(self, squelch): if not check_negative(squelch): squelch = float(-abs(squelch)) self.logger.debug(f"Changing squelch to {squelch}") self.tel_conn.write(bytes(f"L SQL {float(squelch)}", 'utf-8')) self.tel_conn.read_until(b'RPRT 0') def change_mode(self, mode): self.logger.debug(f"Changing mode to {mode}") self.tel_conn.write(bytes(f"M {str(mode)}", 'utf-8')) self.tel_conn.read_until(b'RPRT 0') def set_all_settings(self, _mode, _squelch, _freq): self.change_squelch(0) self.change_mode(_mode) self.change_freq(_freq) self.change_squelch(_squelch) def creat_config(self): from templates.gqrx_config_template import drb_defaults config = drb_defaults try: with open(self.GQRX_Config_Path, 'w+') as config_file: config_file.write(config) except OSError as err: self.logger.error(err) def reset_or_create_config(self): if self.GQRX_Config_Path.is_file(): try: self.logger.debug(f"Enabling AGC in the GQRX config") enable_agc(_config_path=self.GQRX_Config_Path) self.logger.debug(f"GQRX Config exists, resetting 'crashed' setting") reset_crashed(_config_path=self.GQRX_Config_Path) except configparser.DuplicateOptionError as err: self.logger.warning(err) self.creat_config() else: self.logger.debug(f"GQRX config does not exist, creating it from template") self.creat_config()