import os
import shutil
import subprocess
import zipfile
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.optim import lr_scheduler
import subprocess
import zipfile
from PIL import Image
import gradio as gr
# Step 1: Setup Kaggle API
# Ensure the .kaggle directory exists
kaggle_dir = os.path.expanduser("~/.kaggle")
if not os.path.exists(kaggle_dir):
# Step 2: Copy the kaggle.json file to the ~/.kaggle directory
kaggle_json_path = "kaggle.json"
kaggle_dest_path = os.path.join(kaggle_dir, "kaggle.json")
if not os.path.exists(kaggle_dest_path):
shutil.copy(kaggle_json_path, kaggle_dest_path)
os.chmod(kaggle_dest_path, 0o600)
print("Kaggle API key copied and permissions set.")
print("Kaggle API key already exists.")
# Step 3: Download the dataset from Kaggle using Kaggle CLI
dataset_name = "mostafaabla/garbage-classification"
print(f"Downloading the dataset: {dataset_name}")
download_command = f"kaggle datasets download -d {dataset_name}"
# Run the download command, shell=True)
# Step 4: Unzip the downloaded dataset
dataset_zip = ""
extracted_folder = "./garbage-classification"
# Check if the zip file exists
if os.path.exists(dataset_zip):
if not os.path.exists(extracted_folder):
with zipfile.ZipFile(dataset_zip, 'r') as zip_ref:
print("Dataset unzipped successfully!")
print("Dataset already unzipped.")
print(f"Dataset zip file '{dataset_zip}' not found.")
# Path to the data directory
data_dir = 'C:\\Users\\kendr\\Downloads\\data' # Adjust this if necessary
# Define data transformations
data_transforms = {
'train': transforms.Compose([
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
'valid': transforms.Compose([
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# Create the datasets from the image folder
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
for x in ['train', 'valid']}
# Create the dataloaders
dataloaders = {x:[x], batch_size=32, shuffle=True, num_workers=4)
for x in ['train', 'valid']}
# Class names
class_names = image_datasets['train'].classes
print(f"Classes: {class_names}")
# Check if a GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Load pre-trained ResNet50 model
model = models.resnet50(weights='ResNet50_Weights.DEFAULT') # Use weights instead of pretrained
# Modify the final layer to match the number of classes
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(class_names)) # Output classes match
# Move the model to the GPU if available
model =
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Learning rate scheduler
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
# Number of epochs
num_epochs = 10
# Training function with detailed output for each epoch
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
since = time.time()
best_model_wts = model.state_dict()
best_acc = 0.0
for epoch in range(num_epochs):
epoch_start = time.time() # Start time for this epoch
print(f'Epoch {epoch + 1}/{num_epochs}')
print('-' * 10)
# Each epoch has a training and validation phase
for phase in ['train', 'valid']:
if phase == 'train':
model.train() # Set model to training mode
model.eval() # Set model to evaluate mode
running_loss = 0.0
running_corrects = 0
# Iterate over data
for inputs, labels in dataloaders[phase]:
inputs =
labels =
# Zero the parameter gradients
# Forward
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# Backward + optimize only if in training phase
if phase == 'train':
# Statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds ==
if phase == 'train':
# Calculate epoch loss and accuracy
epoch_loss = running_loss / len(image_datasets[phase])
epoch_acc = running_corrects.double() / len(image_datasets[phase])
# Print loss and accuracy for each phase
print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
# Deep copy the model if it's the best accuracy
if phase == 'valid' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = model.state_dict()
epoch_end = time.time() # End time for this epoch
print(f'Epoch {epoch + 1} completed in {epoch_end - epoch_start:.2f} seconds.')
time_elapsed = time.time() - since
print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
print(f'Best val Acc: {best_acc:.4f}')
# Load best model weights
return model
# Train the model
best_model = train_model(model, criterion, optimizer, scheduler, num_epochs=num_epochs)
# Save the model, 'resnet50_garbage_classification.pth')
import pickle
# Manually creating the history dictionary based on the logs you provided
history = {
'train_loss': [1.0083, 0.7347, 0.6510, 0.5762, 0.5478, 0.5223, 0.4974, 0.3464, 0.2896, 0.2604],
'train_acc': [0.6850, 0.7687, 0.7913, 0.8126, 0.8210, 0.8272, 0.8355, 0.8870, 0.9049, 0.9136],
'val_loss': [0.6304, 0.8616, 0.5594, 0.4006, 0.3968, 0.4051, 0.3223, 0.2221, 0.2125, 0.2076],
'val_acc': [0.7985, 0.7307, 0.8260, 0.8655, 0.8793, 0.8729, 0.9094, 0.9338, 0.9338, 0.9326]
# Save the history as a pickle file
with open('training_history.pkl', 'wb') as f:
pickle.dump(history, f)
print('Training history saved as training_history.pkl')
# Load your model
def load_model():
model = models.resnet50(weights='DEFAULT') # Using default weights for initialization
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 12) # Adjust to the number of classes you have
# Load the state dict without the weights_only argument
model.load_state_dict(torch.load('resnet50_garbage_classification.pth', map_location=torch.device('cpu')))
model.eval() # Set to evaluation mode
return model
model = load_model()
# Define image transformations
transform = transforms.Compose([
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# Class names
class_names = ['battery', 'biological', 'brown-glass', 'cardboard',
'clothes', 'green-glass', 'metal', 'paper',
'plastic', 'shoes', 'trash', 'white-glass']
# Define bin colors for each class
bin_colors = {
'battery': 'Merah (Red)', # Limbah berbahaya
'biological': 'Cokelat (Brown)', # Limbah organik
'brown-glass': 'Hijau (Green)', # Gelas berwarna coklat
'cardboard': 'Kuning (Yellow)', # Limbah daur ulang
'clothes': 'Biru (Blue)', # Pakaian dan tekstil
'green-glass': 'Hijau (Green)', # Gelas berwarna hijau
'metal': 'Kuning (Yellow)', # Limbah daur ulang
'paper': 'Kuning (Yellow)', # Limbah daur ulang
'plastic': 'Kuning (Yellow)', # Limbah daur ulang
'shoes': 'Biru (Blue)', # Pakaian dan tekstil
'trash': 'Hitam (Black)', # Limbah umum
'white-glass': 'Putih (White)' # Gelas berwarna putih
# Define the prediction function
def predict(image):
image = Image.fromarray(image) # Convert numpy array to PIL Image
image = transform(image) # Apply transformations
image = image.unsqueeze(0) # Add batch dimension
with torch.no_grad():
outputs = model(image)
_, predicted = torch.max(outputs, 1)
class_name = class_names[predicted.item()] # Return predicted class name
bin_color = bin_colors[class_name] # Get the corresponding bin color
return class_name, bin_color # Return both class name and bin color
# Make Gradio Interface
iface = gr.Interface(
inputs=gr.Image(type="numpy", label="Unggah Gambar"),
gr.Textbox(label="Jenis Sampah"),
gr.Textbox(label="Tong Sampah yang Sesuai") # 2 output with label
title="Klasifikasi Sampah dengan ResNet50",
description="Unggah gambar sampah, dan model akan mengklasifikasikannya ke dalam salah satu dari 12 kategori bersama dengan warna tempat sampah yang sesuai."