""" AI Analyzer module for VoxPop. Performs sentiment analysis and clustering on perspective data. """ import logging import numpy as np from typing import Dict, List, Tuple, Any from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline from sklearn.decomposition import PCA import plotly.express as px import pandas as pd logger = logging.getLogger(__name__) class Analyzer: """ Analyzes perspectives using NLP techniques. """ def __init__(self): """Initialize models and tokenizers.""" logger.info("Initializing AI Analyzer") # Load sentiment analysis model self.sentiment_model_name = "distilbert-base-uncased-finetuned-sst-2-english" try: self.sentiment_tokenizer = AutoTokenizer.from_pretrained(self.sentiment_model_name) self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(self.sentiment_model_name) self.sentiment_pipeline = pipeline( "sentiment-analysis", model=self.sentiment_model, tokenizer=self.sentiment_tokenizer ) logger.info("Sentiment analysis model loaded successfully") except Exception as e: logger.error(f"Error loading sentiment model: {e}") # Fallback to simpler method if model loading fails self.sentiment_pipeline = None # Load embedding model for clustering self.embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2" try: self.embedding_pipeline = pipeline( "feature-extraction", model=self.embedding_model_name ) logger.info("Embedding model loaded successfully") except Exception as e: logger.error(f"Error loading embedding model: {e}") self.embedding_pipeline = None def analyze_perspectives(self, perspectives: List[Dict[str, str]]) -> Dict[str, Any]: """ Analyze a list of perspectives. Args: perspectives: List of perspective dictionaries with 'text' field Returns: Dictionary with analysis results """ logger.info(f"Analyzing {len(perspectives)} perspectives") # Extract text from perspectives texts = [p["text"] for p in perspectives] # Perform sentiment analysis sentiments = self.analyze_sentiment(texts) for i, sentiment in enumerate(sentiments): perspectives[i]["sentiment"] = sentiment # Generate embeddings and cluster if len(perspectives) >= 3: # Need at least 3 perspectives for meaningful clustering clusters, cluster_summaries, confidence_scores = self.cluster_perspectives(perspectives) # Generate PCA visualization for the embeddings visualization_data = None if len(perspectives) >= 2: visualization_data = self.generate_visualization(perspectives) else: clusters = [0] * len(perspectives) cluster_summaries = ["Sample too small for clustering"] confidence_scores = [1.0] visualization_data = None # Generate insights insights = [] for i, summary in enumerate(cluster_summaries): insights.append({ "summary": summary, "confidence": confidence_scores[i] }) return { "perspectives": perspectives, "clusters": clusters, "insights": insights, "visualization": visualization_data } def analyze_sentiment(self, texts: List[str]) -> List[str]: """ Perform sentiment analysis on a list of texts. Args: texts: List of text strings Returns: List of sentiment labels ("positive", "negative", or "neutral") """ logger.info(f"Performing sentiment analysis on {len(texts)} texts") if self.sentiment_pipeline: try: # Use Hugging Face pipeline for sentiment analysis results = [] for text in texts: sentiment_result = self.sentiment_pipeline(text)[0] label = sentiment_result["label"].lower() # Convert LABEL_0/LABEL_1 to positive/negative if needed if label in ["label_0", "label_1"]: label = "negative" if label == "label_0" else "positive" results.append(label) return results except Exception as e: logger.error(f"Error in sentiment analysis: {e}") # Fall back to lexicon-based method return self._lexicon_sentiment(texts) else: # Use simple lexicon-based method as fallback return self._lexicon_sentiment(texts) def _lexicon_sentiment(self, texts: List[str]) -> List[str]: """ Simple lexicon-based sentiment analysis fallback. Args: texts: List of text strings Returns: List of sentiment labels """ positive_words = [ "good", "great", "excellent", "better", "positive", "best", "happy", "improved", "improvement", "benefit", "success", "successful", "support", "well", "advantage" ] negative_words = [ "bad", "worse", "worst", "poor", "negative", "problem", "issue", "concern", "terrible", "horrible", "awful", "failure", "fail", "inadequate", "disappointed" ] results = [] for text in texts: text_lower = text.lower() pos_count = sum(1 for word in positive_words if word in text_lower) neg_count = sum(1 for word in negative_words if word in text_lower) if pos_count > neg_count: results.append("positive") elif neg_count > pos_count: results.append("negative") else: results.append("neutral") return results def cluster_perspectives(self, perspectives: List[Dict[str, str]]) -> Tuple[List[int], List[str], List[float]]: """ Cluster perspectives based on their text content. Args: perspectives: List of perspective dictionaries Returns: Tuple of (cluster assignments, cluster summaries, confidence scores) """ logger.info(f"Clustering {len(perspectives)} perspectives") texts = [p["text"] for p in perspectives] # Generate embeddings if self.embedding_pipeline: try: embeddings = [] for text in texts: # Get embeddings from the model features = self.embedding_pipeline(text) # Average the token embeddings to get a single vector per text embedding = np.mean(features[0], axis=0) embeddings.append(embedding) embeddings = np.array(embeddings) logger.info(f"Generated embeddings with shape {embeddings.shape}") except Exception as e: logger.error(f"Error generating embeddings: {e}") # Return simple clusters if embedding fails return self._fallback_clustering(perspectives) else: # Return simple clusters if embedding model is not available return self._fallback_clustering(perspectives) # Determine optimal number of clusters (between 2 and 5) max_clusters = min(5, len(perspectives) - 1) if max_clusters < 2: max_clusters = 2 best_n_clusters = 3 # Default best_score = -1 for n_clusters in range(2, max_clusters + 1): try: kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) labels = kmeans.fit_predict(embeddings) if len(set(labels)) > 1: # Ensure we have at least 2 clusters score = silhouette_score(embeddings, labels) if score > best_score: best_score = score best_n_clusters = n_clusters except Exception as e: logger.warning(f"Error during clustering with {n_clusters} clusters: {e}") # Perform clustering with optimal number of clusters kmeans = KMeans(n_clusters=best_n_clusters, random_state=42, n_init=10) cluster_labels = kmeans.fit_predict(embeddings) # Generate summaries and confidence scores summaries, confidence_scores = self._generate_cluster_summaries(perspectives, cluster_labels) return cluster_labels.tolist(), summaries, confidence_scores def _fallback_clustering(self, perspectives: List[Dict[str, str]]) -> Tuple[List[int], List[str], List[float]]: """ Simple fallback clustering based on sentiment. Args: perspectives: List of perspective dictionaries Returns: Tuple of (cluster assignments, cluster summaries, confidence scores) """ # Just group by sentiment if available, otherwise return a single cluster if "sentiment" in perspectives[0]: sentiment_map = {"positive": 0, "neutral": 1, "negative": 2} labels = [sentiment_map.get(p.get("sentiment", "neutral"), 1) for p in perspectives] # Generate summaries sentiments = ["positive", "neutral", "negative"] summaries = [] confidence_scores = [] for i, sentiment in enumerate(sentiments): count = labels.count(i) if count > 0: percentage = (count / len(perspectives)) * 100 summaries.append(f"Cluster {i+1}: {sentiment.capitalize()} perspectives about the topic, {percentage:.0f}% of total") confidence_scores.append(percentage / 100) else: summaries.append(f"Cluster {i+1}: No {sentiment} perspectives") confidence_scores.append(0.0) return labels, summaries, confidence_scores else: # Single cluster return [0] * len(perspectives), ["All perspectives"], [1.0] def _generate_cluster_summaries(self, perspectives: List[Dict[str, str]], cluster_labels: List[int]) -> Tuple[List[str], List[float]]: """ Generate summaries and confidence scores for each cluster. Args: perspectives: List of perspective dictionaries cluster_labels: Cluster assignments Returns: Tuple of (summaries, confidence scores) """ unique_clusters = sorted(set(cluster_labels)) summaries = [] confidence_scores = [] for cluster_id in unique_clusters: # Get perspectives in this cluster cluster_perspectives = [p for i, p in enumerate(perspectives) if cluster_labels[i] == cluster_id] # Get most common words/topics in this cluster (simple approach) all_text = " ".join([p["text"] for p in cluster_perspectives]) words = all_text.lower().split() # Remove common words stopwords = ["the", "a", "an", "in", "on", "at", "to", "for", "of", "and", "is", "are", "we", "our", "i", "my"] words = [w for w in words if w not in stopwords and len(w) > 3] # Count word frequencies word_counts = {} for word in words: word_counts[word] = word_counts.get(word, 0) + 1 # Get top words top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:3] topic_words = [word for word, _ in top_words] # Count sentiments in this cluster sentiments = [p.get("sentiment", "neutral") for p in cluster_perspectives] pos_count = sentiments.count("positive") neg_count = sentiments.count("negative") neu_count = sentiments.count("neutral") # Determine dominant sentiment total = pos_count + neg_count + neu_count if pos_count > neg_count and pos_count > neu_count: dominant = "positive" percentage = (pos_count / total) * 100 elif neg_count > pos_count and neg_count > neu_count: dominant = "negative" percentage = (neg_count / total) * 100 else: dominant = "neutral/mixed" percentage = (neu_count / total) * 100 # Generate summary if topic_words: topic_str = ", ".join(topic_words) summary = f"Cluster {cluster_id+1}: Perspectives about {topic_str}, {percentage:.0f}% {dominant}" else: summary = f"Cluster {cluster_id+1}: {dominant.capitalize()} perspectives, {len(cluster_perspectives)} items" # Confidence score based on cluster size confidence = len(cluster_perspectives) / len(perspectives) summaries.append(summary) confidence_scores.append(confidence) return summaries, confidence_scores def generate_visualization(self, perspectives: List[Dict[str, str]]) -> Dict[str, Any]: """ Generate a visualization of the clustered perspectives. Args: perspectives: List of perspective dictionaries with clusters Returns: Visualization data """ try: texts = [p["text"] for p in perspectives] # Generate embeddings if self.embedding_pipeline: embeddings = [] for text in texts: features = self.embedding_pipeline(text) embedding = np.mean(features[0], axis=0) embeddings.append(embedding) embeddings = np.array(embeddings) # Reduce to 2D using PCA pca = PCA(n_components=2) reduced_embeddings = pca.fit_transform(embeddings) # Create a DataFrame for plotting df = pd.DataFrame({ 'x': reduced_embeddings[:, 0], 'y': reduced_embeddings[:, 1], 'text': [p["text"][:50] + "..." for p in perspectives], 'sentiment': [p.get("sentiment", "neutral") for p in perspectives], 'cluster': [f"Cluster {p.get('cluster', 0) + 1}" for p in perspectives] }) # Create an interactive plot fig = px.scatter( df, x='x', y='y', color='sentiment', symbol='cluster', hover_data=['text'], title='Perspective Clusters' ) # Convert to JSON visualization_data = { 'plotly_json': fig.to_json(), 'data': df.to_dict(orient='records') } return visualization_data except Exception as e: logger.error(f"Error generating visualization: {e}") return None