File size: 6,010 Bytes
b748dad
 
02c2d7e
 
 
b748dad
02c2d7e
 
 
 
b748dad
e48d543
02c2d7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c700823
b748dad
 
c700823
02c2d7e
b748dad
 
02c2d7e
 
b748dad
 
 
 
c700823
 
b748dad
 
02c2d7e
 
b748dad
 
02c2d7e
c700823
b748dad
c700823
ab15c62
b748dad
 
 
 
 
ab15c62
b748dad
 
 
 
c700823
b748dad
 
 
 
c700823
b748dad
 
 
 
 
ab15c62
b748dad
 
 
ab15c62
b748dad
e48d543
 
 
c700823
b748dad
c700823
b748dad
 
 
 
 
 
 
 
e48d543
 
 
b748dad
c700823
b748dad
 
c700823
b748dad
c700823
b748dad
 
c700823
b748dad
 
 
 
 
 
 
 
c700823
02c2d7e
c700823
02c2d7e
 
 
b748dad
02c2d7e
 
 
 
b748dad
02c2d7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b748dad
 
 
 
 
 
 
 
 
 
 
 
02c2d7e
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import multiprocessing as mp
import os
import re
import string
from collections import OrderedDict
from typing import Callable, List, Optional

import pandas as pd
import spacy
import streamlit as st
import vaex
from pandas.core.frame import DataFrame
from pandas.core.series import Series
from textacy.preprocessing import make_pipeline, normalize, remove, replace

from .configs import Languages

# more [here](https://github.com/fastai/fastai/blob/master/fastai/text/core.py#L42)
# and [here](https://textacy.readthedocs.io/en/latest/api_reference/preprocessing.html)
# fmt: off
_re_normalize_acronyms = re.compile(r"(?:[a-zA-Z]\.){2,}")
def normalize_acronyms(t):
    return _re_normalize_acronyms.sub(t.translate(str.maketrans("", "", string.punctuation)).upper(), t)


_re_non_word = re.compile(r"\W")
def remove_non_word(t):
    return _re_non_word.sub(" ", t)


_re_space = re.compile(r" {2,}")
def normalize_useless_spaces(t):
    return _re_space.sub(" ", t)


_re_rep = re.compile(r"(\S)(\1{2,})")
def normalize_repeating_chars(t):
    def _replace_rep(m):
        c, cc = m.groups()
        return c

    return _re_rep.sub(_replace_rep, t)


_re_wrep = re.compile(r"(?:\s|^)(\w+)\s+((?:\1\s+)+)\1(\s|\W|$)")
def normalize_repeating_words(t):
    def _replace_wrep(m):
        c, cc, e = m.groups()
        return c

    return _re_wrep.sub(_replace_wrep, t)


def lowercase(t: str) -> str:
    return t.lower()


def strip(t: str) -> str:
    return t.strip()


def lemmatize_remove_stopwords(doc: spacy.tokens.doc.Doc) -> str:
    return " ".join(
        [t.lemma_ for t in doc if t.lemma_ != "-PRON-" and not t.is_stop]
    )


def remove_stopwords(doc: spacy.tokens.doc.Doc) -> str:
    return " ".join([t.text for t in doc if not t.is_stop])


def lemmatize_keep_stopwords(doc: spacy.tokens.doc.Doc) -> str:
    return " ".join([t.lemma_ for t in doc if t.lemma_ != "-PRON-"])


# fmt: on
class PreprocessingPipeline:
    def __init__(
        self,
        language: str,
        pre_steps: Optional[List[str]],
        lemmatization_step: Optional[str],
        post_steps: Optional[List[str]],
    ):
        self.language = language
        self.pre_steps = pre_steps
        self.lemmatization_step = lemmatization_step
        self.post_steps = post_steps

        self.nlp = spacy.load(Languages[language].value, disable=["parser", "ner"])
        self.pre = self.make_pre_post_component(self.pre_steps)
        self.post = self.make_pre_post_component(self.post_steps)
        self.lemma = self.lemmatization_component()[self.lemmatization_step]

    def apply_multiproc(fn, series):
        with mp.Pool(mp.cpu_count()) as pool:
            new_series = pool.map(fn, series)

        return new_series

    def vaex_process(self, df: DataFrame, text_column: str) -> DataFrame:
        def fn(t):
            return self.post(self.lemma(self.nlp(self.pre(t))))

        vdf = vaex.from_pandas(df)
        vdf["processed_text"] = vdf.apply(
            fn, arguments=[vdf[text_column]], vectorize=False
        )

        return vdf.to_pandas_df()

    def __call__(self, series: Series) -> Series:
        if self.pre:
            series = series.map(self.pre)

        if self.lemma:
            total_steps = len(series) // 100
            res = []
            pbar = st.progress(0)
            for i, doc in enumerate(
                self.nlp.pipe(series, batch_size=500, n_process=os.cpu_count())
            ):
                res.append(self.lemma(doc))

                if i % total_steps == 0:
                    pbar.progress(1)

            series = pd.Series(res)

        if self.post:
            series = series.map(self.post)

        return series

    def make_pre_post_component(self, steps: Optional[List[str]]) -> Optional[Callable]:
        if not steps:
            return
        components = [self.pipeline_components()[step] for step in steps]

        return make_pipeline(*components)

    @staticmethod
    def pipeline_components() -> "OrderedDict[str, Callable]":
        """Returns available cleaning steps in order"""
        return OrderedDict(
            [
                ("lowercase", lowercase),
                ("normalize_unicode", normalize.unicode),
                ("normalize_bullet_points", normalize.bullet_points),
                ("normalize_hyphenated_words", normalize.hyphenated_words),
                ("normalize_quotation_marks", normalize.quotation_marks),
                ("normalize_whitespaces", normalize.whitespace),
                ("replace_urls", replace.urls),
                ("replace_currency_symbols", replace.currency_symbols),
                ("replace_emails", replace.emails),
                ("replace_emojis", replace.emojis),
                ("replace_hashtags", replace.hashtags),
                ("replace_numbers", replace.numbers),
                ("replace_phone_numbers", replace.phone_numbers),
                ("replace_user_handles", replace.user_handles),
                ("normalize_acronyms", normalize_acronyms),
                ("remove_accents", remove.accents),
                ("remove_brackets", remove.brackets),
                ("remove_html_tags", remove.html_tags),
                ("remove_punctuation", remove.punctuation),
                ("remove_non_words", remove_non_word),
                ("normalize_useless_spaces", normalize_useless_spaces),
                ("normalize_repeating_chars", normalize_repeating_chars),
                ("normalize_repeating_words", normalize_repeating_words),
                ("strip", strip),
            ]
        )

    @staticmethod
    def lemmatization_component() -> "OrderedDict[str, Optional[Callable]]":
        return OrderedDict(
            [
                ("Spacy lemmatizer (keep stopwords)", lemmatize_keep_stopwords),
                ("Spacy lemmatizer (no stopwords)", lemmatize_remove_stopwords),
                ("Disable lemmatizer", None),
                ("Remove stopwords", remove_stopwords),
            ]
        )