176 lines
5.3 KiB
Python
176 lines
5.3 KiB
Python
"""
|
|
Scheduler module for VoxPop AI Analysis Service.
|
|
Manages periodic execution of analysis tasks.
|
|
"""
|
|
|
|
import logging
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from datetime import datetime
|
|
import time
|
|
from typing import Callable, Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AnalysisScheduler:
|
|
"""
|
|
Manages scheduled analysis tasks.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the scheduler."""
|
|
self.scheduler = BackgroundScheduler()
|
|
self.scheduler.start()
|
|
logger.info("Analysis scheduler initialized")
|
|
|
|
# Store job metadata
|
|
self.jobs = {}
|
|
|
|
def add_hourly_job(self, job_id: str, func: Callable, **kwargs) -> str:
|
|
"""
|
|
Add a job to run hourly.
|
|
|
|
Args:
|
|
job_id: Unique identifier for the job
|
|
func: Function to call
|
|
**kwargs: Arguments to pass to the function
|
|
|
|
Returns:
|
|
Job ID
|
|
"""
|
|
trigger = IntervalTrigger(hours=1)
|
|
return self._add_job(job_id, func, trigger, **kwargs)
|
|
|
|
def add_minutes_job(self, job_id: str, func: Callable, minutes: int, **kwargs) -> str:
|
|
"""
|
|
Add a job to run every N minutes.
|
|
|
|
Args:
|
|
job_id: Unique identifier for the job
|
|
func: Function to call
|
|
minutes: Minutes between runs
|
|
**kwargs: Arguments to pass to the function
|
|
|
|
Returns:
|
|
Job ID
|
|
"""
|
|
trigger = IntervalTrigger(minutes=minutes)
|
|
return self._add_job(job_id, func, trigger, **kwargs)
|
|
|
|
def _add_job(self, job_id: str, func: Callable, trigger: Any, **kwargs) -> str:
|
|
"""
|
|
Add a job with a specific trigger.
|
|
|
|
Args:
|
|
job_id: Unique identifier for the job
|
|
func: Function to call
|
|
trigger: APScheduler trigger
|
|
**kwargs: Arguments to pass to the function
|
|
|
|
Returns:
|
|
Job ID
|
|
"""
|
|
# Wrap the function to track execution
|
|
def wrapped_func(*args, **kw):
|
|
start_time = time.time()
|
|
|
|
try:
|
|
logger.info(f"Starting scheduled job {job_id}")
|
|
result = func(*args, **kw)
|
|
|
|
# Update job metadata
|
|
duration = time.time() - start_time
|
|
self.jobs[job_id]["last_run"] = datetime.now().isoformat()
|
|
self.jobs[job_id]["last_duration"] = duration
|
|
self.jobs[job_id]["last_success"] = True
|
|
self.jobs[job_id]["runs"] += 1
|
|
|
|
logger.info(f"Completed scheduled job {job_id} in {duration:.2f} seconds")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error in scheduled job {job_id}: {e}")
|
|
|
|
# Update job metadata
|
|
duration = time.time() - start_time
|
|
self.jobs[job_id]["last_run"] = datetime.now().isoformat()
|
|
self.jobs[job_id]["last_duration"] = duration
|
|
self.jobs[job_id]["last_success"] = False
|
|
self.jobs[job_id]["runs"] += 1
|
|
self.jobs[job_id]["errors"] += 1
|
|
|
|
# Re-raise the exception
|
|
raise
|
|
|
|
# Store job metadata
|
|
self.jobs[job_id] = {
|
|
"id": job_id,
|
|
"added_at": datetime.now().isoformat(),
|
|
"last_run": None,
|
|
"last_duration": None,
|
|
"last_success": None,
|
|
"runs": 0,
|
|
"errors": 0
|
|
}
|
|
|
|
# Add the job
|
|
job = self.scheduler.add_job(
|
|
wrapped_func,
|
|
trigger=trigger,
|
|
id=job_id,
|
|
replace_existing=True,
|
|
kwargs=kwargs
|
|
)
|
|
|
|
logger.info(f"Added scheduled job {job_id}")
|
|
return job_id
|
|
|
|
def run_job_now(self, job_id: str) -> bool:
|
|
"""
|
|
Run a scheduled job immediately.
|
|
|
|
Args:
|
|
job_id: ID of the job to run
|
|
|
|
Returns:
|
|
True if the job was run, False otherwise
|
|
"""
|
|
try:
|
|
self.scheduler.get_job(job_id).func()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error running job {job_id} immediately: {e}")
|
|
return False
|
|
|
|
def remove_job(self, job_id: str) -> bool:
|
|
"""
|
|
Remove a scheduled job.
|
|
|
|
Args:
|
|
job_id: ID of the job to remove
|
|
|
|
Returns:
|
|
True if the job was removed, False otherwise
|
|
"""
|
|
try:
|
|
self.scheduler.remove_job(job_id)
|
|
if job_id in self.jobs:
|
|
del self.jobs[job_id]
|
|
logger.info(f"Removed scheduled job {job_id}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error removing job {job_id}: {e}")
|
|
return False
|
|
|
|
def get_jobs(self) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Get metadata for all jobs.
|
|
|
|
Returns:
|
|
Dictionary of job metadata
|
|
"""
|
|
return self.jobs
|
|
|
|
def shutdown(self):
|
|
"""Shut down the scheduler."""
|
|
self.scheduler.shutdown()
|
|
logger.info("Scheduler shut down") |