Spaces:
Runtime error
Runtime error
Create nlp_entities.py
Browse files- nlp_entities.py +142 -0
nlp_entities.py
ADDED
@@ -0,0 +1,142 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
#@title NLP Entities code
|
2 |
+
import re
|
3 |
+
def er_data_cleaning(raw: str) -> str:
|
4 |
+
"""
|
5 |
+
Strip off text for html tags and characters.
|
6 |
+
|
7 |
+
:param raw:
|
8 |
+
:return: str: stripped string
|
9 |
+
"""
|
10 |
+
# HTML tags
|
11 |
+
if raw is None:
|
12 |
+
raw = ""
|
13 |
+
|
14 |
+
html_removed = re.sub(r"<[^<]+?>", " ", raw)
|
15 |
+
# Remove /
|
16 |
+
raw_line_removed = str(html_removed).replace("/", " ")
|
17 |
+
# removing special entities like " , & etc.
|
18 |
+
special_entites_removed = re.sub(r"&[\w]+;", "", raw_line_removed)
|
19 |
+
# removing unicode characters like \u200c, \u200E etc.
|
20 |
+
unicode_chars_removed = special_entites_removed.encode("ascii", "ignore").decode("utf-8")
|
21 |
+
unicode_chars_removed = re.sub(r"\\u[\d]{3}[\w]", " ", unicode_chars_removed)
|
22 |
+
|
23 |
+
return unicode_chars_removed.strip()
|
24 |
+
|
25 |
+
|
26 |
+
def get_clean_text_blobs(text_blobs):
|
27 |
+
"""
|
28 |
+
Clean-up text blobs.
|
29 |
+
|
30 |
+
:param text_blobs: list
|
31 |
+
:return:cleaned_text_blobs: list
|
32 |
+
"""
|
33 |
+
cleaned_text_blobs = []
|
34 |
+
for text_blob in text_blobs:
|
35 |
+
cleaned_text_blobs.append(er_data_cleaning(raw=text_blob))
|
36 |
+
return cleaned_text_blobs
|
37 |
+
|
38 |
+
|
39 |
+
def get_phrases_pagerank(text_blobs, limit=1, token_len_min=2, token_len_max=3):
|
40 |
+
"""
|
41 |
+
Return key phrases based on PageRank.
|
42 |
+
|
43 |
+
:param token_length: Length of the token in the key phrases
|
44 |
+
:param text_blobs: List of text
|
45 |
+
# TODO: limit param is redundant because we are returning all the key phrases. Probably get rid of it
|
46 |
+
:param limit: percentage limit on total key phrases returned
|
47 |
+
:return: set(key_phrases)
|
48 |
+
"""
|
49 |
+
try:
|
50 |
+
assert 0 <= limit <= 1
|
51 |
+
text = ". ".join(text_blobs)
|
52 |
+
doc = nlp(text)
|
53 |
+
# doc._.textrank.pos_kept = POS
|
54 |
+
# doc._.textrank.token_lookback = token_lookback
|
55 |
+
|
56 |
+
total_len = len(doc._.phrases)
|
57 |
+
return_phrases = int(total_len * limit)
|
58 |
+
|
59 |
+
# examine the top-ranked phrases in the document
|
60 |
+
out_phrases = dict()
|
61 |
+
|
62 |
+
for p in doc._.phrases[:return_phrases]:
|
63 |
+
|
64 |
+
# adding token_length would reduce total score from 100
|
65 |
+
tokenized_kp = p.text.split()
|
66 |
+
filtered_tokens = [word for word in tokenized_kp if word not in all_stopwords]
|
67 |
+
kp_length = len(filtered_tokens)
|
68 |
+
if p.rank > 0 and kp_length <= token_len_max and kp_length >= token_len_min:
|
69 |
+
joined_kp = " ".join(filtered_tokens)
|
70 |
+
if joined_kp in out_phrases:
|
71 |
+
out_phrases[joined_kp]["weight"] += p.rank
|
72 |
+
out_phrases[joined_kp]["kp_length"] = kp_length
|
73 |
+
else:
|
74 |
+
|
75 |
+
# count is dummy value
|
76 |
+
result_dict = {"weight": p.rank, "kp_length": kp_length, "count": 1}
|
77 |
+
out_phrases[joined_kp] = result_dict
|
78 |
+
|
79 |
+
except AssertionError as err:
|
80 |
+
raise err
|
81 |
+
return out_phrases
|
82 |
+
|
83 |
+
|
84 |
+
def dict_normalization(interest_dictionary, target=1.0):
|
85 |
+
"""
|
86 |
+
Normalize the dictionary weights to target.
|
87 |
+
|
88 |
+
:param interest_dictionary: List of key phrases and scores
|
89 |
+
:param target: normalization score
|
90 |
+
:return: normalized interest dictionary
|
91 |
+
"""
|
92 |
+
curr_score = 0
|
93 |
+
# exclude normalization if no output returned from pagerank
|
94 |
+
if len(interest_dictionary) > 0:
|
95 |
+
for kp_info in interest_dictionary.values():
|
96 |
+
curr_score += kp_info["weight"]
|
97 |
+
factor = target / curr_score
|
98 |
+
for kp, _ in interest_dictionary.items():
|
99 |
+
interest_dictionary[kp]["weight"] = round(interest_dictionary[kp]["weight"] * factor, 4)
|
100 |
+
return interest_dictionary
|
101 |
+
|
102 |
+
|
103 |
+
def get_ners(text_blobs):
|
104 |
+
"""
|
105 |
+
Get named entities.
|
106 |
+
|
107 |
+
:param text_blobs: List of text blobs
|
108 |
+
:return: named_entities
|
109 |
+
"""
|
110 |
+
k_ners = dict()
|
111 |
+
for text_blob in text_blobs:
|
112 |
+
doc = nlp(text_blob)
|
113 |
+
|
114 |
+
for ent in doc.ents:
|
115 |
+
if ent.label_ not in FILT_GROUPS:
|
116 |
+
# increment count associated with named entity
|
117 |
+
if ent.text in k_ners:
|
118 |
+
k_ners[ent.text] += 1
|
119 |
+
else:
|
120 |
+
k_ners[ent.text] = 1
|
121 |
+
return k_ners
|
122 |
+
|
123 |
+
|
124 |
+
def return_ners_and_kp(text_blobs, ret_ne=False):
|
125 |
+
"""
|
126 |
+
Return named entities and key phrases corresponding to text blob.
|
127 |
+
|
128 |
+
:param ret_ne: Boolean to return named entities
|
129 |
+
:param text_blobs: list of text blobs
|
130 |
+
:return: dict(): {NE: {tag1:count, tag2:count},
|
131 |
+
KP: {tag3:{weight: float, kp_length:count, count: int},
|
132 |
+
tag4:{weight: float, kp_length:count, count: int}}
|
133 |
+
"""
|
134 |
+
return_tags = dict()
|
135 |
+
cleaned_text_blobs = get_clean_text_blobs(text_blobs=text_blobs)
|
136 |
+
kps = get_phrases_pagerank(text_blobs=cleaned_text_blobs)
|
137 |
+
kps = dict_normalization(kps)
|
138 |
+
return_tags["KP"] = kps
|
139 |
+
if ret_ne:
|
140 |
+
ners = get_ners(text_blobs=cleaned_text_blobs)
|
141 |
+
return_tags["NE"] = ners
|
142 |
+
return return_tags
|