Spaces:
Build error
Build error
from typing import Dict, List, Tuple | |
import numpy as np | |
import pandas as pd | |
import streamlit as st | |
from pandas.core.frame import DataFrame | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.preprocessing import LabelEncoder | |
from sklearn.utils import resample | |
from .configs import InputTransformConfigs, ModelConfigs | |
def input_transform( | |
text: pd.Series, labels: pd.Series, configs=InputTransformConfigs | |
) -> Dict[str, np.ndarray]: | |
""" | |
Encodes text in mathematical object ameanable to training algorithm | |
""" | |
tfidf_vectorizer = TfidfVectorizer( | |
input="content", # default: file already in memory | |
encoding="utf-8", # default | |
decode_error="strict", # default | |
strip_accents=None, # do nothing | |
lowercase=False, # do nothing | |
preprocessor=None, # do nothing - default | |
tokenizer=None, # default | |
stop_words=None, # do nothing | |
analyzer="word", | |
ngram_range=configs.NGRAM_RANGE.value, # maximum 3-ngrams | |
min_df=configs.MIN_DF.value, | |
max_df=configs.MAX_DF.value, | |
sublinear_tf=configs.SUBLINEAR.value, | |
) | |
label_encoder = LabelEncoder() | |
X = tfidf_vectorizer.fit_transform(text.values) | |
y = label_encoder.fit_transform(labels.values) | |
return { | |
"X": X, | |
"y": y, | |
"X_names": np.array(tfidf_vectorizer.get_feature_names_out()), | |
"y_names": label_encoder.classes_, | |
} | |
def wordifier( | |
X: np.ndarray, | |
y: np.ndarray, | |
X_names: List[str], | |
y_names: List[str], | |
configs=ModelConfigs, | |
) -> List[Tuple[str, float, str]]: | |
n_instances, n_features = X.shape | |
n_classes = len(y_names) | |
# NOTE: the * 10 / 10 trick is to have "nice" round-ups | |
sample_fraction = np.ceil((n_features / n_instances) * 10) / 10 | |
sample_size = min( | |
# this is the maximum supported | |
configs.MAX_SELECTION.value, | |
# at minimum you want MIN_SELECTION but in general you want | |
# n_instances * sample_fraction | |
max(configs.MIN_SELECTION.value, int(n_instances * sample_fraction)), | |
# however if previous one is bigger the the available instances take | |
# the number of available instances | |
n_instances, | |
) | |
# TODO: might want to try out something to subsample features at each iteration | |
# initialize coefficient matrices | |
pos_scores = np.zeros((n_classes, n_features), dtype=int) | |
neg_scores = np.zeros((n_classes, n_features), dtype=int) | |
pbar = st.progress(0) | |
for i, _ in enumerate(range(configs.NUM_ITERS.value)): | |
# run randomized regression | |
clf = LogisticRegression( | |
penalty="l1", | |
C=configs.PENALTIES.value[np.random.randint(len(configs.PENALTIES.value))], | |
solver="liblinear", | |
multi_class="auto", | |
max_iter=500, | |
class_weight="balanced", | |
random_state=42, | |
) | |
# sample indices to subsample matrix | |
selection = resample( | |
np.arange(n_instances), replace=True, stratify=y, n_samples=sample_size | |
) | |
# fit | |
try: | |
clf.fit(X[selection], y[selection]) | |
except ValueError: | |
continue | |
# record coefficients | |
if n_classes == 2: | |
pos_scores[1] = pos_scores[1] + (clf.coef_ > 0.0) | |
neg_scores[1] = neg_scores[1] + (clf.coef_ < 0.0) | |
pos_scores[0] = pos_scores[0] + (clf.coef_ < 0.0) | |
neg_scores[0] = neg_scores[0] + (clf.coef_ > 0.0) | |
else: | |
pos_scores += clf.coef_ > 0 | |
neg_scores += clf.coef_ < 0 | |
pbar.progress(round(i / configs.NUM_ITERS.value, 1)) | |
# normalize | |
pos_scores = pos_scores / configs.NUM_ITERS.value | |
neg_scores = neg_scores / configs.NUM_ITERS.value | |
# get only active features | |
pos_positions = np.where( | |
pos_scores >= configs.SELECTION_THRESHOLD.value, pos_scores, 0 | |
) | |
neg_positions = np.where( | |
neg_scores >= configs.SELECTION_THRESHOLD.value, neg_scores, 0 | |
) | |
# prepare DataFrame | |
pos = [ | |
(X_names[i], pos_scores[c, i], y_names[c]) | |
for c, i in zip(*pos_positions.nonzero()) | |
] | |
neg = [ | |
(X_names[i], neg_scores[c, i], y_names[c]) | |
for c, i in zip(*neg_positions.nonzero()) | |
] | |
return pos, neg | |
def output_transform( | |
pos: List[Tuple[str, float, str]], neg: List[Tuple[str, float, str]] | |
) -> DataFrame: | |
posdf = pd.DataFrame(pos, columns="word score label".split()).sort_values( | |
["label", "score"], ascending=False | |
) | |
posdf["correlation"] = "positive" | |
negdf = pd.DataFrame(neg, columns="word score label".split()).sort_values( | |
["label", "score"], ascending=False | |
) | |
negdf["correlation"] = "negative" | |
output = pd.concat([posdf, negdf], ignore_index=False, axis=0) | |
output.columns = output.columns.str.title() | |
return output | |