File size: 7,703 Bytes
e027770 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
import datetime
import json
import os
import shutil
import tempfile
import uuid
from pathlib import Path
from typing import Any, Dict, List
import gradio as gr
import pyarrow as pa
import pyarrow.parquet as pq
from gradio_client import Client
from huggingface_hub import CommitScheduler
#######################
# Parquet scheduler #
# Run in scheduler.py #
#######################
class ParquetScheduler(CommitScheduler):
def append(self, row: Dict[str, Any]) -> None:
with self.lock:
if not hasattr(self, "rows") or self.rows is None:
self.rows = []
self.rows.append(row)
def set_schema(self, schema: Dict[str, Dict[str, str]]) -> None:
"""
Define a schema to help `datasets` load the generated library.
This method is optional and can be called once just after the scheduler had been created. If it is not called,
the schema is automatically inferred before pushing the data to the Hub.
See https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value for the list of
possible values.
Example:
```py
scheduler.set_schema({
"prompt": {"_type": "Value", "dtype": "string"},
"negative_prompt": {"_type": "Value", "dtype": "string"},
"guidance_scale": {"_type": "Value", "dtype": "int64"},
"image": {"_type": "Image"},
})
```
"""
self._schema = schema
def push_to_hub(self):
# Check for new rows to push
with self.lock:
rows = getattr(self, "rows", None)
self.rows = None
if not rows:
return
print(f"Got {len(rows)} item(s) to commit.")
# Load images + create 'features' config for datasets library
hf_features: Dict[str, Dict] = getattr(self, "_schema", None) or {}
path_to_cleanup: List[Path] = []
for row in rows:
for key, value in row.items():
# Infer schema (for `datasets` library)
if key not in hf_features:
hf_features[key] = _infer_schema(key, value)
# Load binary files if necessary
if hf_features[key]["_type"] in ("Image", "Audio"):
# It's an image or audio: we load the bytes and remember to cleanup the file
file_path = Path(value)
if file_path.is_file():
row[key] = {
"path": file_path.name,
"bytes": file_path.read_bytes(),
}
path_to_cleanup.append(file_path)
# Complete rows if needed
for row in rows:
for feature in hf_features:
if feature not in row:
row[feature] = None
# Export items to Arrow format
table = pa.Table.from_pylist(rows)
# Add metadata (used by datasets library)
table = table.replace_schema_metadata(
{"huggingface": json.dumps({"info": {"features": hf_features}})}
)
# Write to parquet file
archive_file = tempfile.NamedTemporaryFile()
pq.write_table(table, archive_file.name)
# Upload
self.api.upload_file(
repo_id=self.repo_id,
repo_type=self.repo_type,
revision=self.revision,
path_in_repo=f"{uuid.uuid4()}.parquet",
path_or_fileobj=archive_file.name,
)
print(f"Commit completed.")
# Cleanup
archive_file.close()
for path in path_to_cleanup:
path.unlink(missing_ok=True)
def _infer_schema(key: str, value: Any) -> Dict[str, str]:
"""
Infer schema for the `datasets` library.
See https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value.
"""
if "image" in key:
return {"_type": "Image"}
if "audio" in key:
return {"_type": "Audio"}
if isinstance(value, int):
return {"_type": "Value", "dtype": "int64"}
if isinstance(value, float):
return {"_type": "Value", "dtype": "float64"}
if isinstance(value, bool):
return {"_type": "Value", "dtype": "bool"}
if isinstance(value, bytes):
return {"_type": "Value", "dtype": "binary"}
# Otherwise in last resort => convert it to a string
return {"_type": "Value", "dtype": "string"}
#################
# Gradio app #
# Run in app.py #
#################
PARQUET_DATASET_DIR = Path("parquet_dataset")
PARQUET_DATASET_DIR.mkdir(parents=True, exist_ok=True)
scheduler = ParquetScheduler(
repo_id="example-space-to-dataset-parquet",
repo_type="dataset",
folder_path=PARQUET_DATASET_DIR,
path_in_repo="data",
)
client = Client("stabilityai/stable-diffusion")
def generate(prompt: str) -> tuple[str, list[str]]:
"""Generate images on 'submit' button."""
# Generate from https://huggingface.co/spaces/stabilityai/stable-diffusion
out_dir = client.predict(prompt, "", 9, fn_index=1)
with (Path(out_dir) / "captions.json").open() as f:
paths = list(json.load(f).keys())
# Save config used to generate data
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", delete=False
) as config_file:
json.dump(
{"prompt": prompt, "negative_prompt": "", "guidance_scale": 9}, config_file
)
return config_file.name, paths
def get_selected_index(evt: gr.SelectData) -> int:
"""Select "best" image."""
return evt.index
def save_preference(
config_path: str, gallery: list[dict[str, Any]], selected_index: int
) -> None:
"""Save preference, i.e. move images to a new folder and send paths+config to scheduler."""
save_dir = PARQUET_DATASET_DIR / f"{uuid.uuid4()}"
save_dir.mkdir(parents=True, exist_ok=True)
# Load config
with open(config_path) as f:
data = json.load(f)
# Add selected item + timestamp
data["selected_index"] = selected_index
data["timestamp"] = datetime.datetime.utcnow().isoformat()
# Copy and add images
for index, path in enumerate(x["name"] for x in gallery):
name = f"{index:03d}"
dst_path = save_dir / f"{name}{Path(path).suffix}"
shutil.move(path, dst_path)
data[f"image_{name}"] = dst_path
# Send to scheduler
scheduler.append(data)
def clear() -> tuple[dict, dict, dict]:
"""Clear all values once saved."""
return (gr.update(value=None), gr.update(value=None), gr.update(interactive=False))
def get_demo():
with gr.Group():
prompt = gr.Text(show_label=False, placeholder="Prompt")
config_path = gr.Text(visible=False)
gallery = gr.Gallery(show_label=False).style(
columns=2, rows=2, height="600px", object_fit="scale-down"
)
selected_index = gr.Number(visible=False, precision=0)
save_preference_button = gr.Button("Save preference", interactive=False)
# Generate images on submit
prompt.submit(fn=generate, inputs=prompt, outputs=[config_path, gallery],).success(
fn=lambda: gr.update(interactive=True),
outputs=save_preference_button,
queue=False,
)
# Save preference on click
gallery.select(
fn=get_selected_index,
outputs=selected_index,
queue=False,
)
save_preference_button.click(
fn=save_preference,
inputs=[config_path, gallery, selected_index],
queue=False,
).then(
fn=clear,
outputs=[config_path, gallery, save_preference_button],
queue=False,
)
|