|
import numpy as np |
|
|
|
|
|
class DatasetNormalizer: |
|
""" |
|
Overview: |
|
The `DatasetNormalizer` class provides functionality to normalize and unnormalize data in a dataset. |
|
It takes a dataset as input and applies a normalizer function to each key in the dataset. |
|
|
|
Interfaces: |
|
``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. |
|
""" |
|
|
|
def __init__(self, dataset: np.ndarray, normalizer: str, path_lengths: list = None): |
|
""" |
|
Overview: |
|
Initialize the NormalizerHelper object. |
|
|
|
Arguments: |
|
- dataset (:obj:`np.ndarray`): The dataset to be normalized. |
|
- normalizer (:obj:`str`): The type of normalizer to be used. Can be a string representing the name of \ |
|
the normalizer class. |
|
- path_lengths (:obj:`list`): The length of the paths in the dataset. Defaults to None. |
|
""" |
|
dataset = flatten(dataset, path_lengths) |
|
|
|
self.observation_dim = dataset['observations'].shape[1] |
|
self.action_dim = dataset['actions'].shape[1] |
|
|
|
if isinstance(normalizer, str): |
|
normalizer = eval(normalizer) |
|
|
|
self.normalizers = {} |
|
for key, val in dataset.items(): |
|
try: |
|
self.normalizers[key] = normalizer(val) |
|
except: |
|
print(f'[ utils/normalization ] Skipping {key} | {normalizer}') |
|
|
|
|
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Overview: |
|
Returns a string representation of the NormalizerHelper object. \ |
|
The string representation includes the key-value pairs of the normalizers \ |
|
stored in the NormalizerHelper object. |
|
Returns: |
|
- ret (:obj:`str`):A string representation of the NormalizerHelper object. |
|
""" |
|
string = '' |
|
for key, normalizer in self.normalizers.items(): |
|
string += f'{key}: {normalizer}]\n' |
|
return string |
|
|
|
def normalize(self, x: np.ndarray, key: str) -> np.ndarray: |
|
""" |
|
Overview: |
|
Normalize the input data using the specified key. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input data to be normalized. |
|
- key (:obj`str`): The key to identify the normalizer. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The normalized value of the input data. |
|
""" |
|
return self.normalizers[key].normalize(x) |
|
|
|
def unnormalize(self, x: np.ndarray, key: str) -> np.ndarray: |
|
""" |
|
Overview: |
|
Unnormalizes the given value `x` using the specified `key`. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The value to be unnormalized. |
|
- key (:obj`str`): The key to identify the normalizer. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The unnormalized value. |
|
""" |
|
return self.normalizers[key].unnormalize(x) |
|
|
|
|
|
def flatten(dataset: dict, path_lengths: list) -> dict: |
|
""" |
|
Overview: |
|
Flattens dataset of { key: [ n_episodes x max_path_length x dim ] } \ |
|
to { key : [ (n_episodes * sum(path_lengths)) x dim ] } |
|
|
|
Arguments: |
|
- dataset (:obj:`dict`): The dataset to be flattened. |
|
- path_lengths (:obj:`list`): A list of path lengths for each episode. |
|
|
|
Returns: |
|
- flattened (:obj:`dict`): The flattened dataset. |
|
""" |
|
flattened = {} |
|
for key, xs in dataset.items(): |
|
assert len(xs) == len(path_lengths) |
|
if key == 'path_lengths': |
|
continue |
|
flattened[key] = np.concatenate([x[:length] for x, length in zip(xs, path_lengths)], axis=0) |
|
return flattened |
|
|
|
|
|
class Normalizer: |
|
""" |
|
Overview: |
|
Parent class, subclass by defining the `normalize` and `unnormalize` methods |
|
|
|
Interfaces: |
|
``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. |
|
""" |
|
|
|
def __init__(self, X): |
|
""" |
|
Overview: |
|
Initialize the Normalizer object. |
|
Arguments: |
|
- X (:obj:`np.ndarray`): The data to be normalized. |
|
""" |
|
|
|
self.X = X.astype(np.float32) |
|
self.mins = X.min(axis=0) |
|
self.maxs = X.max(axis=0) |
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Overview: |
|
Returns a string representation of the Normalizer object. |
|
Returns: |
|
- ret (:obj:`str`): A string representation of the Normalizer object. |
|
""" |
|
|
|
return ( |
|
f"""[ Normalizer ] dim: {self.mins.size}\n -: """ |
|
f"""{np.round(self.mins, 2)}\n +: {np.round(self.maxs, 2)}\n""" |
|
) |
|
|
|
def normalize(self, *args, **kwargs): |
|
""" |
|
Overview: |
|
Normalize the input data. |
|
Arguments: |
|
- args (:obj:`list`): The arguments passed to the ``normalize`` function. |
|
- kwargs (:obj:`dict`): The keyword arguments passed to the ``normalize`` function. |
|
""" |
|
|
|
raise NotImplementedError() |
|
|
|
def unnormalize(self, *args, **kwargs): |
|
""" |
|
Overview: |
|
Unnormalize the input data. |
|
Arguments: |
|
- args (:obj:`list`): The arguments passed to the ``unnormalize`` function. |
|
- kwargs (:obj:`dict`): The keyword arguments passed to the ``unnormalize`` function. |
|
""" |
|
|
|
raise NotImplementedError() |
|
|
|
|
|
class GaussianNormalizer(Normalizer): |
|
""" |
|
Overview: |
|
A class that normalizes data to zero mean and unit variance. |
|
|
|
Interfaces: |
|
``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. |
|
""" |
|
|
|
def __init__(self, *args, **kwargs): |
|
""" |
|
Overview: |
|
Initialize the GaussianNormalizer object. |
|
Arguments: |
|
- args (:obj:`list`): The arguments passed to the ``__init__`` function of the parent class, \ |
|
i.e., the Normalizer class. |
|
- kwargs (:obj:`dict`): The keyword arguments passed to the ``__init__`` function of the parent class, \ |
|
i.e., the Normalizer class. |
|
""" |
|
|
|
super().__init__(*args, **kwargs) |
|
self.means = self.X.mean(axis=0) |
|
self.stds = self.X.std(axis=0) |
|
self.z = 1 |
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Overview: |
|
Returns a string representation of the GaussianNormalizer object. |
|
Returns: |
|
- ret (:obj:`str`): A string representation of the GaussianNormalizer object. |
|
""" |
|
|
|
return ( |
|
f"""[ Normalizer ] dim: {self.mins.size}\n """ |
|
f"""means: {np.round(self.means, 2)}\n """ |
|
f"""stds: {np.round(self.z * self.stds, 2)}\n""" |
|
) |
|
|
|
def normalize(self, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Normalize the input data. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input data to be normalized. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The normalized data. |
|
""" |
|
return (x - self.means) / self.stds |
|
|
|
def unnormalize(self, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Unnormalize the input data. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input data to be unnormalized. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The unnormalized data. |
|
""" |
|
return x * self.stds + self.means |
|
|
|
|
|
class CDFNormalizer(Normalizer): |
|
""" |
|
Overview: |
|
A class that makes training data uniform (over each dimension) by transforming it with marginal CDFs. |
|
|
|
Interfaces: |
|
``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. |
|
""" |
|
|
|
def __init__(self, X): |
|
""" |
|
Overview: |
|
Initialize the CDFNormalizer object. |
|
Arguments: |
|
- X (:obj:`np.ndarray`): The data to be normalized. |
|
""" |
|
|
|
super().__init__(atleast_2d(X)) |
|
self.dim = self.X.shape[1] |
|
self.cdfs = [CDFNormalizer1d(self.X[:, i]) for i in range(self.dim)] |
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Overview: |
|
Returns a string representation of the CDFNormalizer object. |
|
Returns: |
|
- ret (:obj:`str`): A string representation of the CDFNormalizer object. |
|
""" |
|
|
|
return f'[ CDFNormalizer ] dim: {self.mins.size}\n' + ' | '.join( |
|
f'{i:3d}: {cdf}' for i, cdf in enumerate(self.cdfs) |
|
) |
|
|
|
def wrap(self, fn_name: str, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Wraps the given function name and applies it to the input data. |
|
|
|
Arguments: |
|
- fn_name (:obj:`str`): The name of the function to be applied. |
|
- x (:obj:`np.ndarray`): The input data. |
|
|
|
Returns: |
|
- ret: The output of the function applied to the input data. |
|
""" |
|
shape = x.shape |
|
|
|
x = x.reshape(-1, self.dim) |
|
out = np.zeros_like(x) |
|
for i, cdf in enumerate(self.cdfs): |
|
fn = getattr(cdf, fn_name) |
|
out[:, i] = fn(x[:, i]) |
|
return out.reshape(shape) |
|
|
|
def normalize(self, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Normalizes the input data. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input data. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The normalized data. |
|
""" |
|
return self.wrap('normalize', x) |
|
|
|
def unnormalize(self, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Unnormalizes the input data. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input data. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`):: The unnormalized data. |
|
""" |
|
return self.wrap('unnormalize', x) |
|
|
|
|
|
class CDFNormalizer1d: |
|
""" |
|
Overview: |
|
CDF normalizer for a single dimension. This class provides methods to normalize and unnormalize data \ |
|
using the Cumulative Distribution Function (CDF) approach. |
|
Interfaces: |
|
``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. |
|
""" |
|
|
|
def __init__(self, X: np.ndarray): |
|
""" |
|
Overview: |
|
Initialize the CDFNormalizer1d object. |
|
Arguments: |
|
- X (:obj:`np.ndarray`): The data to be normalized. |
|
""" |
|
|
|
import scipy.interpolate as interpolate |
|
assert X.ndim == 1 |
|
self.X = X.astype(np.float32) |
|
if self.X.max() == self.X.min(): |
|
self.constant = True |
|
else: |
|
self.constant = False |
|
quantiles, cumprob = empirical_cdf(self.X) |
|
self.fn = interpolate.interp1d(quantiles, cumprob) |
|
self.inv = interpolate.interp1d(cumprob, quantiles) |
|
|
|
self.xmin, self.xmax = quantiles.min(), quantiles.max() |
|
self.ymin, self.ymax = cumprob.min(), cumprob.max() |
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Overview: |
|
Returns a string representation of the CDFNormalizer1d object. |
|
""" |
|
|
|
return (f'[{np.round(self.xmin, 2):.4f}, {np.round(self.xmax, 2):.4f}') |
|
|
|
def normalize(self, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Normalize the input data. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The data to be normalized. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The normalized data. |
|
""" |
|
if self.constant: |
|
return x |
|
|
|
x = np.clip(x, self.xmin, self.xmax) |
|
|
|
y = self.fn(x) |
|
|
|
y = 2 * y - 1 |
|
return y |
|
|
|
def unnormalize(self, x: np.ndarray, eps: float = 1e-4) -> np.ndarray: |
|
""" |
|
Overview: |
|
Unnormalize the input data. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The data to be unnormalized. |
|
- eps (:obj:`float`): A small value used for numerical stability. Defaults to 1e-4. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The unnormalized data. |
|
""" |
|
|
|
if self.constant: |
|
return x |
|
|
|
x = (x + 1) / 2. |
|
|
|
if (x < self.ymin - eps).any() or (x > self.ymax + eps).any(): |
|
print( |
|
f"""[ dataset/normalization ] Warning: out of range in unnormalize: """ |
|
f"""[{x.min()}, {x.max()}] | """ |
|
f"""x : [{self.xmin}, {self.xmax}] | """ |
|
f"""y: [{self.ymin}, {self.ymax}]""" |
|
) |
|
|
|
x = np.clip(x, self.ymin, self.ymax) |
|
|
|
y = self.inv(x) |
|
return y |
|
|
|
|
|
def empirical_cdf(sample: np.ndarray) -> (np.ndarray, np.ndarray): |
|
""" |
|
Overview: |
|
Compute the empirical cumulative distribution function (CDF) of a given sample. |
|
|
|
Arguments: |
|
- sample (:obj:`np.ndarray`): The input sample for which to compute the empirical CDF. |
|
|
|
Returns: |
|
- quantiles (:obj:`np.ndarray`): The unique values in the sample. |
|
- cumprob (:obj:`np.ndarray`): The cumulative probabilities corresponding to the quantiles. |
|
|
|
References: |
|
- Stack Overflow: https://stackoverflow.com/a/33346366 |
|
""" |
|
|
|
|
|
quantiles, counts = np.unique(sample, return_counts=True) |
|
|
|
|
|
|
|
cumprob = np.cumsum(counts).astype(np.double) / sample.size |
|
|
|
return quantiles, cumprob |
|
|
|
|
|
def atleast_2d(x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Ensure that the input array has at least two dimensions. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input array. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The input array with at least two dimensions. |
|
""" |
|
if x.ndim < 2: |
|
x = x[:, None] |
|
return x |
|
|
|
|
|
class LimitsNormalizer(Normalizer): |
|
""" |
|
Overview: |
|
A class that normalizes and unnormalizes values within specified limits. \ |
|
This class maps values within the range [xmin, xmax] to the range [-1, 1]. |
|
|
|
Interfaces: |
|
``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. |
|
""" |
|
|
|
def normalize(self, x: np.ndarray) -> np.ndarray: |
|
""" |
|
Overview: |
|
Normalizes the input values. |
|
|
|
Argments: |
|
- x (:obj:`np.ndarray`): The input values to be normalized. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The normalized values. |
|
|
|
""" |
|
|
|
x = (x - self.mins) / (self.maxs - self.mins) |
|
|
|
x = 2 * x - 1 |
|
return x |
|
|
|
def unnormalize(self, x: np.ndarray, eps: float = 1e-4) -> np.ndarray: |
|
""" |
|
Overview: |
|
Unnormalizes the input values. |
|
|
|
Arguments: |
|
- x (:obj:`np.ndarray`): The input values to be unnormalized. |
|
- eps (:obj:`float`): A small value used for clipping. Defaults to 1e-4. |
|
|
|
Returns: |
|
- ret (:obj:`np.ndarray`): The unnormalized values. |
|
|
|
""" |
|
if x.max() > 1 + eps or x.min() < -1 - eps: |
|
|
|
x = np.clip(x, -1, 1) |
|
|
|
|
|
x = (x + 1) / 2. |
|
|
|
return x * (self.maxs - self.mins) + self.mins |
|
|