|
import os |
|
import shutil |
|
import subprocess |
|
import zipfile |
|
import time |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torchvision import datasets, transforms, models |
|
from torch.optim import lr_scheduler |
|
import subprocess |
|
import zipfile |
|
from PIL import Image |
|
import gradio as gr |
|
|
|
|
|
|
|
kaggle_dir = os.path.expanduser("~/.kaggle") |
|
if not os.path.exists(kaggle_dir): |
|
os.makedirs(kaggle_dir) |
|
|
|
|
|
kaggle_json_path = "kaggle.json" |
|
kaggle_dest_path = os.path.join(kaggle_dir, "kaggle.json") |
|
|
|
if not os.path.exists(kaggle_dest_path): |
|
shutil.copy(kaggle_json_path, kaggle_dest_path) |
|
os.chmod(kaggle_dest_path, 0o600) |
|
print("Kaggle API key copied and permissions set.") |
|
else: |
|
print("Kaggle API key already exists.") |
|
|
|
|
|
dataset_name = "mostafaabla/garbage-classification" |
|
print(f"Downloading the dataset: {dataset_name}") |
|
download_command = f"kaggle datasets download -d {dataset_name}" |
|
|
|
|
|
subprocess.run(download_command, shell=True) |
|
|
|
|
|
dataset_zip = "garbage-classification.zip" |
|
extracted_folder = "./garbage-classification" |
|
|
|
|
|
if os.path.exists(dataset_zip): |
|
if not os.path.exists(extracted_folder): |
|
with zipfile.ZipFile(dataset_zip, 'r') as zip_ref: |
|
zip_ref.extractall(extracted_folder) |
|
print("Dataset unzipped successfully!") |
|
else: |
|
print("Dataset already unzipped.") |
|
else: |
|
print(f"Dataset zip file '{dataset_zip}' not found.") |
|
|
|
|
|
|
|
data_dir = 'C:\\Users\\kendr\\Downloads\\data' |
|
|
|
|
|
data_transforms = { |
|
'train': transforms.Compose([ |
|
transforms.RandomResizedCrop(224), |
|
transforms.RandomRotation(15), |
|
transforms.RandomHorizontalFlip(), |
|
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
|
]), |
|
'valid': transforms.Compose([ |
|
transforms.Resize(256), |
|
transforms.CenterCrop(224), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
|
]), |
|
} |
|
|
|
|
|
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) |
|
for x in ['train', 'valid']} |
|
|
|
|
|
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=32, shuffle=True, num_workers=4) |
|
for x in ['train', 'valid']} |
|
|
|
|
|
class_names = image_datasets['train'].classes |
|
print(f"Classes: {class_names}") |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
model = models.resnet50(weights='ResNet50_Weights.DEFAULT') |
|
|
|
|
|
num_ftrs = model.fc.in_features |
|
model.fc = nn.Linear(num_ftrs, len(class_names)) |
|
|
|
|
|
model = model.to(device) |
|
|
|
|
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = optim.Adam(model.parameters(), lr=0.001) |
|
|
|
|
|
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1) |
|
|
|
|
|
num_epochs = 10 |
|
|
|
|
|
def train_model(model, criterion, optimizer, scheduler, num_epochs=10): |
|
since = time.time() |
|
|
|
best_model_wts = model.state_dict() |
|
best_acc = 0.0 |
|
|
|
for epoch in range(num_epochs): |
|
epoch_start = time.time() |
|
print(f'Epoch {epoch + 1}/{num_epochs}') |
|
print('-' * 10) |
|
|
|
|
|
for phase in ['train', 'valid']: |
|
if phase == 'train': |
|
model.train() |
|
else: |
|
model.eval() |
|
|
|
running_loss = 0.0 |
|
running_corrects = 0 |
|
|
|
|
|
for inputs, labels in dataloaders[phase]: |
|
inputs = inputs.to(device) |
|
labels = labels.to(device) |
|
|
|
|
|
optimizer.zero_grad() |
|
|
|
|
|
with torch.set_grad_enabled(phase == 'train'): |
|
outputs = model(inputs) |
|
_, preds = torch.max(outputs, 1) |
|
loss = criterion(outputs, labels) |
|
|
|
|
|
if phase == 'train': |
|
loss.backward() |
|
optimizer.step() |
|
|
|
|
|
running_loss += loss.item() * inputs.size(0) |
|
running_corrects += torch.sum(preds == labels.data) |
|
|
|
if phase == 'train': |
|
scheduler.step() |
|
|
|
|
|
epoch_loss = running_loss / len(image_datasets[phase]) |
|
epoch_acc = running_corrects.double() / len(image_datasets[phase]) |
|
|
|
|
|
print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}') |
|
|
|
|
|
if phase == 'valid' and epoch_acc > best_acc: |
|
best_acc = epoch_acc |
|
best_model_wts = model.state_dict() |
|
|
|
epoch_end = time.time() |
|
print(f'Epoch {epoch + 1} completed in {epoch_end - epoch_start:.2f} seconds.') |
|
|
|
time_elapsed = time.time() - since |
|
print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s') |
|
print(f'Best val Acc: {best_acc:.4f}') |
|
|
|
|
|
model.load_state_dict(best_model_wts) |
|
return model |
|
|
|
|
|
best_model = train_model(model, criterion, optimizer, scheduler, num_epochs=num_epochs) |
|
|
|
|
|
torch.save(model.state_dict(), 'resnet50_garbage_classification.pth') |
|
|
|
import pickle |
|
|
|
|
|
history = { |
|
'train_loss': [1.0083, 0.7347, 0.6510, 0.5762, 0.5478, 0.5223, 0.4974, 0.3464, 0.2896, 0.2604], |
|
'train_acc': [0.6850, 0.7687, 0.7913, 0.8126, 0.8210, 0.8272, 0.8355, 0.8870, 0.9049, 0.9136], |
|
'val_loss': [0.6304, 0.8616, 0.5594, 0.4006, 0.3968, 0.4051, 0.3223, 0.2221, 0.2125, 0.2076], |
|
'val_acc': [0.7985, 0.7307, 0.8260, 0.8655, 0.8793, 0.8729, 0.9094, 0.9338, 0.9338, 0.9326] |
|
} |
|
|
|
|
|
with open('training_history.pkl', 'wb') as f: |
|
pickle.dump(history, f) |
|
|
|
print('Training history saved as training_history.pkl') |
|
|
|
|
|
|
|
|
|
def load_model(): |
|
model = models.resnet50(weights='DEFAULT') |
|
num_ftrs = model.fc.in_features |
|
model.fc = nn.Linear(num_ftrs, 12) |
|
|
|
|
|
model.load_state_dict(torch.load('resnet50_garbage_classification.pth', map_location=torch.device('cpu'))) |
|
|
|
model.eval() |
|
return model |
|
|
|
model = load_model() |
|
|
|
|
|
transform = transforms.Compose([ |
|
transforms.Resize(256), |
|
transforms.CenterCrop(224), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) |
|
]) |
|
|
|
|
|
class_names = ['battery', 'biological', 'brown-glass', 'cardboard', |
|
'clothes', 'green-glass', 'metal', 'paper', |
|
'plastic', 'shoes', 'trash', 'white-glass'] |
|
|
|
|
|
bin_colors = { |
|
'battery': 'Merah (Red)', |
|
'biological': 'Cokelat (Brown)', |
|
'brown-glass': 'Hijau (Green)', |
|
'cardboard': 'Kuning (Yellow)', |
|
'clothes': 'Biru (Blue)', |
|
'green-glass': 'Hijau (Green)', |
|
'metal': 'Kuning (Yellow)', |
|
'paper': 'Kuning (Yellow)', |
|
'plastic': 'Kuning (Yellow)', |
|
'shoes': 'Biru (Blue)', |
|
'trash': 'Hitam (Black)', |
|
'white-glass': 'Putih (White)' |
|
} |
|
|
|
|
|
def predict(image): |
|
image = Image.fromarray(image) |
|
image = transform(image) |
|
image = image.unsqueeze(0) |
|
|
|
with torch.no_grad(): |
|
outputs = model(image) |
|
_, predicted = torch.max(outputs, 1) |
|
|
|
class_name = class_names[predicted.item()] |
|
bin_color = bin_colors[class_name] |
|
return class_name, bin_color |
|
|
|
|
|
iface = gr.Interface( |
|
fn=predict, |
|
inputs=gr.Image(type="numpy", label="Unggah Gambar"), |
|
outputs=[ |
|
gr.Textbox(label="Jenis Sampah"), |
|
gr.Textbox(label="Tong Sampah yang Sesuai") |
|
], |
|
title="Klasifikasi Sampah dengan ResNet50", |
|
description="Unggah gambar sampah, dan model akan mengklasifikasikannya ke dalam salah satu dari 12 kategori bersama dengan warna tempat sampah yang sesuai." |
|
) |
|
|
|
|
|
iface.launch(share=True) |
|
|