import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.distributions.normal import Normal class ObservationEncoder(nn.Module): def __init__(self, obs_shape, state_size, num_layers=4, num_filters=32, stride=None): super().__init__() assert len(obs_shape) == 3 self.state_size = state_size layers = [] for i in range(num_layers): input_channels = obs_shape[0] if i == 0 else output_channels output_channels = num_filters * (2 ** i) layers.append(nn.Conv2d(in_channels=input_channels, out_channels= output_channels, kernel_size=4, stride=2)) layers.append(nn.LeakyReLU()) self.convs = nn.Sequential(*layers) self.fc = nn.Linear(256 * obs_shape[0], 2 * state_size) # 9 if 3 frames stacked def forward(self, x): x_reshaped = x.reshape(-1, *x.shape[-3:]) x_embed = self.convs(x_reshaped) x_embed = torch.reshape(x_embed, (*x.shape[:-3], -1)) x = self.fc(x_embed) # Mean and standard deviation mean, std = torch.chunk(x, 2, dim=-1) mean = nn.ELU()(mean) std = F.softplus(std) std = torch.clamp(std, min=0.0, max=1e1) # Normal Distribution dist = self.get_dist(mean, std) # Sampling via reparameterization Trick x = self.reparameterize(mean, std) encoded_output = {"sample": x, "distribution": dist} return encoded_output def reparameterize(self, mu, std): eps = torch.randn_like(std) return mu + eps * std def get_dist(self, mean, std): distribution = torch.distributions.Normal(mean, std) distribution = torch.distributions.independent.Independent(distribution, 1) return distribution class ObservationDecoder(nn.Module): def __init__(self, state_size, output_shape): super().__init__() self.state_size = state_size self.output_shape = output_shape self.input_size = 256 * 3 * 3 self.in_channels = [self.input_size, 256, 128, 64] self.out_channels = [256, 128, 64, 9] if output_shape[1] == 84: self.kernels = [5, 7, 5, 6] self.output_padding = [1, 1, 1, 0] elif output_shape[1] == 64: self.kernels = [5, 5, 6, 6] self.output_padding = [0, 0, 0, 0] self.dense = nn.Linear(state_size, self.input_size) layers = [] for i in range(len(self.kernels)): layers.append(nn.ConvTranspose2d(in_channels=self.in_channels[i], out_channels=self.out_channels[i], kernel_size=self.kernels[i], stride=2, output_padding=self.output_padding[i])) if i!=len(self.kernels)-1: layers.append(nn.LeakyReLU()) self.convtranspose = nn.Sequential(*layers) def forward(self, features): out_batch_shape = features.shape[:-1] out = self.dense(features) out = torch.reshape(out, [-1, self.input_size, 1, 1]) out = self.convtranspose(out) mean = torch.reshape(out, (*out_batch_shape, *self.output_shape)) out_dist = torch.distributions.independent.Independent(torch.distributions.Normal(mean, 1), len(self.output_shape)) return out_dist class Actor(nn.Module): def __init__(self, state_size, hidden_size, action_size, num_layers=4, min_std=1e-4, init_std=5, mean_scale=5): super().__init__() self.state_size = state_size self.hidden_size = hidden_size self.action_size = action_size self.num_layers = num_layers self._min_std = min_std self._init_std = init_std self._mean_scale = mean_scale layers = [] for i in range(self.num_layers): input_channels = state_size if i == 0 else self.hidden_size layers.append(nn.Linear(input_channels, self.hidden_size)) layers.append(nn.ReLU()) layers.append(nn.Linear(self.hidden_size, 2*self.action_size)) self.action_model = nn.Sequential(*layers) def get_dist(self, mean, std): distribution = torch.distributions.Normal(mean, std) distribution = torch.distributions.transformed_distribution.TransformedDistribution(distribution, TanhBijector()) distribution = torch.distributions.independent.Independent(distribution, 1) distribution = SampleDist(distribution) return distribution def add_exploration(self, action, action_noise=0.3): return torch.clamp(torch.distributions.Normal(action, action_noise).rsample(), -1, 1) def forward(self, features): out = self.action_model(features) mean, std = torch.chunk(out, 2, dim=-1) raw_init_std = np.log(np.exp(self._init_std) - 1) action_mean = self._mean_scale * torch.tanh(mean / self._mean_scale) action_std = F.softplus(std + raw_init_std) + self._min_std dist = self.get_dist(action_mean, action_std) sample = dist.rsample() #self.reparameterize(action_mean, action_std) return sample def reparameterize(self, mu, std): eps = torch.randn_like(std) return mu + eps * std class ValueModel(nn.Module): def __init__(self, state_size, hidden_size, num_layers=4): super().__init__() self.state_size = state_size self.hidden_size = hidden_size self.num_layers = num_layers layers = [] for i in range(self.num_layers-1): input_channels = state_size if i == 0 else self.hidden_size output_channels = self.hidden_size layers.append(nn.Linear(input_channels, output_channels)) layers.append(nn.LeakyReLU()) layers.append(nn.Linear(self.hidden_size, int(np.prod(1)))) self.value_model = nn.Sequential(*layers) def forward(self, state): value = self.value_model(state) value_dist = torch.distributions.independent.Independent(torch.distributions.Normal(value, 1), 1) return value_dist class RewardModel(nn.Module): def __init__(self, state_size, hidden_size): super().__init__() self.reward_model = nn.Sequential( nn.Linear(state_size, hidden_size), nn.LeakyReLU(), nn.Linear(hidden_size, hidden_size), nn.LeakyReLU(), nn.Linear(hidden_size, 1) ) def forward(self, state): reward = self.reward_model(state) return torch.distributions.independent.Independent( torch.distributions.Normal(reward, 1), 1) """ class TransitionModel(nn.Module): def __init__(self, state_size, hidden_size, action_size, history_size): super().__init__() self.state_size = state_size self.hidden_size = hidden_size self.action_size = action_size self.history_size = history_size self.act_fn = nn.LeakyReLU() self.fc_state_action = nn.Linear(state_size + action_size, hidden_size) self.ln = nn.LayerNorm(hidden_size) self.history_cell = nn.GRUCell(hidden_size + history_size, history_size) self.fc_state_prior = nn.Linear(history_size + state_size + action_size, 2 * state_size) self.fc_state_posterior = nn.Linear(history_size + state_size + action_size, 2 * state_size) def init_states(self, batch_size, device): self.prev_state = torch.zeros(batch_size, self.state_size).to(device) self.prev_action = torch.zeros(batch_size, self.action_size).to(device) self.prev_history = torch.zeros(batch_size, self.history_size).to(device) def get_dist(self, mean, std): distribution = torch.distributions.Normal(mean, std) distribution = torch.distributions.independent.Independent(distribution, 1) return distribution def stack_states(self, states, dim=0): s = dict( mean = torch.stack([state['mean'] for state in states], dim=dim), std = torch.stack([state['std'] for state in states], dim=dim), sample = torch.stack([state['sample'] for state in states], dim=dim), history = torch.stack([state['history'] for state in states], dim=dim),) if 'distribution' in states: dist = dict(distribution = [state['distribution'] for state in states]) s.update(dist) return s def seq_to_batch(self, state, name): return dict( sample = torch.reshape(state[name], (state[name].shape[0]* state[name].shape[1], *state[name].shape[2:]))) def imagine_step(self, state, action, history): state_action = self.ln(self.act_fn(self.fc_state_action(torch.cat([state, action], dim=-1)))) imag_hist = self.history_cell(torch.cat([state_action, history], dim=-1), history) state_prior = self.fc_state_prior(torch.cat([imag_hist, state, action], dim=-1)) state_prior_mean, state_prior_std = torch.chunk(state_prior, 2, dim=-1) state_prior_std = F.softplus(state_prior_std) # Normal Distribution state_prior_dist = self.get_dist(state_prior_mean, state_prior_std) # Sampling via reparameterization Trick sample_state_prior = self.reparemeterize(state_prior_mean, state_prior_std) prior = {"mean": state_prior_mean, "std": state_prior_std, "sample": sample_state_prior, "history": imag_hist, "distribution": state_prior_dist} return prior def imagine_rollout(self, state, action, history, horizon): imagined_priors = [] for i in range(horizon): prior = self.imagine_step(state, action, history) state = prior["sample"] history = prior["history"] imagined_priors.append(prior) imagined_priors = self.stack_states(imagined_priors, dim=0) return imagined_priors def observe_step(self, prev_state, prev_action, prev_history, nonterms): state_action = self.ln(self.act_fn(self.fc_state_action(torch.cat([prev_state, prev_action], dim=-1)))) current_history = self.history_cell(torch.cat([state_action, prev_history], dim=-1), prev_history) state_prior = self.fc_state_prior(torch.cat([prev_history, prev_state, prev_action], dim=-1)) state_prior_mean, state_prior_std = torch.chunk(state_prior*nonterms, 2, dim=-1) state_prior_std = F.softplus(state_prior_std) + 0.1 sample_state_prior = state_prior_mean + torch.randn_like(state_prior_mean) * state_prior_std prior = {"mean": state_prior_mean, "std": state_prior_std, "sample": sample_state_prior, "history": current_history} return prior def observe_rollout(self, rollout_states, rollout_actions, init_history, nonterms): observed_rollout = [] for i in range(rollout_states.shape[0]): actions = rollout_actions[i] * nonterms[i] prior = self.observe_step(rollout_states[i], actions, init_history, nonterms[i]) init_history = prior["history"] observed_rollout.append(prior) observed_rollout = self.stack_states(observed_rollout, dim=0) return observed_rollout def reparemeterize(self, mean, std): eps = torch.randn_like(std) return mean + eps * std """ class TransitionModel(nn.Module): def __init__(self, state_size, hidden_size, action_size, history_size): super().__init__() self.state_size = state_size self.hidden_size = hidden_size self.action_size = action_size self.history_size = history_size self.act_fn = nn.ELU() self.fc_state_action = nn.Linear(state_size + action_size, hidden_size) self.history_cell = nn.GRUCell(hidden_size, history_size) self.fc_state_mu = nn.Linear(history_size + hidden_size, state_size) self.fc_state_sigma = nn.Linear(history_size + hidden_size, state_size) self.batch_norm = nn.BatchNorm1d(hidden_size) self.batch_norm2 = nn.BatchNorm1d(state_size) self.min_sigma = 1e-4 self.max_sigma = 1e0 def init_states(self, batch_size, device): self.prev_state = torch.zeros(batch_size, self.state_size).to(device) self.prev_action = torch.zeros(batch_size, self.action_size).to(device) self.prev_history = torch.zeros(batch_size, self.history_size).to(device) def get_dist(self, mean, std): distribution = torch.distributions.Normal(mean, std) distribution = torch.distributions.independent.Independent(distribution, 1) return distribution def stack_states(self, states, dim=0): s = dict( mean = torch.stack([state['mean'] for state in states], dim=dim), std = torch.stack([state['std'] for state in states], dim=dim), sample = torch.stack([state['sample'] for state in states], dim=dim), history = torch.stack([state['history'] for state in states], dim=dim),) if 'distribution' in states: dist = dict(distribution = [state['distribution'] for state in states]) s.update(dist) return s def seq_to_batch(self, state, name): return dict( sample = torch.reshape(state[name], (state[name].shape[0]* state[name].shape[1], *state[name].shape[2:]))) def imagine_step(self, state, action, history): next_state_action_enc = self.act_fn(self.batch_norm(self.fc_state_action(torch.cat([state, action], dim=-1)))) imag_history = self.history_cell(next_state_action_enc, history) next_state_mu = self.act_fn(self.batch_norm2(self.fc_state_mu(torch.cat([next_state_action_enc, imag_history], dim=-1)))) next_state_sigma = torch.sigmoid(self.fc_state_sigma(torch.cat([next_state_action_enc, imag_history], dim=-1))) next_state_sigma = self.min_sigma + (self.max_sigma - self.min_sigma) * next_state_sigma # Normal Distribution next_state_dist = self.get_dist(next_state_mu, next_state_sigma) next_state_sample = self.reparemeterize(next_state_mu, next_state_sigma) prior = {"mean": next_state_mu, "std": next_state_sigma, "sample": next_state_sample, "history": imag_history, "distribution": next_state_dist} return prior def imagine_rollout(self, state, action, history, horizon): imagined_priors = [] for i in range(horizon): prior = self.imagine_step(state, action, history) state = prior["sample"] history = prior["history"] imagined_priors.append(prior) imagined_priors = self.stack_states(imagined_priors, dim=0) return imagined_priors def observe_step(self, prev_state, prev_action, prev_history): state_action_enc = self.act_fn(self.batch_norm(self.fc_state_action(torch.cat([prev_state, prev_action], dim=-1)))) current_history = self.history_cell(state_action_enc, prev_history) state_mu = self.act_fn(self.batch_norm2(self.fc_state_mu(torch.cat([state_action_enc, prev_history], dim=-1)))) state_sigma = F.softplus(self.fc_state_sigma(torch.cat([state_action_enc, prev_history], dim=-1))) sample_state = state_mu + torch.randn_like(state_mu) * state_sigma state_enc = {"mean": state_mu, "std": state_sigma, "sample": sample_state, "history": current_history} return state_enc def observe_rollout(self, rollout_states, rollout_actions, init_history, nonterms): observed_rollout = [] for i in range(rollout_states.shape[0]): rollout_states_ = rollout_states[i] rollout_actions_ = rollout_actions[i] init_history_ = nonterms[i] * init_history state_enc = self.observe_step(rollout_states_, rollout_actions_, init_history_) init_history = state_enc["history"] observed_rollout.append(state_enc) observed_rollout = self.stack_states(observed_rollout, dim=0) return observed_rollout def reparemeterize(self, mean, std): eps = torch.randn_like(mean) return mean + eps * std class TanhBijector(torch.distributions.Transform): def __init__(self): super().__init__() self.bijective = True self.domain = torch.distributions.constraints.real self.codomain = torch.distributions.constraints.interval(-1.0, 1.0) @property def sign(self): return 1. def _call(self, x): return torch.tanh(x) def atanh(self, x): return 0.5 * torch.log((1 + x) / (1 - x)) def _inverse(self, y: torch.Tensor): y = torch.where( (torch.abs(y) <= 1.), torch.clamp(y, -0.99999997, 0.99999997), y) y = self.atanh(y) return y def log_abs_det_jacobian(self, x, y): #return 2. * (np.log(2) - x - F.softplus(-2. * x)) return 2.0 * (torch.log(torch.tensor([2.0])) - x - F.softplus(-2.0 * x)) class ProjectionHead(nn.Module): def __init__(self, state_size, action_size, hidden_size): super(ProjectionHead, self).__init__() self.state_size = state_size self.action_size = action_size self.hidden_size = hidden_size self.projection_model = nn.Sequential( nn.Linear(state_size + action_size, hidden_size), nn.LayerNorm(hidden_size), nn.LeakyReLU(), nn.Linear(hidden_size, hidden_size), nn.LayerNorm(hidden_size), ) def forward(self, state, action): x = torch.cat([state, action], dim=-1) x = self.projection_model(x) return x class ContrastiveHead(nn.Module): def __init__(self, hidden_size, temperature=1): super(ContrastiveHead, self).__init__() self.hidden_size = hidden_size self.temperature = temperature self.W = nn.Parameter(torch.rand(self.hidden_size, self.hidden_size)) def forward(self, z_a, z_pos): Wz = torch.matmul(self.W, z_pos.T) # (z_dim,B) logits = torch.matmul(z_a, Wz) # (B,B) logits = logits - torch.max(logits, 1)[0][:, None] logits = logits * self.temperature return logits """ class CLUBSample(nn.Module): # Sampled version of the CLUB estimator def __init__(self, last_states, current_states, negative_current_states, predicted_current_states): super(CLUBSample, self).__init__() self.last_states = last_states self.current_states = current_states self.negative_current_states = negative_current_states self.predicted_current_states = predicted_current_states def get_mu_var_samples(self, state_dict): dist = state_dict["distribution"] sample = state_dict["sample"] #dist.sample() # Use state_dict["sample"] if you want to use the same sample for all the losses mu = dist.mean var = dist.variance return mu.detach(), var.detach(), sample.detach() def loglikeli(self): _, _, pred_sample = self.get_mu_var_samples(self.predicted_current_states) mu_curr, var_curr, _ = self.get_mu_var_samples(self.current_states) logvar_curr = torch.log(var_curr) return (-(mu_curr - pred_sample)**2 /var_curr-logvar_curr).sum(dim=1).mean(dim=0) def forward(self): _, _, pred_sample = self.get_mu_var_samples(self.predicted_current_states) mu_curr, var_curr, _ = self.get_mu_var_samples(self.current_states) mu_neg, var_neg, _ = self.get_mu_var_samples(self.negative_current_states) sample_size = pred_sample.shape[0] random_index = torch.randperm(sample_size).long() pos = (-(mu_curr - pred_sample)**2 /var_curr).sum(dim=1).mean(dim=0) #neg = (-(mu_curr - pred_sample[random_index])**2 /var_curr).sum(dim=1).mean(dim=0) neg = (-(mu_neg - pred_sample)**2 /var_neg).sum(dim=1).mean(dim=0) upper_bound = pos - neg return upper_bound/2 def learning_loss(self): return - self.loglikeli() """ class CLUBSample(nn.Module): # Sampled version of the CLUB estimator def __init__(self, x_dim, y_dim, hidden_size): super(CLUBSample, self).__init__() self.p_mu = nn.Sequential(nn.Linear(x_dim, hidden_size//2), nn.ReLU(), nn.Linear(hidden_size//2, y_dim)) self.p_logvar = nn.Sequential(nn.Linear(x_dim, hidden_size//2), nn.ReLU(), nn.Linear(hidden_size//2, y_dim), nn.Tanh()) def get_mu_logvar(self, x_samples): mu = self.p_mu(x_samples) logvar = self.p_logvar(x_samples) return mu, logvar def loglikeli(self, x_samples, y_samples): mu, logvar = self.get_mu_logvar(x_samples) return (-(mu - y_samples)**2 /logvar.exp()-logvar).sum(dim=1).mean(dim=0) def forward(self, x_samples, y_samples, y_negatives): mu, logvar = self.get_mu_logvar(x_samples) sample_size = x_samples.shape[0] #random_index = torch.randint(sample_size, (sample_size,)).long() random_index = torch.randperm(sample_size).long() positive = -(mu - y_samples)**2 / logvar.exp() #negative = - (mu - y_samples[random_index])**2 / logvar.exp() negative = -(mu - y_negatives)**2 / logvar.exp() upper_bound = (positive.sum(dim = -1) - negative.sum(dim = -1)).mean() return upper_bound/2. def learning_loss(self, x_samples, y_samples): return -self.loglikeli(x_samples, y_samples) class QFunction(nn.Module): """MLP for q-function.""" def __init__(self, obs_dim, action_dim, hidden_dim): super().__init__() self.trunk = nn.Sequential( nn.Linear(obs_dim + action_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1) ) def forward(self, obs, action): assert obs.size(0) == action.size(0) obs_action = torch.cat([obs, action], dim=1) return self.trunk(obs_action) class Critic(nn.Module): """Critic network, employes two q-functions.""" def __init__( self, obs_shape, action_shape, hidden_dim, encoder_feature_dim): super().__init__() self.Q1 = QFunction( self.encoder.feature_dim, action_shape[0], hidden_dim ) self.Q2 = QFunction( self.encoder.feature_dim, action_shape[0], hidden_dim ) self.outputs = dict() def forward(self, obs, action, detach_encoder=False): # detach_encoder allows to stop gradient propogation to encoder obs = self.encoder(obs, detach=detach_encoder) q1 = self.Q1(obs, action) q2 = self.Q2(obs, action) self.outputs['q1'] = q1 self.outputs['q2'] = q2 return q1, q2 class SampleDist: def __init__(self, dist, samples=100): self._dist = dist self._samples = samples @property def name(self): return 'SampleDist' def __getattr__(self, name): return getattr(self._dist, name) def mean(self): sample = self._dist.rsample(self._samples) return torch.mean(sample, 0) def mode(self): dist = self._dist.expand((self._samples, *self._dist.batch_shape)) sample = dist.rsample() logprob = dist.log_prob(sample) batch_size = sample.size(1) feature_size = sample.size(2) indices = torch.argmax(logprob, dim=0).reshape(1, batch_size, 1).expand(1, batch_size, feature_size) return torch.gather(sample, 0, indices).squeeze(0) def entropy(self): dist = self._dist.expand((self._samples, *self._dist.batch_shape)) sample = dist.rsample() logprob = dist.log_prob(sample) return -torch.mean(logprob, 0) def sample(self): return self._dist.sample() if "__name__ == __main__": tr = TransitionModel(50, 512, 1, 256)