Zamanonymize3 / app.py
kcelia's picture
chore: fix chatgpt step
dc83cd7 unverified
raw
history blame
23.3 kB
"""A Gradio app for anonymizing text data using FHE."""
import base64
import os
import re
import subprocess
import time
import uuid
from typing import Dict, List
import gradio as gr
import numpy
import pandas as pd
import requests
from fhe_anonymizer import FHEAnonymizer
from openai import OpenAI
from utils_demo import *
from concrete.ml.deployment import FHEModelClient
# Ensure the directory is clean before starting processes or reading files
clean_directory()
anonymizer = FHEAnonymizer()
client = OpenAI(api_key=os.environ.get("openaikey"))
# Start the Uvicorn server hosting the FastAPI app
subprocess.Popen(["uvicorn", "server:app"], cwd=CURRENT_DIR)
time.sleep(3)
# Load data from files required for the application
UUID_MAP = read_json(MAPPING_UUID_PATH)
ANONYMIZED_DOCUMENT = read_txt(ANONYMIZED_FILE_PATH)
MAPPING_SENTENCES = read_pickle(MAPPING_SENTENCES_PATH)
ORIGINAL_DOCUMENT = read_txt(ORIGINAL_FILE_PATH).split("\n\n")
# 4. Data Processing and Operations (No specific operations shown here, assuming it's part of anonymizer or client usage)
# 5. Utilizing External Services or APIs
# (Assuming client initialization and anonymizer setup are parts of using external services or application-specific logic)
# Generate a random user ID for this session
USER_ID = numpy.random.randint(0, 2**32)
def select_static_sentences_fn(selected_sentences: List):
selected_sentences = [MAPPING_SENTENCES[sentence] for sentence in selected_sentences]
anonymized_selected_sentence = sorted(selected_sentences, key=lambda x: x[0])
anonymized_selected_sentence = [sentence for _, sentence in anonymized_selected_sentence]
return {anonymized_doc_box: gr.update(value="\n\n".join(anonymized_selected_sentence))}
def key_gen_fn() -> Dict:
"""Generate keys for a given user."""
print("------------ Step 1: Key Generation:")
print(f"Your user ID is: {USER_ID}....")
client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}")
client.load()
# Creates the private and evaluation keys on the client side
client.generate_private_and_evaluation_keys()
# Get the serialized evaluation keys
serialized_evaluation_keys = client.get_serialized_evaluation_keys()
assert isinstance(serialized_evaluation_keys, bytes)
# Save the evaluation key
evaluation_key_path = KEYS_DIR / f"{USER_ID}/evaluation_key"
write_bytes(evaluation_key_path, serialized_evaluation_keys)
# anonymizer.generate_key()
if not evaluation_key_path.is_file():
error_message = (
f"Error Encountered While generating the evaluation {evaluation_key_path.is_file()=}"
)
print(error_message)
return {gen_key_btn: gr.update(value=error_message)}
else:
print("Keys have been generated ✅")
return {gen_key_btn: gr.update(value="Keys have been generated ✅")}
def encrypt_query_fn(query):
print(f"\n------------ Step 2: Query encryption: {query=}")
if not (KEYS_DIR / f"{USER_ID}/evaluation_key").is_file():
return {output_encrypted_box: gr.update(value="Error ❌: Please generate the key first!")}
if is_user_query_valid(query):
return {
query_box: gr.update(
value=(
"Unable to process ❌: The request exceeds the length limit or falls "
"outside the scope of this document. Please refine your query."
)
)
}
# Retrieve the client API
client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}")
client.load()
encrypted_tokens = []
# Pattern to identify words and non-words (including punctuation, spaces, etc.)
tokens = re.findall(r"(\b[\w\.\/\-@]+\b|[\s,.!?;:'\"-]+)", query)
for token in tokens:
# 1- Ignore non-words tokens
if bool(re.match(r"^\s+$", token)):
continue
# 2- Directly append non-word tokens or whitespace to processed_tokens
# Prediction for each word
emb_x = get_batch_text_representation([token], EMBEDDINGS_MODEL, TOKENIZER)
encrypted_x = client.quantize_encrypt_serialize(emb_x)
assert isinstance(encrypted_x, bytes)
encrypted_tokens.append(encrypted_x)
print("Data encrypted ✅ on Client Side")
assert len({len(token) for token in encrypted_tokens}) == 1
write_bytes(KEYS_DIR / f"{USER_ID}/encrypted_input", b"".join(encrypted_tokens))
write_bytes(
KEYS_DIR / f"{USER_ID}/encrypted_input_len", len(encrypted_tokens[0]).to_bytes(10, "big")
)
encrypted_quant_tokens_hex = [token.hex()[500:675] for token in encrypted_tokens]
return {
output_encrypted_box: gr.update(value=" ".join(encrypted_quant_tokens_hex)),
anonymized_text_output: gr.update(visible=True, value=None),
identified_words_output_df: gr.update(visible=False, value=None),
}
def send_input_fn(query) -> Dict:
"""Send the encrypted data and the evaluation key to the server."""
print("------------ Step 3.1: Send encrypted_data to the Server")
evaluation_key_path = KEYS_DIR / f"{USER_ID}/evaluation_key"
encrypted_input_path = KEYS_DIR / f"{USER_ID}/encrypted_input"
encrypted_input_len_path = KEYS_DIR / f"{USER_ID}/encrypted_input_len"
if not evaluation_key_path.is_file():
error_message = (
"Error Encountered While Sending Data to the Server: "
f"The key has been generated correctly - {evaluation_key_path.is_file()=}"
)
return {anonymized_text_output: gr.update(value=error_message)}
if not encrypted_input_path.is_file():
error_message = (
"Error Encountered While Sending Data to the Server: The data has not been encrypted "
f"correctly on the client side - {encrypted_input_path.is_file()=}"
)
return {anonymized_text_output: gr.update(value=error_message)}
# Define the data and files to post
data = {"user_id": USER_ID, "input": query}
files = [
("files", open(evaluation_key_path, "rb")),
("files", open(encrypted_input_path, "rb")),
("files", open(encrypted_input_len_path, "rb")),
]
# Send the encrypted input and evaluation key to the server
url = SERVER_URL + "send_input"
with requests.post(
url=url,
data=data,
files=files,
) as resp:
print("Data sent to the server ✅" if resp.ok else "Error ❌ in sending data to the server")
def run_fhe_in_server_fn() -> Dict:
"""Run in FHE the anonymization of the query"""
print("------------ Step 3.2: Run in FHE on the Server Side")
evaluation_key_path = KEYS_DIR / f"{USER_ID}/evaluation_key"
encrypted_input_path = KEYS_DIR / f"{USER_ID}/encrypted_input"
if not evaluation_key_path.is_file():
error_message = (
"Error Encountered While Sending Data to the Server: "
f"The key has been generated correctly - {evaluation_key_path.is_file()=}"
)
return {anonymized_text_output: gr.update(value=error_message)}
if not encrypted_input_path.is_file():
error_message = (
"Error Encountered While Sending Data to the Server: The data has not been encrypted "
f"correctly on the client side - {encrypted_input_path.is_file()=}"
)
return {anonymized_text_output: gr.update(value=error_message)}
data = {
"user_id": USER_ID,
}
url = SERVER_URL + "run_fhe"
with requests.post(
url=url,
data=data,
) as response:
if not response.ok:
return {
anonymized_text_output: gr.update(
value=(
"⚠️ An error occurred on the Server Side. "
"Please check connectivity and data transmission."
),
),
}
else:
time.sleep(1)
print(f"The query anonymization was computed in {response.json():.2f} s per token.")
def get_output_fn() -> Dict:
print("------------ Step 3.3: Get the output from the Server Side")
if not (KEYS_DIR / f"{USER_ID}/evaluation_key").is_file():
error_message = (
"Error Encountered While Sending Data to the Server: "
"The key has not been generated correctly"
)
return {anonymized_text_output: gr.update(value=error_message)}
if not (KEYS_DIR / f"{USER_ID}/encrypted_input").is_file():
error_message = (
"Error Encountered While Sending Data to the Server: "
"The data has not been encrypted correctly on the client side"
)
return {anonymized_text_output: gr.update(value=error_message)}
data = {
"user_id": USER_ID,
}
# Retrieve the encrypted output
url = SERVER_URL + "get_output"
with requests.post(
url=url,
data=data,
) as response:
if response.ok:
print("Data received ✅ from the remote Server")
response_data = response.json()
encrypted_output_base64 = response_data["encrypted_output"]
length_encrypted_output_base64 = response_data["length"]
# Decode the base64 encoded data
encrypted_output = base64.b64decode(encrypted_output_base64)
length_encrypted_output = base64.b64decode(length_encrypted_output_base64)
# Save the encrypted output to bytes in a file as it is too large to pass through
# regular Gradio buttons (see https://github.com/gradio-app/gradio/issues/1877)
write_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output", encrypted_output)
write_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output_len", length_encrypted_output)
else:
print("Error ❌ in getting data to the server")
def decrypt_fn(text) -> Dict:
"""Dencrypt the data on the `Client Side`."""
print("------------ Step 4: Dencrypt the data on the `Client Side`")
# Get the encrypted output path
encrypted_output_path = CLIENT_DIR / f"{USER_ID}_encrypted_output"
if not encrypted_output_path.is_file():
error_message = """⚠️ Please ensure that: \n
- the connectivity \n
- the query has been submitted \n
- the evaluation key has been generated \n
- the server processed the encrypted data \n
- the Client received the data from the Server before decrypting the prediction
"""
print(error_message)
return error_message, None
# Retrieve the client API
client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}")
client.load()
# Load the encrypted output as bytes
encrypted_output = read_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output")
length = int.from_bytes(read_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output_len"), "big")
tokens = re.findall(r"(\b[\w\.\/\-@]+\b|[\s,.!?;:'\"-]+)", text)
decrypted_output, identified_words_with_prob = [], []
i = 0
for token in tokens:
# Directly append non-word tokens or whitespace to processed_tokens
if bool(re.match(r"^\s+$", token)):
continue
else:
encrypted_token = encrypted_output[i : i + length]
prediction_proba = client.deserialize_decrypt_dequantize(encrypted_token)
probability = prediction_proba[0][1]
i += length
if probability >= 0.77:
identified_words_with_prob.append((token, probability))
# Use the existing UUID if available, otherwise generate a new one
tmp_uuid = UUID_MAP.get(token, str(uuid.uuid4())[:8])
decrypted_output.append(tmp_uuid)
UUID_MAP[token] = tmp_uuid
else:
decrypted_output.append(token)
# Update the UUID map with query.
write_json(MAPPING_UUID_PATH, UUID_MAP)
# Removing Spaces Before Punctuation:
anonymized_text = re.sub(r"\s([,.!?;:])", r"\1", " ".join(decrypted_output))
# Convert the list of identified words and probabilities into a DataFrame
if identified_words_with_prob:
identified_df = pd.DataFrame(
identified_words_with_prob, columns=["Identified Words", "Probability"]
)
else:
identified_df = pd.DataFrame(columns=["Identified Words", "Probability"])
print("Decryption done ✅ on Client Side")
return anonymized_text, identified_df
def anonymization_with_fn(query):
encrypt_query_fn(query)
send_input_fn(query)
run_fhe_in_server_fn()
get_output_fn()
anonymized_text, identified_df = decrypt_fn(query)
return {
anonymized_text_output: gr.update(value=anonymized_text),
identified_words_output_df: gr.update(value=identified_df, visible=True),
}
def query_chatgpt_fn(anonymized_query, anonymized_document):
print("------------ Step 5: ChatGPT communication")
if not (KEYS_DIR / f"{USER_ID}/evaluation_key").is_file():
error_message = "Error ❌: Please generate the key first!"
return {chatgpt_response_anonymized: gr.update(value=error_message)}
if not (CLIENT_DIR / f"{USER_ID}_encrypted_output").is_file():
error_message = "Error ❌: Please encrypt your query first!"
return {chatgpt_response_anonymized: gr.update(value=error_message)}
prompt = read_txt(PROMPT_PATH)
# Prepare prompt
initial_prompt = prompt + "\n"
query = (
"Document content:\n```\n"
+ anonymized_document
+ "\n\n```"
+ "Query:\n```\n"
+ anonymized_query
+ "\n```"
)
print(f'initial_prompt:\n{initial_prompt}')
completion = client.chat.completions.create(
model="gpt-4-1106-preview", # Replace with "gpt-4" if available
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": query},
],
)
anonymized_response = completion.choices[0].message.content
uuid_map = read_json(MAPPING_UUID_PATH)
inverse_uuid_map = {
v: k for k, v in uuid_map.items()
} # TODO load the inverse mapping from disk for efficiency
# Pattern to identify words and non-words (including punctuation, spaces, etc.)
tokens = re.findall(r"(\b[\w\.\/\-@]+\b|[\s,.!?;:'\"-]+)", anonymized_response)
processed_tokens = []
for token in tokens:
# Directly append non-word tokens or whitespace to processed_tokens
if not token.strip() or not re.match(r"\w+", token):
processed_tokens.append(token)
continue
if token in inverse_uuid_map:
processed_tokens.append(inverse_uuid_map[token])
else:
processed_tokens.append(token)
deanonymized_response = "".join(processed_tokens)
return {chatgpt_response_anonymized: gr.update(value=anonymized_response),
chatgpt_response_deanonymized: gr.update(value=deanonymized_response)}
demo = gr.Blocks(css=".markdown-body { font-size: 18px; }")
with demo:
gr.Markdown(
"""
<p align="center">
<img width=200 src="file/images/logos/zama.jpg">
</p>
<h1 style="text-align: center;">Encrypted Anonymization Using Fully Homomorphic Encryption</h1>
<p align="center">
<a href="https://github.com/zama-ai/concrete-ml"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="file/images/logos/github.png">Concrete-ML</a>
<a href="https://docs.zama.ai/concrete-ml"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="file/images/logos/documentation.png">Documentation</a>
<a href=" https://community.zama.ai/c/concrete-ml/8"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="file/images/logos/community.png">Community</a>
<a href="https://twitter.com/zama_fhe"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="file/images/logos/x.png">@zama_fhe</a>
</p>
"""
)
# gr.Markdown(
# """
# <p align="center">
# <img width="15%" height="15%" src="./encrypted_anonymization_diagram.jpg">
# </p>
# """
# )
with gr.Accordion("What is encrypted anonymization?", open=False):
gr.Markdown(
"""Anonymization is the process of removing personally identifiable information (PII)
from data to protect individual privacy.
To resolve trust issues when deploying anonymization as a cloud service, Fully Homomorphic
Encryption (FHE) can be used to preserve the privacy of the original data using
encryption.
The data remains encrypted throughout the anonymization process, eliminating the need for
third-party access to the raw data. Once the data is anonymized, it can safely be sent
to GenAI services such as ChatGPT.
"""
)
########################## Key Gen Part ##########################
gr.Markdown(
"## Step 1: Key generation\n\n"
"""In FHE schemes, two sets of keys are generated. First, the secret keys which are used for
encrypting and decrypting data owned by the client. Second, the evaluation keys that allow
a server to blindly process the encrypted data.
"""
)
gen_key_btn = gr.Button("Generate the secret and evaluation keys")
gen_key_btn.click(
key_gen_fn,
inputs=[],
outputs=[gen_key_btn],
)
########################## Main document Part ##########################
gr.Markdown("<hr />")
gr.Markdown("## Step 2: Private document")
with gr.Row():
with gr.Column():
gr.Markdown("**Original document:**")
gr.Markdown(
"""This document was retrieved from the
[Microsoft Presidio](https://huggingface.co/spaces/presidio/presidio_demo) demo.
You can select and deselect sentences to customize the document that will be used
as the initial prompt for ChatGPT in step 5.
"""
)
with gr.Column():
gr.Markdown("**Anonymized document:**")
gr.Markdown(
"""You can see below the anonymized text, replaced with hexademical strings, that
will be sent to ChatGPT.
ChatGPT will then be able to answer any queries about the document.
"""
)
with gr.Row():
with gr.Column():
original_sentences_box = gr.CheckboxGroup(
ORIGINAL_DOCUMENT,
value=ORIGINAL_DOCUMENT,
show_label=False,
)
with gr.Column():
anonymized_doc_box = gr.Textbox(
show_label=False, value=ANONYMIZED_DOCUMENT, interactive=False, lines=11
)
original_sentences_box.change(
fn=select_static_sentences_fn,
inputs=[original_sentences_box],
outputs=[anonymized_doc_box],
)
########################## User Query Part ##########################
gr.Markdown("<hr />")
gr.Markdown("## Step 3: Private query")
gr.Markdown(
"""Now, you can formulate a query. Please choose from the predefined options in
<span style='color:grey'>“Queries examples”</span>" or craft a custom question in
the <span style='color:grey'>“Customized query”</span>" text box.
Remain concise and relevant to the context. Any off-topic query will not be processed.
"""
)
with gr.Row():
with gr.Column(scale=5):
with gr.Column(scale=5):
default_query_box = gr.Dropdown(
list(DEFAULT_QUERIES.values()), label="Queries examples:"
)
gr.Markdown("Or")
query_box = gr.Textbox(
value="Who lives in Maine?", label="Customized query:", interactive=True
)
default_query_box.change(
fn=lambda default_query_box: default_query_box,
inputs=[default_query_box],
outputs=[query_box],
)
with gr.Column(scale=1, min_width=6):
gr.HTML("<div style='height: 77px;'></div>")
encrypt_btn = gr.Button("Encrypt query")
# gr.HTML("<div style='height: 50px;'></div>")
with gr.Column(scale=5):
output_encrypted_box = gr.Textbox(
label="Encrypted anonymized query that will be sent to the anonymization server:",
lines=8,
)
########################## FHE processing Part ##########################
gr.Markdown("<hr />")
gr.Markdown("## Step 4: Secure anonymization with FHE")
gr.Markdown(
""" Once the client encrypts the private query locally, it will be sent to a remote server
to perform the anonymization on encrypted data. When the computation is done, the server
will return the result to the client for decryption.
"""
)
run_fhe_btn = gr.Button("Anonymize with FHE")
anonymized_text_output = gr.Textbox(
label="Decrypted anonymized query that will be sent to ChatGPT:", lines=1, interactive=True
)
identified_words_output_df = gr.Dataframe(label="Identified words:", visible=False)
encrypt_btn.click(
fn=encrypt_query_fn,
inputs=[query_box],
outputs=[
query_box,
output_encrypted_box,
anonymized_text_output,
identified_words_output_df,
],
)
run_fhe_btn.click(
anonymization_with_fn,
inputs=[query_box],
outputs=[anonymized_text_output, identified_words_output_df],
)
########################## ChatGpt Part ##########################
gr.Markdown("<hr />")
gr.Markdown("## Spet 5: Secure your communication on ChatGPT with anonymized queries")
gr.Markdown(
"""After securely anonymizing the query with FHE,
you can forward it to ChatGPT without having any concern about information leakage."""
)
chatgpt_button = gr.Button("Query ChatGPT")
with gr.Row():
chatgpt_response_anonymized = gr.Textbox(label="ChatGPT's anonymized response:", lines=13)
chatgpt_response_deanonymized = gr.Textbox(
label="ChatGPT's non-anonymized response:", lines=13
)
chatgpt_button.click(
query_chatgpt_fn,
inputs=[anonymized_text_output, anonymized_doc_box],
outputs=[chatgpt_response_anonymized, chatgpt_response_deanonymized],
)
gr.Markdown(
"""**Please note**: As this space is intended solely for demonstration purposes, some
private information may be missed during by the anonymization algorithm. Please validate the
following query before sending it to ChatGPT."""
)
# Launch the app
demo.launch(share=False)