GQRX Changes - Better error handling; GQRX config dupes some options for an unknown reason
240 lines
7.4 KiB
Python
240 lines
7.4 KiB
Python
import configparser
|
|
import shutil
|
|
import logging
|
|
import threading
|
|
import subprocess
|
|
import time
|
|
from pathlib import Path
|
|
from telnetlib import Telnet
|
|
from BotResources import *
|
|
from time import sleep
|
|
|
|
|
|
def reset_crashed(_config_path):
|
|
config = configparser.SafeConfigParser()
|
|
config.read(_config_path)
|
|
if config.has_section('General'):
|
|
if config.getboolean('General', 'crashed'):
|
|
config['General']['crashed'] = 'false'
|
|
with open(_config_path, 'w') as config_file:
|
|
config.write(config_file)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def enable_agc(_config_path):
|
|
config = configparser.SafeConfigParser()
|
|
config.read(_config_path)
|
|
if config.has_option('receiver', 'agc_off'):
|
|
config.remove_option('receiver', 'agc_off')
|
|
with open(_config_path, 'w') as config_file:
|
|
config.write(config_file)
|
|
return True
|
|
|
|
|
|
class GQRXHandler(threading.Thread):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.GQRX_Config_Path = Path(f"{Path.home()}/.config/gqrx/drb_defaults.conf")
|
|
self.GQRXDir: str = GQRX_BIN_LOCATION
|
|
self.GQRXEXE: str = shutil.which(GQRX_BIN)
|
|
self.GQRXProc = None
|
|
self.GQRX_Started = False
|
|
|
|
self.logger = logging.getLogger("Discord_Radio_Bot.GQRXHandler")
|
|
|
|
self.Frequency = None
|
|
|
|
self.Mode = DEFAULT_RADIO_SETTINGS['mode']
|
|
self.Frequency = DEFAULT_RADIO_SETTINGS['freq']
|
|
self.Squelch = DEFAULT_RADIO_SETTINGS['squelch']
|
|
|
|
self.Start_GQRX = False
|
|
self.Stop_GQRX = False
|
|
|
|
self.Output_Device_Name = None
|
|
|
|
self.hostname = "localhost"
|
|
self.port = 7356
|
|
|
|
self.tel_conn = None
|
|
|
|
def run(self) -> None:
|
|
while True:
|
|
if self.Start_GQRX:
|
|
self.open_gqrx()
|
|
|
|
self.Start_GQRX = False
|
|
self.Stop_GQRX = False
|
|
|
|
self.logger.debug("GQRX is open, waiting for it to close")
|
|
|
|
while not self.Stop_GQRX:
|
|
sleep(1)
|
|
|
|
self.logger.debug('Request to close GQRX')
|
|
|
|
self.close_gqrx()
|
|
sleep(.5)
|
|
|
|
def set_gqrx_parameters(self, _frequency: str = False, _start: bool = False, _stop: bool = False,
|
|
_output_device_name: str = None, _fm_mode: str = None, _squelch: float = None,
|
|
_hostname: str = None, _port: int = None, _start_dsp: bool = None):
|
|
if _frequency:
|
|
self.Frequency = _frequency
|
|
if self.GQRX_Started:
|
|
self.change_freq(_frequency)
|
|
|
|
if _output_device_name:
|
|
self.Output_Device_Name = _output_device_name
|
|
|
|
if _fm_mode:
|
|
self.Mode = _fm_mode
|
|
if self.GQRX_Started:
|
|
self.change_mode(_fm_mode)
|
|
|
|
if _squelch:
|
|
self.Squelch = _squelch
|
|
if self.GQRX_Started:
|
|
self.change_squelch(_squelch)
|
|
|
|
if _hostname:
|
|
self.hostname = _hostname
|
|
self.Start_GQRX = True
|
|
|
|
if _port:
|
|
self.port = _port
|
|
self.Start_GQRX = True
|
|
|
|
if _start_dsp:
|
|
self.start_dsp()
|
|
|
|
if _start:
|
|
self.Start_GQRX = _start
|
|
|
|
if _stop:
|
|
self.Stop_GQRX = _stop
|
|
|
|
def open_gqrx(self):
|
|
if self.GQRX_Started:
|
|
self.close_gqrx()
|
|
|
|
gqrx_kwargs = [f"gqrx", "-c", "drb_defaults.conf"]
|
|
|
|
self.logger.info(f"Resetting 'crashed' option in the GQRX config")
|
|
|
|
self.reset_or_create_config()
|
|
|
|
self.logger.info(f"Starting GQRX")
|
|
|
|
self.logger.debug(f"GQRX Keyword Args: {gqrx_kwargs}")
|
|
|
|
self.GQRXProc = subprocess.Popen(gqrx_kwargs, executable=self.GQRXEXE, shell=False)
|
|
|
|
while not self.tel_conn:
|
|
self.create_telnet_connection()
|
|
sleep(2)
|
|
self.logger.debug(f"Waiting for GQRX to start")
|
|
|
|
self.GQRX_Started = True
|
|
|
|
self.start_dsp()
|
|
|
|
self.set_all_settings(_squelch=self.Squelch, _mode=self.Mode, _freq=self.Frequency)
|
|
self.logger.debug('Finished opening GQRX')
|
|
|
|
def close_gqrx(self):
|
|
self.logger.info(f"Closing GQRX")
|
|
try:
|
|
self.GQRXProc.kill()
|
|
|
|
seconds_waited = 0
|
|
while self.GQRXProc.poll() is None:
|
|
# Terminate the process every 5 seconds
|
|
if seconds_waited % 5 == 0:
|
|
self.logger.info("Terminating GQRX")
|
|
self.GQRXProc.terminate()
|
|
sleep(1)
|
|
self.logger.debug(f"Waited {seconds_waited} seconds")
|
|
seconds_waited += 1
|
|
self.logger.debug("GQRX Closed")
|
|
self.GQRX_Started = False
|
|
self.tel_conn = None
|
|
|
|
except Exception as e:
|
|
self.logger.error(e)
|
|
|
|
def create_telnet_connection(self):
|
|
self.logger.debug("Creating connection")
|
|
try:
|
|
self.tel_conn = Telnet(self.hostname, self.port)
|
|
self.tel_conn.open(self.hostname, self.port)
|
|
self.logger.debug(f"GQRX is open")
|
|
return True
|
|
except Exception as err:
|
|
self.logger.warning(err)
|
|
self.tel_conn = None
|
|
return False
|
|
|
|
def check_dsp(self):
|
|
self.logger.debug(f"Checking if DSP is running on GQRX")
|
|
self.tel_conn.write(bytes(f"u DSP", 'utf-8'))
|
|
if self.tel_conn.read_some() == b"1":
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def start_dsp(self):
|
|
if not self.check_dsp():
|
|
self.logger.debug(f"Starting DSP on GQRX")
|
|
self.tel_conn.write(bytes(f"U DSP 1", 'utf-8'))
|
|
self.tel_conn.read_until(b'RPRT 0')
|
|
|
|
def change_freq(self, freq):
|
|
self.logger.debug(f"Changing freq to {freq}")
|
|
self.tel_conn.write(bytes(f"F {int(freq)}", 'utf-8'))
|
|
self.tel_conn.read_until(b'RPRT 0')
|
|
|
|
def change_squelch(self, squelch):
|
|
if not check_negative(squelch):
|
|
squelch = float(-abs(squelch))
|
|
self.logger.debug(f"Changing squelch to {squelch}")
|
|
self.tel_conn.write(bytes(f"L SQL {float(squelch)}", 'utf-8'))
|
|
self.tel_conn.read_until(b'RPRT 0')
|
|
|
|
def change_mode(self, mode):
|
|
self.logger.debug(f"Changing mode to {mode}")
|
|
self.tel_conn.write(bytes(f"M {str(mode)}", 'utf-8'))
|
|
self.tel_conn.read_until(b'RPRT 0')
|
|
|
|
def set_all_settings(self, _mode, _squelch, _freq):
|
|
self.change_squelch(0)
|
|
self.change_mode(_mode)
|
|
self.change_freq(_freq)
|
|
self.change_squelch(_squelch)
|
|
|
|
def creat_config(self):
|
|
from templates.gqrx_config_template import drb_defaults
|
|
config = drb_defaults
|
|
try:
|
|
with open(self.GQRX_Config_Path, 'w+') as config_file:
|
|
config_file.write(config)
|
|
except OSError as err:
|
|
self.logger.error(err)
|
|
|
|
def reset_or_create_config(self):
|
|
if self.GQRX_Config_Path.is_file():
|
|
try:
|
|
self.logger.debug(f"Enabling AGC in the GQRX config")
|
|
enable_agc(_config_path=self.GQRX_Config_Path)
|
|
|
|
self.logger.debug(f"GQRX Config exists, resetting 'crashed' setting")
|
|
reset_crashed(_config_path=self.GQRX_Config_Path)
|
|
except configparser.DuplicateOptionError as err:
|
|
self.logger.warning(err)
|
|
self.creat_config()
|
|
else:
|
|
self.logger.debug(f"GQRX config does not exist, creating it from template")
|
|
self.creat_config()
|