import re import string from collections import OrderedDict from typing import Callable, List, Optional, Union import spacy import vaex from pandas.core.frame import DataFrame from textacy.preprocessing import make_pipeline, normalize, remove, replace from .configs import Languages # more [here](https://github.com/fastai/fastai/blob/master/fastai/text/core.py#L42) # and [here](https://textacy.readthedocs.io/en/latest/api_reference/preprocessing.html) # fmt: off _re_normalize_acronyms = re.compile(r"(?:[a-zA-Z]\.){2,}") def normalize_acronyms(t: str) -> str: return _re_normalize_acronyms.sub(t.translate(str.maketrans("", "", string.punctuation)).upper(), t) _re_non_word = re.compile(r"[^A-Za-z]+") def remove_non_word(t: str) -> str: "Removes non-words characters and digits from the text using the regex `[^A-Za-z]+`" return _re_non_word.sub(" ", t) _re_space = re.compile(r" {2,}") def normalize_useless_spaces(t: str) -> str: return _re_space.sub(" ", t) _re_rep = re.compile(r"(\S)(\1{2,})") def normalize_repeating_chars(t: str) -> str: def _replace_rep(m): c, cc = m.groups() return c return _re_rep.sub(_replace_rep, t) _re_wrep = re.compile(r"(?:\s|^)(\w+)\s+((?:\1\s+)+)\1(\s|\W|$)") def normalize_repeating_words(t: str) -> str: def _replace_wrep(m): c, cc, e = m.groups() return c return _re_wrep.sub(_replace_wrep, t) _re_remove_numbers = re.compile(r"\d+") def remove_numbers(t: str) -> str: return _re_remove_numbers.sub(" ", t) def lowercase(t: str) -> str: "Lowercases the text" return t.lower() def strip(t: str) -> str: return t.strip() def lemmatize_remove_stopwords(doc: spacy.tokens.doc.Doc) -> str: return " ".join( [t.lemma_ for t in doc if t.lemma_ != "-PRON-" and not t.is_stop] ) def remove_stopwords(doc: spacy.tokens.doc.Doc) -> str: return " ".join([t.text for t in doc if not t.is_stop]) def lemmatize_keep_stopwords(doc: spacy.tokens.doc.Doc) -> str: return " ".join([t.lemma_ for t in doc if t.lemma_ != "-PRON-"]) def identity(t): return t # fmt: on class PreprocessingPipeline: def __init__( self, language: str, pre_steps: Optional[List[str]], lemmatization_step: Optional[str], post_steps: Optional[List[str]], ): self.language = language self.pre_steps = pre_steps self.lemmatization_step = lemmatization_step self.post_steps = post_steps self.pre = self.make_pipe_component(self.pre_steps, self.language) self.post = self.make_pipe_component(self.post_steps, self.language) self.nlp = self.make_nlp(self.lemmatization_step, self.language) self.lemma = self.make_lemma(self.lemmatization_step, self.language) # def apply_multiproc(fn, series): # with mp.Pool(mp.cpu_count()) as pool: # new_series = pool.map(fn, series) # return new_series def vaex_process(self, df: DataFrame, text_column: str) -> DataFrame: def fn(t): return self.post(self.lemma(self.nlp(self.pre(t)))) vdf = vaex.from_pandas(df) vdf["processed_text"] = vdf.apply( fn, arguments=[vdf[text_column]], vectorize=False ) df = vdf.to_pandas_df() return df @classmethod def make_pipe_component(cls, steps: Optional[List[str]], language: str) -> Callable: if not steps: return identity elif language in ("MultiLanguage", "Chinese") and "remove_non_words" in steps: idx = steps.index("remove_non_words") steps = ( steps[:idx] + ["remove_numbers", "remove_punctuation"] + steps[idx + 1 :] ) components = [cls.pipeline_components()[step] for step in steps] return make_pipeline(*components) @staticmethod def make_nlp( lemmatization_step: Optional[str], language: str ) -> Union[spacy.language.Language, Callable]: if ( lemmatization_step is None or lemmatization_step == "Disable lemmatizer" or ( lemmatization_step == "Spacy lemmatizer (keep stopwords)" and language in ("MultiLanguage", "Chinese") ) ): return identity return spacy.load(Languages[language].value, disable=["parser", "ner"]) @classmethod def make_lemma(cls, lemmatization_step: Optional[str], language: str) -> Callable: if ( lemmatization_step is None or lemmatization_step == "Disable lemmatizer" or ( lemmatization_step == "Spacy lemmatizer (keep stopwords)" and language in ("MultiLanguage", "Chinese") ) ): return identity elif ( lemmatization_step == "Spacy lemmatizer (remove stopwords)" and language in ("MultiLanguage", "Chinese") ): return cls.lemmatization_component().get("Remove stopwords") return cls.lemmatization_component().get(lemmatization_step) @staticmethod def pipeline_components() -> "OrderedDict[str, Callable]": """Returns available cleaning steps in order""" return OrderedDict( [ ("lowercase", lowercase), ("normalize_unicode", normalize.unicode), ("normalize_bullet_points", normalize.bullet_points), ("normalize_hyphenated_words", normalize.hyphenated_words), ("normalize_quotation_marks", normalize.quotation_marks), ("normalize_whitespaces", normalize.whitespace), ("replace_urls", replace.urls), ("replace_currency_symbols", replace.currency_symbols), ("replace_emails", replace.emails), ("replace_emojis", replace.emojis), ("replace_hashtags", replace.hashtags), ("replace_numbers", replace.numbers), ("replace_phone_numbers", replace.phone_numbers), ("replace_user_handles", replace.user_handles), ("normalize_acronyms", normalize_acronyms), ("remove_accents", remove.accents), ("remove_brackets", remove.brackets), ("remove_html_tags", remove.html_tags), ("remove_punctuation", remove.punctuation), ("remove_non_words", remove_non_word), ("remove_numbers", remove_numbers), ("normalize_useless_spaces", normalize_useless_spaces), ("normalize_repeating_chars", normalize_repeating_chars), ("normalize_repeating_words", normalize_repeating_words), ("strip", strip), ] ) @staticmethod def lemmatization_component() -> "OrderedDict[str, Optional[Callable]]": return OrderedDict( [ ("Spacy lemmatizer (keep stopwords)", lemmatize_keep_stopwords), ("Spacy lemmatizer (remove stopwords)", lemmatize_remove_stopwords), ("Disable lemmatizer", identity), ("Remove stopwords", remove_stopwords), ] )