import motor.motor_asyncio import asyncio from typing import Optional, Dict, Any, List class MongoHandler: """ An asynchronous handler for MongoDB operations using motor. The client connection is established on first use (via async with or direct call to connect()) and should be explicitly closed by calling close_client() at application shutdown. """ def __init__(self, db_name: str, collection_name: str, mongo_uri: str = "mongodb://localhost:27017/"): """ Initializes the MongoDB handler. Args: db_name (str): The name of the database to connect to. collection_name (str): The name of the collection to use. mongo_uri (str): The MongoDB connection string URI. """ self.mongo_uri = mongo_uri self.db_name = db_name self.collection_name = collection_name self._client: Optional[motor.motor_asyncio.AsyncIOMotorClient] = None self._db: Optional[motor.motor_asyncio.AsyncIOMotorDatabase] = None self._collection: Optional[motor.motor_asyncio.AsyncIOMotorCollection] = None self._lock = asyncio.Lock() # Lock for serializing client creation async def connect(self): """ Establishes an asynchronous connection to MongoDB if not already established. This method is idempotent. """ if self._client is None: async with self._lock: # Ensure only one coroutine attempts to initialize the client if self._client is None: # Double-check after acquiring lock try: print(f"Initializing MongoDB client for: DB '{self.db_name}', Collection '{self.collection_name}' URI: {self.mongo_uri.split('@')[-1]}") # Avoid logging credentials self._client = motor.motor_asyncio.AsyncIOMotorClient(self.mongo_uri) # The ismaster command is cheap and does not require auth. await self._client.admin.command('ismaster') self._db = self._client[self.db_name] self._collection = self._db[self.collection_name] print(f"MongoDB client initialized and connected: Database '{self.db_name}', Collection '{self.collection_name}'") except Exception as e: print(f"Failed to initialize MongoDB client at {self.mongo_uri.split('@')[-1]} for {self.db_name}/{self.collection_name}: {e}") self._client = None # Ensure client is None if connection fails self._db = None self._collection = None raise # Re-raise the exception after printing if self._collection is None and self._client is not None: # This can happen if connect was called, client was set, but then an error occurred before collection was set # Or if connect logic needs to re-establish db/collection objects without re-creating client (less common with motor) if self._db is None: self._db = self._client[self.db_name] self._collection = self._db[self.collection_name] if self._collection is None: raise RuntimeError(f"MongoDB collection '{self.collection_name}' could not be established even though client exists.") async def close_client(self): """Closes the MongoDB client connection. Should be called on application shutdown.""" async with self._lock: if self._client: print(f"Closing MongoDB client for: Database '{self.db_name}', Collection '{self.collection_name}'") self._client.close() self._client = None self._db = None self._collection = None print(f"MongoDB client closed for: Database '{self.db_name}', Collection '{self.collection_name}'.") async def __aenter__(self): """Allows using the handler with async with. Ensures connection is active.""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Ensures the connection is NOT closed when exiting async with.""" # The connection is managed and closed at the application level by calling close_client() pass async def insert_one(self, document: Dict[str, Any]) -> Any: if self._collection is None: await self.connect() # Try to connect if collection is None if self._collection is None: # Check again after attempting to connect raise RuntimeError("MongoDB collection not available. Call connect() or use async with.") print(f"Inserting document into '{self.collection_name}'...") result = await self._collection.insert_one(document) print(f"Inserted document with ID: {result.inserted_id}") return result async def find_one(self, query: Dict[str, Any]) -> Optional[Dict[str, Any]]: if self._collection is None: await self.connect() if self._collection is None: raise RuntimeError("MongoDB collection not available. Call connect() or use async with.") print(f"Finding one document in '{self.collection_name}' with query: {query}") document = await self._collection.find_one(query) return document async def find(self, query: Dict[str, Any] = None) -> List[Dict[str, Any]]: if self._collection is None: await self.connect() if self._collection is None: raise RuntimeError("MongoDB collection not available. Call connect() or use async with.") if query is None: query = {} print(f"Finding documents in '{self.collection_name}' with query: {query}") documents = [doc async for doc in self._collection.find(query)] print(f"Found {len(documents)} documents.") return documents async def update_one(self, query: Dict[str, Any], update: Dict[str, Any], upsert: bool = False) -> Any: if self._collection is None: await self.connect() if self._collection is None: raise RuntimeError("MongoDB collection not available. Call connect() or use async with.") print(f"Updating one document in '{self.collection_name}' with query: {query}, update: {update}") result = await self._collection.update_one(query, update, upsert=upsert) print(f"Matched {result.matched_count}, Modified {result.modified_count}, Upserted ID: {result.upserted_id}") return result async def delete_one(self, query: Dict[str, Any]) -> Any: if self._collection is None: await self.connect() if self._collection is None: raise RuntimeError("MongoDB collection not available. Call connect() or use async with.") print(f"Deleting one document from '{self.collection_name}' with query: {query}") result = await self._collection.delete_one(query) print(f"Deleted count: {result.deleted_count}") return result async def delete_many(self, query: Dict[str, Any]) -> Any: if self._collection is None: await self.connect() if self._collection is None: raise RuntimeError("MongoDB collection not available. Call connect() or use async with.") print(f"Deleting many documents from '{self.collection_name}' with query: {query}") result = await self._collection.delete_many(query) print(f"Deleted count: {result.deleted_count}") return result # --- Example Usage (no change needed here, but behavior of MongoHandler is different) --- async def example_mongo_usage(): db_name = "radio_app_db" collection_name = "channels_example" # Use a different collection for example handler = MongoHandler(db_name, collection_name) # MONGO_URL defaults to localhost try: async with handler: # Connects on enter (if not already connected) print("\n--- Inserting a document ---") # ... (rest of example usage) channel_data = { "_id": "example_channel", "name": "Example" } await handler.insert_one(channel_data) found = await handler.find_one({"_id": "example_channel"}) print(f"Found: {found}") await handler.delete_one({"_id": "example_channel"}) print("Example completed.") finally: await handler.close_client() # Explicitly close client at the end of usage if __name__ == "__main__": asyncio.run(example_mongo_usage())