Kabatubare
commited on
Commit
•
e0a67a5
1
Parent(s):
d49aedb
Update app.py
Browse files
app.py
CHANGED
@@ -1,33 +1,37 @@
|
|
1 |
import gradio as gr
|
2 |
-
import librosa
|
3 |
-
import numpy as np
|
4 |
import torch
|
5 |
import torchaudio
|
6 |
-
import
|
7 |
import logging
|
8 |
from audioseal import AudioSeal
|
9 |
import random
|
10 |
import string
|
11 |
from pathlib import Path
|
12 |
|
|
|
13 |
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
|
14 |
logger = logging.getLogger(__name__)
|
15 |
|
|
|
16 |
def generate_unique_message(length=16):
|
17 |
-
"""Generates a unique alphanumeric message of the given length."""
|
18 |
characters = string.ascii_letters + string.digits
|
19 |
return ''.join(random.choice(characters) for _ in range(length))
|
20 |
|
|
|
21 |
def message_to_binary(message, bit_length=16):
|
22 |
-
"""Converts a message to binary format, truncating or padding to `bit_length`."""
|
23 |
binary_message = ''.join(format(ord(c), '08b') for c in message)
|
24 |
return binary_message[:bit_length].ljust(bit_length, '0')
|
25 |
|
|
|
26 |
def binary_to_message(binary_str):
|
27 |
-
"""Converts a binary string back to its ASCII message representation."""
|
28 |
chars = [chr(int(binary_str[i:i+8], 2)) for i in range(0, len(binary_str), 8)]
|
29 |
return ''.join(chars).rstrip('\x00')
|
30 |
|
|
|
|
|
|
|
|
|
|
|
31 |
def load_and_resample_audio(audio_file_path, target_sample_rate=16000):
|
32 |
waveform, sample_rate = torchaudio.load(audio_file_path)
|
33 |
if sample_rate != target_sample_rate:
|
@@ -35,30 +39,33 @@ def load_and_resample_audio(audio_file_path, target_sample_rate=16000):
|
|
35 |
waveform = resampler(waveform)
|
36 |
return waveform, target_sample_rate
|
37 |
|
|
|
38 |
def watermark_audio(audio_file_path, unique_message):
|
39 |
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
|
40 |
waveform = torch.clamp(waveform, min=-1.0, max=1.0)
|
41 |
generator = AudioSeal.load_generator("audioseal_wm_16bits")
|
42 |
|
43 |
-
|
44 |
-
|
45 |
message_tensor = torch.tensor([int(bit) for bit in binary_message], dtype=torch.int32).unsqueeze(0)
|
46 |
|
47 |
watermarked_audio = generator(waveform.unsqueeze(0), sample_rate=sample_rate, message=message_tensor).squeeze(0)
|
48 |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
|
49 |
torchaudio.save(temp_file.name, watermarked_audio, sample_rate)
|
50 |
-
return temp_file.name,
|
51 |
|
|
|
52 |
def detect_watermark(audio_file_path):
|
53 |
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
|
54 |
detector = AudioSeal.load_detector("audioseal_detector_16bits")
|
55 |
|
56 |
result, message_tensor = detector.detect_watermark(waveform.unsqueeze(0), sample_rate=sample_rate)
|
57 |
binary_message = ''.join(str(bit) for bit in message_tensor[0].tolist())
|
58 |
-
detected_message =
|
59 |
|
60 |
return result, detected_message
|
61 |
|
|
|
62 |
style_path = Path("style.css")
|
63 |
style = style_path.read_text()
|
64 |
|
@@ -82,4 +89,4 @@ with gr.Blocks(css=style) as demo:
|
|
82 |
detected_message_output = gr.Textbox(label="Detected Unique Message")
|
83 |
detect_watermark_button.click(fn=detect_watermark, inputs=audio_input_detect_watermark, outputs=[watermark_detection_output, detected_message_output])
|
84 |
|
85 |
-
demo.launch()
|
|
|
1 |
import gradio as gr
|
|
|
|
|
2 |
import torch
|
3 |
import torchaudio
|
4 |
+
import tempfile
|
5 |
import logging
|
6 |
from audioseal import AudioSeal
|
7 |
import random
|
8 |
import string
|
9 |
from pathlib import Path
|
10 |
|
11 |
+
# Initialize logging
|
12 |
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
|
13 |
logger = logging.getLogger(__name__)
|
14 |
|
15 |
+
# Helper function for generating a unique alphanumeric message
|
16 |
def generate_unique_message(length=16):
|
|
|
17 |
characters = string.ascii_letters + string.digits
|
18 |
return ''.join(random.choice(characters) for _ in range(length))
|
19 |
|
20 |
+
# Converts message to binary, ensuring it fits within the specified bit length
|
21 |
def message_to_binary(message, bit_length=16):
|
|
|
22 |
binary_message = ''.join(format(ord(c), '08b') for c in message)
|
23 |
return binary_message[:bit_length].ljust(bit_length, '0')
|
24 |
|
25 |
+
# Converts binary string back to ASCII message
|
26 |
def binary_to_message(binary_str):
|
|
|
27 |
chars = [chr(int(binary_str[i:i+8], 2)) for i in range(0, len(binary_str), 8)]
|
28 |
return ''.join(chars).rstrip('\x00')
|
29 |
|
30 |
+
# Converts binary string to hexadecimal
|
31 |
+
def binary_to_hex(binary_str):
|
32 |
+
return hex(int(binary_str, 2))[2:].zfill(4)
|
33 |
+
|
34 |
+
# Load and resample audio file to match model's expected sample rate
|
35 |
def load_and_resample_audio(audio_file_path, target_sample_rate=16000):
|
36 |
waveform, sample_rate = torchaudio.load(audio_file_path)
|
37 |
if sample_rate != target_sample_rate:
|
|
|
39 |
waveform = resampler(waveform)
|
40 |
return waveform, target_sample_rate
|
41 |
|
42 |
+
# Main function for applying watermark to audio
|
43 |
def watermark_audio(audio_file_path, unique_message):
|
44 |
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
|
45 |
waveform = torch.clamp(waveform, min=-1.0, max=1.0)
|
46 |
generator = AudioSeal.load_generator("audioseal_wm_16bits")
|
47 |
|
48 |
+
binary_message = message_to_binary(unique_message, bit_length=16)
|
49 |
+
hex_message = binary_to_hex(binary_message)
|
50 |
message_tensor = torch.tensor([int(bit) for bit in binary_message], dtype=torch.int32).unsqueeze(0)
|
51 |
|
52 |
watermarked_audio = generator(waveform.unsqueeze(0), sample_rate=sample_rate, message=message_tensor).squeeze(0)
|
53 |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
|
54 |
torchaudio.save(temp_file.name, watermarked_audio, sample_rate)
|
55 |
+
return temp_file.name, hex_message # Display hex message to the user
|
56 |
|
57 |
+
# Function to detect watermark in audio
|
58 |
def detect_watermark(audio_file_path):
|
59 |
waveform, sample_rate = load_and_resample_audio(audio_file_path, target_sample_rate=16000)
|
60 |
detector = AudioSeal.load_detector("audioseal_detector_16bits")
|
61 |
|
62 |
result, message_tensor = detector.detect_watermark(waveform.unsqueeze(0), sample_rate=sample_rate)
|
63 |
binary_message = ''.join(str(bit) for bit in message_tensor[0].tolist())
|
64 |
+
detected_message = binary_to_hex(binary_message) # Convert detected binary message to hex for display
|
65 |
|
66 |
return result, detected_message
|
67 |
|
68 |
+
# Setup for Gradio interface
|
69 |
style_path = Path("style.css")
|
70 |
style = style_path.read_text()
|
71 |
|
|
|
89 |
detected_message_output = gr.Textbox(label="Detected Unique Message")
|
90 |
detect_watermark_button.click(fn=detect_watermark, inputs=audio_input_detect_watermark, outputs=[watermark_detection_output, detected_message_output])
|
91 |
|
92 |
+
demo.launch()
|