227 lines
9.1 KiB
Python
227 lines
9.1 KiB
Python
import os
|
|
import asyncio
|
|
from uuid import uuid4
|
|
from typing import Optional, List, Dict, Any
|
|
from enum import Enum
|
|
from internal.db_handler import MongoHandler #
|
|
from internal.types import System, DiscordId
|
|
|
|
# Init vars
|
|
DB_NAME = os.getenv("DB_NAME", "default_db")
|
|
MONGO_URL = os.getenv("MONGO_URL", "mongodb://10.10.202.4:27017/")
|
|
|
|
SYSTEM_DB_COLLECTION_NAME = "radio_systems"
|
|
DISCORD_ID_DB_COLLECTION_NAME = "discord_bot_ids"
|
|
|
|
# --- System class ---
|
|
class SystemDbController():
|
|
def __init__(self):
|
|
self.db_h = MongoHandler(DB_NAME, SYSTEM_DB_COLLECTION_NAME, MONGO_URL) #
|
|
|
|
async def close_db_connection(self):
|
|
"""Closes the underlying MongoDB connection."""
|
|
if self.db_h:
|
|
await self.db_h.close_client() #
|
|
|
|
async def create_system(self, system_data: Dict[str, Any]) -> Optional[System]:
|
|
print("\n--- Creating a document ---")
|
|
try:
|
|
if not system_data.get("_id"):
|
|
system_data['_id'] = str(uuid4())
|
|
|
|
inserted_id = None
|
|
async with self.db_h as db: #
|
|
insert_result = await db.insert_one(system_data) #
|
|
inserted_id = insert_result.inserted_id
|
|
|
|
if inserted_id:
|
|
print(f"Insert successful with ID: {inserted_id}")
|
|
query = {"_id": inserted_id}
|
|
inserted_doc = None
|
|
async with self.db_h as db: #
|
|
inserted_doc = await db.find_one(query) #
|
|
if inserted_doc:
|
|
return System.from_dict(inserted_doc)
|
|
else:
|
|
print("Insert acknowledged but no ID returned.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Create failed: {e}")
|
|
return None
|
|
|
|
async def find_system(self, query: Dict[str, Any]) -> Optional[System]:
|
|
print("\n--- Finding one document ---")
|
|
try:
|
|
found_doc = None
|
|
async with self.db_h as db: #
|
|
found_doc = await db.find_one(query) #
|
|
if found_doc:
|
|
print("Found document (raw dict):", found_doc)
|
|
return System.from_dict(found_doc)
|
|
else:
|
|
print("Document not found.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Find failed: {e}")
|
|
return None
|
|
|
|
async def find_systems(self, query: Dict[str, Any]) -> Optional[List[System]]:
|
|
print("\n--- Finding documents ---")
|
|
try:
|
|
found_docs = None
|
|
async with self.db_h as db: #
|
|
found_docs = await db.find(query) #
|
|
if found_docs:
|
|
print("Found document (raw dict):", found_docs)
|
|
converted_systems = [System.from_dict(doc) for doc in found_docs]
|
|
print("YURB", found_docs, converted_systems)
|
|
return converted_systems if len(converted_systems) > 0 else None
|
|
else:
|
|
print("Document not found.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Find failed: {e}")
|
|
return None
|
|
|
|
async def find_all_systems(self, query: Dict[str, Any] = {}) -> List[System]:
|
|
print("\n--- Finding multiple documents ---")
|
|
try:
|
|
found_docs = None
|
|
async with self.db_h as db: #
|
|
found_docs = await db.find(query) #
|
|
if found_docs:
|
|
print(f"Found {len(found_docs)} documents (raw dicts).")
|
|
return [System.from_dict(doc) for doc in found_docs]
|
|
else:
|
|
print("No documents found.")
|
|
return []
|
|
except Exception as e:
|
|
print(f"Find all failed: {e}")
|
|
return []
|
|
|
|
async def update_system(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]:
|
|
print("\n--- Updating a document ---")
|
|
try:
|
|
update_result = None
|
|
async with self.db_h as db: #
|
|
update_result = await db.update_one(query, update_data) #
|
|
print(f"Update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}")
|
|
return update_result.modified_count
|
|
except Exception as e:
|
|
print(f"Update failed: {e}")
|
|
return None
|
|
|
|
async def delete_system(self, query: Dict[str, Any]) -> Optional[int]:
|
|
print("\n--- Deleting a document ---")
|
|
try:
|
|
delete_result = None
|
|
async with self.db_h as db: #
|
|
delete_result = await db.delete_one(query) #
|
|
print(f"Delete result: Deleted count {delete_result.deleted_count}")
|
|
return delete_result.deleted_count
|
|
except Exception as e:
|
|
print(f"Delete failed: {e}")
|
|
return None
|
|
|
|
|
|
# --- DiscordIdDbController class ---
|
|
class DiscordIdDbController():
|
|
def __init__(self):
|
|
self.db_h = MongoHandler(DB_NAME, DISCORD_ID_DB_COLLECTION_NAME, MONGO_URL) #
|
|
|
|
async def close_db_connection(self):
|
|
"""Closes the underlying MongoDB connection."""
|
|
if self.db_h:
|
|
await self.db_h.close_client() #
|
|
|
|
async def create_discord_id(self, discord_id_data: Dict[str, Any]) -> Optional[DiscordId]:
|
|
print("\n--- Creating a Discord ID document ---")
|
|
try:
|
|
if not discord_id_data.get("_id"):
|
|
discord_id_data['_id'] = str(uuid4())
|
|
|
|
inserted_id = None
|
|
async with self.db_h as db: #
|
|
insert_result = await db.insert_one(discord_id_data) #
|
|
inserted_id = insert_result.inserted_id
|
|
|
|
if inserted_id:
|
|
print(f"Discord ID insert successful with ID: {inserted_id}")
|
|
query = {"_id": inserted_id}
|
|
inserted_doc = None
|
|
async with self.db_h as db: #
|
|
inserted_doc = await db.find_one(query) #
|
|
if inserted_doc:
|
|
return DiscordId.from_dict(inserted_doc)
|
|
else:
|
|
print("Discord ID insert acknowledged but no ID returned.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Discord ID create failed: {e}")
|
|
return None
|
|
|
|
async def find_discord_id(self, query: Dict[str, Any], active_only: bool = False) -> Optional[DiscordId]:
|
|
print("\n--- Finding one Discord ID document ---")
|
|
try:
|
|
if active_only:
|
|
query["active"] = True
|
|
found_doc = None
|
|
async with self.db_h as db: #
|
|
found_doc = await db.find_one(query) #
|
|
if found_doc:
|
|
print("Found Discord ID document (raw dict):", found_doc)
|
|
return DiscordId.from_dict(found_doc)
|
|
else:
|
|
print("Discord ID document not found.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Discord ID find failed: {e}")
|
|
return None
|
|
|
|
async def find_discord_ids(self, query: Dict[str, Any] = {}, guild_id: Optional[str] = None, active_only: bool = False) -> Optional[List[DiscordId]]:
|
|
print("\n--- Finding multiple Discord ID documents ---")
|
|
try:
|
|
if active_only == True:
|
|
print("Searching active IDs")
|
|
query["active"] = True
|
|
if guild_id:
|
|
print(f"Searching for IDs in {guild_id}")
|
|
query["guild_ids"] = {"$in": [guild_id]}
|
|
found_docs = None
|
|
async with self.db_h as db: #
|
|
print(f"Query: {query}")
|
|
found_docs = await db.find(query) #
|
|
if found_docs:
|
|
print(f"Found {len(found_docs)} Discord ID documents (raw dicts).")
|
|
converted_discord_ids = [DiscordId.from_dict(doc) for doc in found_docs]
|
|
return converted_discord_ids if len(converted_discord_ids) > 0 else None
|
|
else:
|
|
print("Discord ID documents not found.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Discord ID find failed: {e}")
|
|
return None
|
|
|
|
async def update_discord_id(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]:
|
|
print("\n--- Updating a Discord ID document ---")
|
|
try:
|
|
update_result = None
|
|
async with self.db_h as db: #
|
|
update_result = await db.update_one(query, update_data) #
|
|
print(f"Discord ID update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}")
|
|
return update_result.modified_count
|
|
except Exception as e:
|
|
print(f"Discord ID update failed: {e}")
|
|
return None
|
|
|
|
async def delete_discord_id(self, query: Dict[str, Any]) -> Optional[int]:
|
|
print("\n--- Deleting a Discord ID document ---")
|
|
try:
|
|
delete_result = None
|
|
async with self.db_h as db: #
|
|
delete_result = await db.delete_one(query) #
|
|
print(f"Discord ID delete result: Deleted count {delete_result.deleted_count}")
|
|
return delete_result.deleted_count
|
|
except Exception as e:
|
|
print(f"Discord ID delete failed: {e}")
|
|
return None |