|
import torch |
|
import math |
|
from torchvision.datasets import ImageFolder |
|
from torch.utils.data import DataLoader |
|
from transformers import ViTFeatureExtractor, ViTForImageClassification |
|
import kagglehub |
|
from torch.optim import AdamW |
|
from transformers import get_scheduler |
|
from tqdm.auto import tqdm |
|
|
|
|
|
gpu_available = torch.cuda.is_available() |
|
print("GPU available:", gpu_available) |
|
|
|
if gpu_available: |
|
print("GPU:", torch.cuda.get_device_name(0)) |
|
print("GPU count:", torch.cuda.device_count()) |
|
print("#memory avail:", torch.cuda.get_device_properties(0).total_memory / (1024 ** 3), "GB") |
|
else: |
|
print("No GPU available.") |
|
|
|
|
|
kaggle_path = kagglehub.dataset_download("mostafaabla/garbage-classification") |
|
|
|
folder_root_path = kaggle_path+'/garbage_classification' |
|
print("Path to dataset files:", kaggle_path) |
|
|
|
ds = ImageFolder(folder_root_path) |
|
indices = torch.randperm(len(ds)).tolist() |
|
n_val = math.floor(len(indices) * .20) |
|
train_ds = torch.utils.data.Subset(ds, indices[:-n_val]) |
|
val_ds = torch.utils.data.Subset(ds, indices[-n_val:]) |
|
|
|
print(ds.classes) |
|
|
|
label2id = {} |
|
id2label = {} |
|
|
|
for i, class_name in enumerate(ds.classes): |
|
label2id[class_name] = str(i) |
|
id2label[str(i)] = class_name |
|
|
|
|
|
class ImageClassificationCollator: |
|
def __init__(self, feature_extractor): |
|
self.feature_extractor = feature_extractor |
|
|
|
def __call__(self, batch): |
|
encodings = self.feature_extractor([x[0] for x in batch], return_tensors='pt') |
|
encodings['labels'] = torch.tensor([x[1] for x in batch], dtype=torch.long) |
|
return encodings |
|
|
|
|
|
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k') |
|
|
|
model = ViTForImageClassification.from_pretrained( |
|
'google/vit-base-patch16-224-in21k', |
|
num_labels=len(label2id), |
|
label2id=label2id, |
|
id2label=id2label |
|
) |
|
collator = ImageClassificationCollator(feature_extractor) |
|
train_loader = DataLoader(train_ds, batch_size=16, collate_fn=collator, num_workers=2, shuffle=True) |
|
val_loader = DataLoader(val_ds, batch_size=16, collate_fn=collator, num_workers=2) |
|
|
|
optimizer = AdamW(model.parameters(), lr=5e-5) |
|
|
|
num_epochs = 10 |
|
num_training_steps = num_epochs * len(train_loader) |
|
lr_scheduler = get_scheduler( |
|
name="linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps |
|
) |
|
|
|
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") |
|
model.to(device) |
|
|
|
|
|
progress_bar = tqdm(range(num_training_steps)) |
|
|
|
model.train() |
|
for epoch in range(num_epochs): |
|
for batch in train_loader: |
|
batch = {k: v.to(device) for k, v in batch.items()} |
|
outputs = model(**batch) |
|
loss = outputs.loss |
|
loss.backward() |
|
|
|
optimizer.step() |
|
lr_scheduler.step() |
|
optimizer.zero_grad() |
|
progress_bar.update(1) |
|
|
|
|
|
model.eval() |
|
for batch in val_loader: |
|
batch = {k: v.to(device) for k, v in batch.items()} |
|
with torch.no_grad(): |
|
outputs = model(**batch) |
|
|
|
logits = outputs.logits |
|
predictions = torch.argmax(logits, dim=-1) |
|
|
|
import os |
|
|
|
save_directory = "./saved_model" |
|
if not os.path.exists(save_directory): |
|
os.makedirs(save_directory) |
|
|
|
model.save_pretrained(save_directory) |
|
feature_extractor.save_pretrained(save_directory) |
|
|
|
print(f"Model saved: {save_directory}") |
|
|