import os import typing import asyncio from uuid import uuid4 from typing import Optional, List, Dict, Any from enum import Enum from internal.db_handler import MongoHandler # Init vars DB_NAME = os.getenv("DB_NAME", "default_db") MONGO_URL = os.getenv("MONGO_URL", "mongodb://10.10.202.4:27017/") SYSTEM_DB_COLLECTION_NAME = "radio_systems" # --- Types class DemodTypes(str, Enum): P25 = "P25" DMR = "DMR" ANALOG = "NBFM" class TalkgroupTag: """Represents a talkgroup tag.""" def __init__(self, talkgroup: str, tagDec: int): self.talkgroup = talkgroup self.tagDec = tagDec # Add a method to convert to a dictionary, useful for sending as JSON def to_dict(self) -> Dict[str, Any]: return {"talkgroup": self.talkgroup, "tagDec": self.tagDec} class System: """ A basic data model for a channel/system entry in a radio system. """ def __init__(self, _id: str, _type: DemodTypes, name: str, frequency_khz: List[int], location: str, avail_on_nodes: List[str], description: Optional[str] = "", tags: Optional[List[TalkgroupTag]] = None, whitelist: Optional[List[int]] = None): """ Initializes a System object. Args: _id: A unique identifier for the entry (e.g., MongoDB ObjectId string). _type: The demodulation type (P25, NBFM, etc.). name: The name of the channel/system. frequency_khz: The frequency in kilohertz. location: The geographical location or coverage area. avail_on_nodes: A list of node identifiers where this is available. description: A brief description. """ self._id: str = _id self.type: DemodTypes = _type self.name: str = name self.frequency_khz: List[int] = frequency_khz self.location: str = location self.avail_on_nodes: List[str] = avail_on_nodes self.description: str = description or "" self.tags: List[TalkgroupTag] = tags or None self.whitelist: List[int] = whitelist or None def __repr__(self) -> str: """ Provides a developer-friendly string representation of the object. """ # Use self.type.value for string representation of the enum return (f"System(_id='{self._id}', type='{self.type.value}', name='{self.name}', " f"frequency_khz={self.frequency_khz}, location='{self.location}', " f"avail_on_nodes={self.avail_on_nodes}, description='{self.description}'," f" tags='{self.tags}', whitelist='{self.whitelist}')") def to_dict(self) -> Dict[str, Any]: """ Converts the System object to a dictionary suitable for MongoDB. Converts the DemodTypes enum to its string value. """ return { "_id": self._id, "type": self.type.value, # Store the enum value (string) "name": self.name, "frequency_khz": self.frequency_khz, "location": self.location, "avail_on_nodes": self.avail_on_nodes, "description": self.description, "tags": self.tags, "whitelist": self.whitelist, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "System": """ Creates a System object from a dictionary (e.g., from MongoDB). Converts the 'type' string back to a DemodTypes enum member. """ # Ensure required keys exist and handle potential type mismatches if necessary # Convert the type string back to the DemodTypes enum system_type = DemodTypes(data.get("type")) if data.get("type") else None # Handle missing or invalid type if system_type is None: # Handle error: could raise an exception or return None/default # For this example, let's raise an error if type is missing/invalid raise ValueError(f"Invalid or missing 'type' in document data: {data}") return cls( _id=data.get("_id"), _type=system_type, name=data.get("name", ""), # Provide default empty string if name is missing frequency_khz=data.get("frequency_khz", 0), # Provide default 0 if missing location=data.get("location", ""), avail_on_nodes=data.get("avail_on_nodes", []), # Provide default empty list description=data.get("description", ""), tags=data.get("tags", None), whitelist=data.get("whitelist", None) ) # --- System class --- class SystemDbController(): def __init__(self): # Init the handler self.db_h = MongoHandler(DB_NAME, SYSTEM_DB_COLLECTION_NAME, MONGO_URL) async def create_system(self, system_data: Dict[str, Any]) -> Optional[System]: """ Creates a new system entry in the database. Args: system_data: A dictionary containing the data for the new system. Returns: The created System object if successful, None otherwise. """ print("\n--- Creating a document ---") try: # Check if the data to be inserted has an ID if not system_data.get("_id"): system_data['_id'] = uuid4() inserted_result = None inserted_id = None async with self.db_h as db: insert_result = await self.db_h.insert_one(system_data) inserted_id = insert_result.inserted_id if inserted_id: print(f"Insert successful with ID: {inserted_id}") # Fetch the inserted document to get the complete data including the generated _id query = {"_id": inserted_id} inserted_doc = None async with self.db_h as db: inserted_doc = await db.find_one(query) if inserted_doc: # Convert the fetched dictionary back to a System object return System.from_dict(inserted_doc) else: print("Insert acknowledged but no ID returned.") return None except Exception as e: print(f"Create failed: {e}") return None async def find_system(self, query: Dict[str, Any]) -> Optional[System]: """ Finds a single system entry in the database. Args: query: A dictionary representing the query criteria. Returns: A System object if found, None otherwise. """ print("\n--- Finding one document ---") try: found_doc = None async with self.db_h as db: found_doc = await db.find_one(query) if found_doc: print("Found document (raw dict):", found_doc) # Convert the dictionary result to a System object return System.from_dict(found_doc) else: print("Document not found.") return None except Exception as e: print(f"Find failed: {e}") return None async def find_systems(self, query: Dict[str, Any]) -> Optional[List[System]]: """ Finds one or more system entries in the database. Args: query: A dictionary representing the query criteria. Returns: A list of System object(s) if found, None otherwise. """ print("\n--- Finding documents ---") try: found_docs = None async with self.db_h as db: found_docs = await db.find(query) if found_docs: print("Found document (raw dict):", found_docs) # Convert the dictionary results to a System object converted_systems = [] for doc in found_docs: converted_systems.append(System.from_dict(doc)) print("YURB", found_docs, converted_systems) return converted_systems if len(converted_systems) > 0 else None else: print("Document not found.") return None except Exception as e: print(f"Find failed: {e}") return None async def find_all_systems(self, query: Dict[str, Any] = {}) -> List[System]: """ Finds multiple system entries in the database. Args: query: A dictionary representing the query criteria (default is empty to find all). Returns: A list of System objects. """ print("\n--- Finding multiple documents ---") try: found_docs = None async with self.db_h as db: found_docs = await db.find(query) if found_docs: print(f"Found {len(found_docs)} documents (raw dicts).") # Convert the list of dictionaries to a list of System objects return [System.from_dict(doc) for doc in found_docs] else: print("No documents found.") return [] except Exception as e: print(f"Find all failed: {e}") return [] async def update_system(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]: """ Updates a single system entry in the database. Args: query: A dictionary representing the query criteria to find the document. update_data: A dictionary representing the update operations (e.g., using $set). Returns: The number of modified documents if successful, None otherwise. """ print("\n--- Updating a document ---") try: update_result = None async with self.db_h as db: update_result = await db.update_one(query, update_data) print(f"Update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}") return update_result.modified_count except Exception as e: print(f"Update failed: {e}") return None async def delete_system(self, query: Dict[str, Any]) -> Optional[int]: """ Deletes a single system entry from the database. Args: query: A dictionary representing the query criteria to find the document to delete. Returns: The number of deleted documents if successful, None otherwise. """ print("\n--- Deleting a document ---") try: delete_result = None async with self.db_h as db: delete_result = await self.db_h.delete_one(query) print(f"Delete result: Deleted count {delete_result.deleted_count}") return delete_result.deleted_count except Exception as e: print(f"Delete failed: {e}") return None