datastats / datastats.py
yangwang825's picture
Update datastats.py
ac41e13 verified
raw
history blame
11 kB
""" DataStats metric. """
import logging
import functools
from collections import Counter
from multiprocessing import Pool
from contextlib import contextmanager
from typing import List, Any, Dict, Optional
from collections import namedtuple as _namedtuple
import spacy
import datasets
import evaluate
from packaging import version
logger = logging.getLogger(__name__)
try:
_en = spacy.load('en_core_web_sm')
except OSError as stderr:
spacy.cli.download('en_core_web_sm')
_en = spacy.load('en_core_web_sm')
@contextmanager
def filter_logging_context():
def filter_log(record):
return False if "This is expected if you are initialising" in record.msg else True
logger = datasets.utils.logging.get_logger("transformers.modeling_utils")
logger.addFilter(filter_log)
try:
yield
finally:
logger.removeFilter(filter_log)
_CITATION = """\
@article{grusky2018newsroom,
title={Newsroom: A dataset of 1.3 million summaries with diverse extractive strategies},
author={Grusky, Max and Naaman, Mor and Artzi, Yoav},
journal={arXiv preprint arXiv:1804.11283},
year={2018}
}
"""
_DESCRIPTION = """\
DataStats examines summarization strategies using three measures that capture the degree of text overlap between the summary and article, and the rate of compression of the information conveyed.
"""
_KWARGS_DESCRIPTION = """
BERTScore Metrics with the hashcode from a source against one or more references.
Args:
predictions (list of str): Prediction/candidate sentences.
references (list of str or list of list of str): Reference sentences.
Returns:
coverage: Percentage of words in the summary that are from the source article, measuring the extent to which a summary is a derivative of a text.
density: It is defined as the average length of the extractive fragment to which each summary word belongs.
compression: It is defined as the word ratio between the articles and its summaries.
Examples:
>>> predictions = ["hello there", "general kenobi"]
>>> references = ["hello there", "general kenobi"]
>>> bertscore = evaluate.load("datastats")
>>> results = bertscore.compute(predictions=predictions, references=references)
"""
def find_ngrams(input_list: List[Any], n: int):
return zip(*[input_list[i:] for i in range(n)])
def normalize(tokens: List[str], lowercase: bool = False):
"""
Lowercases and turns tokens into distinct words.
"""
return [str(t).lower() if not lowercase else str(t) for t in tokens]
class Fragments:
Match = _namedtuple("Match", ("summary", "text", "length"))
def __init__(self, summary, text, lowercase: bool = False):
if isinstance(summary, str):
self.summary = summary.split()
else:
self.summary = summary
if isinstance(text, str):
self.text = text.split()
else:
self.text = text
self._norm_summary = normalize(self.summary, lowercase)
self._norm_text = normalize(self.text, lowercase)
self._match(self._norm_summary, self._norm_text)
def overlaps(self):
"""
Return a list of Fragments.Match objects between summary and text.
This is a list of named tuples of the form (summary, text, length):
"""
return self._matches
def strings(self, min_length=0, summary_base=True):
# Compute the strings against the summary or the text?
base = self.summary if summary_base else self.text
# Generate strings, filtering out strings below the minimum length.
strings = [base[i : i + length] for i, j, length in self.overlaps() if length > min_length]
return strings
def coverage(self, summary_base=True):
"""
Return the COVERAGE score of the summary and text.
"""
numerator = sum(o.length for o in self.overlaps())
if summary_base:
denominator = len(self.summary)
else:
denominator = len(self.text)
if denominator == 0:
return 0
else:
return numerator / denominator
def density(self, summary_base=True):
"""
Return the DENSITY score of summary and text.
"""
numerator = sum(o.length ** 2 for o in self.overlaps())
if summary_base:
denominator = len(self.summary)
else:
denominator = len(self.text)
if denominator == 0:
return 0
else:
return numerator / denominator
def compression(self, text_to_summary=True):
"""
Return compression ratio between summary and text.
"""
ratio = [len(self.text), len(self.summary)]
try:
if text_to_summary:
return ratio[0] / ratio[1]
else:
return ratio[1] / ratio[0]
except ZeroDivisionError:
return 0
def _match(self, a, b):
"""
Raw procedure for matching summary in text, described in paper.
"""
self._matches = []
a_start = b_start = 0
while a_start < len(a):
best_match = None
best_match_length = 0
while b_start < len(b):
if a[a_start] == b[b_start]:
a_end = a_start
b_end = b_start
while a_end < len(a) and b_end < len(b) \
and b[b_end] == a[a_end]:
b_end += 1
a_end += 1
length = a_end - a_start
if length > best_match_length:
best_match = Fragments.Match(a_start, b_start, length)
best_match_length = length
b_start = b_end
else:
b_start += 1
b_start = 0
if best_match:
if best_match_length > 0:
self._matches.append(best_match)
a_start += best_match_length
else:
a_start += 1
class DataStatsMetric(object):
def __init__(
self,
n_gram: int = 3,
n_workers: int = 24,
lowercase: bool = False,
tokenize: bool = True
):
"""
Data Statistics metric
Args:
n_gram (int): Compute statistics for n-grams up to and including this length.
n_workers (int): Number of processes to use if using multiprocessing.
case (bool): Whether to lowercase input before calculating statistics.
tokenize (bool): Whether to tokenize the input.
"""
self.n_gram = n_gram
self.n_workers = n_workers
self.lowercase = lowercase
self.tokenize = tokenize
def evaluate_example(self, summary, input_text):
if self.tokenize:
input_text = _en(input_text, disable=["tagger", "parser", "ner", "textcat"])
input_text = [tok.text for tok in input_text]
summary = _en(summary, disable=["tagger", "parser", "ner", "textcat"])
summary = [tok.text for tok in summary]
fragments = Fragments(summary, input_text, lowercase=self.lowercase)
coverage = fragments.coverage()
density = fragments.density()
compression = fragments.compression()
score_dict = {"coverage": coverage, "density": density, "compression": compression}
tokenized_summary = fragments._norm_summary
tokenized_text = fragments._norm_text
score_dict["summary_length"] = len(tokenized_summary)
for i in range(1, self.n_gram + 1):
input_ngrams = list(find_ngrams(tokenized_text, i))
summ_ngrams = list(find_ngrams(tokenized_summary, i))
input_ngrams_set = set(input_ngrams)
summ_ngrams_set = set(summ_ngrams)
intersect = summ_ngrams_set.intersection(input_ngrams_set)
try:
score_dict[f"percentage_novel_{i}-gram"] = (len(summ_ngrams_set) \
- len(intersect))/float(len(summ_ngrams_set))
ngramCounter = Counter()
ngramCounter.update(summ_ngrams)
repeated = [key for key, val in ngramCounter.items() if val > 1]
score_dict[f"percentage_repeated_{i}-gram_in_summ"] = len(repeated)/float(len(summ_ngrams_set))
except ZeroDivisionError:
continue
return score_dict
def evaluate_batch(self, summaries, input_texts, aggregate=True):
corpus_score_dict = Counter()
p = Pool(processes=self.n_workers)
results = p.starmap(self.evaluate_example, zip(summaries, input_texts))
p.close()
if aggregate:
[corpus_score_dict.update(x) for x in results]
for key in corpus_score_dict.keys():
corpus_score_dict[key] /= float(len(input_texts))
return corpus_score_dict
else:
return results
@property
def supports_multi_ref(self):
return False
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class DataStats(evaluate.Metric):
name = 'DataStats'
def _info(self):
return evaluate.MetricInfo(
description=_DESCRIPTION,
citation=_CITATION,
homepage="",
inputs_description=_KWARGS_DESCRIPTION,
features=[
datasets.Features(
{
"predictions": datasets.Value("string", id="sequence"),
"references": datasets.Sequence(datasets.Value("string", id="sequence"), id="references"),
}
),
datasets.Features(
{
"predictions": datasets.Value("string", id="sequence"),
"references": datasets.Value("string", id="sequence"),
}
),
],
codebase_urls=["https://github.com/Tiiiger/bert_score"],
reference_urls=[
"https://github.com/lil-lab/newsroom",
"https://arxiv.org/pdf/2007.12626",
],
)
def _compute(
self,
predictions,
references,
n_gram: int = 3,
n_workers: int = 4,
lowercase: bool = False,
tokenize: bool = True
):
logger.info(predictions)
logger.info(references)
datastats = DataStatsMetric(n_gram, n_workers, lowercase, tokenize)
results = datastats.evaluate_batch(predictions, references)
coverage = float(results['coverage'])
density = float(results['density'])
compression = float(results['compression'])
logger.info(coverage, density, compression)
return {
"coverage": coverage,
"density": density,
"compression": compression
}