Tymec's picture
Add min-df option
8b10b79
raw
history blame
7.31 kB
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Literal, Sequence
import numpy as np
from joblib import Memory
from sklearn.exceptions import ConvergenceWarning
from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer, TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import RandomizedSearchCV, cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from tqdm import tqdm
from app.constants import CACHE_DIR
from app.data import tokenize
if TYPE_CHECKING:
from sklearn.base import BaseEstimator, TransformerMixin
__all__ = ["train_model", "evaluate_model", "infer_model"]
def _identity(x: list[str]) -> list[str]:
"""Identity function for use in TfidfVectorizer.
Args:
x: Input data
Returns:
Unchanged input data
"""
return x
def _get_vectorizer(
name: Literal["tfidf", "count", "hashing"],
n_features: int,
min_df: float = 0.1,
ngram: tuple[int, int] = (1, 2),
) -> TransformerMixin:
"""Get the appropriate vectorizer.
Args:
name: Type of vectorizer
n_features: Maximum number of features
min_df: Minimum document frequency (ignored for hashing)
ngram: N-gram range [min_n, max_n]
Returns:
Vectorizer instance
Raises:
ValueError: If the vectorizer is not recognized
"""
shared_params = {
"ngram_range": ngram,
# disable text processing
"tokenizer": _identity,
"preprocessor": _identity,
"lowercase": False,
"token_pattern": None,
}
match name:
case "tfidf":
return TfidfVectorizer(
max_features=n_features,
min_df=min_df,
**shared_params,
)
case "count":
return CountVectorizer(
max_features=n_features,
min_df=min_df,
**shared_params,
)
case "hashing":
if n_features < 2**15:
warnings.warn(
"HashingVectorizer may perform poorly with small n_features, default is 2^20.",
stacklevel=2,
)
return HashingVectorizer(
n_features=n_features,
**shared_params,
)
case _:
msg = f"Unknown vectorizer: {name}"
raise ValueError(msg)
def train_model(
token_data: Sequence[Sequence[str]],
label_data: list[int],
vectorizer: Literal["tfidf", "count", "hashing"],
max_features: int,
min_df: float = 0.1,
folds: int = 5,
n_jobs: int = 4,
seed: int = 42,
) -> tuple[BaseEstimator, float]:
"""Train the sentiment analysis model.
Args:
token_data: Tokenized text data
label_data: Label data
vectorizer: Which vectorizer to use
max_features: Maximum number of features
min_df: Minimum document frequency (ignored for hashing)
folds: Number of cross-validation folds
n_jobs: Number of parallel jobs
seed: Random seed (None for random seed)
Returns:
Trained model and accuracy
Raises:
ValueError: If the vectorizer is not recognized
"""
rs = None if seed == -1 else seed
text_train, text_test, label_train, label_test = train_test_split(
token_data,
label_data,
test_size=0.2,
random_state=rs,
)
vectorizer = _get_vectorizer(vectorizer, max_features, min_df)
classifiers = [
(LogisticRegression(max_iter=1000, random_state=rs), {"C": np.logspace(-4, 4, 20)}),
# (LinearSVC(max_iter=10000, random_state=rs), {"C": np.logspace(-4, 4, 20)}),
# (KNeighborsClassifier(), {"n_neighbors": np.arange(1, 10)}),
# (RandomForestClassifier(random_state=rs), {"n_estimators": np.arange(50, 500, 50)}),
# (
# VotingClassifier(
# estimators=[
# ("lr", LogisticRegression(max_iter=1000, random_state=rs)),
# ("knn", KNeighborsClassifier()),
# ("rf", RandomForestClassifier(random_state=rs)),
# ],
# ),
# {
# "lr__C": np.logspace(-4, 4, 20),
# "knn__n_neighbors": np.arange(1, 10),
# "rf__n_estimators": np.arange(50, 500, 50),
# },
# ),
]
models = []
for clf, param_dist in (pbar := tqdm(classifiers, unit="clf")):
param_dist = {f"classifier__{k}": v for k, v in param_dist.items()}
model = Pipeline(
[("vectorizer", vectorizer), ("classifier", clf)],
memory=Memory(CACHE_DIR, verbose=0),
)
search = RandomizedSearchCV(
model,
param_dist,
cv=folds,
random_state=rs,
n_jobs=n_jobs,
# verbose=2,
scoring="accuracy",
n_iter=10,
)
pbar.set_description(f"Searching for {clf.__class__.__name__}")
with warnings.catch_warnings():
warnings.filterwarnings("once", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=UserWarning, message="Persisting input arguments took")
search.fit(text_train, label_train)
best_model = search.best_estimator_
acc = best_model.score(text_test, label_test)
models.append((best_model, acc))
print("Final results:")
print("--------------")
print("\n".join(f"{model.named_steps['classifier'].__class__.__name__}: {acc:.2%}" for model, acc in models))
best_model, best_acc = max(models, key=lambda x: x[1])
print(f"Settled on {best_model.named_steps['classifier'].__class__.__name__}")
return best_model, best_acc
def evaluate_model(
model: BaseEstimator,
token_data: Sequence[Sequence[str]],
label_data: list[int],
folds: int = 5,
n_jobs: int = 4,
) -> tuple[float, float]:
"""Evaluate the model using cross-validation.
Args:
model: Trained model
token_data: Tokenized text data
label_data: Label data
folds: Number of cross-validation folds
n_jobs: Number of parallel jobs
Returns:
Mean accuracy and standard deviation
"""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning)
scores = cross_val_score(
model,
token_data,
label_data,
cv=folds,
scoring="accuracy",
n_jobs=n_jobs,
verbose=2,
)
return scores.mean(), scores.std()
def infer_model(
model: BaseEstimator,
text_data: list[str],
batch_size: int = 32,
n_jobs: int = 4,
) -> list[int]:
"""Predict the sentiment of the provided text documents.
Args:
model: Trained model
text_data: Text data
batch_size: Batch size for tokenization
n_jobs: Number of parallel jobs
Returns:
Predicted sentiments
"""
tokens = tokenize(
text_data,
batch_size=batch_size,
n_jobs=n_jobs,
show_progress=False,
)
return model.predict(tokens)