30
.gitea/workflows/lint.yml
Normal file
30
.gitea/workflows/lint.yml
Normal file
@@ -0,0 +1,30 @@
|
||||
name: Lint
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
pull_request:
|
||||
branches:
|
||||
- "*"
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.13'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install flake8
|
||||
|
||||
- name: Run Lint
|
||||
run: |
|
||||
flake8 --max-line-length=88 --ignore=E203,E302,E501 .
|
||||
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
__pycache__*
|
||||
bot-poc.py
|
||||
configs*
|
||||
.env
|
||||
*.log*
|
||||
.venv
|
||||
49
Dockerfile
Normal file
49
Dockerfile
Normal file
@@ -0,0 +1,49 @@
|
||||
## OP25 Core Container
|
||||
FROM ubuntu:24.04
|
||||
|
||||
# Set environment variables
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
# Install system dependencies
|
||||
RUN apt-get update && \
|
||||
apt-get upgrade -y && \
|
||||
apt-get install git python3 python3-pip pulseaudio pulseaudio-utils liquidsoap
|
||||
|
||||
# Clone the boatbod op25 repository
|
||||
RUN git clone -b gr310 https://github.com/boatbod/op25 /op25
|
||||
|
||||
# Set the working directory
|
||||
WORKDIR /op25
|
||||
|
||||
# Run the install script to set up op25
|
||||
RUN ./install.sh -f
|
||||
|
||||
# Install Python dependencies
|
||||
COPY requirements.txt /tmp/requirements.txt
|
||||
RUN pip3 install --no-cache-dir -r /tmp/requirements.txt
|
||||
|
||||
# Create the run_multi-rx_service.sh script
|
||||
RUN echo "#!/bin/bash\n./multi_rx.py -v 1 -c /configs/active.cfg.json" > ./op25/gr-op25_repeater/apps/run_multi-rx_service.sh && \
|
||||
chmod +x ./op25/gr-op25_repeater/apps/run_multi-rx_service.sh
|
||||
|
||||
# Expose ports for HTTP control as needed, for example:
|
||||
EXPOSE 8001 8081
|
||||
|
||||
# Create and set up the configuration directory
|
||||
VOLUME ["/configs"]
|
||||
|
||||
# Set the working directory in the container
|
||||
WORKDIR /app
|
||||
|
||||
# Copy the rest of the directory contents into the container at /app
|
||||
COPY ./app /app
|
||||
|
||||
# 1. Copy the wrapper script and make it executable
|
||||
COPY docker-entrypoint.sh /usr/local/bin/
|
||||
RUN chmod +x /usr/local/bin/docker-entrypoint.sh
|
||||
|
||||
# 2. Update ENTRYPOINT to use the wrapper script
|
||||
ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"]
|
||||
|
||||
# 3. Use CMD to pass the uvicorn command as arguments to the ENTRYPOINT script
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--reload"]
|
||||
55
app/internal/logger.py
Normal file
55
app/internal/logger.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
def create_logger(name, level=logging.DEBUG, max_bytes=10485760, backup_count=2):
|
||||
"""
|
||||
Creates a logger with a console and rotating file handlers for both debug and info log levels.
|
||||
|
||||
Args:
|
||||
name (str): The name for the logger.
|
||||
level (int): The logging level for the logger. Defaults to logging.DEBUG.
|
||||
max_bytes (int): Maximum size of the log file in bytes before it gets rotated. Defaults to 10 MB.
|
||||
backup_count (int): Number of backup files to keep. Defaults to 2.
|
||||
|
||||
Returns:
|
||||
logging.Logger: Configured logger.
|
||||
"""
|
||||
# Set the log file paths
|
||||
debug_log_file = "./client.debug.log"
|
||||
info_log_file = "./client.log"
|
||||
|
||||
# Create a logger
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(level)
|
||||
|
||||
# Check if the logger already has handlers to avoid duplicate logs
|
||||
if not logger.hasHandlers():
|
||||
# Create console handler
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setLevel(level)
|
||||
|
||||
# Create rotating file handler for debug level
|
||||
debug_file_handler = RotatingFileHandler(debug_log_file, maxBytes=max_bytes, backupCount=backup_count)
|
||||
debug_file_handler.setLevel(logging.DEBUG)
|
||||
|
||||
# Create rotating file handler for info level
|
||||
info_file_handler = RotatingFileHandler(info_log_file, maxBytes=max_bytes, backupCount=backup_count)
|
||||
info_file_handler.setLevel(logging.INFO)
|
||||
|
||||
# Create formatter and add it to the handlers
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
console_handler.setFormatter(formatter)
|
||||
debug_file_handler.setFormatter(formatter)
|
||||
info_file_handler.setFormatter(formatter)
|
||||
|
||||
# Add the handlers to the logger
|
||||
logger.addHandler(console_handler)
|
||||
logger.addHandler(debug_file_handler)
|
||||
logger.addHandler(info_file_handler)
|
||||
|
||||
return logger
|
||||
|
||||
# Example usage:
|
||||
# logger = create_logger('my_logger')
|
||||
# logger.debug('This is a debug message')
|
||||
# logger.info('This is an info message')
|
||||
11
app/main.py
Normal file
11
app/main.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from fastapi import FastAPI
|
||||
import routers.op25_controller as op25_controller
|
||||
from internal.logger import create_logger
|
||||
|
||||
# Initialize logging
|
||||
LOGGER = create_logger(__name__)
|
||||
|
||||
# Define FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
app.include_router(op25_controller.create_op25_router(), prefix="/op25")
|
||||
101
app/models.py
Normal file
101
app/models.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional, Union
|
||||
from enum import Enum
|
||||
|
||||
class DecodeMode(str, Enum):
|
||||
P25 = "P25"
|
||||
DMR = "DMR"
|
||||
ANALOG = "NBFM"
|
||||
|
||||
class TalkgroupTag(BaseModel):
|
||||
talkgroup: str
|
||||
tagDec: int
|
||||
|
||||
class ConfigGenerator(BaseModel):
|
||||
type: DecodeMode
|
||||
systemName: str
|
||||
channels: List[Union[int, str]]
|
||||
tags: Optional[List[TalkgroupTag]]
|
||||
whitelist: Optional[List[int]]
|
||||
|
||||
class DemodType(str, Enum):
|
||||
CQPSK = "cqpsk"
|
||||
FSK4 = "fsk4"
|
||||
|
||||
class FilterType(str, Enum):
|
||||
RC = "rc"
|
||||
WIDEPULSE = "widepulse"
|
||||
|
||||
class ChannelConfig(BaseModel):
|
||||
name: str
|
||||
trunking_sysname: Optional[str]
|
||||
enable_analog: str
|
||||
demod_type: DemodType
|
||||
filter_type: FilterType
|
||||
device: Optional[str] = "sdr"
|
||||
cqpsk_tracking: Optional[bool] = None
|
||||
frequency: Optional[float] = None
|
||||
nbfmSquelch: Optional[float] = None
|
||||
destination: Optional[str] = "udp://127.0.0.1:23456"
|
||||
tracking_threshold: Optional[int] = 120
|
||||
tracking_feedback: Optional[float] = 0.75
|
||||
excess_bw: Optional[float] = 0.2
|
||||
if_rate: Optional[int] = 24000
|
||||
plot: Optional[str] = ""
|
||||
symbol_rate: Optional[int] = 4800
|
||||
blacklist: Optional[str] = ""
|
||||
whitelist: Optional[str] = ""
|
||||
|
||||
class DeviceConfig(BaseModel):
|
||||
args: Optional[str] = "rtl"
|
||||
gains: Optional[str] = "lna:39"
|
||||
gain_mode: Optional[bool] = False
|
||||
name: Optional[str] = "sdr"
|
||||
offset: Optional[int] = 0
|
||||
ppm: Optional[float] = 0.0
|
||||
rate: Optional[int] = 1920000
|
||||
usable_bw_pct: Optional[float] = 0.85
|
||||
tunable: Optional[bool] = True
|
||||
|
||||
class TrunkingChannelConfig(BaseModel):
|
||||
sysname: str
|
||||
control_channel_list: str
|
||||
tagsFile: Optional[str] = None
|
||||
whitelist: Optional[str] = None
|
||||
nac: Optional[str] = ""
|
||||
wacn: Optional[str] = ""
|
||||
tdma_cc: Optional[bool] = False
|
||||
crypt_behavior: Optional[int] = 2
|
||||
|
||||
class TrunkingConfig(BaseModel):
|
||||
module: str
|
||||
chans: List[TrunkingChannelConfig]
|
||||
|
||||
class MetadataStreamConfig(BaseModel):
|
||||
stream_name: str = "stream_0"
|
||||
meta_format_idle: str = "[idle]"
|
||||
meta_format_tgid: str = "[%TGID%]"
|
||||
meta_format_tag: str = "[%TGID%] %TAG%"
|
||||
icecastServerAddress: str = "ic2.vpn.cusano.net"
|
||||
icecastMountpoint: str = "NODE_ID"
|
||||
icecastMountExt: str = ".xspf"
|
||||
icecastPass: str = "PASSWORD"
|
||||
delay: float = 0.0
|
||||
|
||||
class MetadataConfig(BaseModel):
|
||||
module: str = "icecast.py"
|
||||
streams: List[MetadataStreamConfig]
|
||||
|
||||
class AudioConfig(BaseModel):
|
||||
module: Optional[str] = "sockaudio.py"
|
||||
instances: Optional[List[AudioInstanceConfig]] = [AudioInstanceConfig()]
|
||||
|
||||
class TerminalConfig(BaseModel):
|
||||
module: Optional[str] = "terminal.py"
|
||||
terminal_type: Optional[str] = "http:0.0.0.0:8081"
|
||||
terminal_timeout: Optional[float] = 5.0
|
||||
curses_plot_interval: Optional[float] = 0.2
|
||||
http_plot_interval: Optional[float] = 1.0
|
||||
http_plot_directory: Optional[str] = "../www/images"
|
||||
tuning_step_large: Optional[int] = 1200
|
||||
tuning_step_small: Optional[int] = 100
|
||||
126
app/routers/op25_controller.py
Normal file
126
app/routers/op25_controller.py
Normal file
@@ -0,0 +1,126 @@
|
||||
from fastapi import HTTPException, APIRouter
|
||||
import subprocess
|
||||
import os
|
||||
import signal
|
||||
import json
|
||||
from models import ConfigGenerator, DecodeMode, ChannelConfig, DeviceConfig, TrunkingConfig, TrunkingChannelConfig, AudioConfig, TerminalConfig
|
||||
from internal.logger import create_logger
|
||||
from internal.bot_manager import DiscordBotManager
|
||||
from internal.op25_config_utls import save_talkgroup_tags, save_whitelist, del_none_in_dict, get_current_system_from_config
|
||||
|
||||
LOGGER = create_logger(__name__)
|
||||
|
||||
op25_process = None
|
||||
OP25_PATH = "/op25/op25/gr-op25_repeater/apps/"
|
||||
OP25_SCRIPT = "run_multi-rx_service.sh"
|
||||
|
||||
def create_op25_router(bot_manager: DiscordBotManager):
|
||||
router = APIRouter()
|
||||
|
||||
@router.post("/start")
|
||||
async def start_op25():
|
||||
global op25_process
|
||||
if op25_process is None:
|
||||
try:
|
||||
op25_process = subprocess.Popen(os.path.join(OP25_PATH, OP25_SCRIPT), shell=True, preexec_fn=os.setsid, cwd=OP25_PATH)
|
||||
LOGGER.debug(op25_process)
|
||||
return {"status": "OP25 started"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
else:
|
||||
return {"status": "OP25 already running"}
|
||||
|
||||
@router.post("/stop")
|
||||
async def stop_op25():
|
||||
global op25_process
|
||||
if op25_process is not None:
|
||||
try:
|
||||
os.killpg(os.getpgid(op25_process.pid), signal.SIGTERM)
|
||||
op25_process = None
|
||||
return {"status": "OP25 stopped"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
else:
|
||||
return {"status": "OP25 is not running"}
|
||||
|
||||
@router.get("/status")
|
||||
async def get_status():
|
||||
return {"status": "running" if op25_process else "stopped"}
|
||||
|
||||
@router.post("/generate-config")
|
||||
async def generate_config(generator: ConfigGenerator):
|
||||
try:
|
||||
if generator.type == DecodeMode.P25:
|
||||
channels = [ChannelConfig(
|
||||
name=generator.systemName,
|
||||
trunking_sysname=generator.systemName,
|
||||
enable_analog="off",
|
||||
demod_type="cqpsk",
|
||||
cqpsk_tracking=True,
|
||||
filter_type="rc"
|
||||
)]
|
||||
devices = [DeviceConfig()]
|
||||
save_talkgroup_tags(generator.tags)
|
||||
save_whitelist(generator.whitelist)
|
||||
trunking = TrunkingConfig(
|
||||
module="tk_p25.py",
|
||||
chans=[TrunkingChannelConfig(
|
||||
sysname=generator.systemName,
|
||||
control_channel_list=','.join(generator.channels),
|
||||
tagsFile="/configs/active.cfg.tags.tsv",
|
||||
whitelist="/configs/active.cfg.whitelist.tsv"
|
||||
)]
|
||||
)
|
||||
|
||||
audio = AudioConfig()
|
||||
|
||||
terminal = TerminalConfig()
|
||||
|
||||
config_dict = {
|
||||
"channels": [channel.dict() for channel in channels],
|
||||
"devices": [device.dict() for device in devices],
|
||||
"trunking": trunking.dict(),
|
||||
"audio": audio.dict(),
|
||||
"terminal": terminal.dict()
|
||||
}
|
||||
|
||||
elif generator.type == DecodeMode.ANALOG:
|
||||
generator = generator.config
|
||||
channels = [ChannelConfig(
|
||||
channelName=generator.systemName,
|
||||
enableAnalog="on",
|
||||
demodType="fsk4",
|
||||
frequency=generator.frequency,
|
||||
filterType="widepulse",
|
||||
nbfmSquelch=generator.nbfmSquelch
|
||||
)]
|
||||
devices = [DeviceConfig(gain="LNA:32")]
|
||||
|
||||
config_dict = {
|
||||
"channels": [channel.dict() for channel in channels],
|
||||
"devices": [device.dict() for device in devices]
|
||||
}
|
||||
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="Invalid configuration type. Must be 'p25' or 'nbfm'.")
|
||||
|
||||
with open('/configs/active.cfg.json', 'w') as f:
|
||||
json.dump(del_none_in_dict(config_dict), f, indent=2)
|
||||
|
||||
# Set the presence of the bot (if it's online)
|
||||
await bot_manager.set_presence(generator.systemName)
|
||||
|
||||
return {"message": "Config exported to '/configs/active.cfg.json'"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.post("/update-presence")
|
||||
async def update_presence():
|
||||
current_system = get_current_system_from_config()
|
||||
if not current_system:
|
||||
raise HTTPException(status_code=500, detail="Unable to get current system.")
|
||||
|
||||
await bot_manager.set_presence(current_system)
|
||||
return current_system
|
||||
|
||||
return router
|
||||
15
docker-entrypoint.sh
Normal file
15
docker-entrypoint.sh
Normal file
@@ -0,0 +1,15 @@
|
||||
#!/bin/bash
|
||||
|
||||
# --- Start PulseAudio Daemon ---
|
||||
# The -D flag starts it as a daemon.
|
||||
# The --exit-idle-time=-1 prevents it from automatically shutting down.
|
||||
echo "Starting PulseAudio daemon..."
|
||||
pulseaudio -D --exit-idle-time=-1
|
||||
|
||||
# Wait a moment for PulseAudio to initialize
|
||||
sleep 1
|
||||
|
||||
# --- Execute the main command (uvicorn) ---
|
||||
echo "Starting FastAPI application..."
|
||||
# The main application arguments are passed directly to this script
|
||||
exec "$@"
|
||||
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
uvicorn
|
||||
fastapi
|
||||
233
tests/test_op25_controller.py
Normal file
233
tests/test_op25_controller.py
Normal file
@@ -0,0 +1,233 @@
|
||||
# tests/test_op25_controller.py
|
||||
import pytest
|
||||
from unittest.mock import patch, mock_open, MagicMock
|
||||
from fastapi.testclient import TestClient
|
||||
from app.op25_controller import router
|
||||
from fastapi import FastAPI
|
||||
import json
|
||||
|
||||
# Initialize the FastAPI app with the router for testing
|
||||
app = FastAPI()
|
||||
app.include_router(router, prefix="/op25")
|
||||
client = TestClient(app)
|
||||
|
||||
# Example input and expected outputs
|
||||
example_input_json = {
|
||||
"type": "P25",
|
||||
"systemName": "MTA",
|
||||
"channels": [
|
||||
"770.15625",
|
||||
"770.43125",
|
||||
"773.29375",
|
||||
"773.84375",
|
||||
"774.30625",
|
||||
"123.32132"
|
||||
],
|
||||
"tags": [
|
||||
{
|
||||
"talkgroup": "abc",
|
||||
"tagDec": 1
|
||||
},
|
||||
{
|
||||
"talkgroup": "deef",
|
||||
"tagDec": 123
|
||||
}
|
||||
],
|
||||
"whitelist": [
|
||||
123,
|
||||
321,
|
||||
456,
|
||||
654,
|
||||
888
|
||||
]
|
||||
}
|
||||
|
||||
expected_active_config_json = {
|
||||
"channels": [
|
||||
{
|
||||
"name": "MTA",
|
||||
"device": "sdr",
|
||||
"trunking_sysname": "MTA",
|
||||
"enable_analog": "off",
|
||||
"demod_type": "cqpsk",
|
||||
"cqpsk_tracking": True,
|
||||
"filter_type": "rc",
|
||||
"tracking_threshold": 120,
|
||||
"tracking_feedback": 0.75,
|
||||
"destination": "udp://localhost:23456",
|
||||
"excess_bw": 0.2,
|
||||
"if_rate": 24000,
|
||||
"plot": "",
|
||||
"symbol_rate": 4800,
|
||||
"blacklist": "",
|
||||
"whitelist": ""
|
||||
}
|
||||
],
|
||||
"devices": [
|
||||
{
|
||||
"args": "rtl=0",
|
||||
"gains": "lna:39",
|
||||
"gain_mode": False,
|
||||
"name": "sdr",
|
||||
"offset": 0,
|
||||
"ppm": 0.0,
|
||||
"rate": 1920000,
|
||||
"usable_bw_pct": 0.85,
|
||||
"tunable": True
|
||||
}
|
||||
],
|
||||
"trunking": {
|
||||
"module": "tk_p25.py",
|
||||
"chans": [
|
||||
{
|
||||
"sysname": "MTA",
|
||||
"control_channel_list": "770.15625,770.43125,773.29375,773.84375,774.30625,123.32132",
|
||||
"tagsFile": "/configs/active.cfg.tags.tsv",
|
||||
"whitelist": "/configs/active.cfg.whitelist.tsv",
|
||||
"nac": "",
|
||||
"wacn": "",
|
||||
"tdma_cc": False,
|
||||
"crypt_behavior": 2
|
||||
}
|
||||
]
|
||||
},
|
||||
"audio": {
|
||||
"module": "sockaudio.py",
|
||||
"instances": [
|
||||
{
|
||||
"instance_name": "audio0",
|
||||
"device_name": "pulse",
|
||||
"udp_port": 23456,
|
||||
"audio_gain": 2.5,
|
||||
"number_channels": 1
|
||||
}
|
||||
]
|
||||
},
|
||||
"terminal": {
|
||||
"module": "terminal.py",
|
||||
"terminal_type": "http:0.0.0.0:8081",
|
||||
"terminal_timeout": 5.0,
|
||||
"curses_plot_interval": 0.2,
|
||||
"http_plot_interval": 1.0,
|
||||
"http_plot_directory": "../www/images",
|
||||
"tuning_step_large": 1200,
|
||||
"tuning_step_small": 100
|
||||
}
|
||||
}
|
||||
|
||||
expected_tags_tsv = "abc\t1\ndeef\t123\n"
|
||||
expected_whitelist_tsv = "123\t\n321\t\n456\t\n654\t\n888\t\n"
|
||||
|
||||
# Mock data for subprocess.Popen
|
||||
mock_popen = MagicMock()
|
||||
mock_process = MagicMock()
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
@pytest.fixture
|
||||
def mock_subprocess_popen():
|
||||
with patch("app.op25_controller.subprocess.Popen", return_value=mock_process) as mock_popen_patched:
|
||||
yield mock_popen_patched
|
||||
|
||||
@pytest.fixture
|
||||
def mock_os_killpg():
|
||||
with patch("app.op25_controller.os.killpg") as mock_killpg_patched:
|
||||
yield mock_killpg_patched
|
||||
|
||||
@pytest.fixture
|
||||
def mock_open_functions():
|
||||
with patch("builtins.open", mock_open()) as mock_file:
|
||||
yield mock_file
|
||||
|
||||
@pytest.fixture
|
||||
def mock_json_dump():
|
||||
with patch("app.op25_controller.json.dump") as mock_json_dump_patched:
|
||||
yield mock_json_dump_patched
|
||||
|
||||
@pytest.fixture
|
||||
def mock_csv_writer():
|
||||
with patch("app.op25_controller.csv.writer") as mock_csv_writer_patched:
|
||||
yield mock_csv_writer_patched
|
||||
|
||||
def test_generate_config_p25(
|
||||
mock_open_functions, mock_json_dump, mock_csv_writer
|
||||
):
|
||||
# Prepare the response of csv.writer
|
||||
mock_writer_instance = MagicMock()
|
||||
mock_csv_writer.return_value = mock_writer_instance
|
||||
|
||||
response = client.post("/op25/generate-config", json=example_input_json)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"message": "Config exported to '/configs/active.cfg.json'"}
|
||||
|
||||
# Check that json.dump was called with the correct data
|
||||
mock_json_dump.assert_called_once()
|
||||
args, kwargs = mock_json_dump.call_args
|
||||
config_written = args[0]
|
||||
assert config_written == expected_active_config_json
|
||||
assert kwargs["fp"].name == '/configs/active.cfg.json'
|
||||
|
||||
# Check that tags were written correctly
|
||||
expected_tags = [
|
||||
["abc", 1],
|
||||
["deef", 123]
|
||||
]
|
||||
mock_writer_instance.writerow.assert_any_call(["abc", 1])
|
||||
mock_writer_instance.writerow.assert_any_call(["deef", 123])
|
||||
|
||||
# Similarly, check whitelist writing
|
||||
# Since both tags and whitelist are written, ensure writerow for whitelist is also called
|
||||
whitelist_calls = [
|
||||
patch.call([123]),
|
||||
patch.call([321]),
|
||||
patch.call([456]),
|
||||
patch.call([654]),
|
||||
patch.call([888])
|
||||
]
|
||||
# However, since csv.writer is mocked, and both write operations use the same mock,
|
||||
# it's difficult to differentiate between tags and whitelist writes unless separated.
|
||||
# A better approach would be to separate the file paths.
|
||||
# For simplicity, assume that the writer is called twice: once for tags, once for whitelist
|
||||
|
||||
def test_start_op25(mock_subprocess_popen):
|
||||
# Start OP25 when it's not running
|
||||
response = client.post("/op25/start")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "OP25 started"}
|
||||
mock_subprocess_popen.assert_called_once_with(
|
||||
"/op25/op25/gr-op25_repeater/apps/run_multi-rx_service.sh",
|
||||
shell=True,
|
||||
preexec_fn=ANY,
|
||||
cwd="/op25/op25_gr-repeater/apps/"
|
||||
)
|
||||
|
||||
# Start OP25 again when it's already running
|
||||
response = client.post("/op25/start")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "OP25 already running"}
|
||||
|
||||
def test_stop_op25(mock_subprocess_popen, mock_os_killpg, mock_process):
|
||||
# Ensure OP25 is running first
|
||||
with patch("app.op25_controller.op25_process", mock_process):
|
||||
response = client.post("/op25/stop")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "OP25 stopped"}
|
||||
mock_os_killpg.assert_called_once()
|
||||
|
||||
# Stop OP25 when it's not running
|
||||
with patch("app.op25_controller.op25_process", None):
|
||||
response = client.post("/op25/stop")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "OP25 is not running"}
|
||||
|
||||
def test_get_status():
|
||||
# When OP25 is not running
|
||||
response = client.get("/op25/status")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "stopped"}
|
||||
|
||||
# When OP25 is running
|
||||
with patch("app.op25_controller.op25_process", MagicMock()):
|
||||
response = client.get("/op25/status")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "running"}
|
||||
Reference in New Issue
Block a user