wordify / src /wordifier.py
Pietro Lesci
reproducibility
11bd087
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
import streamlit as st
from pandas.core.frame import DataFrame
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import resample
from .configs import InputTransformConfigs, ModelConfigs
def input_transform(
text: pd.Series, labels: pd.Series, configs=InputTransformConfigs
) -> Dict[str, np.ndarray]:
"""
Encodes text in mathematical object ameanable to training algorithm
"""
tfidf_vectorizer = TfidfVectorizer(
input="content", # default: file already in memory
encoding="utf-8", # default
decode_error="strict", # default
strip_accents=None, # do nothing
lowercase=False, # do nothing
preprocessor=None, # do nothing - default
tokenizer=None, # default
stop_words=None, # do nothing
analyzer="word",
ngram_range=configs.NGRAM_RANGE.value, # maximum 3-ngrams
min_df=configs.MIN_DF.value,
max_df=configs.MAX_DF.value,
sublinear_tf=configs.SUBLINEAR.value,
)
label_encoder = LabelEncoder()
X = tfidf_vectorizer.fit_transform(text.values)
y = label_encoder.fit_transform(labels.values)
return {
"X": X,
"y": y,
"X_names": np.array(tfidf_vectorizer.get_feature_names_out()),
"y_names": label_encoder.classes_,
}
def wordifier(
X: np.ndarray,
y: np.ndarray,
X_names: List[str],
y_names: List[str],
configs=ModelConfigs,
) -> List[Tuple[str, float, str]]:
n_instances, n_features = X.shape
n_classes = len(y_names)
# NOTE: the * 10 / 10 trick is to have "nice" round-ups
sample_fraction = np.ceil((n_features / n_instances) * 10) / 10
sample_size = min(
# this is the maximum supported
configs.MAX_SELECTION.value,
# at minimum you want MIN_SELECTION but in general you want
# n_instances * sample_fraction
max(configs.MIN_SELECTION.value, int(n_instances * sample_fraction)),
# however if previous one is bigger the the available instances take
# the number of available instances
n_instances,
)
# TODO: might want to try out something to subsample features at each iteration
# initialize coefficient matrices
pos_scores = np.zeros((n_classes, n_features), dtype=int)
neg_scores = np.zeros((n_classes, n_features), dtype=int)
pbar = st.progress(0)
for i, _ in enumerate(range(configs.NUM_ITERS.value)):
# run randomized regression
clf = LogisticRegression(
penalty="l1",
C=configs.PENALTIES.value[np.random.randint(len(configs.PENALTIES.value))],
solver="liblinear",
multi_class="auto",
max_iter=500,
class_weight="balanced",
random_state=42,
)
# sample indices to subsample matrix
selection = resample(
np.arange(n_instances), replace=True, stratify=y, n_samples=sample_size
)
# fit
try:
clf.fit(X[selection], y[selection])
except ValueError:
continue
# record coefficients
if n_classes == 2:
pos_scores[1] = pos_scores[1] + (clf.coef_ > 0.0)
neg_scores[1] = neg_scores[1] + (clf.coef_ < 0.0)
pos_scores[0] = pos_scores[0] + (clf.coef_ < 0.0)
neg_scores[0] = neg_scores[0] + (clf.coef_ > 0.0)
else:
pos_scores += clf.coef_ > 0
neg_scores += clf.coef_ < 0
pbar.progress(round(i / configs.NUM_ITERS.value, 1))
# normalize
pos_scores = pos_scores / configs.NUM_ITERS.value
neg_scores = neg_scores / configs.NUM_ITERS.value
# get only active features
pos_positions = np.where(
pos_scores >= configs.SELECTION_THRESHOLD.value, pos_scores, 0
)
neg_positions = np.where(
neg_scores >= configs.SELECTION_THRESHOLD.value, neg_scores, 0
)
# prepare DataFrame
pos = [
(X_names[i], pos_scores[c, i], y_names[c])
for c, i in zip(*pos_positions.nonzero())
]
neg = [
(X_names[i], neg_scores[c, i], y_names[c])
for c, i in zip(*neg_positions.nonzero())
]
return pos, neg
def output_transform(
pos: List[Tuple[str, float, str]], neg: List[Tuple[str, float, str]]
) -> DataFrame:
posdf = pd.DataFrame(pos, columns="word score label".split()).sort_values(
["label", "score"], ascending=False
)
posdf["correlation"] = "positive"
negdf = pd.DataFrame(neg, columns="word score label".split()).sort_values(
["label", "score"], ascending=False
)
negdf["correlation"] = "negative"
output = pd.concat([posdf, negdf], ignore_index=False, axis=0)
output.columns = output.columns.str.title()
return output