Summarizer / extractive_summarizer /cluster_features.py
piecurus's picture
extractive
7215c40
raw
history blame
4.83 kB
from typing import Dict, List
import numpy as np
from numpy import ndarray
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.mixture import GaussianMixture
class ClusterFeatures(object):
"""
Basic handling of clustering features.
"""
def __init__(
self,
features: ndarray,
algorithm: str = 'kmeans',
pca_k: int = None,
random_state: int = 12345,
):
"""
:param features: the embedding matrix created by bert parent.
:param algorithm: Which clustering algorithm to use.
:param pca_k: If you want the features to be ran through pca, this is the components number.
:param random_state: Random state.
"""
if pca_k:
self.features = PCA(n_components=pca_k).fit_transform(features)
else:
self.features = features
self.algorithm = algorithm
self.pca_k = pca_k
self.random_state = random_state
def __get_model(self, k: int):
"""
Retrieve clustering model.
:param k: amount of clusters.
:return: Clustering model.
"""
if self.algorithm == 'gmm':
return GaussianMixture(n_components=k, random_state=self.random_state)
return KMeans(n_clusters=k, random_state=self.random_state)
def __get_centroids(self, model):
"""
Retrieve centroids of model.
:param model: Clustering model.
:return: Centroids.
"""
if self.algorithm == 'gmm':
return model.means_
return model.cluster_centers_
def __find_closest_args(self, centroids: np.ndarray) -> Dict:
"""
Find the closest arguments to centroid.
:param centroids: Centroids to find closest.
:return: Closest arguments.
"""
centroid_min = 1e10
cur_arg = -1
args = {}
used_idx = []
for j, centroid in enumerate(centroids):
for i, feature in enumerate(self.features):
value = np.linalg.norm(feature - centroid)
if value < centroid_min and i not in used_idx:
cur_arg = i
centroid_min = value
used_idx.append(cur_arg)
args[j] = cur_arg
centroid_min = 1e10
cur_arg = -1
return args
def calculate_elbow(self, k_max: int) -> List[float]:
"""
Calculates elbow up to the provided k_max.
:param k_max: K_max to calculate elbow for.
:return: The inertias up to k_max.
"""
inertias = []
for k in range(1, min(k_max, len(self.features))):
model = self.__get_model(k).fit(self.features)
inertias.append(model.inertia_)
return inertias
def calculate_optimal_cluster(self, k_max: int):
"""
Calculates the optimal cluster based on Elbow.
:param k_max: The max k to search elbow for.
:return: The optimal cluster size.
"""
delta_1 = []
delta_2 = []
max_strength = 0
k = 1
inertias = self.calculate_elbow(k_max)
for i in range(len(inertias)):
delta_1.append(inertias[i] - inertias[i - 1] if i > 0 else 0.0)
delta_2.append(delta_1[i] - delta_1[i - 1] if i > 1 else 0.0)
for j in range(len(inertias)):
strength = 0 if j <= 1 or j == len(inertias) - 1 else delta_2[j + 1] - delta_1[j + 1]
if strength > max_strength:
max_strength = strength
k = j + 1
return k
def cluster(self, ratio: float = 0.1, num_sentences: int = None) -> List[int]:
"""
Clusters sentences based on the ratio.
:param ratio: Ratio to use for clustering.
:param num_sentences: Number of sentences. Overrides ratio.
:return: Sentences index that qualify for summary.
"""
if num_sentences is not None:
if num_sentences == 0:
return []
k = min(num_sentences, len(self.features))
else:
k = max(int(len(self.features) * ratio), 1)
model = self.__get_model(k).fit(self.features)
centroids = self.__get_centroids(model)
cluster_args = self.__find_closest_args(centroids)
sorted_values = sorted(cluster_args.values())
return sorted_values
def __call__(self, ratio: float = 0.1, num_sentences: int = None) -> List[int]:
"""
Clusters sentences based on the ratio.
:param ratio: Ratio to use for clustering.
:param num_sentences: Number of sentences. Overrides ratio.
:return: Sentences index that qualify for summary.
"""
return self.cluster(ratio)