Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import bz2 | |
from typing import TYPE_CHECKING, Literal | |
import pandas as pd | |
import spacy | |
from tqdm import tqdm | |
from app.constants import ( | |
AMAZONREVIEWS_PATH, | |
AMAZONREVIEWS_URL, | |
IMDB50K_PATH, | |
IMDB50K_URL, | |
SENTIMENT140_PATH, | |
SENTIMENT140_URL, | |
TEST_DATASET_PATH, | |
TEST_DATASET_URL, | |
) | |
if TYPE_CHECKING: | |
from spacy.tokens import Doc | |
__all__ = ["load_data", "tokenize"] | |
try: | |
nlp = spacy.load("en_core_web_sm", disable=["tok2vec", "parser", "ner"]) | |
except OSError: | |
print("Downloading spaCy model...") | |
from spacy.cli import download as spacy_download | |
spacy_download("en_core_web_sm") | |
nlp = spacy.load("en_core_web_sm", disable=["tok2vec", "parser", "ner"]) | |
def _lemmatize(doc: Doc, threshold: int = 2) -> list[str]: | |
"""Lemmatize the provided text using spaCy. | |
Args: | |
doc: spaCy document | |
threshold: Minimum character length of tokens | |
Returns: | |
Lemmatized text | |
""" | |
return [ | |
token.lemma_.lower().strip() | |
for token in doc | |
if not token.is_stop | |
and not token.is_punct | |
and not token.like_email | |
and not token.like_url | |
and not token.like_num | |
and not (len(token.lemma_) < threshold) | |
] | |
def tokenize( | |
text_data: list[str], | |
batch_size: int = 512, | |
n_jobs: int = 4, | |
character_threshold: int = 2, | |
show_progress: bool = True, | |
) -> list[list[str]]: | |
"""Tokenize the provided text using spaCy. | |
Args: | |
text_data: Text data to tokenize | |
batch_size: Batch size for tokenization | |
n_jobs: Number of parallel jobs | |
character_threshold: Minimum character length of tokens | |
show_progress: Whether to show a progress bar | |
Returns: | |
Tokenized text data | |
""" | |
return [ | |
_lemmatize(doc, character_threshold) | |
for doc in tqdm( | |
nlp.pipe(text_data, batch_size=batch_size, n_process=n_jobs), | |
total=len(text_data), | |
disable=not show_progress, | |
) | |
] | |
def load_sentiment140(include_neutral: bool = False) -> tuple[list[str], list[int]]: | |
"""Load the sentiment140 dataset and make it suitable for use. | |
Args: | |
include_neutral: Whether to include neutral sentiment | |
Returns: | |
Text and label data | |
Raises: | |
FileNotFoundError: If the dataset is not found | |
""" | |
# Check if the dataset exists | |
if not SENTIMENT140_PATH.exists(): | |
msg = ( | |
f"Sentiment140 dataset not found at: '{SENTIMENT140_PATH}'\n" | |
"Please download the dataset from:\n" | |
f"{SENTIMENT140_URL}" | |
) | |
raise FileNotFoundError(msg) | |
# Load the dataset | |
data = pd.read_csv( | |
SENTIMENT140_PATH, | |
encoding="ISO-8859-1", | |
names=[ | |
"target", # 0 = negative, 2 = neutral, 4 = positive | |
"id", # The id of the tweet | |
"date", # The date of the tweet | |
"flag", # The query, NO_QUERY if not present | |
"user", # The user that tweeted | |
"text", # The text of the tweet | |
], | |
) | |
# Ignore rows with neutral sentiment | |
if not include_neutral: | |
data = data[data["target"] != 2] | |
# Map sentiment values | |
data["sentiment"] = data["target"].map( | |
{ | |
0: 0, # Negative | |
4: 1, # Positive | |
2: 2, # Neutral | |
}, | |
) | |
# Return as lists | |
return data["text"].tolist(), data["sentiment"].tolist() | |
def load_amazonreviews(merge: bool = True) -> tuple[list[str], list[int]]: | |
"""Load the amazonreviews dataset and make it suitable for use. | |
Args: | |
merge: Whether to merge the test and train datasets (otherwise ignore test) | |
Returns: | |
Text and label data | |
Raises: | |
FileNotFoundError: If the dataset is not found | |
""" | |
# Check if the dataset exists | |
test_exists = AMAZONREVIEWS_PATH[0].exists() or not merge | |
train_exists = AMAZONREVIEWS_PATH[1].exists() | |
if not (test_exists and train_exists): | |
msg = ( | |
f"Amazonreviews dataset not found at: '{AMAZONREVIEWS_PATH[0]}' and '{AMAZONREVIEWS_PATH[1]}'\n" | |
"Please download the dataset from:\n" | |
f"{AMAZONREVIEWS_URL}" | |
) | |
raise FileNotFoundError(msg) | |
# Load the datasets | |
dataset = [] | |
with bz2.BZ2File(AMAZONREVIEWS_PATH[1]) as train_file: | |
dataset.extend([line.decode("utf-8") for line in train_file]) | |
if merge: | |
with bz2.BZ2File(AMAZONREVIEWS_PATH[0]) as test_file: | |
dataset.extend([line.decode("utf-8") for line in test_file]) | |
# Split the data into labels and text | |
labels, texts = zip(*(line.split(" ", 1) for line in dataset)) # NOTE: Occasionally OOM | |
# Map sentiment values | |
sentiments = [int(label.split("__label__")[1]) - 1 for label in labels] | |
# Return as lists | |
return texts, sentiments | |
def load_imdb50k() -> tuple[list[str], list[int]]: | |
"""Load the imdb50k dataset and make it suitable for use. | |
Returns: | |
Text and label data | |
Raises: | |
FileNotFoundError: If the dataset is not found | |
""" | |
# Check if the dataset exists | |
if not IMDB50K_PATH.exists(): | |
msg = ( | |
f"IMDB50K dataset not found at: '{IMDB50K_PATH}'\n" | |
"Please download the dataset from:\n" | |
f"{IMDB50K_URL}" | |
) # fmt: off | |
raise FileNotFoundError(msg) | |
# Load the dataset | |
data = pd.read_csv(IMDB50K_PATH) | |
# Map sentiment values | |
data["sentiment"] = data["sentiment"].map( | |
{ | |
"positive": 1, | |
"negative": 0, | |
}, | |
) | |
# Return as lists | |
return data["review"].tolist(), data["sentiment"].tolist() | |
def load_test(include_neutral: bool = False) -> tuple[list[str], list[int]]: | |
"""Load the test dataset and make it suitable for use. | |
Args: | |
include_neutral: Whether to include neutral sentiment | |
Returns: | |
Text and label data | |
Raises: | |
FileNotFoundError: If the dataset is not found | |
""" | |
# Check if the dataset exists | |
if not TEST_DATASET_PATH.exists(): | |
msg = ( | |
f"Test dataset not found at: '{TEST_DATASET_PATH}'\n" | |
"Please download the dataset from:\n" | |
f"{TEST_DATASET_URL}" | |
) | |
raise FileNotFoundError(msg) | |
# Load the dataset | |
data = pd.read_csv(TEST_DATASET_PATH) | |
# Ignore rows with neutral sentiment | |
if not include_neutral: | |
data = data[data["label"] != 1] | |
# Map sentiment values | |
data["label"] = data["label"].map( | |
{ | |
0: 0, # Negative | |
1: 1, # Neutral | |
2: 2, # Positive | |
}, | |
) | |
# Return as lists | |
return data["text"].tolist(), data["label"].tolist() | |
def load_data(dataset: Literal["sentiment140", "amazonreviews", "imdb50k", "test"]) -> tuple[list[str], list[int]]: | |
"""Load and preprocess the specified dataset. | |
Args: | |
dataset: Dataset to load | |
Returns: | |
Text and label data | |
Raises: | |
ValueError: If the dataset is not recognized | |
""" | |
match dataset: | |
case "sentiment140": | |
return load_sentiment140(include_neutral=False) | |
case "amazonreviews": | |
return load_amazonreviews(merge=True) | |
case "imdb50k": | |
return load_imdb50k() | |
case "test": | |
return load_test(include_neutral=False) | |
case _: | |
msg = f"Unknown dataset: {dataset}" | |
raise ValueError(msg) | |