Spaces:
Runtime error
Runtime error
from itertools import combinations | |
import numpy as np | |
import pandas as pd | |
SUPPORTED_TYPES = [".csv", ".json", ".xlsx"] | |
def hello_world(): return "hello world!" | |
def load_file(file): | |
""" | |
Takes a file given by Streamlit and loads into a DataFrame. | |
Returns a DataFrame, metadata, and result string. | |
@param file: File uploaded into streamlit. | |
@rtype: tuple | |
@return: A tuple of format (pd.DataFrame, (str, str), str). | |
""" | |
df = None | |
if file is None: return df, ("", ""), "" | |
filename = file.name | |
extension = filename.split(".")[-1] | |
metadata = (filename, extension) | |
import_functions = { | |
"csv": pd.read_csv, | |
"json": pd.read_json, | |
"xlsx": pd.read_excel | |
} | |
try: | |
reader = import_functions.get(extension, None) | |
if reader is None: | |
return df, metadata, f"Error: Invalid extension '{extension}'" | |
df = reader(file) | |
rows, columns = df.shape | |
return df, metadata, f"File '{filename}' loaded successfully.\nFound {rows} rows, {columns} columns." | |
except Exception as error: | |
return df, metadata, f"Error: Unable to read file '{filename}' ({type(error)}: {error})" | |
def data_cleaner(df, drop_missing=False, remove_duplicates=True): | |
""" | |
Takes a DataFrame and removes empty and duplicate entries. | |
@type df: pd.DataFrame | |
@param df: A DataFrame of uncleaned data. | |
@type drop_missing: bool | |
@param drop_missing: Determines if rows with any missing values are dropped ("any"), or just empty rows ("all"). | |
@type remove_duplicates: bool | |
@param remove_duplicates: Determines if duplicate rows are removed. | |
@rtype: pd.DataFrame | |
@return: A DataFrame with requested cleaning applied | |
""" | |
df = df.dropna(how="any" if drop_missing else "all") | |
if remove_duplicates: df = df.drop_duplicates() | |
return df | |
def column_combinations(df, k): | |
return list(combinations(df.columns, k)) | |
def k_redact(df, k): | |
kwise_combinations = column_combinations(df, k) | |
for columns in kwise_combinations: | |
df_search = df.loc[:, columns] | |
sensitive_data = [ | |
(columns, key) | |
for key, value | |
in df_search.value_counts().to_dict().items() | |
if value == 1 | |
] | |
if not sensitive_data: continue | |
for columns, values in sensitive_data: | |
for column, value in zip(columns, values): | |
df_search = df_search.loc[df[column] == value] | |
if df_search.shape[0] == 1: | |
for column in columns: | |
df_search[column] = None | |
return df | |
def sensitive_values(series, sensitivity_minimum): | |
return {key | |
for key, value | |
in series.value_counts().to_dict().items() | |
if value < sensitivity_minimum | |
} | |
def drop_sensitive(series, sensitivity_minimum): | |
series.loc[series.isin(sensitive_values(series, sensitivity_minimum))] = None | |
def bin_numeric(df, to_process, bin_size, sensitivity_minimum): | |
processed = set() | |
rows, _ = df.shape | |
num_bins = rows//bin_size | |
for column_name in to_process: | |
column = df[column_name] | |
if column.dtype.kind not in "biufc": continue | |
array = sorted(np.array(column)) | |
array_min, array_max = array[0], array[-1] | |
splits = [array_min] + list(np.array_split(array, num_bins)) + [array_max] | |
bins = [ | |
(np.min(split), np.max(split)) | |
for split | |
in (splits[i] for i in range(num_bins)) | |
] | |
result = [None] * rows | |
for bin_min, bin_max in bins: | |
for i, value in enumerate(column): | |
if bin_min <= value <= bin_max: | |
result[i] = (bin_min, bin_max) | |
df[column_name] = result | |
drop_sensitive(df[column_name], sensitivity_minimum) | |
processed.add(column_name) | |
return df, to_process - processed | |
def find_categorical(df, to_process, max_categorical_size, sensitivity_minimum): | |
processed = set() | |
for column_name in to_process: | |
column = df[column_name] | |
if column.nunique() <= max_categorical_size: | |
drop_sensitive(column, sensitivity_minimum) | |
processed.add(column_name) | |
return df, to_process - processed | |
def redact(df, to_process, sensitivity_minimum): | |
processed = set() | |
for column_name in to_process: | |
column = df[column_name] | |
is_object = column.dtype == object | |
if not is_object: continue | |
# Check if any unique values exist, and redact them | |
drop_sensitive(column, sensitivity_minimum) | |
processed.add(column_name) | |
return df, to_process - processed | |
def anonymize(df, max_categorical_size, bin_size, sensitivity_minimum): | |
to_process = set(df.columns) | |
df, to_process = redact(df, to_process, sensitivity_minimum) | |
df, to_process = find_categorical(df, to_process, max_categorical_size, sensitivity_minimum) | |
df, to_process = bin_numeric(df, to_process, bin_size, sensitivity_minimum) | |
return df, to_process | |
def data_anonymizer(df, k, max_categorical_size, bin_size, sensitivity_minimum): | |
start_dtypes = df.dtypes.to_dict() | |
df, unprocessed = anonymize(df, max_categorical_size, bin_size, sensitivity_minimum) | |
df = k_redact(df, k) | |
end_dtypes = df.dtypes.to_dict() | |
# Type correction | |
for column in df.columns: | |
start_type, end_type = start_dtypes[column], end_dtypes[column] | |
if start_type == end_type: continue | |
if start_type.kind == "i" and end_type.kind == "f": | |
df[column] = df[column].astype("Int64") | |
return df, unprocessed | |