|
import gradio as gr |
|
import torch |
|
import torchaudio |
|
import tempfile |
|
import logging |
|
from audioseal import AudioSeal |
|
import random |
|
import string |
|
from pathlib import Path |
|
from datetime import datetime |
|
import json |
|
import os |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
metadata_file = 'audio_metadata.json' |
|
if not os.path.exists(metadata_file): |
|
with open(metadata_file, 'w') as f: |
|
json.dump({}, f) |
|
|
|
|
|
def generate_unique_message(length=16): |
|
characters = string.ascii_letters + string.digits |
|
return ''.join(random.choice(characters) for _ in range(length)) |
|
|
|
|
|
def message_to_binary(message, bit_length=16): |
|
binary_message = ''.join(format(ord(c), '08b') for c in message) |
|
return binary_message[:bit_length].ljust(bit_length, '0') |
|
|
|
|
|
def binary_to_hex(binary_str): |
|
return hex(int(binary_str, 2))[2:].zfill(4) |
|
|
|
|
|
def load_and_resample_audio(audio_file_path, target_sample_rate=16000): |
|
waveform, sample_rate = torchaudio.load(audio_file_path) |
|
if sample_rate != target_sample_rate: |
|
resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sample_rate) |
|
waveform = resampler(waveform) |
|
return waveform, target_sample_rate |
|
|
|
|
|
def generate_enhanced_identifier(): |
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S%f') |
|
sequential_number = str(get_next_sequential_number()).zfill(6) |
|
return f"{timestamp}-{sequential_number}" |
|
|
|
|
|
def get_next_sequential_number(): |
|
with open(metadata_file, 'r+') as f: |
|
data = json.load(f) |
|
next_number = data.get('next_sequential_number', 1) |
|
data['next_sequential_number'] = next_number + 1 |
|
f.seek(0) |
|
json.dump(data, f, indent=4) |
|
f.truncate() |
|
return next_number |
|
|
|
|
|
def save_audio_metadata(unique_id, original_hex, enhanced_id): |
|
with open(metadata_file, 'r+') as f: |
|
data = json.load(f) |
|
data['audio_files'] = data.get('audio_files', {}) |
|
data['audio_files'][unique_id] = {'original_hex': original_hex, 'enhanced_id': enhanced_id} |
|
f.seek(0) |
|
json.dump(data, f, indent=4) |
|
f.truncate() |
|
|
|
|
|
def watermark_audio(audio_file_path, unique_message): |
|
|
|
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000) |
|
waveform = torch.clamp(waveform, min=-1.0, max=1.0) |
|
generator = AudioSeal.load_generator("audioseal_wm_16bits") |
|
|
|
binary_message = message_to_binary(unique_message, bit_length=16) |
|
hex_message = binary_to_hex(binary_message) |
|
message_tensor = torch.tensor([int(bit) for bit in binary_message], dtype=torch.int32).unsqueeze(0) |
|
|
|
watermarked_audio = generator(waveform.unsqueeze(0), sample_rate=sample_rate, message=message_tensor).squeeze(0) |
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') |
|
torchaudio.save(temp_file.name, watermarked_audio, sample_rate) |
|
|
|
|
|
enhanced_id = generate_enhanced_identifier() |
|
save_audio_metadata(unique_message, hex_message, enhanced_id) |
|
|
|
return temp_file.name, hex_message, enhanced_id |
|
|
|
|
|
def detect_watermark(audio_file_path, original_hex_message=None): |
|
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000) |
|
detector = AudioSeal.load_detector("audioseal_detector_16bits") |
|
|
|
result, message_tensor = detector.detect_watermark(waveform.unsqueeze(0), sample_rate=sample_rate) |
|
binary_message = ''.join(str(bit) for bit in message_tensor[0].tolist()) |
|
detected_hex_message = binary_to_hex(binary_message) |
|
|
|
|
|
match_result = "Not compared" |
|
if original_hex_message: |
|
match_result = "Match" if detected_hex_message == original_hex_message.upper() else "No Match" |
|
|
|
return result, detected_hex_message, match_result |
|
|
|
|
|
style_path = Path("style.css") |
|
if style_path.exists(): |
|
style = style_path.read_text() |
|
else: |
|
style = "" |
|
|
|
|
|
def gradio_interface(): |
|
with gr.Blocks(css=style) as demo: |
|
with gr.Tab("Watermark Audio"): |
|
with gr.Column(scale=6): |
|
gr.Markdown("""**How to Watermark Your Audio** |
|
This tool embeds a unique, invisible watermark into your audio files to mark them as yours. Follow these steps: |
|
1. **Upload Your Audio**: Choose the audio file you want to watermark. |
|
2. **Generate Unique Message**: Click this button to generate a unique code that will serve as your audio's watermark. |
|
3. **Apply Watermark**: Embed the watermark into your audio file. This process does not alter the audio's quality. |
|
4. **Download Watermarked Audio**: After the watermark is applied, you can download the watermarked audio. It will sound identical to the original but now contains your unique watermark. |
|
5. **View Enhanced ID**: Along with the watermarked audio, you'll get an enhanced ID for additional tracking and verification purposes. |
|
""") |
|
audio_input_watermark = gr.Audio(label="Upload Audio File for Watermarking", type="filepath") |
|
unique_message_output = gr.Textbox(label="Unique Message") |
|
watermarked_audio_output = gr.Audio(label="Watermarked Audio") |
|
message_output = gr.Textbox(label="Message Used for Watermarking") |
|
enhanced_id_output = gr.Textbox(label="Enhanced ID") |
|
generate_message_button = gr.Button("Generate Unique Message") |
|
watermark_button = gr.Button("Apply Watermark") |
|
generate_message_button.click(fn=generate_unique_message, inputs=None, outputs=unique_message_output) |
|
watermark_button.click(fn=watermark_audio, inputs=[audio_input_watermark, unique_message_output], outputs=[watermarked_audio_output, message_output, enhanced_id_output]) |
|
|
|
with gr.Tab("Detect Watermark"): |
|
with gr.Column(scale=6): |
|
gr.Markdown("""**How to Detect a Watermark in Your Audio** |
|
Use this feature to check if an audio file contains a specific watermark. Here's how: |
|
1. **Upload the Audio File |
|
**: Select the audio file you want to check. |
|
2. **Enter Original Hex Message**: If you know the hexadecimal code of the watermark, enter it here for a precise search. |
|
3. **Detect Watermark**: Click to analyze the audio for your watermark. |
|
4. **Review Results**: Find out whether your watermark was detected and if the detected code matches your input. |
|
""") |
|
audio_input_detect_watermark = gr.Audio(label="Upload Audio File for Watermark Detection", type="filepath") |
|
original_hex_input = gr.Textbox(label="Original Hex Message for Comparison", placeholder="Enter the original hex message here") |
|
detect_watermark_button = gr.Button("Detect Watermark") |
|
watermark_detection_output = gr.Textbox(label="Watermark Detection Result") |
|
detected_message_output = gr.Textbox(label="Detected Hex Message") |
|
match_result_output = gr.Textbox(label="Match Result") |
|
detect_watermark_button.click(fn=detect_watermark, inputs=[audio_input_detect_watermark, original_hex_input], outputs=[watermark_detection_output, detected_message_output, match_result_output]) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = gradio_interface() |
|
demo.launch() |
|
|