import os import random import asyncio from fastapi import APIRouter, Depends, HTTPException, status from fastapi.responses import FileResponse, StreamingResponse from typing import List from ..models import Vote, VoteCreate, Video from ..security import is_admin, is_user from ..firebase_config import get_db, auth from dotenv import load_dotenv load_dotenv() router = APIRouter() VIDEO_DIRECTORY = os.environ.get("VIDEO_DIRECTORY") @router.post("/scan", status_code=status.HTTP_201_CREATED) async def scan_videos_directory(current_user: dict = Depends(is_admin)): """ Scans the video directory for new clips and adds them to the database. Handles structures: - /videos/person/game/clip[.mp4|.mkv] - /videos/person/clip[.mp4|.mkv] """ new_videos_count = 0 if not VIDEO_DIRECTORY or not os.path.isdir(VIDEO_DIRECTORY): raise HTTPException(status_code=500, detail="Video directory not found or not configured on server") db = get_db() videos_collection = db.collection('videos') # Top-level items should be 'person' directories for person_name in os.listdir(VIDEO_DIRECTORY): person_path = os.path.join(VIDEO_DIRECTORY, person_name) if not os.path.isdir(person_path): continue # Second-level items can be 'game' directories or clips for item_name in os.listdir(person_path): item_path = os.path.join(person_path, item_name) game_name = None # Structure 1: /person/game/clip.mkv if os.path.isdir(item_path): game_name = item_name # Third-level items must be clips for clip_name in os.listdir(item_path): clip_path = os.path.join(item_path, clip_name) if os.path.isfile(clip_path) and clip_name.lower().endswith(('.mp4', '.mkv')): # Construct relative path for DB: person/game/clip.mkv relative_path = os.path.join(person_name, game_name, clip_name) # Check for duplicates existing = videos_collection.where('file_path', '==', relative_path).limit(1).get() if not existing: new_video_ref = videos_collection.document() video_data = { "id": new_video_ref.id, "file_path": relative_path, "person": person_name, "game": game_name, "has_been_voted": False } new_video_ref.set(video_data) new_videos_count += 1 # Structure 2: /person/clip.mp4 elif os.path.isfile(item_path) and item_name.lower().endswith(('.mp4', '.mkv')): clip_name = item_name # Construct relative path for DB: person/clip.mp4 relative_path = os.path.join(person_name, clip_name) # Check for duplicates existing = videos_collection.where('file_path', '==', relative_path).limit(1).get() if not existing: new_video_ref = videos_collection.document() video_data = { "id": new_video_ref.id, "file_path": relative_path, "person": person_name, "game": None, # No game in this structure "has_been_voted": False } new_video_ref.set(video_data) new_videos_count += 1 return {"message": f"Scan complete. Added {new_videos_count} new videos."} @router.get("/vote-next", response_model=Video) async def get_random_unvoted_video(current_user: dict = Depends(is_user)): """ Retrieves a random, unvoted video document from Firestore efficiently using a small batch approach. """ db = get_db() limit_count = 5 unvoted_videos_query = db.collection('videos').where('has_been_voted', '==', False).limit(limit_count) unvoted_videos = [doc.to_dict() for doc in unvoted_videos_query.stream()] if not unvoted_videos: raise HTTPException(status_code=404, detail="No more videos to vote on!") # Randomly select one video from the fetched batch random_video_data = random.choice(unvoted_videos) return Video(**random_video_data) @router.post("/{video_id}/vote", status_code=status.HTTP_201_CREATED) async def submit_vote(video_id: str, vote_data: VoteCreate, current_user: dict = Depends(is_user)): """ Submits a vote, creating a 'vote' document and updating the video document in Firestore. """ db = get_db() video_ref = db.collection('videos').document(video_id) video_doc = video_ref.get() if not video_doc.exists: raise HTTPException(status_code=404, detail="Video not found") if video_doc.to_dict().get("has_been_voted"): raise HTTPException(status_code=400, detail="This video has already been voted on") new_vote_ref = db.collection('votes').document() new_vote_ref.set({ "id": new_vote_ref.id, "video_id": video_id, "user_id": current_user['uid'], "decision": vote_data.decision, "reason": vote_data.reason, "recommended_game": vote_data.recommended_game }) update_data = {"has_been_voted": True} if vote_data.recommended_game: update_data["game"] = vote_data.recommended_game video_ref.update(update_data) return {"message": "Vote submitted successfully"} @router.get("/{video_id}/stream") async def stream_video(video_id: str, current_user: dict = Depends(is_user)): """ Streams a video file from the server with on-the-fly transcoding and chunking using FFmpeg. """ db = get_db() video_doc = db.collection('videos').document(video_id).get() if not video_doc.exists: raise HTTPException(status_code=404, detail="Video not found") video_data = video_doc.to_dict() full_path = os.path.join(VIDEO_DIRECTORY, video_data["file_path"]) if not os.path.exists(full_path): raise HTTPException(status_code=404, detail="Video file not found on disk") async def generate_video_chunks(): ffmpeg_command = [ 'ffmpeg', '-i', full_path, '-map', '0:v:0', '-map', '0:a?', '-movflags', 'frag_keyframe+empty_moov+omit_tfhd_offset+frag_discont+default_base_moof', '-f', 'mp4', '-codec:v', 'libx264', '-preset', 'ultrafast', '-crf', '28', '-codec:a', 'aac', '-filter_complex', 'amerge', '-ac', '2', '-b:a', '128k', '-loglevel', 'warning', '-hide_banner', 'pipe:1' ] process = None try: process = await asyncio.create_subprocess_exec( *ffmpeg_command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE # Capture stderr for potential error logging ) # Read chunks from stdout and yield them while True: chunk = await process.stdout.read(8192) # Read 8KB chunks if not chunk: break yield chunk except asyncio.CancelledError: # This exception is raised if the client disconnects before the stream is complete print(f"[{video_id}] Streaming cancelled by client.") except Exception as e: print(f"[{video_id}] Error during video streaming: {e}") if process and process.returncode is None: # If FFmpeg process is still running, try to read any remaining stderr stderr_output = await process.stderr.read() print(f"[{video_id}] FFmpeg stderr: {stderr_output.decode()}") raise HTTPException(status_code=500, detail="Error processing video stream.") finally: if process and process.returncode is None: # Terminate the FFmpeg process if it's still running print(f"[{video_id}] Terminating FFmpeg process.") process.terminate() # Send SIGTERM await process.wait() # Wait for process to exit stderr_output = await process.stderr.read() elif process and process.returncode != 0: # If FFmpeg exited with an error code stderr_output = await process.stderr.read() print(f"[{video_id}] FFmpeg exited with error code {process.returncode}: {stderr_output.decode()}") # You might want to log this error more formally or send a more specific client error print(f"[{video_id}] Video streaming finished or terminated.") return StreamingResponse(generate_video_chunks(), media_type="video/mp4") @router.get("/votes", response_model=List[Vote]) async def get_all_votes(current_user: dict = Depends(is_admin)): """ Admin endpoint to retrieve all vote documents from Firestore. """ db = get_db() votes_stream = db.collection('votes').stream() return [Vote(**doc.to_dict()) for doc in votes_stream]