import torch import torch.nn as nn import torch.nn.functional as F from torch.distributions.normal import Normal class ObservationEncoder(nn.Module): def __init__(self, obs_shape, state_size, num_layers=4, num_filters=32, stride=None): super().__init__() assert len(obs_shape) == 3 self.state_size = state_size layers = [] for i in range(num_layers): input_channels = obs_shape[0] if i == 0 else output_channels output_channels = num_filters * (2 ** i) layers.append(nn.Conv2d(in_channels=input_channels, out_channels= output_channels, kernel_size=4, stride=2)) layers.append(nn.ReLU()) self.convs = nn.Sequential(*layers) self.fc = nn.Linear(256 * 3 * 3, 2 * state_size) def forward(self, x): x = self.convs(x) x = x.view(x.size(0), -1) x = self.fc(x) # Mean and standard deviation mean, std = torch.chunk(x, 2, dim=-1) std = F.softplus(std) std = torch.clamp(std, min=0.0, max=1e5) # Normal Distribution x = self.reparameterize(mean, std) return x def reparameterize(self, mu, std): eps = torch.randn_like(std) return mu + eps * std class ObservationDecoder(nn.Module): def __init__(self, state_size, output_shape): super().__init__() self.state_size = state_size self.output_shape = output_shape self.input_size = 256 * 3 * 3 self.in_channels = [self.input_size, 256, 128, 64] self.out_channels = [256, 128, 64, 3] if output_shape[1] == 84: self.kernels = [5, 7, 5, 6] self.output_padding = [1, 1, 1, 0] elif output_shape[1] == 64: self.kernels = [5, 5, 6, 6] self.output_padding = [0, 0, 0, 0] self.dense = nn.Linear(state_size, self.input_size) layers = [] for i in range(len(self.kernels)): layers.append(nn.ConvTranspose2d(in_channels=self.in_channels[i], out_channels=self.out_channels[i], kernel_size=self.kernels[i], stride=2, output_padding=self.output_padding[i])) if i!=len(self.kernels)-1: layers.append(nn.ReLU()) self.convtranspose = nn.Sequential(*layers) def forward(self, features): out_batch_shape = features.shape[:-1] out = self.dense(features) out = torch.reshape(out, [-1, self.input_size, 1, 1]) out = self.convtranspose(out) mean = torch.reshape(out, (*out_batch_shape, *self.output_shape)) out_dist = torch.distributions.independent.Independent(torch.distributions.Normal(mean, 1), len(self.output_shape)) return out_dist class TransitionModel(nn.Module): def __init__(self, state_size, hidden_size, action_size, history_size): super().__init__() self.state_size = state_size self.hidden_size = hidden_size self.action_size = action_size self.history_size = history_size self.act_fn = nn.ReLU() self.fc_state_action = nn.Linear(state_size + action_size, hidden_size) self.history_cell = nn.GRUCell(hidden_size + history_size, history_size) self.fc_state_prior = nn.Linear(history_size + state_size + action_size, 2 * state_size) self.fc_state_posterior = nn.Linear(history_size + state_size + action_size, 2 * state_size) def init_states(self, batch_size, device): self.prev_state = torch.zeros(batch_size, self.state_size).to(device) self.prev_action = torch.zeros(batch_size, self.action_size).to(device) self.prev_history = torch.zeros(batch_size, self.history_size).to(device) def get_dist(self, mean, std): distribution = torch.distributions.Normal(mean, std) distribution = torch.distributions.independent.Independent(distribution, 1) return distribution def imagine_step(self, prev_state, prev_action, prev_history): state_action = self.act_fn(self.fc_state_action(torch.cat([prev_state, prev_action], dim=-1))) history = self.history_cell(torch.cat([state_action, prev_history], dim=-1), prev_history) state_prior = self.fc_state_prior(torch.cat([history, prev_state, prev_action], dim=-1)) state_prior_mean, state_prior_std = torch.chunk(state_prior, 2, dim=-1) state_prior_std = F.softplus(state_prior_std) sample_state_prior = self.reparemeterize(state_prior_mean, state_prior_std) prior = {"mean": state_prior_mean, "std": state_prior_std, "sample": sample_state_prior, "history": history} return prior def reparemeterize(self, mean, std): eps = torch.randn_like(std) return mean + eps * std class CLUBSample(nn.Module): # Sampled version of the CLUB estimator def __init__(self, x_dim, y_dim, hidden_size): super(CLUBSample, self).__init__() self.p_mu = nn.Sequential( nn.Linear(x_dim, hidden_size//2), nn.ReLU(), nn.Linear(hidden_size//2, y_dim) ) self.p_logvar = nn.Sequential( nn.Linear(x_dim, hidden_size//2), nn.ReLU(), nn.Linear(hidden_size//2, y_dim), nn.Tanh() ) def get_mu_logvar(self, x_samples): mu = self.p_mu(x_samples) logvar = self.p_logvar(x_samples) return mu, logvar def loglikeli(self, x_samples, y_samples): mu, logvar = self.get_mu_logvar(x_samples) return (-(mu - y_samples)**2 /logvar.exp()-logvar).sum(dim=1).mean(dim=0) def forward(self, x_samples, y_samples): mu, logvar = self.get_mu_logvar(x_samples) sample_size = x_samples.shape[0] #random_index = torch.randint(sample_size, (sample_size,)).long() random_index = torch.randperm(sample_size).long() positive = - (mu - y_samples)**2 / logvar.exp() negative = - (mu - y_samples[random_index])**2 / logvar.exp() upper_bound = (positive.sum(dim = -1) - negative.sum(dim = -1)).mean() return upper_bound/2. def learning_loss(self, x_samples, y_samples): return - self.loglikeli(x_samples, y_samples) if __name__ == "__main__": encoder = ObservationEncoder((12,84,84), 256) x = torch.randn(5000, 12, 84, 84) print(encoder(x).shape) exit() club = CLUBSample(256, 256 , 512) x = torch.randn(100, 256) y = torch.randn(100, 256) print(club.learning_loss(x, y)) x = torch.randn(100, 12, 84, 84) y = torch.randn(100, 12, 84, 84) x_enc = encoder(x) y_enc = encoder(y) print(x_enc.shape) print(y_enc.shape) print(club.learning_loss(x_enc, y_enc))