""" Main application module for VoxPop AI Analysis Service. """ import logging import os import sys from typing import Dict, List, Any, Optional import json from datetime import datetime import time from fastapi import FastAPI, BackgroundTasks, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from app.data_fetcher import PerspectiveFetcher from app.analyzer import Analyzer from app.insight_generator import InsightGenerator from app.scheduler import AnalysisScheduler # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) # Create app components perspective_fetcher = PerspectiveFetcher() analyzer = Analyzer() insight_generator = InsightGenerator(output_dir="insights") scheduler = AnalysisScheduler() # Create FastAPI app app = FastAPI( title="VoxPop AI Analysis Service", description="Analyzes perspectives and generates insights", version="0.1.0" ) # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # In-memory cache for analysis results analysis_cache = { "last_analysis": None, "last_insights": None, "timestamp": None } @app.on_event("startup") async def startup_event(): """Run when the app starts up.""" logger.info("Starting VoxPop AI Analysis Service") # Schedule the analysis job to run hourly scheduler.add_hourly_job( "hourly_analysis", perform_analysis_and_generate_insights, run_immediately=True ) # Also schedule a more frequent job for demo purposes scheduler.add_minutes_job( "demo_analysis", perform_analysis_and_generate_insights, minutes=15, run_immediately=False ) @app.on_event("shutdown") async def shutdown_event(): """Run when the app shuts down.""" logger.info("Shutting down VoxPop AI Analysis Service") scheduler.shutdown() def perform_analysis_and_generate_insights(run_immediately: bool = False) -> Dict[str, Any]: """ Perform analysis and generate insights. Args: run_immediately: Whether this is being run immediately (vs scheduled) Returns: Analysis results and insights """ logger.info(f"Starting analysis run (immediate: {run_immediately})") # Fetch perspectives perspectives = perspective_fetcher.fetch_perspectives() # Analyze perspectives analysis_results = analyzer.analyze_perspectives(perspectives) # Generate insights insights = insight_generator.generate_insights(analysis_results) # Update cache analysis_cache["last_analysis"] = analysis_results analysis_cache["last_insights"] = insights analysis_cache["timestamp"] = datetime.now().isoformat() logger.info(f"Completed analysis run with {len(perspectives)} perspectives and {len(insights['insights'])} insights") return { "analysis": analysis_results, "insights": insights } @app.get("/") async def root(): """Root endpoint.""" return { "message": "VoxPop AI Analysis Service is running", "docs": "/docs", "status": "OK" } @app.get("/analyze") async def analyze_endpoint( hashes: Optional[List[str]] = Query(None, description="List of IPFS hashes to analyze"), background_tasks: BackgroundTasks = None ): """ Analyze perspectives and generate insights. Args: hashes: Optional list of IPFS hashes to analyze background_tasks: FastAPI background tasks Returns: Analysis results """ try: if hashes: # Analyze specific perspectives perspectives = perspective_fetcher.fetch_perspectives(hashes) analysis_results = analyzer.analyze_perspectives(perspectives) # Generate insights in the background if background_tasks: background_tasks.add_task( insight_generator.generate_insights, analysis_results ) return analysis_results else: # Return cached results if available and recent if ( analysis_cache["last_analysis"] and analysis_cache["timestamp"] and (datetime.now() - datetime.fromisoformat(analysis_cache["timestamp"])).total_seconds() < 3600 ): return analysis_cache["last_analysis"] # Otherwise perform new analysis result = perform_analysis_and_generate_insights(run_immediately=True) return result["analysis"] except Exception as e: logger.error(f"Error in analyze endpoint: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/insights") async def insights_endpoint(days: int = Query(1, description="Number of days to look back")): """ Get consolidated insights. Args: days: Number of days to look back Returns: Consolidated insights """ try: return insight_generator.get_consolidated_insights(days=days) except Exception as e: logger.error(f"Error in insights endpoint: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/status") async def status_endpoint(): """ Get service status. Returns: Service status information """ jobs = scheduler.get_jobs() return { "status": "OK", "uptime": "Unknown", # Would track this in a real service "last_analysis": analysis_cache["timestamp"], "scheduled_jobs": jobs, "version": "0.1.0" } @app.post("/run-now") async def run_now_endpoint(): """ Run analysis immediately. Returns: Success message """ try: result = perform_analysis_and_generate_insights(run_immediately=True) return {"message": "Analysis run completed", "timestamp": datetime.now().isoformat()} except Exception as e: logger.error(f"Error in run-now endpoint: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": # When run directly, this will be used for local development import uvicorn # Create insights directory os.makedirs("insights", exist_ok=True) # Run the app uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)