143 lines
5.4 KiB
Python
143 lines
5.4 KiB
Python
import os
|
|
import pickle
|
|
from PIL import Image
|
|
|
|
from train_mm_moco import evaluate_and_plot, compute_tsne, MultiModalMoCo
|
|
import matplotlib.pyplot as plt
|
|
|
|
import torch
|
|
import torch.optim as optim
|
|
from torchvision import transforms
|
|
from torch.utils.data import random_split
|
|
from torch.utils.data import DataLoader, Dataset
|
|
from torch.utils.tensorboard import SummaryWriter
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
|
|
writer = SummaryWriter('runs/mmssl')
|
|
|
|
# Custom dataset
|
|
class CustomMultiModalDataset(Dataset):
|
|
def __init__(self, vision_folder, tactile_folder, transform=None):
|
|
self.vision_folder = vision_folder
|
|
self.tactile_folder = tactile_folder
|
|
self.transform = transform
|
|
|
|
self.vision_files = sorted(os.listdir(vision_folder))
|
|
self.tactile_files = sorted(os.listdir(tactile_folder))
|
|
|
|
def __len__(self):
|
|
return len(self.vision_files)
|
|
|
|
def __getitem__(self, idx):
|
|
vision_path = os.path.join(self.vision_folder, self.vision_files[idx])
|
|
tactile_path = os.path.join(self.tactile_folder, self.tactile_files[idx])
|
|
|
|
vision_image = Image.open(vision_path).convert("RGB")
|
|
tactile_image = Image.open(tactile_path).convert("RGB")
|
|
|
|
if self.transform:
|
|
vision_image = self.transform(vision_image)
|
|
tactile_image = self.transform(tactile_image)
|
|
|
|
return vision_image, tactile_image
|
|
|
|
# Initialize augmentation
|
|
simple_transforms = transforms.Compose([
|
|
transforms.Resize((275, 275)),
|
|
#transforms.CenterCrop(500),
|
|
transforms.ToTensor(),
|
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
|
|
])
|
|
|
|
data_transforms = transforms.Compose([
|
|
transforms.RandomApply([transforms.RandomRotation(150)], p=0.50),
|
|
transforms.RandomResizedCrop(224, scale=(0.2, 1.0)),
|
|
transforms.RandomApply([transforms.RandomHorizontalFlip()], p=0.50),
|
|
#transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
|
|
transforms.RandomGrayscale(p=0.2),
|
|
#transforms.RandomApply([transforms.GaussianBlur(3, sigma=(0.1, 2.0))], p=0.5),
|
|
])
|
|
|
|
# Initialize dataset and dataloader
|
|
vision_folder = "/home/vedant/Downloads/ssvtp_data/images_rgb"
|
|
tactile_folder = "/home/vedant/Downloads/ssvtp_data/images_tac"
|
|
dataset = CustomMultiModalDataset(vision_folder, tactile_folder, transform=simple_transforms)
|
|
|
|
preload = True
|
|
if not preload:
|
|
# Split the dataset into 80-20
|
|
train_size = int(0.8 * len(dataset))
|
|
test_size = len(dataset) - train_size
|
|
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
|
|
|
|
# Get the indices of the training and test sets
|
|
train_indices = train_dataset.indices
|
|
test_indices = test_dataset.indices
|
|
|
|
# Save these indices to disk
|
|
with open('indices/train_indices.pkl', 'wb') as f:
|
|
pickle.dump(train_indices, f)
|
|
|
|
with open('indices/test_indices.pkl', 'wb') as f:
|
|
pickle.dump(test_indices, f)
|
|
|
|
# Initialize dataloaders for train and test
|
|
train_dataloader = DataLoader(train_dataset, batch_size=96, shuffle=True)
|
|
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
|
|
else:
|
|
# Load the indices from disk
|
|
with open('indices/train_indices.pkl', 'rb') as f:
|
|
train_indices = pickle.load(f)
|
|
|
|
with open('indices/test_indices.pkl', 'rb') as f:
|
|
test_indices = pickle.load(f)
|
|
|
|
# Create subset datasets and DataLoaders
|
|
train_subset = torch.utils.data.Subset(dataset, train_indices)
|
|
test_subset = torch.utils.data.Subset(dataset, test_indices)
|
|
|
|
train_dataloader = DataLoader(train_subset, batch_size=96, shuffle=True)
|
|
test_dataloader = DataLoader(test_subset, batch_size=32, shuffle=False)
|
|
|
|
# Initialize model
|
|
model = MultiModalMoCo(writer, K=4096, m=0.99, T=0.07).to(device)
|
|
|
|
# Initialize optimizer
|
|
vision_module = list(model.vision_base_q.parameters()) + list(model.vision_head_intra_q.parameters()) + list(model.vision_head_inter_q.parameters())
|
|
tactile_module = list(model.tactile_base_q.parameters()) + list(model.tactile_head_intra_q.parameters()) + list(model.tactile_head_inter_q.parameters())
|
|
optim_vision = optim.Adam(vision_module, lr=0.1)
|
|
optim_tactile = optim.Adam(tactile_module, lr=0.1)
|
|
|
|
# Training loop
|
|
n_epochs = 500 # Number of epochs
|
|
for epoch in range(n_epochs):
|
|
for i, (x_vision, x_tactile) in enumerate(train_dataloader):
|
|
|
|
# Augment images
|
|
x_vision_q = data_transforms(x_vision).to(device)
|
|
x_vision_k = data_transforms(x_vision).to(device)
|
|
|
|
x_tactile_q = data_transforms(x_tactile).to(device)
|
|
x_tactile_k = data_transforms(x_tactile).to(device)
|
|
|
|
# Forward pass to get the loss
|
|
loss = model(x_vision_q, x_vision_k, x_tactile_q, x_tactile_k, epoch, i, len(train_dataloader))
|
|
|
|
# Backward pass and optimization
|
|
optim_vision.zero_grad()
|
|
optim_tactile.zero_grad()
|
|
loss.backward()
|
|
optim_vision.step()
|
|
optim_tactile.step()
|
|
|
|
# Logging
|
|
if i % 10 == 0:
|
|
print(f"Epoch [{epoch+1}/{n_epochs}], Step [{i+1}/{len(train_dataloader)}], Loss: {loss.item():.4f}")
|
|
writer.add_scalar('training loss', loss.item(), epoch * len(train_dataloader) + i)
|
|
|
|
# Evaluate and plot
|
|
#compute_tsne(model, test_dataloader, writer, epoch)
|
|
#evaluate_and_plot(model, test_dataloader, epoch, writer, device)
|
|
if epoch % 10 == 0:
|
|
torch.save(model.state_dict(), 'models/model.pth') |