import os import asyncio from uuid import uuid4 from typing import Optional, List, Dict, Any from enum import Enum from internal.db_handler import MongoHandler from internal.types import System, DiscordId # Init vars DB_NAME = os.getenv("DB_NAME", "default_db") MONGO_URL = os.getenv("MONGO_URL", "mongodb://10.10.202.4:27017/") SYSTEM_DB_COLLECTION_NAME = "radio_systems" DISCORD_ID_DB_COLLECTION_NAME = "discord_bot_ids" # --- System class --- class SystemDbController(): def __init__(self): # Init the handler self.db_h = MongoHandler(DB_NAME, SYSTEM_DB_COLLECTION_NAME, MONGO_URL) async def create_system(self, system_data: Dict[str, Any]) -> Optional[System]: """ Creates a new system entry in the database. Args: system_data: A dictionary containing the data for the new system. Returns: The created System object if successful, None otherwise. """ print("\n--- Creating a document ---") try: # Check if the data to be inserted has an ID if not system_data.get("_id"): system_data['_id'] = str(uuid4()) inserted_result = None inserted_id = None async with self.db_h as db: insert_result = await self.db_h.insert_one(system_data) inserted_id = insert_result.inserted_id if inserted_id: print(f"Insert successful with ID: {inserted_id}") # Fetch the inserted document to get the complete data including the generated _id query = {"_id": inserted_id} inserted_doc = None async with self.db_h as db: inserted_doc = await db.find_one(query) if inserted_doc: # Convert the fetched dictionary back to a System object return System.from_dict(inserted_doc) else: print("Insert acknowledged but no ID returned.") return None except Exception as e: print(f"Create failed: {e}") return None async def find_system(self, query: Dict[str, Any]) -> Optional[System]: """ Finds a single system entry in the database. Args: query: A dictionary representing the query criteria. Returns: A System object if found, None otherwise. """ print("\n--- Finding one document ---") try: found_doc = None async with self.db_h as db: found_doc = await db.find_one(query) if found_doc: print("Found document (raw dict):", found_doc) # Convert the dictionary result to a System object return System.from_dict(found_doc) else: print("Document not found.") return None except Exception as e: print(f"Find failed: {e}") return None async def find_systems(self, query: Dict[str, Any]) -> Optional[List[System]]: """ Finds one or more system entries in the database. Args: query: A dictionary representing the query criteria. Returns: A list of System object(s) if found, None otherwise. """ print("\n--- Finding documents ---") try: found_docs = None async with self.db_h as db: found_docs = await db.find(query) if found_docs: print("Found document (raw dict):", found_docs) # Convert the dictionary results to a System object converted_systems = [] for doc in found_docs: converted_systems.append(System.from_dict(doc)) print("YURB", found_docs, converted_systems) return converted_systems if len(converted_systems) > 0 else None else: print("Document not found.") return None except Exception as e: print(f"Find failed: {e}") return None async def find_all_systems(self, query: Dict[str, Any] = {}) -> List[System]: """ Finds multiple system entries in the database. Args: query: A dictionary representing the query criteria (default is empty to find all). Returns: A list of System objects. """ print("\n--- Finding multiple documents ---") try: found_docs = None async with self.db_h as db: found_docs = await db.find(query) if found_docs: print(f"Found {len(found_docs)} documents (raw dicts).") # Convert the list of dictionaries to a list of System objects return [System.from_dict(doc) for doc in found_docs] else: print("No documents found.") return [] except Exception as e: print(f"Find all failed: {e}") return [] async def update_system(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]: """ Updates a single system entry in the database. Args: query: A dictionary representing the query criteria to find the document. update_data: A dictionary representing the update operations (e.g., using $set). Returns: The number of modified documents if successful, None otherwise. """ print("\n--- Updating a document ---") try: update_result = None async with self.db_h as db: update_result = await db.update_one(query, update_data) print(f"Update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}") return update_result.modified_count except Exception as e: print(f"Update failed: {e}") return None async def delete_system(self, query: Dict[str, Any]) -> Optional[int]: """ Deletes a single system entry from the database. Args: query: A dictionary representing the query criteria to find the document to delete. Returns: The number of deleted documents if successful, None otherwise. """ print("\n--- Deleting a document ---") try: delete_result = None async with self.db_h as db: delete_result = await self.db_h.delete_one(query) print(f"Delete result: Deleted count {delete_result.deleted_count}") return delete_result.deleted_count except Exception as e: print(f"Delete failed: {e}") return None # --- DiscordIdDbController class --- class DiscordIdDbController(): def __init__(self): # Init the handler for Discord IDs self.db_h = MongoHandler(DB_NAME, DISCORD_ID_DB_COLLECTION_NAME, MONGO_URL) async def create_discord_id(self, discord_id_data: Dict[str, Any]) -> Optional[DiscordId]: """ Creates a new Discord ID entry in the database. Args: discord_id_data: A dictionary containing the data for the new Discord ID. Returns: The created DiscordId object if successful, None otherwise. """ print("\n--- Creating a Discord ID document ---") try: if not discord_id_data.get("_id"): discord_id_data['_id'] = str(uuid4()) # Ensure _id is a string inserted_id = None async with self.db_h as db: insert_result = await self.db_h.insert_one(discord_id_data) inserted_id = insert_result.inserted_id if inserted_id: print(f"Discord ID insert successful with ID: {inserted_id}") query = {"_id": inserted_id} inserted_doc = None async with self.db_h as db: inserted_doc = await db.find_one(query) if inserted_doc: return DiscordId.from_dict(inserted_doc) else: print("Discord ID insert acknowledged but no ID returned.") return None except Exception as e: print(f"Discord ID create failed: {e}") return None async def find_discord_id(self, query: Dict[str, Any], active_only: bool = False) -> Optional[DiscordId]: """ Finds a single Discord ID entry in the database. Args: query: A dictionary representing the query criteria. active_only: If True, only returns active Discord IDs. Returns: A DiscordId object if found, None otherwise. """ print("\n--- Finding one Discord ID document ---") try: if active_only: query["active"] = True found_doc = None async with self.db_h as db: found_doc = await db.find_one(query) if found_doc: print("Found Discord ID document (raw dict):", found_doc) return DiscordId.from_dict(found_doc) else: print("Discord ID document not found.") return None except Exception as e: print(f"Discord ID find failed: {e}") return None async def find_discord_ids(self, query: Dict[str, Any] = {}, guild_id: Optional[str] = None, active_only: bool = False) -> Optional[List[DiscordId]]: """ Finds one or more Discord ID entries in the database. Args: query: A dictionary representing the query criteria. guild_id: Optional. If provided, filters Discord IDs that belong to this guild. active_only: If True, only returns active Discord IDs. Returns: A list of DiscordId object(s) if found, None otherwise. """ print("\n--- Finding multiple Discord ID documents ---") try: # Add active filter if requested if active_only: query["active"] = True # Add guild_id filter if provided if guild_id: query["guild_ids"] = {"$in": [guild_id]} found_docs = None async with self.db_h as db: found_docs = await db.find(query) if found_docs: print(f"Found {len(found_docs)} Discord ID documents (raw dicts).") converted_discord_ids = [] for doc in found_docs: converted_discord_ids.append(DiscordId.from_dict(doc)) return converted_discord_ids if len(converted_discord_ids) > 0 else None else: print("Discord ID documents not found.") return None except Exception as e: print(f"Discord ID find failed: {e}") return None async def update_discord_id(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]: """ Updates a single Discord ID entry in the database. Args: query: A dictionary representing the query criteria to find the document. update_data: A dictionary representing the update operations (e.g., using $set). Returns: The number of modified documents if successful, None otherwise. """ print("\n--- Updating a Discord ID document ---") try: update_result = None async with self.db_h as db: update_result = await db.update_one(query, update_data) print(f"Discord ID update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}") return update_result.modified_count except Exception as e: print(f"Discord ID update failed: {e}") return None async def delete_discord_id(self, query: Dict[str, Any]) -> Optional[int]: """ Deletes a single Discord ID entry from the database. Args: query: A dictionary representing the query criteria to find the document to delete. Returns: The number of deleted documents if successful, None otherwise. """ print("\n--- Deleting a Discord ID document ---") try: delete_result = None async with self.db_h as db: delete_result = await self.db_h.delete_one(query) print(f"Discord ID delete result: Deleted count {delete_result.deleted_count}") return delete_result.deleted_count except Exception as e: print(f"Discord ID delete failed: {e}") return None