Files
drb-core-server/app/internal/db_handler.py
2025-05-26 02:36:21 -04:00

164 lines
8.5 KiB
Python

import motor.motor_asyncio
import asyncio
from typing import Optional, Dict, Any, List
class MongoHandler:
"""
An asynchronous handler for MongoDB operations using motor.
The client connection is established on first use (via async with or direct call to connect())
and should be explicitly closed by calling close_client() at application shutdown.
"""
def __init__(self, db_name: str, collection_name: str, mongo_uri: str = "mongodb://localhost:27017/"):
"""
Initializes the MongoDB handler.
Args:
db_name (str): The name of the database to connect to.
collection_name (str): The name of the collection to use.
mongo_uri (str): The MongoDB connection string URI.
"""
self.mongo_uri = mongo_uri
self.db_name = db_name
self.collection_name = collection_name
self._client: Optional[motor.motor_asyncio.AsyncIOMotorClient] = None
self._db: Optional[motor.motor_asyncio.AsyncIOMotorDatabase] = None
self._collection: Optional[motor.motor_asyncio.AsyncIOMotorCollection] = None
self._lock = asyncio.Lock() # Lock for serializing client creation
async def connect(self):
"""
Establishes an asynchronous connection to MongoDB if not already established.
This method is idempotent.
"""
if self._client is None:
async with self._lock: # Ensure only one coroutine attempts to initialize the client
if self._client is None: # Double-check after acquiring lock
try:
print(f"Initializing MongoDB client for: DB '{self.db_name}', Collection '{self.collection_name}' URI: {self.mongo_uri.split('@')[-1]}") # Avoid logging credentials
self._client = motor.motor_asyncio.AsyncIOMotorClient(self.mongo_uri)
# The ismaster command is cheap and does not require auth.
await self._client.admin.command('ismaster')
self._db = self._client[self.db_name]
self._collection = self._db[self.collection_name]
print(f"MongoDB client initialized and connected: Database '{self.db_name}', Collection '{self.collection_name}'")
except Exception as e:
print(f"Failed to initialize MongoDB client at {self.mongo_uri.split('@')[-1]} for {self.db_name}/{self.collection_name}: {e}")
self._client = None # Ensure client is None if connection fails
self._db = None
self._collection = None
raise # Re-raise the exception after printing
if self._collection is None and self._client is not None:
# This can happen if connect was called, client was set, but then an error occurred before collection was set
# Or if connect logic needs to re-establish db/collection objects without re-creating client (less common with motor)
if self._db is None:
self._db = self._client[self.db_name]
self._collection = self._db[self.collection_name]
if self._collection is None:
raise RuntimeError(f"MongoDB collection '{self.collection_name}' could not be established even though client exists.")
async def close_client(self):
"""Closes the MongoDB client connection. Should be called on application shutdown."""
async with self._lock:
if self._client:
print(f"Closing MongoDB client for: Database '{self.db_name}', Collection '{self.collection_name}'")
self._client.close()
self._client = None
self._db = None
self._collection = None
print(f"MongoDB client closed for: Database '{self.db_name}', Collection '{self.collection_name}'.")
async def __aenter__(self):
"""Allows using the handler with async with. Ensures connection is active."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Ensures the connection is NOT closed when exiting async with."""
# The connection is managed and closed at the application level by calling close_client()
pass
async def insert_one(self, document: Dict[str, Any]) -> Any:
if self._collection is None:
await self.connect() # Try to connect if collection is None
if self._collection is None: # Check again after attempting to connect
raise RuntimeError("MongoDB collection not available. Call connect() or use async with.")
print(f"Inserting document into '{self.collection_name}'...")
result = await self._collection.insert_one(document)
print(f"Inserted document with ID: {result.inserted_id}")
return result
async def find_one(self, query: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if self._collection is None:
await self.connect()
if self._collection is None:
raise RuntimeError("MongoDB collection not available. Call connect() or use async with.")
print(f"Finding one document in '{self.collection_name}' with query: {query}")
document = await self._collection.find_one(query)
return document
async def find(self, query: Dict[str, Any] = None) -> List[Dict[str, Any]]:
if self._collection is None:
await self.connect()
if self._collection is None:
raise RuntimeError("MongoDB collection not available. Call connect() or use async with.")
if query is None:
query = {}
print(f"Finding documents in '{self.collection_name}' with query: {query}")
documents = [doc async for doc in self._collection.find(query)]
print(f"Found {len(documents)} documents.")
return documents
async def update_one(self, query: Dict[str, Any], update: Dict[str, Any], upsert: bool = False) -> Any:
if self._collection is None:
await self.connect()
if self._collection is None:
raise RuntimeError("MongoDB collection not available. Call connect() or use async with.")
print(f"Updating one document in '{self.collection_name}' with query: {query}, update: {update}")
result = await self._collection.update_one(query, update, upsert=upsert)
print(f"Matched {result.matched_count}, Modified {result.modified_count}, Upserted ID: {result.upserted_id}")
return result
async def delete_one(self, query: Dict[str, Any]) -> Any:
if self._collection is None:
await self.connect()
if self._collection is None:
raise RuntimeError("MongoDB collection not available. Call connect() or use async with.")
print(f"Deleting one document from '{self.collection_name}' with query: {query}")
result = await self._collection.delete_one(query)
print(f"Deleted count: {result.deleted_count}")
return result
async def delete_many(self, query: Dict[str, Any]) -> Any:
if self._collection is None:
await self.connect()
if self._collection is None:
raise RuntimeError("MongoDB collection not available. Call connect() or use async with.")
print(f"Deleting many documents from '{self.collection_name}' with query: {query}")
result = await self._collection.delete_many(query)
print(f"Deleted count: {result.deleted_count}")
return result
# --- Example Usage (no change needed here, but behavior of MongoHandler is different) ---
async def example_mongo_usage():
db_name = "radio_app_db"
collection_name = "channels_example" # Use a different collection for example
handler = MongoHandler(db_name, collection_name) # MONGO_URL defaults to localhost
try:
async with handler: # Connects on enter (if not already connected)
print("\n--- Inserting a document ---")
# ... (rest of example usage)
channel_data = { "_id": "example_channel", "name": "Example" }
await handler.insert_one(channel_data)
found = await handler.find_one({"_id": "example_channel"})
print(f"Found: {found}")
await handler.delete_one({"_id": "example_channel"})
print("Example completed.")
finally:
await handler.close_client() # Explicitly close client at the end of usage
if __name__ == "__main__":
asyncio.run(example_mongo_usage())