Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
from datasets import load_dataset, Dataset | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel, Trainer, TrainingArguments | |
import torch | |
import os | |
import matplotlib.pyplot as plt | |
from huggingface_hub import HfApi # ここを修正しました | |
import json | |
import io | |
from datetime import datetime | |
# グローバル変数で検出された列を保存 | |
columns = [] | |
# データセットをロードする関数 | |
def load_data(dataset_name): | |
global columns | |
try: | |
# Hugging Faceのデータセットをロード | |
dataset = load_dataset(dataset_name) | |
# 最初のデータをプレビューとして表示 | |
df = pd.DataFrame(dataset['train']) | |
# 列名を検出 | |
columns = df.columns.tolist() | |
return columns, df.head().to_string(index=False) | |
except Exception as e: | |
return f"エラーが発生しました: {str(e)}" | |
# 列の選択が正しいかを検証 | |
def validate_columns(prompt_col, description_col): | |
if prompt_col not in columns or description_col not in columns: | |
return False | |
return True | |
# モデル訓練関数 | |
def train_model(dataset_name, model_name, epochs, batch_size, learning_rate, output_dir, prompt_col, description_col, hf_token): | |
try: | |
# 列の検証 | |
if not validate_columns(prompt_col, description_col): | |
return "無効な列選択です。データセット内の列を確認してください。" | |
# Hugging Faceのデータセットをロード | |
dataset = load_dataset(dataset_name) | |
# 訓練データを取得 | |
df = pd.DataFrame(dataset['train']) | |
# データのプレビュー | |
preview = df.head().to_string(index=False) | |
# 訓練用テキストの準備 | |
df['text'] = df[prompt_col] + ': ' + df[description_col] | |
train_dataset = Dataset.from_pandas(df[['text']]) | |
# GPT-2のトークナイザーとモデルを初期化 | |
tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
model = GPT2LMHeadModel.from_pretrained(model_name) | |
# 必要であればパディングトークンを追加 | |
if tokenizer.pad_token is None: | |
tokenizer.add_special_tokens({'pad_token': '[PAD]'}) | |
model.resize_token_embeddings(len(tokenizer)) | |
# データのトークナイズ関数 | |
def tokenize_function(examples): | |
tokens = tokenizer(examples['text'], padding="max_length", truncation=True, max_length=128) | |
tokens['labels'] = tokens['input_ids'].copy() | |
return tokens | |
tokenized_datasets = train_dataset.map(tokenize_function, batched=True) | |
# 訓練のための設定 | |
training_args = TrainingArguments( | |
output_dir=output_dir, | |
overwrite_output_dir=True, | |
num_train_epochs=int(epochs), | |
per_device_train_batch_size=int(batch_size), | |
per_device_eval_batch_size=int(batch_size), | |
warmup_steps=1000, | |
weight_decay=0.01, | |
learning_rate=float(learning_rate), | |
logging_dir="./logs", | |
logging_steps=10, | |
save_steps=500, | |
save_total_limit=2, | |
evaluation_strategy="steps", | |
eval_steps=500, | |
load_best_model_at_end=True, | |
metric_for_best_model="eval_loss" | |
) | |
# Trainer設定 | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=tokenized_datasets, | |
eval_dataset=tokenized_datasets, | |
) | |
# 訓練開始 | |
trainer.train() | |
eval_results = trainer.evaluate() | |
# Fine-tunedモデルを保存 | |
model.save_pretrained(output_dir) | |
tokenizer.save_pretrained(output_dir) | |
# 訓練損失と評価損失のグラフ生成 | |
train_loss = [x['loss'] for x in trainer.state.log_history if 'loss' in x] | |
eval_loss = [x['eval_loss'] for x in trainer.state.log_history if 'eval_loss' in x] | |
plt.plot(train_loss, label='訓練損失') | |
plt.plot(eval_loss, label='評価損失') | |
plt.xlabel('ステップ数') | |
plt.ylabel('損失') | |
plt.title('訓練と評価の損失') | |
plt.legend() | |
plt.savefig(os.path.join(output_dir, 'training_eval_loss.png')) | |
# モデルのHuggingFaceにアップロード | |
hf_api = HfApi() | |
hf_api.upload_folder( | |
folder_path=output_dir, | |
path_in_repo=".", | |
repo_id="sakaltcommunity/grape-small", | |
token=hf_token | |
) | |
return f"訓練が完了しました。\nデータのプレビュー:\n{preview}", eval_results | |
except Exception as e: | |
return f"エラーが発生しました: {str(e)}" | |
# テキスト生成関数 | |
def generate_text(prompt, temperature, top_k, top_p, max_length, repetition_penalty, use_comma, batch_size): | |
try: | |
model_name = "./fine-tuned-gpt2" | |
tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
model = GPT2LMHeadModel.from_pretrained(model_name) | |
if use_comma: | |
prompt = prompt.replace('.', ',') | |
inputs = tokenizer(prompt, return_tensors="pt", padding=True) | |
attention_mask = inputs.attention_mask | |
outputs = model.generate( | |
inputs.input_ids, | |
attention_mask=attention_mask, | |
max_length=int(max_length), | |
temperature=float(temperature), | |
top_k=int(top_k), | |
top_p=float(top_p), | |
repetition_penalty=float(repetition_penalty), | |
num_return_sequences=int(batch_size), | |
pad_token_id=tokenizer.eos_token_id | |
) | |
return [tokenizer.decode(output, skip_special_tokens=True) for output in outputs] | |
except Exception as e: | |
return f"エラーが発生しました: {str(e)}" | |
# UI設定 | |
with gr.Blocks() as ui: | |
with gr.Row(): | |
dataset_name = gr.Textbox(label="データセット名", value="imdb") # ここにデータセット名を入力 | |
model_name = gr.Textbox(label="モデル名", value="gpt2") | |
epochs = gr.Number(label="エポック数", value=3, minimum=1) | |
batch_size = gr.Number(label="バッチサイズ", value=4, minimum=1) | |
learning_rate = gr.Number(label="学習率", value=5e-5, minimum=1e-7, maximum=1e-2, step=1e-7) | |
output_dir = gr.Textbox(label="出力ディレクトリ", value="./output") | |
prompt_col = gr.Textbox(label="プロンプト列名", value="text") # 例:IMDBのレビュー列名 | |
description_col = gr.Textbox(label="説明列名", value="label") # 例:IMDBのラベル列名 | |
hf_token = gr.Textbox(label="Hugging Face アクセストークン") | |
with gr.Row(): | |
validate_button = gr.Button("列検証") | |
output = gr.Textbox(label="出力") | |
validate_button.click( | |
load_data, | |
inputs=[dataset_name], | |
outputs=[output] | |
) | |
with gr.Row(): | |
train_button = gr.Button("訓練開始") | |
result_output = gr.Textbox(label="訓練結果", lines=20) | |
train_button.click( | |
train_model, | |
inputs=[dataset_name, model_name, epochs, batch_size, learning_rate, output_dir, prompt_col, description_col, hf_token], | |
outputs=[result_output] | |
) | |
ui.launch() |