Files
drb-core-server/app/internal/db_wrappers.py
2025-05-24 14:56:56 -04:00

192 lines
6.6 KiB
Python

import os
import asyncio
from uuid import uuid4
from typing import Optional, List, Dict, Any
from enum import Enum
from internal.db_handler import MongoHandler
from internal.types import System
# Init vars
DB_NAME = os.getenv("DB_NAME", "default_db")
MONGO_URL = os.getenv("MONGO_URL", "mongodb://10.10.202.4:27017/")
SYSTEM_DB_COLLECTION_NAME = "radio_systems"
# --- System class ---
class SystemDbController():
def __init__(self):
# Init the handler
self.db_h = MongoHandler(DB_NAME, SYSTEM_DB_COLLECTION_NAME, MONGO_URL)
async def create_system(self, system_data: Dict[str, Any]) -> Optional[System]:
"""
Creates a new system entry in the database.
Args:
system_data: A dictionary containing the data for the new system.
Returns:
The created System object if successful, None otherwise.
"""
print("\n--- Creating a document ---")
try:
# Check if the data to be inserted has an ID
if not system_data.get("_id"):
system_data['_id'] = uuid4()
inserted_result = None
inserted_id = None
async with self.db_h as db:
insert_result = await self.db_h.insert_one(system_data)
inserted_id = insert_result.inserted_id
if inserted_id:
print(f"Insert successful with ID: {inserted_id}")
# Fetch the inserted document to get the complete data including the generated _id
query = {"_id": inserted_id}
inserted_doc = None
async with self.db_h as db:
inserted_doc = await db.find_one(query)
if inserted_doc:
# Convert the fetched dictionary back to a System object
return System.from_dict(inserted_doc)
else:
print("Insert acknowledged but no ID returned.")
return None
except Exception as e:
print(f"Create failed: {e}")
return None
async def find_system(self, query: Dict[str, Any]) -> Optional[System]:
"""
Finds a single system entry in the database.
Args:
query: A dictionary representing the query criteria.
Returns:
A System object if found, None otherwise.
"""
print("\n--- Finding one document ---")
try:
found_doc = None
async with self.db_h as db:
found_doc = await db.find_one(query)
if found_doc:
print("Found document (raw dict):", found_doc)
# Convert the dictionary result to a System object
return System.from_dict(found_doc)
else:
print("Document not found.")
return None
except Exception as e:
print(f"Find failed: {e}")
return None
async def find_systems(self, query: Dict[str, Any]) -> Optional[List[System]]:
"""
Finds one or more system entries in the database.
Args:
query: A dictionary representing the query criteria.
Returns:
A list of System object(s) if found, None otherwise.
"""
print("\n--- Finding documents ---")
try:
found_docs = None
async with self.db_h as db:
found_docs = await db.find(query)
if found_docs:
print("Found document (raw dict):", found_docs)
# Convert the dictionary results to a System object
converted_systems = []
for doc in found_docs:
converted_systems.append(System.from_dict(doc))
print("YURB", found_docs, converted_systems)
return converted_systems if len(converted_systems) > 0 else None
else:
print("Document not found.")
return None
except Exception as e:
print(f"Find failed: {e}")
return None
async def find_all_systems(self, query: Dict[str, Any] = {}) -> List[System]:
"""
Finds multiple system entries in the database.
Args:
query: A dictionary representing the query criteria (default is empty to find all).
Returns:
A list of System objects.
"""
print("\n--- Finding multiple documents ---")
try:
found_docs = None
async with self.db_h as db:
found_docs = await db.find(query)
if found_docs:
print(f"Found {len(found_docs)} documents (raw dicts).")
# Convert the list of dictionaries to a list of System objects
return [System.from_dict(doc) for doc in found_docs]
else:
print("No documents found.")
return []
except Exception as e:
print(f"Find all failed: {e}")
return []
async def update_system(self, query: Dict[str, Any], update_data: Dict[str, Any]) -> Optional[int]:
"""
Updates a single system entry in the database.
Args:
query: A dictionary representing the query criteria to find the document.
update_data: A dictionary representing the update operations (e.g., using $set).
Returns:
The number of modified documents if successful, None otherwise.
"""
print("\n--- Updating a document ---")
try:
update_result = None
async with self.db_h as db:
update_result = await db.update_one(query, update_data)
print(f"Update result: Matched {update_result.matched_count}, Modified {update_result.modified_count}")
return update_result.modified_count
except Exception as e:
print(f"Update failed: {e}")
return None
async def delete_system(self, query: Dict[str, Any]) -> Optional[int]:
"""
Deletes a single system entry from the database.
Args:
query: A dictionary representing the query criteria to find the document to delete.
Returns:
The number of deleted documents if successful, None otherwise.
"""
print("\n--- Deleting a document ---")
try:
delete_result = None
async with self.db_h as db:
delete_result = await self.db_h.delete_one(query)
print(f"Delete result: Deleted count {delete_result.deleted_count}")
return delete_result.deleted_count
except Exception as e:
print(f"Delete failed: {e}")
return None