discourse/analysis/app/analyzer.py

400 lines
16 KiB
Python
Raw Normal View History

2025-03-25 03:52:30 -04:00
"""
AI Analyzer module for VoxPop.
Performs sentiment analysis and clustering on perspective data.
"""
import logging
import numpy as np
from typing import Dict, List, Tuple, Any
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
from sklearn.decomposition import PCA
import plotly.express as px
import pandas as pd
logger = logging.getLogger(__name__)
class Analyzer:
"""
Analyzes perspectives using NLP techniques.
"""
def __init__(self):
"""Initialize models and tokenizers."""
logger.info("Initializing AI Analyzer")
# Load sentiment analysis model
self.sentiment_model_name = "distilbert-base-uncased-finetuned-sst-2-english"
try:
self.sentiment_tokenizer = AutoTokenizer.from_pretrained(self.sentiment_model_name)
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(self.sentiment_model_name)
self.sentiment_pipeline = pipeline(
"sentiment-analysis",
model=self.sentiment_model,
tokenizer=self.sentiment_tokenizer
)
logger.info("Sentiment analysis model loaded successfully")
except Exception as e:
logger.error(f"Error loading sentiment model: {e}")
# Fallback to simpler method if model loading fails
self.sentiment_pipeline = None
# Load embedding model for clustering
self.embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
try:
self.embedding_pipeline = pipeline(
"feature-extraction",
model=self.embedding_model_name
)
logger.info("Embedding model loaded successfully")
except Exception as e:
logger.error(f"Error loading embedding model: {e}")
self.embedding_pipeline = None
def analyze_perspectives(self, perspectives: List[Dict[str, str]]) -> Dict[str, Any]:
"""
Analyze a list of perspectives.
Args:
perspectives: List of perspective dictionaries with 'text' field
Returns:
Dictionary with analysis results
"""
logger.info(f"Analyzing {len(perspectives)} perspectives")
# Extract text from perspectives
texts = [p["text"] for p in perspectives]
# Perform sentiment analysis
sentiments = self.analyze_sentiment(texts)
for i, sentiment in enumerate(sentiments):
perspectives[i]["sentiment"] = sentiment
# Generate embeddings and cluster
if len(perspectives) >= 3: # Need at least 3 perspectives for meaningful clustering
clusters, cluster_summaries, confidence_scores = self.cluster_perspectives(perspectives)
# Generate PCA visualization for the embeddings
visualization_data = None
if len(perspectives) >= 2:
visualization_data = self.generate_visualization(perspectives)
else:
clusters = [0] * len(perspectives)
cluster_summaries = ["Sample too small for clustering"]
confidence_scores = [1.0]
visualization_data = None
# Generate insights
insights = []
for i, summary in enumerate(cluster_summaries):
insights.append({
"summary": summary,
"confidence": confidence_scores[i]
})
return {
"perspectives": perspectives,
"clusters": clusters,
"insights": insights,
"visualization": visualization_data
}
def analyze_sentiment(self, texts: List[str]) -> List[str]:
"""
Perform sentiment analysis on a list of texts.
Args:
texts: List of text strings
Returns:
List of sentiment labels ("positive", "negative", or "neutral")
"""
logger.info(f"Performing sentiment analysis on {len(texts)} texts")
if self.sentiment_pipeline:
try:
# Use Hugging Face pipeline for sentiment analysis
results = []
for text in texts:
sentiment_result = self.sentiment_pipeline(text)[0]
label = sentiment_result["label"].lower()
# Convert LABEL_0/LABEL_1 to positive/negative if needed
if label in ["label_0", "label_1"]:
label = "negative" if label == "label_0" else "positive"
results.append(label)
return results
except Exception as e:
logger.error(f"Error in sentiment analysis: {e}")
# Fall back to lexicon-based method
return self._lexicon_sentiment(texts)
else:
# Use simple lexicon-based method as fallback
return self._lexicon_sentiment(texts)
def _lexicon_sentiment(self, texts: List[str]) -> List[str]:
"""
Simple lexicon-based sentiment analysis fallback.
Args:
texts: List of text strings
Returns:
List of sentiment labels
"""
positive_words = [
"good", "great", "excellent", "better", "positive", "best", "happy", "improved",
"improvement", "benefit", "success", "successful", "support", "well", "advantage"
]
negative_words = [
"bad", "worse", "worst", "poor", "negative", "problem", "issue", "concern",
"terrible", "horrible", "awful", "failure", "fail", "inadequate", "disappointed"
]
results = []
for text in texts:
text_lower = text.lower()
pos_count = sum(1 for word in positive_words if word in text_lower)
neg_count = sum(1 for word in negative_words if word in text_lower)
if pos_count > neg_count:
results.append("positive")
elif neg_count > pos_count:
results.append("negative")
else:
results.append("neutral")
return results
def cluster_perspectives(self, perspectives: List[Dict[str, str]]) -> Tuple[List[int], List[str], List[float]]:
"""
Cluster perspectives based on their text content.
Args:
perspectives: List of perspective dictionaries
Returns:
Tuple of (cluster assignments, cluster summaries, confidence scores)
"""
logger.info(f"Clustering {len(perspectives)} perspectives")
texts = [p["text"] for p in perspectives]
# Generate embeddings
if self.embedding_pipeline:
try:
embeddings = []
for text in texts:
# Get embeddings from the model
features = self.embedding_pipeline(text)
# Average the token embeddings to get a single vector per text
embedding = np.mean(features[0], axis=0)
embeddings.append(embedding)
embeddings = np.array(embeddings)
logger.info(f"Generated embeddings with shape {embeddings.shape}")
except Exception as e:
logger.error(f"Error generating embeddings: {e}")
# Return simple clusters if embedding fails
return self._fallback_clustering(perspectives)
else:
# Return simple clusters if embedding model is not available
return self._fallback_clustering(perspectives)
# Determine optimal number of clusters (between 2 and 5)
max_clusters = min(5, len(perspectives) - 1)
if max_clusters < 2:
max_clusters = 2
best_n_clusters = 3 # Default
best_score = -1
for n_clusters in range(2, max_clusters + 1):
try:
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(embeddings)
if len(set(labels)) > 1: # Ensure we have at least 2 clusters
score = silhouette_score(embeddings, labels)
if score > best_score:
best_score = score
best_n_clusters = n_clusters
except Exception as e:
logger.warning(f"Error during clustering with {n_clusters} clusters: {e}")
# Perform clustering with optimal number of clusters
kmeans = KMeans(n_clusters=best_n_clusters, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(embeddings)
# Generate summaries and confidence scores
summaries, confidence_scores = self._generate_cluster_summaries(perspectives, cluster_labels)
return cluster_labels.tolist(), summaries, confidence_scores
def _fallback_clustering(self, perspectives: List[Dict[str, str]]) -> Tuple[List[int], List[str], List[float]]:
"""
Simple fallback clustering based on sentiment.
Args:
perspectives: List of perspective dictionaries
Returns:
Tuple of (cluster assignments, cluster summaries, confidence scores)
"""
# Just group by sentiment if available, otherwise return a single cluster
if "sentiment" in perspectives[0]:
sentiment_map = {"positive": 0, "neutral": 1, "negative": 2}
labels = [sentiment_map.get(p.get("sentiment", "neutral"), 1) for p in perspectives]
# Generate summaries
sentiments = ["positive", "neutral", "negative"]
summaries = []
confidence_scores = []
for i, sentiment in enumerate(sentiments):
count = labels.count(i)
if count > 0:
percentage = (count / len(perspectives)) * 100
summaries.append(f"Cluster {i+1}: {sentiment.capitalize()} perspectives about the topic, {percentage:.0f}% of total")
confidence_scores.append(percentage / 100)
else:
summaries.append(f"Cluster {i+1}: No {sentiment} perspectives")
confidence_scores.append(0.0)
return labels, summaries, confidence_scores
else:
# Single cluster
return [0] * len(perspectives), ["All perspectives"], [1.0]
def _generate_cluster_summaries(self, perspectives: List[Dict[str, str]], cluster_labels: List[int]) -> Tuple[List[str], List[float]]:
"""
Generate summaries and confidence scores for each cluster.
Args:
perspectives: List of perspective dictionaries
cluster_labels: Cluster assignments
Returns:
Tuple of (summaries, confidence scores)
"""
unique_clusters = sorted(set(cluster_labels))
summaries = []
confidence_scores = []
for cluster_id in unique_clusters:
# Get perspectives in this cluster
cluster_perspectives = [p for i, p in enumerate(perspectives) if cluster_labels[i] == cluster_id]
# Get most common words/topics in this cluster (simple approach)
all_text = " ".join([p["text"] for p in cluster_perspectives])
words = all_text.lower().split()
# Remove common words
stopwords = ["the", "a", "an", "in", "on", "at", "to", "for", "of", "and", "is", "are", "we", "our", "i", "my"]
words = [w for w in words if w not in stopwords and len(w) > 3]
# Count word frequencies
word_counts = {}
for word in words:
word_counts[word] = word_counts.get(word, 0) + 1
# Get top words
top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:3]
topic_words = [word for word, _ in top_words]
# Count sentiments in this cluster
sentiments = [p.get("sentiment", "neutral") for p in cluster_perspectives]
pos_count = sentiments.count("positive")
neg_count = sentiments.count("negative")
neu_count = sentiments.count("neutral")
# Determine dominant sentiment
total = pos_count + neg_count + neu_count
if pos_count > neg_count and pos_count > neu_count:
dominant = "positive"
percentage = (pos_count / total) * 100
elif neg_count > pos_count and neg_count > neu_count:
dominant = "negative"
percentage = (neg_count / total) * 100
else:
dominant = "neutral/mixed"
percentage = (neu_count / total) * 100
# Generate summary
if topic_words:
topic_str = ", ".join(topic_words)
summary = f"Cluster {cluster_id+1}: Perspectives about {topic_str}, {percentage:.0f}% {dominant}"
else:
summary = f"Cluster {cluster_id+1}: {dominant.capitalize()} perspectives, {len(cluster_perspectives)} items"
# Confidence score based on cluster size
confidence = len(cluster_perspectives) / len(perspectives)
summaries.append(summary)
confidence_scores.append(confidence)
return summaries, confidence_scores
def generate_visualization(self, perspectives: List[Dict[str, str]]) -> Dict[str, Any]:
"""
Generate a visualization of the clustered perspectives.
Args:
perspectives: List of perspective dictionaries with clusters
Returns:
Visualization data
"""
try:
texts = [p["text"] for p in perspectives]
# Generate embeddings
if self.embedding_pipeline:
embeddings = []
for text in texts:
features = self.embedding_pipeline(text)
embedding = np.mean(features[0], axis=0)
embeddings.append(embedding)
embeddings = np.array(embeddings)
# Reduce to 2D using PCA
pca = PCA(n_components=2)
reduced_embeddings = pca.fit_transform(embeddings)
# Create a DataFrame for plotting
df = pd.DataFrame({
'x': reduced_embeddings[:, 0],
'y': reduced_embeddings[:, 1],
'text': [p["text"][:50] + "..." for p in perspectives],
'sentiment': [p.get("sentiment", "neutral") for p in perspectives],
'cluster': [f"Cluster {p.get('cluster', 0) + 1}" for p in perspectives]
})
# Create an interactive plot
fig = px.scatter(
df,
x='x',
y='y',
color='sentiment',
symbol='cluster',
hover_data=['text'],
title='Perspective Clusters'
)
# Convert to JSON
visualization_data = {
'plotly_json': fig.to_json(),
'data': df.to_dict(orient='records')
}
return visualization_data
except Exception as e:
logger.error(f"Error generating visualization: {e}")
return None