from __future__ import annotations import warnings from typing import TYPE_CHECKING, Literal, Sequence import numpy as np from joblib import Memory from sklearn.exceptions import ConvergenceWarning from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer, TfidfVectorizer from sklearn.linear_model import LogisticRegression from sklearn.model_selection import RandomizedSearchCV, cross_val_score, train_test_split from sklearn.pipeline import Pipeline from tqdm import tqdm from app.constants import CACHE_DIR from app.data import tokenize if TYPE_CHECKING: from sklearn.base import BaseEstimator, TransformerMixin __all__ = ["train_model", "evaluate_model", "infer_model"] def _identity(x: list[str]) -> list[str]: """Identity function for use in TfidfVectorizer. Args: x: Input data Returns: Unchanged input data """ return x def _get_vectorizer( name: Literal["tfidf", "count", "hashing"], n_features: int, min_df: float = 0.1, ngram: tuple[int, int] = (1, 2), ) -> TransformerMixin: """Get the appropriate vectorizer. Args: name: Type of vectorizer n_features: Maximum number of features min_df: Minimum document frequency (ignored for hashing) ngram: N-gram range [min_n, max_n] Returns: Vectorizer instance Raises: ValueError: If the vectorizer is not recognized """ shared_params = { "ngram_range": ngram, # disable text processing "tokenizer": _identity, "preprocessor": _identity, "lowercase": False, "token_pattern": None, } match name: case "tfidf": return TfidfVectorizer( max_features=n_features, min_df=min_df, **shared_params, ) case "count": return CountVectorizer( max_features=n_features, min_df=min_df, **shared_params, ) case "hashing": if n_features < 2**15: warnings.warn( "HashingVectorizer may perform poorly with small n_features, default is 2^20.", stacklevel=2, ) return HashingVectorizer( n_features=n_features, **shared_params, ) case _: msg = f"Unknown vectorizer: {name}" raise ValueError(msg) def train_model( token_data: Sequence[Sequence[str]], label_data: list[int], vectorizer: Literal["tfidf", "count", "hashing"], max_features: int, min_df: float = 0.1, folds: int = 5, n_jobs: int = 4, seed: int = 42, ) -> tuple[BaseEstimator, float]: """Train the sentiment analysis model. Args: token_data: Tokenized text data label_data: Label data vectorizer: Which vectorizer to use max_features: Maximum number of features min_df: Minimum document frequency (ignored for hashing) folds: Number of cross-validation folds n_jobs: Number of parallel jobs seed: Random seed (None for random seed) Returns: Trained model and accuracy Raises: ValueError: If the vectorizer is not recognized """ rs = None if seed == -1 else seed text_train, text_test, label_train, label_test = train_test_split( token_data, label_data, test_size=0.2, random_state=rs, ) vectorizer = _get_vectorizer(vectorizer, max_features, min_df) classifiers = [ (LogisticRegression(max_iter=1000, random_state=rs), {"C": np.logspace(-4, 4, 20)}), # (LinearSVC(max_iter=10000, random_state=rs), {"C": np.logspace(-4, 4, 20)}), # (KNeighborsClassifier(), {"n_neighbors": np.arange(1, 10)}), # (RandomForestClassifier(random_state=rs), {"n_estimators": np.arange(50, 500, 50)}), # ( # VotingClassifier( # estimators=[ # ("lr", LogisticRegression(max_iter=1000, random_state=rs)), # ("knn", KNeighborsClassifier()), # ("rf", RandomForestClassifier(random_state=rs)), # ], # ), # { # "lr__C": np.logspace(-4, 4, 20), # "knn__n_neighbors": np.arange(1, 10), # "rf__n_estimators": np.arange(50, 500, 50), # }, # ), ] models = [] for clf, param_dist in (pbar := tqdm(classifiers, unit="clf")): param_dist = {f"classifier__{k}": v for k, v in param_dist.items()} model = Pipeline( [("vectorizer", vectorizer), ("classifier", clf)], memory=Memory(CACHE_DIR, verbose=0), ) search = RandomizedSearchCV( model, param_dist, cv=folds, random_state=rs, n_jobs=n_jobs, # verbose=2, scoring="accuracy", n_iter=10, ) pbar.set_description(f"Searching for {clf.__class__.__name__}") with warnings.catch_warnings(): warnings.filterwarnings("once", category=ConvergenceWarning) warnings.filterwarnings("ignore", category=UserWarning, message="Persisting input arguments took") search.fit(text_train, label_train) best_model = search.best_estimator_ acc = best_model.score(text_test, label_test) models.append((best_model, acc)) print("Final results:") print("--------------") print("\n".join(f"{model.named_steps['classifier'].__class__.__name__}: {acc:.2%}" for model, acc in models)) best_model, best_acc = max(models, key=lambda x: x[1]) print(f"Settled on {best_model.named_steps['classifier'].__class__.__name__}") return best_model, best_acc def evaluate_model( model: BaseEstimator, token_data: Sequence[Sequence[str]], label_data: list[int], folds: int = 5, n_jobs: int = 4, ) -> tuple[float, float]: """Evaluate the model using cross-validation. Args: model: Trained model token_data: Tokenized text data label_data: Label data folds: Number of cross-validation folds n_jobs: Number of parallel jobs Returns: Mean accuracy and standard deviation """ with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) scores = cross_val_score( model, token_data, label_data, cv=folds, scoring="accuracy", n_jobs=n_jobs, verbose=2, ) return scores.mean(), scores.std() def infer_model( model: BaseEstimator, text_data: list[str], batch_size: int = 32, n_jobs: int = 4, ) -> list[int]: """Predict the sentiment of the provided text documents. Args: model: Trained model text_data: Text data batch_size: Batch size for tokenization n_jobs: Number of parallel jobs Returns: Predicted sentiments """ tokens = tokenize( text_data, batch_size=batch_size, n_jobs=n_jobs, show_progress=False, ) return model.predict(tokens)