232 lines
6.5 KiB
Python
Raw Permalink Normal View History

2025-03-25 03:52:30 -04:00
"""
Main application module for VoxPop AI Analysis Service.
"""
import logging
import os
import sys
from typing import Dict, List, Any, Optional
import json
from datetime import datetime
import time
from fastapi import FastAPI, BackgroundTasks, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from app.data_fetcher import PerspectiveFetcher
from app.analyzer import Analyzer
from app.insight_generator import InsightGenerator
from app.scheduler import AnalysisScheduler
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Create app components
perspective_fetcher = PerspectiveFetcher()
analyzer = Analyzer()
insight_generator = InsightGenerator(output_dir="insights")
scheduler = AnalysisScheduler()
# Create FastAPI app
app = FastAPI(
title="VoxPop AI Analysis Service",
description="Analyzes perspectives and generates insights",
version="0.1.0"
)
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory cache for analysis results
analysis_cache = {
"last_analysis": None,
"last_insights": None,
"timestamp": None
}
@app.on_event("startup")
async def startup_event():
"""Run when the app starts up."""
logger.info("Starting VoxPop AI Analysis Service")
# Schedule the analysis job to run hourly
scheduler.add_hourly_job(
"hourly_analysis",
perform_analysis_and_generate_insights,
run_immediately=True
)
# Also schedule a more frequent job for demo purposes
scheduler.add_minutes_job(
"demo_analysis",
perform_analysis_and_generate_insights,
minutes=15,
run_immediately=False
)
@app.on_event("shutdown")
async def shutdown_event():
"""Run when the app shuts down."""
logger.info("Shutting down VoxPop AI Analysis Service")
scheduler.shutdown()
def perform_analysis_and_generate_insights(run_immediately: bool = False) -> Dict[str, Any]:
"""
Perform analysis and generate insights.
Args:
run_immediately: Whether this is being run immediately (vs scheduled)
Returns:
Analysis results and insights
"""
logger.info(f"Starting analysis run (immediate: {run_immediately})")
# Fetch perspectives
perspectives = perspective_fetcher.fetch_perspectives()
# Analyze perspectives
analysis_results = analyzer.analyze_perspectives(perspectives)
# Generate insights
insights = insight_generator.generate_insights(analysis_results)
# Update cache
analysis_cache["last_analysis"] = analysis_results
analysis_cache["last_insights"] = insights
analysis_cache["timestamp"] = datetime.now().isoformat()
logger.info(f"Completed analysis run with {len(perspectives)} perspectives and {len(insights['insights'])} insights")
return {
"analysis": analysis_results,
"insights": insights
}
@app.get("/")
async def root():
"""Root endpoint."""
return {
"message": "VoxPop AI Analysis Service is running",
"docs": "/docs",
"status": "OK"
}
@app.get("/analyze")
async def analyze_endpoint(
hashes: Optional[List[str]] = Query(None, description="List of IPFS hashes to analyze"),
background_tasks: BackgroundTasks = None
):
"""
Analyze perspectives and generate insights.
Args:
hashes: Optional list of IPFS hashes to analyze
background_tasks: FastAPI background tasks
Returns:
Analysis results
"""
try:
if hashes:
# Analyze specific perspectives
perspectives = perspective_fetcher.fetch_perspectives(hashes)
analysis_results = analyzer.analyze_perspectives(perspectives)
# Generate insights in the background
if background_tasks:
background_tasks.add_task(
insight_generator.generate_insights,
analysis_results
)
return analysis_results
else:
# Return cached results if available and recent
if (
analysis_cache["last_analysis"]
and analysis_cache["timestamp"]
and (datetime.now() - datetime.fromisoformat(analysis_cache["timestamp"])).total_seconds() < 3600
):
return analysis_cache["last_analysis"]
# Otherwise perform new analysis
result = perform_analysis_and_generate_insights(run_immediately=True)
return result["analysis"]
except Exception as e:
logger.error(f"Error in analyze endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/insights")
async def insights_endpoint(days: int = Query(1, description="Number of days to look back")):
"""
Get consolidated insights.
Args:
days: Number of days to look back
Returns:
Consolidated insights
"""
try:
return insight_generator.get_consolidated_insights(days=days)
except Exception as e:
logger.error(f"Error in insights endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status")
async def status_endpoint():
"""
Get service status.
Returns:
Service status information
"""
jobs = scheduler.get_jobs()
return {
"status": "OK",
"uptime": "Unknown", # Would track this in a real service
"last_analysis": analysis_cache["timestamp"],
"scheduled_jobs": jobs,
"version": "0.1.0"
}
@app.post("/run-now")
async def run_now_endpoint():
"""
Run analysis immediately.
Returns:
Success message
"""
try:
result = perform_analysis_and_generate_insights(run_immediately=True)
return {"message": "Analysis run completed", "timestamp": datetime.now().isoformat()}
except Exception as e:
logger.error(f"Error in run-now endpoint: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
# When run directly, this will be used for local development
import uvicorn
# Create insights directory
os.makedirs("insights", exist_ok=True)
# Run the app
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)