Files
Discord-Radio-Bot/gqrxHandler.py
Logan Cusano 8debd690f1 Better logging
2022-04-01 01:08:54 -04:00

163 lines
4.7 KiB
Python

import shutil
import logging
import threading
import subprocess
import time
from telnetlib import Telnet
from BotResources import *
from time import sleep
class GQRXHandler(threading.Thread):
def __init__(self):
super().__init__()
self.GQRXDir: str = GQRX_BIN_LOCATION
self.GQRXEXE: str = shutil.which(GQRX_BIN)
self.GQRXProc = None
self.logger = logging.getLogger("Discord_Radio_Bot.GQRXHandler")
self.Frequency = None
self.Mode = None
self.Frequency = None
self.Squelch = None
self.Start_GQRX = False
self.Stop_GQRX = False
self.Output_Device_Name = None
self.hostname = "localhost"
self.port = 7356
self.tel_conn = None
def run(self) -> None:
self.open_gqrx()
self.create_telnet_connection()
while True:
if self.Start_GQRX:
self.open_gqrx()
self.Start_GQRX = False
self.Stop_GQRX = False
while not self.Stop_GQRX:
sleep(1)
self.close_gqrx()
sleep(.5)
def set_gqrx_parameters(self, _frequency: str = False, _start: bool = False, _stop: bool = False,
_output_device_name: str = None, _fm_mode: str = None, _squelch: int = None,
_hostname: str = None, _port: int = None):
if _frequency:
self.Frequency = _frequency
if _output_device_name:
self.Output_Device_Name = _output_device_name
if _fm_mode:
self.Mode = _fm_mode
if _squelch:
self.Squelch = _squelch
if _hostname:
self.hostname = _hostname
self.Start_GQRX = True
if _port:
self.port = _port
self.Start_GQRX = True
if _start:
self.Start_GQRX = _start
if _stop:
self.Stop_GQRX = _stop
def open_gqrx(self):
if self.GQRXProc is not None:
self.close_gqrx()
gqrx_kwargs = [f""]
self.logger.info(f"Starting GQRX")
self.logger.debug(f"GQRX Keyword Args: {gqrx_kwargs}")
self.GQRXProc = subprocess.Popen(gqrx_kwargs, executable=self.GQRXEXE, shell=False, cwd=self.GQRXDir)
while not self.tel_conn:
sleep(.5)
self.logger.debug(f"Waiting for GQRX to start")
def close_gqrx(self):
self.logger.info(f"Closing GQRX")
try:
self.GQRXProc.kill()
seconds_waited = 0
while self.GQRXProc.poll() is None:
# Terminate the process every 5 seconds
if seconds_waited % 5 == 0:
self.logger.info("Terminating GQRX")
self.GQRXProc.terminate()
sleep(1)
self.logger.debug(f"Waited {seconds_waited} seconds")
seconds_waited += 1
except Exception as e:
self.logger.error(e)
def create_telnet_connection(self):
self.logger.info("Creating connection")
self.tel_conn = Telnet(self.hostname, self.port)
try:
self.tel_conn.open(self.hostname, self.port)
self.logger.debug(f"GQRX is open")
return True
except ConnectionRefusedError as err:
self.logger.warning(err)
self.tel_conn = None
return False
def change_freq(self, freq):
self.logger.debug(f"Changing freq to {freq}")
self.tel_conn.write(bytes(f"F {int(freq)}", 'utf-8'))
self.tel_conn.read_until(b'RPRT 0')
def check_dsp(self):
self.logger.debug(f"Checking if DSP is running on GQRX")
self.tel_conn.write(bytes(f"u DSP", 'utf-8'))
if self.tel_conn.read_some() == b"1":
return True
else:
return False
def start_dsp(self):
self.logger.debug(f"Starting DSP on GQRX")
self.tel_conn.write(bytes(f"U DSP 1", 'utf-8'))
self.tel_conn.read_until(b'RPRT 0')
def change_squelch(self, squelch):
if not check_negative(squelch):
squelch = float(-abs(squelch))
self.logger.debug(f"Changing squelch to {squelch}")
self.tel_conn.write(bytes(f"L SQL {float(squelch)}", 'utf-8'))
self.tel_conn.read_until(b'RPRT 0')
def change_mode(self, mode):
self.logger.debug(f"Changing mode to {mode}")
self.tel_conn.write(bytes(f"M {str(mode)}", 'utf-8'))
self.tel_conn.read_until(b'RPRT 0')
def set_all_settings(self, mode, squelch, freq):
self.change_squelch(0)
self.change_mode(mode)
self.change_freq(freq)
self.change_squelch(squelch)