diff --git a/app/data/sentiment_data.py b/app/data/sentiment_data.py index 0910853..4fbdfe0 100644 --- a/app/data/sentiment_data.py +++ b/app/data/sentiment_data.py @@ -1,66 +1,355 @@ """ -This Module is responsible for handling the sentiment analysis data layer. +This module is responsible for handling the sentiment analysis data layer. +It provides a unified interface for different sentiment analysis models +and handles data processing, caching, and result formatting. """ +import logging +import time +from typing import Dict, List, Union, Any, Optional, Tuple +from functools import lru_cache + # Model Layer from app.models.bertweet_model import BertweetSentiment +# Set up logger +logger = logging.getLogger(__name__) + class SentimentDataLayer: - def __init__(self, config: dict): + """ + Data layer for sentiment analysis operations. + + This class provides an abstraction over different sentiment analysis models + and offers unified interfaces for text analysis, batch processing, and result caching. + """ + + def __init__(self, config: Dict[str, Any]) -> None: """ Initialize the Sentiment Data Layer. - :param config: The configuration object containing model and device info. + + Args: + config: Configuration dictionary containing model settings and parameters. + Expected structure: + - debug: Boolean for debug mode + - sentiment_analysis.default_model: Model type to use + - sentiment_analysis.[model_name]: Model-specific configuration + + Raises: + ValueError: If the specified model is not supported + RuntimeError: If model initialization fails """ - self.debug = config.get('debug') - + self.debug = config.get('debug', False) + self._configure_logging() + + # Validate configuration + if 'sentiment_analysis' not in config: + raise ValueError("Configuration must contain 'sentiment_analysis' section") + self.config = config.get('sentiment_analysis') self.default_model = self.config.get('default_model') - - # Initialize the appropriate model based on the configuration - if self.default_model == "bertweet": - self.model = BertweetSentiment(config) - # elif self.default_model == "another_model": - # self.model = AnotherModel(config) # Replace with your other model class - else: - raise ValueError(f"Unsupported sentiment analysis model: {self.default_model}") - def analyze(self, text: str) -> tuple: + if not self.default_model: + raise ValueError("Default model must be specified in configuration") + + # Cache for sentiment results + self.cache_size = self.config.get('cache_size', 1000) + self.cache_enabled = self.config.get('enable_cache', True) + + logger.info(f"Initializing SentimentDataLayer with model: {self.default_model}") + + try: + # Initialize the appropriate model based on the configuration + if self.default_model == "bertweet": + self.model = BertweetSentiment(config) + # To add more models, uncomment and modify the following code: + # elif self.default_model == "another_model": + # self.model = AnotherModel(config) + else: + raise ValueError(f"Unsupported sentiment analysis model: {self.default_model}") + + # Store model metadata + self.model_info = { + 'name': self.default_model, + 'class_labels': getattr(self.model, 'class_labels', None) + } + + logger.info(f"Model initialized successfully with {len(self.model.class_labels)} classes: {self.model.class_labels}") + + except Exception as e: + logger.error(f"Failed to initialize sentiment model: {str(e)}") + raise RuntimeError(f"Sentiment model initialization failed: {str(e)}") + + def _configure_logging(self) -> None: + """Configure logging based on debug setting.""" + log_level = logging.DEBUG if self.debug else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + @lru_cache(maxsize=1000) + def _cached_analyze(self, text: str) -> Dict[str, Any]: + """ + Internal method for cached sentiment analysis. + + Args: + text: Text to analyze + + Returns: + Dictionary containing sentiment analysis results + """ + start_time = time.time() + outputs, probabilities, predicted_label, confidence = self.model(text) + processing_time = time.time() - start_time + + # Format the result + return { + 'label': predicted_label, + 'confidence': confidence, + 'processing_time_ms': round(processing_time * 1000, 2), + # Optional additional data + 'model': self.default_model, + 'timestamp': time.time(), + # We only include probabilities if they're available + 'probabilities': { + label: prob.item() for label, prob in + zip(self.model.class_labels, probabilities[0]) + } if probabilities is not None else None + } + + def analyze(self, text: str, include_probabilities: bool = False) -> Dict[str, Any]: """ Perform sentiment analysis on the given text. - :param text: Input text for sentiment analysis. - :return: Model outputs, probabilities, predicted label, and confidence score. + + Args: + text: Input text for sentiment analysis + include_probabilities: Whether to include class probabilities in the result + + Returns: + Dictionary containing sentiment analysis results including predicted label and confidence """ + if not text or not text.strip(): + logger.warning("Empty text provided for sentiment analysis") + return { + 'label': 'neutral', + 'confidence': 0.0, + 'error': 'Empty text provided' + } + try: - outputs, probabilities, predicted_label, confidence = self.model(text) + # Use cached analysis if enabled + if self.cache_enabled: + result = self._cached_analyze(text) + logger.debug(f"Analyzed text with {len(text)} chars: {result['label']} ({result['confidence']:.4f})") + else: + # Direct analysis without caching + outputs, probabilities, predicted_label, confidence = self.model(text) + result = { + 'label': predicted_label, + 'confidence': confidence + } + + # Filter out probabilities if not requested + if not include_probabilities and 'probabilities' in result: + result.pop('probabilities') + + return result + + except Exception as e: + logger.error(f"Error analyzing text: {str(e)}") return { - # 'outputs': outputs, - # 'probabilities': probabilities, - 'label': predicted_label, - 'confidence': confidence + 'error': 'An unexpected error occurred while processing the request.', + 'label': 'error', + 'confidence': 0.0 } + + def batch_analyze( + self, + texts: List[str], + include_probabilities: bool = False, + show_progress: bool = False + ) -> List[Dict[str, Any]]: + """ + Analyze sentiment for a batch of texts. + Args: + texts: List of texts to analyze + include_probabilities: Whether to include class probabilities in the results + show_progress: Whether to display a progress bar for processing + + Returns: + List of dictionaries containing sentiment analysis results + """ + if not texts: + return [] + + try: + # Use the batch processing capability of the model if available + if hasattr(self.model, 'batch_process'): + batch_results = self.model.batch_process(texts, show_progress=show_progress) + + # Format results + formatted_results = [] + for i, (_, probs, label, confidence) in enumerate(batch_results): + result = { + 'label': label, + 'confidence': confidence, + 'model': self.default_model + } + + # Add probabilities if requested + if include_probabilities and probs is not None: + result['probabilities'] = { + cls: probs[i].item() for i, cls in enumerate(self.model.class_labels) + } + + formatted_results.append(result) + + return formatted_results + else: + # Fall back to processing texts one by one + logger.warning("Batch processing not available in the model, processing texts individually") + return [self.analyze(text, include_probabilities) for text in texts] + except Exception as e: - print(f"[error] [Data Layer] [SentimentDataLayer] [analyze] An error occurred during sentiment analysis: {str(e)}") - return {'error': f'An unexpected error occurred while processing the request.'} # Generic error message + logger.error(f"Error in batch analysis: {str(e)}") + # Return error for each text + return [ + { + 'error': 'An unexpected error occurred during batch processing.', + 'label': 'error', + 'confidence': 0.0 + } + for _ in texts + ] + + def get_model_info(self) -> Dict[str, Any]: + """ + Get information about the currently loaded sentiment model. + Returns: + Dictionary with model metadata + """ + return { + 'model_name': self.default_model, + 'class_labels': getattr(self.model, 'class_labels', []), + 'cache_enabled': self.cache_enabled, + 'cache_size': self.cache_size + } + + def clear_cache(self) -> None: + """Clear the sentiment analysis cache.""" + if hasattr(self._cached_analyze, 'cache_clear'): + self._cached_analyze.cache_clear() + logger.info("Sentiment analysis cache cleared") + + def sample_analysis(self, sample_text: Optional[str] = None) -> Dict[str, Any]: + """ + Run a sample analysis to verify the model is working. + + Args: + sample_text: Optional text to use for the sample analysis + + Returns: + Sentiment analysis results for the sample text + """ + text = sample_text or "I am feeling quite happy about this new feature!" + result = self.analyze(text, include_probabilities=True) + + # Add the sample text to the result + result['sample_text'] = text + return result -# if __name__ == "__main__": -# config = { -# 'debug': True, -# 'sentiment_analysis': { -# 'default_model': "bertweet", # Specify the default sentiment analysis model (e.g., bertweet, another_model) -# 'bertweet': { -# 'model_name': 'finiteautomata/bertweet-base-sentiment-analysis', -# 'device': 'cpu' -# } -# } -# } -# print("config",config) -# sentiment_data = SentimentDataLayer(config) -# print("sentiment_data",sentiment_data) - -# print(sentiment_data.analyze("I love this product!")) -# print(sentiment_data.analyze("I hate this product!")) -# print(sentiment_data.analyze("I am neutral about this product.")) -# # Run: -# # python -m app.data.sentiment_data \ No newline at end of file +if __name__ == "__main__": + import json + from pprint import pprint + + # Example configuration + config = { + 'debug': True, + 'sentiment_analysis': { + 'default_model': "bertweet", + 'enable_cache': True, + 'cache_size': 500, + 'bertweet': { + 'model_name': 'finiteautomata/bertweet-base-sentiment-analysis', + 'device': 'cpu' + } + } + } + + print("\n=== Sentiment Analysis Data Layer Demo ===\n") + + # Initialize the data layer + sentiment_data = SentimentDataLayer(config) + + # Display model information + print("Model Information:") + pprint(sentiment_data.get_model_info()) + print("\n" + "-"*50 + "\n") + + # Individual text analysis examples + print("Individual Text Analysis Examples:") + positive_text = "I love this product! It's amazing and exceeded my expectations. 😍" + negative_text = "I'm really disappointed with this service. It's terrible and frustrating. 😠" + neutral_text = "The product arrived today. It seems to be working as described." + + # Analyze with probabilities + print("\nPositive text (with probabilities):") + pprint(sentiment_data.analyze(positive_text, include_probabilities=True)) + + # Analyze without probabilities + print("\nNegative text:") + pprint(sentiment_data.analyze(negative_text)) + + print("\nNeutral text:") + pprint(sentiment_data.analyze(neutral_text)) + + # Empty text example + print("\nEmpty text handling:") + pprint(sentiment_data.analyze("")) + + print("\n" + "-"*50 + "\n") + + # Batch processing example + print("Batch Processing Example:") + batch_texts = [ + "I absolutely love this app! The interface is so intuitive.", + "This is the worst experience I've ever had with customer service.", + "The package arrived on schedule. Contents were as described online.", + "I'm not sure if I should upgrade to the premium version or not.", + "The weather today is quite nice, might go for a walk later." + ] + + print(f"\nAnalyzing batch of {len(batch_texts)} texts:") + batch_results = sentiment_data.batch_analyze(batch_texts, show_progress=True) + + for i, result in enumerate(batch_results): + print(f"\nText {i+1}: \"{batch_texts[i][:50]}...\"") + print(f"Label: {result['label']}, Confidence: {result['confidence']:.4f}") + + print("\n" + "-"*50 + "\n") + + # Cache demonstration + print("Cache Demonstration:") + print("First analysis (uncached):") + start = time.time() + result1 = sentiment_data.analyze(positive_text) + time1 = time.time() - start + print(f"Time: {time1*1000:.2f}ms") + + print("\nSecond analysis (should use cache):") + start = time.time() + result2 = sentiment_data.analyze(positive_text) + time2 = time.time() - start + print(f"Time: {time2*1000:.2f}ms") + print(f"Cache speedup: {time1/time2 if time2 > 0 else 'infinite'}x") + + # Sample analysis + print("\nSample Analysis:") + sample_result = sentiment_data.sample_analysis() + pprint(sample_result) + + # Clean up + print("\nClearing cache...") + sentiment_data.clear_cache() diff --git a/app/models/bertweet_model.py b/app/models/bertweet_model.py index 3466394..33d6370 100644 --- a/app/models/bertweet_model.py +++ b/app/models/bertweet_model.py @@ -1,92 +1,352 @@ """ -This module defines the BertweetSentiment class, which is a PyTorch model for sentiment analysis using the Bertweet model. +This module defines the BertweetSentiment class, which is a PyTorch model for sentiment analysis +using the BERTweet model architecture. """ +import os +import logging +from typing import Dict, List, Tuple, Union, Optional, Any +from functools import lru_cache + import torch import torch.nn as nn +import torch.nn.functional as F +from transformers import AutoTokenizer, AutoModelForSequenceClassification, PreTrainedTokenizer +from tqdm import tqdm -from transformers import AutoTokenizer, AutoModelForSequenceClassification +logger = logging.getLogger(__name__) class BertweetSentiment(nn.Module): - def __init__(self,config: dict)->None: + """ + A sentiment analysis model based on BERTweet architecture. + + This class provides methods for sentiment analysis on text data using the BERTweet model, + which is specifically designed for processing social media text. It handles tokenization, + inference, batch processing, and model persistence. + """ + + def __init__(self, config: Dict[str, Any]) -> None: """ - Initialize the Bertweet model for sentiment analysis. - :param config: The configuration object containing model and device info. + Initialize the BERTweet model for sentiment analysis. + + Args: + config: Configuration dictionary containing model settings and device info. + Expected structure: + - debug: Boolean for debug mode + - sentiment_analysis.bertweet.model_name: Model identifier + - sentiment_analysis.bertweet.device: Device to run the model on ('cpu', 'cuda', etc.) + - sentiment_analysis.bertweet.cache_size: (Optional) Size for LRU cache + + Raises: + ValueError: If required configuration parameters are missing + RuntimeError: If model initialization fails """ - self.debug = config.get('debug') - + super(BertweetSentiment, self).__init__() + + # Initialize logging based on debug setting + self.debug = config.get('debug', False) + self._configure_logging() + + # Extract configuration + if 'sentiment_analysis' not in config or 'bertweet' not in config.get('sentiment_analysis', {}): + raise ValueError("Configuration must contain 'sentiment_analysis.bertweet' section") + self.config = config.get('sentiment_analysis').get('bertweet') self.model_name = self.config.get('model_name') - self.device = self.config.get('device') - - super(BertweetSentiment, self).__init__() - # Initialize the Tokenizer - self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) - - # Initialize the Model - self.model= AutoModelForSequenceClassification.from_pretrained(self.model_name) - self.model.to(self.device) - - # Load the model configuration to get class labels - self.model_config = self.model.config - - # Get Labels + self.device = self.config.get('device', 'cpu') + self.cache_size = self.config.get('cache_size', 128) + + if not self.model_name: + raise ValueError("Model name must be specified in configuration") + + logger.info(f"Initializing BertweetSentiment with model: {self.model_name} on device: {self.device}") + + try: + # Initialize the tokenizer + self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) + + # Initialize the model + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name) + self.model.to(self.device) + + # Extract class labels from model configuration + self.model_config = self.model.config + self._extract_class_labels() + + logger.info(f"Model initialized successfully with {len(self.class_labels)} classes: {self.class_labels}") + except Exception as e: + logger.error(f"Failed to initialize model: {str(e)}") + raise RuntimeError(f"Model initialization failed: {str(e)}") + + def _configure_logging(self) -> None: + """Configure logging based on debug setting.""" + log_level = logging.DEBUG if self.debug else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + def _extract_class_labels(self) -> None: + """Extract class labels from model configuration.""" if hasattr(self.model_config, 'id2label'): self.class_labels = [self.model_config.id2label[i] for i in range(len(self.model_config.id2label))] else: - self.class_labels = None - - def forward(self,text)->tuple: + # Default to positive/negative if labels aren't found + logger.warning("No labels found in model config. Using default labels.") + self.class_labels = ["negative", "neutral", "positive"] + + @lru_cache(maxsize=128) + def _tokenize(self, text: str) -> Dict[str, torch.Tensor]: """ - Perform sentiment analysis on the given text. - + Tokenize text with caching for repeated inputs. + Args: - text (str): Input text for sentiment analysis. - + text: Input text to tokenize + Returns: - tuple: Model outputs, probabilities, predicted label, and confidence score. + Dictionary of tokenized inputs """ - # Tokenize the input text - inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(self.device) - - # Forward pass - outputs = self.model(**inputs) - - # Convert logits to probabilities - probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) - - # Get the predicted sentiment - predicted_class = torch.argmax(probabilities, dim=1).item() + return self.tokenizer(text, return_tensors="pt", truncation=True, padding=True) + + def forward(self, text: Union[str, List[str]]) -> Union[ + Tuple[Any, torch.Tensor, str, float], + List[Tuple[Any, torch.Tensor, str, float]] + ]: + """ + Perform sentiment analysis on the given text(s). - # Get the corresponding class label - predicted_label = self.class_labels[predicted_class] + Args: + text: Input text or list of texts for sentiment analysis. - return outputs, probabilities, predicted_label, probabilities[0][predicted_class].item() + Returns: + For single text: Tuple of (model_outputs, probabilities, predicted_label, confidence) + For multiple texts: List of such tuples + """ + if isinstance(text, list): + return self.batch_process(text) + + try: + # Tokenize the input + inputs = self._tokenize(text) + inputs = {k: v.to(self.device) for k, v in inputs.items()} + + # Run model in evaluation mode + with torch.no_grad(): + self.model.eval() + outputs = self.model(**inputs) + + # Convert logits to probabilities + probabilities = F.softmax(outputs.logits, dim=-1) + + # Get the predicted sentiment class + predicted_class = torch.argmax(probabilities, dim=1).item() + predicted_label = self.class_labels[predicted_class] + confidence = probabilities[0][predicted_class].item() + + if self.debug: + logger.debug(f"Text: '{text[:50]}...' → {predicted_label} ({confidence:.4f})") + + return outputs, probabilities, predicted_label, confidence + except Exception as e: + logger.error(f"Inference error for text '{text[:50]}...': {str(e)}") + # Return a default value in case of error + return None, None, "error", 0.0 + + def batch_process( + self, + texts: List[str], + batch_size: int = 16, + show_progress: bool = False + ) -> List[Tuple[Any, torch.Tensor, str, float]]: + """ + Process a batch of texts for sentiment analysis. + + Args: + texts: List of texts to analyze + batch_size: Number of texts to process at once + show_progress: Whether to show a progress bar + + Returns: + List of tuples containing model outputs, probabilities, predicted labels, and confidence scores + """ + results = [] + + # Use tqdm for progress tracking if requested + iterator = tqdm(range(0, len(texts), batch_size)) if show_progress else range(0, len(texts), batch_size) + + for i in iterator: + batch_texts = texts[i:i + batch_size] + try: + # Tokenize batch + batch_inputs = self.tokenizer(batch_texts, return_tensors="pt", + truncation=True, padding=True) + batch_inputs = {k: v.to(self.device) for k, v in batch_inputs.items()} + + # Process batch + with torch.no_grad(): + self.model.eval() + outputs = self.model(**batch_inputs) + + probabilities = F.softmax(outputs.logits, dim=-1) + + # Process each item in batch + for j, probs in enumerate(probabilities): + predicted_class = torch.argmax(probs).item() + predicted_label = self.class_labels[predicted_class] + confidence = probs[predicted_class].item() + + results.append((outputs, probs, predicted_label, confidence)) + except Exception as e: + logger.error(f"Batch processing error at index {i}: {str(e)}") + # Add None results for the failed batch + results.extend([(None, None, "error", 0.0)] * len(batch_texts)) + + return results + + def evaluate(self, texts: List[str], labels: List[str]) -> Dict[str, float]: + """ + Evaluate the model on a dataset with ground truth labels. + + Args: + texts: List of input texts + labels: List of ground truth labels corresponding to texts + + Returns: + Dictionary with evaluation metrics (accuracy, per-class F1, etc.) + """ + if len(texts) != len(labels): + raise ValueError(f"Number of texts ({len(texts)}) must match number of labels ({len(labels)})") + + results = self.batch_process(texts, show_progress=True) + predicted_labels = [result[2] for result in results] + + # Calculate accuracy + correct = sum(1 for pred, true in zip(predicted_labels, labels) if pred == true) + accuracy = correct / len(texts) if texts else 0.0 + + # Calculate per-class metrics + class_metrics = {} + for cls in set(self.class_labels): + tp = sum(1 for pred, true in zip(predicted_labels, labels) if pred == cls and true == cls) + fp = sum(1 for pred, true in zip(predicted_labels, labels) if pred == cls and true != cls) + fn = sum(1 for pred, true in zip(predicted_labels, labels) if pred != cls and true == cls) + + precision = tp / (tp + fp) if (tp + fp) > 0 else 0 + recall = tp / (tp + fn) if (tp + fn) > 0 else 0 + f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 + + class_metrics[cls] = {"precision": precision, "recall": recall, "f1": f1} + + return { + "accuracy": accuracy, + "class_metrics": class_metrics + } + + def save_model(self, path: str) -> None: + """ + Save the model to the specified path. + + Args: + path: Directory path to save the model to + """ + if not os.path.exists(path): + os.makedirs(path) + + try: + # Save the model + self.model.save_pretrained(path) + # Save the tokenizer + self.tokenizer.save_pretrained(path) + # Save class labels + with open(os.path.join(path, "class_labels.txt"), "w") as f: + f.write("\n".join(self.class_labels)) + + logger.info(f"Model successfully saved to {path}") + except Exception as e: + logger.error(f"Error saving model to {path}: {str(e)}") + raise + + @classmethod + def load_model(cls, path: str, device: str = "cpu") -> "BertweetSentiment": + """ + Load a saved model from the specified path. + + Args: + path: Directory path to load the model from + device: Device to load the model on + + Returns: + Loaded BertweetSentiment model + """ + config = { + "debug": False, + "sentiment_analysis": { + "bertweet": { + "model_name": path, + "device": device + } + } + } + + model = cls(config) + + # Load custom class labels if available + class_labels_path = os.path.join(path, "class_labels.txt") + if os.path.exists(class_labels_path): + with open(class_labels_path, "r") as f: + model.class_labels = [line.strip() for line in f.readlines()] + + return model if __name__ == "__main__": + # Example configuration config = { 'debug': True, 'sentiment_analysis': { - 'default_model': "bertweet", # Specify the default sentiment analysis model (e.g., bertweet, another_model) + 'default_model': "bertweet", 'bertweet': { 'model_name': "finiteautomata/bertweet-base-sentiment-analysis", - 'device': 'cpu' + 'device': 'cuda' if torch.cuda.is_available() else 'cpu', + 'cache_size': 256 } } } - print("config",config) + + print(f"Using device: {config['sentiment_analysis']['bertweet']['device']}") + + # Initialize model model = BertweetSentiment(config) -# print("model",model) -# print("model.class_labels",model.class_labels) - - text = "I love the new features of the app!" - print(model(text)) - - # text = "I hate the new features of the app!" - # print(model(text)) - - # text = "Hi how are u?" - # print(model(text)) - -# # Run: -# # python -m app.models.bertweet_model \ No newline at end of file + print(f"Model initialized with classes: {model.class_labels}") + + # Single text example + text = "I love the new features of the app! 😍" + _, _, sentiment, confidence = model(text) + print(f"Text: '{text}'\nSentiment: {sentiment}\nConfidence: {confidence:.4f}\n") + + # Batch processing example + texts = [ + "I hate the new features of the app! 😡", + "This product is just okay, nothing special", + "Hi how are u?", + "The service was excellent and the staff was very friendly!" + ] + + print("Batch processing example:") + results = model.batch_process(texts, show_progress=True) + for i, (_, _, sentiment, confidence) in enumerate(results): + print(f"Text: '{texts[i]}'\nSentiment: {sentiment}\nConfidence: {confidence:.4f}\n") + + # Evaluation example (with mock data) + print("Evaluation example:") + eval_texts = texts + eval_labels = ["negative", "neutral", "neutral", "positive"] + eval_results = model.evaluate(eval_texts, eval_labels) + print(f"Accuracy: {eval_results['accuracy']:.4f}") + for cls, metrics in eval_results['class_metrics'].items(): + print(f"{cls}: Precision={metrics['precision']:.4f}, Recall={metrics['recall']:.4f}, F1={metrics['f1']:.4f}") + + # Model saving example (commented out to avoid unintended file creation) + # model.save_model("./saved_sentiment_model") + + # Model loading example + # loaded_model = BertweetSentiment.load_model("./saved_sentiment_model")