import multiprocessing as mp import os import re import string from collections import OrderedDict from typing import Callable, List, Optional import pandas as pd import spacy import streamlit as st import vaex from pandas.core.frame import DataFrame from pandas.core.series import Series from textacy.preprocessing import make_pipeline, normalize, remove, replace from .configs import Languages # more [here](https://github.com/fastai/fastai/blob/master/fastai/text/core.py#L42) # and [here](https://textacy.readthedocs.io/en/latest/api_reference/preprocessing.html) # fmt: off _re_normalize_acronyms = re.compile(r"(?:[a-zA-Z]\.){2,}") def normalize_acronyms(t): return _re_normalize_acronyms.sub(t.translate(str.maketrans("", "", string.punctuation)).upper(), t) _re_non_word = re.compile(r"\W") def remove_non_word(t): return _re_non_word.sub(" ", t) _re_space = re.compile(r" {2,}") def normalize_useless_spaces(t): return _re_space.sub(" ", t) _re_rep = re.compile(r"(\S)(\1{2,})") def normalize_repeating_chars(t): def _replace_rep(m): c, cc = m.groups() return c return _re_rep.sub(_replace_rep, t) _re_wrep = re.compile(r"(?:\s|^)(\w+)\s+((?:\1\s+)+)\1(\s|\W|$)") def normalize_repeating_words(t): def _replace_wrep(m): c, cc, e = m.groups() return c return _re_wrep.sub(_replace_wrep, t) def lowercase(t: str) -> str: return t.lower() def strip(t: str) -> str: return t.strip() def lemmatize_remove_stopwords(doc: spacy.tokens.doc.Doc) -> str: return " ".join( [t.lemma_ for t in doc if t.lemma_ != "-PRON-" and not t.is_stop] ) def remove_stopwords(doc: spacy.tokens.doc.Doc) -> str: return " ".join([t.text for t in doc if not t.is_stop]) def lemmatize_keep_stopwords(doc: spacy.tokens.doc.Doc) -> str: return " ".join([t.lemma_ for t in doc if t.lemma_ != "-PRON-"]) # fmt: on class PreprocessingPipeline: def __init__( self, language: str, pre_steps: Optional[List[str]], lemmatization_step: Optional[str], post_steps: Optional[List[str]], ): self.language = language self.pre_steps = pre_steps self.lemmatization_step = lemmatization_step self.post_steps = post_steps self.nlp = spacy.load(Languages[language].value, disable=["parser", "ner"]) self.pre = self.make_pre_post_component(self.pre_steps) self.post = self.make_pre_post_component(self.post_steps) self.lemma = self.lemmatization_component()[self.lemmatization_step] def apply_multiproc(fn, series): with mp.Pool(mp.cpu_count()) as pool: new_series = pool.map(fn, series) return new_series def vaex_process(self, df: DataFrame, text_column: str) -> DataFrame: def fn(t): return self.post(self.lemma(self.nlp(self.pre(t)))) vdf = vaex.from_pandas(df) vdf["processed_text"] = vdf.apply( fn, arguments=[vdf[text_column]], vectorize=False ) return vdf.to_pandas_df() def __call__(self, series: Series) -> Series: if self.pre: series = series.map(self.pre) if self.lemma: total_steps = len(series) // 100 res = [] pbar = st.progress(0) for i, doc in enumerate( self.nlp.pipe(series, batch_size=500, n_process=os.cpu_count()) ): res.append(self.lemma(doc)) if i % total_steps == 0: pbar.progress(1) series = pd.Series(res) if self.post: series = series.map(self.post) return series def make_pre_post_component(self, steps: Optional[List[str]]) -> Optional[Callable]: if not steps: return components = [self.pipeline_components()[step] for step in steps] return make_pipeline(*components) @staticmethod def pipeline_components() -> "OrderedDict[str, Callable]": """Returns available cleaning steps in order""" return OrderedDict( [ ("lowercase", lowercase), ("normalize_unicode", normalize.unicode), ("normalize_bullet_points", normalize.bullet_points), ("normalize_hyphenated_words", normalize.hyphenated_words), ("normalize_quotation_marks", normalize.quotation_marks), ("normalize_whitespaces", normalize.whitespace), ("replace_urls", replace.urls), ("replace_currency_symbols", replace.currency_symbols), ("replace_emails", replace.emails), ("replace_emojis", replace.emojis), ("replace_hashtags", replace.hashtags), ("replace_numbers", replace.numbers), ("replace_phone_numbers", replace.phone_numbers), ("replace_user_handles", replace.user_handles), ("normalize_acronyms", normalize_acronyms), ("remove_accents", remove.accents), ("remove_brackets", remove.brackets), ("remove_html_tags", remove.html_tags), ("remove_punctuation", remove.punctuation), ("remove_non_words", remove_non_word), ("normalize_useless_spaces", normalize_useless_spaces), ("normalize_repeating_chars", normalize_repeating_chars), ("normalize_repeating_words", normalize_repeating_words), ("strip", strip), ] ) @staticmethod def lemmatization_component() -> "OrderedDict[str, Optional[Callable]]": return OrderedDict( [ ("Spacy lemmatizer (keep stopwords)", lemmatize_keep_stopwords), ("Spacy lemmatizer (no stopwords)", lemmatize_remove_stopwords), ("Disable lemmatizer", None), ("Remove stopwords", remove_stopwords), ] )