Skip to content
Snippets Groups Projects
Commit 6ce76635 authored by Tobias Seibel's avatar Tobias Seibel
Browse files

fully functional unet + implementation in pipeline

parent 38147ff1
Branches
No related tags found
No related merge requests found
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from trainer.train import * from trainer.train import *
from dataloader.load import * from dataloader.load import *
from models.Framework import * from models.Framework import *
from models.unet_unconditional_diffusion import * from models.all_unets import *
import torch import torch
from torch import nn from torch import nn
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Prepare experiment # Prepare experiment
1. Choose Hyperparameter Settings 1. Choose Hyperparameter Settings
2. Run notebook on local maschine to generate experiment folder with the JSON files containing the settings 2. Run notebook on local maschine to generate experiment folder with the JSON files containing the settings
3. scp experiment folder to the HPC 3. scp experiment folder to the HPC
4. Run Pipeline by adding following to batch file: 4. Run Pipeline by adding following to batch file:
- Train Model: &emsp;&emsp;&emsp;&emsp;&emsp; `python main.py train "<absolute path of experiment folder in hpc>"` - Train Model: &emsp;&emsp;&emsp;&emsp;&emsp; `python main.py train "<absolute path of experiment folder in hpc>"`
- Sample Images: &emsp;&emsp;&emsp; `python main.py sample "<absolute path of experiment folder in hpc>"` - Sample Images: &emsp;&emsp;&emsp; `python main.py sample "<absolute path of experiment folder in hpc>"`
- Evaluate Model: &emsp;&emsp;&emsp; `python main.py evaluate "<absolute path of experiment folder in hpc>"` - Evaluate Model: &emsp;&emsp;&emsp; `python main.py evaluate "<absolute path of experiment folder in hpc>"`
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import torch import torch
#### ####
# Settings # Settings
#### ####
# Dataset path # Dataset path
datapath = "/work/lect0100/lhq_256" datapath = "/work/lect0100/lhq_256"
# Experiment setup # Experiment setup
run_name = 'batch_timesteps' # WANDB and experiment folder Name! run_name = 'batch_timesteps' # WANDB and experiment folder Name!
checkpoint = None #'model_epoch_8.pth' # Name of checkpoint pth file or None checkpoint = None #'model_epoch_8.pth' # Name of checkpoint pth file or None
experiment_path = '/work/lect0100/experiments_gonzalo/'+ run_name +'/' experiment_path = '/work/lect0100/experiments_gonzalo/'+ run_name +'/'
# Path to save generated experiment folder on local machine # Path to save generated experiment folder on local machine
local_path ="/Users/gonzalo/Desktop/" + run_name + '/settings' local_path ="/Users/gonzalo/Desktop/" + run_name + '/settings'
# Diffusion Model Settings # Diffusion Model Settings
diffusion_steps = 200 diffusion_steps = 200
image_size = 64 image_size = 64
channels = 3 channels = 3
# Training # Training
batchsize = 32 batchsize = 32
epochs = 30 epochs = 30
store_iter = 1 store_iter = 1
eval_iter = 500 eval_iter = 500
learning_rate = 0.0001 learning_rate = 0.0001
optimizername = "torch.optim.AdamW" optimizername = "torch.optim.AdamW"
optimizer_params = None optimizer_params = None
verbose = True verbose = True
# checkpoint = None #(If no checkpoint training, ie. random weights) # checkpoint = None #(If no checkpoint training, ie. random weights)
# Sampling # Sampling
sample_size = 10 sample_size = 10
intermediate = False # True if you want to sample one image and all ist intermediate latents intermediate = False # True if you want to sample one image and all ist intermediate latents
# Evaluating # Evaluating
... ...
### ###
# Advanced Settings Dictionaries # Advanced Settings Dictionaries
### ###
meta_setting = dict(modelname = "UNet_Unconditional_Diffusion_Bottleneck_Variant", meta_setting = dict(modelname = "UNet_Res",
dataset = "UnconditionalDataset", dataset = "UnconditionalDataset",
framework = "DDPM", framework = "DDPM",
trainloop_function = "ddpm_trainer", trainloop_function = "ddpm_trainer",
sampling_function = 'ddpm_sampler', sampling_function = 'ddpm_sampler',
evaluation_function = 'ddpm_evaluator', evaluation_function = 'ddpm_evaluator',
batchsize = batchsize batchsize = batchsize
) )
dataset_setting = dict(fpath = datapath, dataset_setting = dict(fpath = datapath,
img_size = image_size, img_size = image_size,
frac =0.8, frac =0.8,
skip_first_n = 0, skip_first_n = 0,
ext = ".png", ext = ".png",
transform=True transform=True
) )
model_setting = dict( n_channels=64,
fctr = [1,2,4,4,8],
time_dim=256,
)
"""
outdated
model_setting = dict( channels_in=channels, model_setting = dict( channels_in=channels,
channels_out =channels , channels_out =channels ,
activation='relu', # activation function. Options: {'relu', 'leakyrelu', 'selu', 'gelu', 'silu'/'swish'} activation='relu', # activation function. Options: {'relu', 'leakyrelu', 'selu', 'gelu', 'silu'/'swish'}
weight_init='he', # weight initialization. Options: {'he', 'torch'} weight_init='he', # weight initialization. Options: {'he', 'torch'}
projection_features=64, # number of image features after first convolution layer projection_features=64, # number of image features after first convolution layer
time_dim=batchsize, #dont chnage!!! time_dim=batchsize, #dont chnage!!!
time_channels=diffusion_steps, # number of time channels #TODO same as diffusion steps? time_channels=diffusion_steps, # number of time channels #TODO same as diffusion steps?
num_stages=4, # number of stages in contracting/expansive path num_stages=4, # number of stages in contracting/expansive path
stage_list=None, # specify number of features produced by stages stage_list=None, # specify number of features produced by stages
num_blocks=1, # number of ConvResBlock in each contracting/expansive path num_blocks=1, # number of ConvResBlock in each contracting/expansive path
num_groupnorm_groups=32, # number of groups used in Group Normalization inside a ConvResBlock num_groupnorm_groups=32, # number of groups used in Group Normalization inside a ConvResBlock
dropout=0.1, # drop-out to be applied inside a ConvResBlock dropout=0.1, # drop-out to be applied inside a ConvResBlock
attention_list=None, # specify MHA pattern across stages attention_list=None, # specify MHA pattern across stages
num_attention_heads=1, num_attention_heads=1,
) )
"""
framework_setting = dict( framework_setting = dict(
diffusion_steps = diffusion_steps, # dont change!! diffusion_steps = diffusion_steps, # dont change!!
out_shape = (channels,image_size,image_size), # dont change!! out_shape = (channels,image_size,image_size), # dont change!!
noise_schedule = 'linear', noise_schedule = 'linear',
beta_1 = 1e-4, beta_1 = 1e-4,
beta_T = 0.02, beta_T = 0.02,
alpha_bar_lower_bound = 0.9, alpha_bar_lower_bound = 0.9,
var_schedule = 'same', var_schedule = 'same',
kl_loss = 'simplified', kl_loss = 'simplified',
recon_loss = 'nll', recon_loss = 'nll',
) )
training_setting = dict( training_setting = dict(
epochs = epochs, epochs = epochs,
store_iter = store_iter, store_iter = store_iter,
eval_iter = eval_iter, eval_iter = eval_iter,
optimizer_class=optimizername, optimizer_class=optimizername,
optimizer_params = optimizer_params, optimizer_params = optimizer_params,
#optimizer_params=dict(lr=learning_rate), # don't change! #optimizer_params=dict(lr=learning_rate), # don't change!
learning_rate = learning_rate, learning_rate = learning_rate,
run_name=run_name, run_name=run_name,
checkpoint= checkpoint, checkpoint= checkpoint,
experiment_path = experiment_path, experiment_path = experiment_path,
verbose = verbose, verbose = verbose,
T_max = 5*10000, # cosine lr param T_max = 5*10000, # cosine lr param
eta_min= 1e-5, # cosine lr param eta_min= 1e-5, # cosine lr param
) )
sampling_setting = dict( sampling_setting = dict(
checkpoint = checkpoint, checkpoint = checkpoint,
experiment_path = experiment_path, experiment_path = experiment_path,
batch_size = sample_size, batch_size = sample_size,
intermediate = intermediate intermediate = intermediate
) )
# TODO # TODO
evaluation_setting = dict( evaluation_setting = dict(
checkpoint = checkpoint, checkpoint = checkpoint,
experiment_path = experiment_path, experiment_path = experiment_path,
) )
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import os import os
import json import json
f = local_path f = local_path
if os.path.exists(f): if os.path.exists(f):
print("path already exists, pick a new name!") print("path already exists, pick a new name!")
print("break") print("break")
else: else:
print("create folder") print("create folder")
#os.mkdir(f) #os.mkdir(f)
os.makedirs(f, exist_ok=True) os.makedirs(f, exist_ok=True)
print("folder created ") print("folder created ")
with open(f+"/meta_setting.json","w+") as fp: with open(f+"/meta_setting.json","w+") as fp:
json.dump(meta_setting,fp) json.dump(meta_setting,fp)
with open(f+"/dataset_setting.json","w+") as fp: with open(f+"/dataset_setting.json","w+") as fp:
json.dump(dataset_setting,fp) json.dump(dataset_setting,fp)
with open(f+"/model_setting.json","w+") as fp: with open(f+"/model_setting.json","w+") as fp:
json.dump(model_setting,fp) json.dump(model_setting,fp)
with open(f+"/framework_setting.json","w+") as fp: with open(f+"/framework_setting.json","w+") as fp:
json.dump(framework_setting,fp) json.dump(framework_setting,fp)
with open(f+"/training_setting.json","w+") as fp: with open(f+"/training_setting.json","w+") as fp:
json.dump(training_setting,fp) json.dump(training_setting,fp)
with open(f+"/sampling_setting.json","w+") as fp: with open(f+"/sampling_setting.json","w+") as fp:
json.dump(sampling_setting,fp) json.dump(sampling_setting,fp)
with open(f+"/evaluation_setting.json","w+") as fp: with open(f+"/evaluation_setting.json","w+") as fp:
json.dump(evaluation_setting,fp) json.dump(evaluation_setting,fp)
print("stored json files in folder") print("stored json files in folder")
print(meta_setting) print(meta_setting)
print(dataset_setting) print(dataset_setting)
print(model_setting) print(model_setting)
print(framework_setting) print(framework_setting)
print(training_setting) print(training_setting)
print(sampling_setting) print(sampling_setting)
print(evaluation_setting) print(evaluation_setting)
``` ```
%% Output %% Output
create folder create folder
folder created folder created
stored json files in folder stored json files in folder
{'modelname': 'UNet_Unconditional_Diffusion_Bottleneck_Variant', 'dataset': 'UnconditionalDataset', 'framework': 'DDPM', 'trainloop_function': 'ddpm_trainer', 'sampling_function': 'ddpm_sampler', 'evaluation_function': 'ddpm_evaluator', 'batchsize': 32} {'modelname': 'UNet_Unconditional_Diffusion_Bottleneck_Variant', 'dataset': 'UnconditionalDataset', 'framework': 'DDPM', 'trainloop_function': 'ddpm_trainer', 'sampling_function': 'ddpm_sampler', 'evaluation_function': 'ddpm_evaluator', 'batchsize': 32}
{'fpath': '/work/lect0100/lhq_256', 'img_size': 64, 'frac': 0.8, 'skip_first_n': 0, 'ext': '.png', 'transform': True} {'fpath': '/work/lect0100/lhq_256', 'img_size': 64, 'frac': 0.8, 'skip_first_n': 0, 'ext': '.png', 'transform': True}
{'channels_in': 3, 'channels_out': 3, 'activation': 'relu', 'weight_init': 'he', 'projection_features': 64, 'time_dim': 32, 'time_channels': 200, 'num_stages': 4, 'stage_list': None, 'num_blocks': 1, 'num_groupnorm_groups': 32, 'dropout': 0.1, 'attention_list': None, 'num_attention_heads': 1} {'channels_in': 3, 'channels_out': 3, 'activation': 'relu', 'weight_init': 'he', 'projection_features': 64, 'time_dim': 32, 'time_channels': 200, 'num_stages': 4, 'stage_list': None, 'num_blocks': 1, 'num_groupnorm_groups': 32, 'dropout': 0.1, 'attention_list': None, 'num_attention_heads': 1}
{'diffusion_steps': 200, 'out_shape': (3, 64, 64), 'noise_schedule': 'linear', 'beta_1': 0.0001, 'beta_T': 0.02, 'alpha_bar_lower_bound': 0.9, 'var_schedule': 'same', 'kl_loss': 'simplified', 'recon_loss': 'nll'} {'diffusion_steps': 200, 'out_shape': (3, 64, 64), 'noise_schedule': 'linear', 'beta_1': 0.0001, 'beta_T': 0.02, 'alpha_bar_lower_bound': 0.9, 'var_schedule': 'same', 'kl_loss': 'simplified', 'recon_loss': 'nll'}
{'epochs': 30, 'store_iter': 1, 'eval_iter': 500, 'optimizer_class': 'torch.optim.AdamW', 'optimizer_params': None, 'learning_rate': 0.0001, 'run_name': 'batch_timesteps', 'checkpoint': None, 'experiment_path': '/work/lect0100/experiments_gonzalo/batch_timesteps/', 'verbose': True} {'epochs': 30, 'store_iter': 1, 'eval_iter': 500, 'optimizer_class': 'torch.optim.AdamW', 'optimizer_params': None, 'learning_rate': 0.0001, 'run_name': 'batch_timesteps', 'checkpoint': None, 'experiment_path': '/work/lect0100/experiments_gonzalo/batch_timesteps/', 'verbose': True}
{'checkpoint': None, 'experiment_path': '/work/lect0100/experiments_gonzalo/batch_timesteps/', 'batch_size': 10, 'intermediate': False} {'checkpoint': None, 'experiment_path': '/work/lect0100/experiments_gonzalo/batch_timesteps/', 'batch_size': 10, 'intermediate': False}
{'checkpoint': None, 'experiment_path': '/work/lect0100/experiments_gonzalo/batch_timesteps/'} {'checkpoint': None, 'experiment_path': '/work/lect0100/experiments_gonzalo/batch_timesteps/'}
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
``` ```
......
from diffusers import UNet2DModel
import json
import sys
from dataloader.load import *
from models.Framework import *
from trainer.train import ddpm_trainer from trainer.train import ddpm_trainer
from evaluation.sample import ddpm_sampler from evaluation.sample import ddpm_sampler
from evaluation.evaluate import ddpm_evaluator from evaluation.evaluate import ddpm_evaluator
from models.all_unets import *
import torch
def train_func(f): def train_func(f):
#load all settings #load all settings
...@@ -33,32 +40,10 @@ def train_func(f): ...@@ -33,32 +40,10 @@ def train_func(f):
test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=batchsize) test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=batchsize)
#model = globals()[meta_setting["modelname"]](**model_setting).to(device) net = globals()[meta_setting["modelname"]](**model_setting).to(device)
#net = torch.compile(model) #net = torch.compile(net)
net = UNet2DModel(
sample_size=64,
in_channels=3,
out_channels=3,
layers_per_block=2,
block_out_channels=(128, 128, 256, 256, 512, 512),
down_block_types=(
"DownBlock2D",
"DownBlock2D",
"DownBlock2D",
"DownBlock2D",
"AttnDownBlock2D",
"DownBlock2D",
),
up_block_types=(
"UpBlock2D",
"AttnUpBlock2D",
"UpBlock2D",
"UpBlock2D",
"UpBlock2D",
"UpBlock2D",
),
)
net = net.to(device) net = net.to(device)
framework = globals()[meta_setting["framework"]](net = net,device=device, **framework_setting) framework = globals()[meta_setting["framework"]](net = net,device=device, **framework_setting)
print(f"META SETTINGS:\n\n {meta_setting}\n\n") print(f"META SETTINGS:\n\n {meta_setting}\n\n")
...@@ -92,31 +77,8 @@ def sample_func(f): ...@@ -92,31 +77,8 @@ def sample_func(f):
# init Unet # init Unet
batchsize = meta_setting["batchsize"] batchsize = meta_setting["batchsize"]
#model = globals()[meta_setting["modelname"]](**model_setting).to(device) net = globals()[meta_setting["modelname"]](**model_setting).to(device)
#net = torch.compile(model) #net = torch.compile(net)
net = UNet2DModel(
sample_size=64,
in_channels=3,
out_channels=3,
layers_per_block=2,
block_out_channels=(128, 128, 256, 256, 512, 512),
down_block_types=(
"DownBlock2D",
"DownBlock2D",
"DownBlock2D",
"DownBlock2D",
"AttnDownBlock2D",
"DownBlock2D",
),
up_block_types=(
"UpBlock2D",
"AttnUpBlock2D",
"UpBlock2D",
"UpBlock2D",
"UpBlock2D",
"UpBlock2D",
),
)
net = net.to(device) net = net.to(device)
# init unconditional diffusion model # init unconditional diffusion model
framework = globals()[meta_setting["framework"]](net = net,device=device, **framework_setting) framework = globals()[meta_setting["framework"]](net = net,device=device, **framework_setting)
...@@ -158,31 +120,8 @@ def evaluate_func(f): ...@@ -158,31 +120,8 @@ def evaluate_func(f):
test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=batchsize) test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size=batchsize)
# init Unet # init Unet
#model = globals()[meta_setting["modelname"]](**model_setting).to(device) net = globals()[meta_setting["modelname"]](**model_setting).to(device)
#net = torch.compile(model) #net = torch.compile(net)
net = UNet2DModel(
sample_size=64,
in_channels=3,
out_channels=3,
layers_per_block=2,
block_out_channels=(128, 128, 256, 256, 512, 512),
down_block_types=(
"DownBlock2D",
"DownBlock2D",
"DownBlock2D",
"DownBlock2D",
"AttnDownBlock2D",
"DownBlock2D",
),
up_block_types=(
"UpBlock2D",
"AttnUpBlock2D",
"UpBlock2D",
"UpBlock2D",
"UpBlock2D",
"UpBlock2D",
),
)
net = net.to(device) net = net.to(device)
# init unconditional diffusion model # init unconditional diffusion model
...@@ -202,29 +141,11 @@ def evaluate_func(f): ...@@ -202,29 +141,11 @@ def evaluate_func(f):
def pipeline_func(f):
# TODO
#train_func(f)
generate_func(f)
#evaluate_func(f)
def hello(name):
print(f'Hello {name}!')
if __name__ == '__main__': if __name__ == '__main__':
import json
import sys
from trainer.train import *
from dataloader.load import *
from models.Framework import *
from models.unet_unconditional_diffusion import *
from models.unet import UNet
import torch
from torch import nn
print(sys.argv) print(sys.argv)
functions = {'train': train_func,'sample': sample_func,'evaluate': evaluate_func,"hello":hello} functions = {'train': train_func,'sample': sample_func,'evaluate': evaluate_func}
functions[sys.argv[1]](sys.argv[2]) functions[sys.argv[1]](sys.argv[2])
......
...@@ -287,7 +287,7 @@ class DDPM(nn.Module): ...@@ -287,7 +287,7 @@ class DDPM(nn.Module):
std (tensor): Batch of std scalars for the complete noise dist. for each image in the batch x_t std (tensor): Batch of std scalars for the complete noise dist. for each image in the batch x_t
pred_noise (tensor): Predicted noise for each image in the batch x_t pred_noise (tensor): Predicted noise for each image in the batch x_t
''' '''
pred_noise = self.net(x_t,t,return_dict=False)[0] pred_noise = self.net(x_t,t)
mean = self.mean_scaler[t-1][:,None,None,None]*(x_t - self.noise_scaler[t-1][:,None,None,None]*pred_noise) mean = self.mean_scaler[t-1][:,None,None,None]*(x_t - self.noise_scaler[t-1][:,None,None,None]*pred_noise)
std = self.std[t-1][:,None,None,None] std = self.std[t-1][:,None,None,None]
return mean, std, pred_noise return mean, std, pred_noise
......
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import einops
import numpy as np
# U-Net model
class UNet_Res(nn.Module):
def __init__(self, attention,channels_in=3, n_channels=64,fctr = [1,2,4,4,8],time_dim=256,**args):
"""
attention : (Bool) wether to use attention layers or not
channels_in : (Int)
n_channels : (Int) Channel size after first convolution
fctr : (list) list of factors for further channel size wrt n_channels
time_dim : (Int) dimenison size for time embeding vector
"""
super().__init__()
channels_out = channels_in
fctr = np.asarray(fctr)*n_channels
# learned time embeddings
self.time_embedder = TimeEmbedding(time_dim = time_dim)
self.time_embedder0 = torch.nn.Sequential(nn.Linear(time_dim,fctr[0]),nn.SELU(),nn.Linear(fctr[0],fctr[0]))
self.time_embedder1 = torch.nn.Sequential(nn.Linear(time_dim,fctr[1]),nn.SELU(),nn.Linear(fctr[1],fctr[1]))
self.time_embedder2 = torch.nn.Sequential(nn.Linear(time_dim,fctr[2]),nn.SELU(),nn.Linear(fctr[2],fctr[2]))
self.time_embedder3 = torch.nn.Sequential(nn.Linear(time_dim,fctr[3]),nn.SELU(),nn.Linear(fctr[3],fctr[3]))
self.time_embedder4 = torch.nn.Sequential(nn.Linear(time_dim,fctr[4]),nn.SELU(),nn.Linear(fctr[4],fctr[4]))
# first conv block
self.first_conv = nn.Conv2d(channels_in,fctr[0],kernel_size=3, padding='same', bias=True)
#down blocks
self.down1 = DownsampleBlock_Res(fctr[0],fctr[1],time_dim)
self.down2 = DownsampleBlock_Res(fctr[1],fctr[2],time_dim)
self.down3 = DownsampleBlock_Res(fctr[2],fctr[3],time_dim,attention=attention)
self.down4 = DownsampleBlock_Res(fctr[3],fctr[4],time_dim,attention=attention)
#middle layer
self.mid1 = MidBlock_Res(fctr[4],time_dim,attention=attention)
#up blocks
self.up1 = UpsampleBlock_Res(fctr[1],fctr[0],time_dim)
self.up2 = UpsampleBlock_Res(fctr[2],fctr[1],time_dim)
self.up3 = UpsampleBlock_Res(fctr[3],fctr[2],time_dim,attention=attention)
self.up4 = UpsampleBlock_Res(fctr[4],fctr[3],time_dim)
# final 1x1 conv
self.end_conv = nn.Conv2d(fctr[0], channels_out, kernel_size=1,bias=True)
# Attention Layers
self.mha21 = MHABlock(fctr[2])
self.mha22 = MHABlock(fctr[2])
self.mha31 = MHABlock(fctr[3])
self.mha32 = MHABlock(fctr[3])
self.mha41 = MHABlock(fctr[4])
self.mha42 = MHABlock(fctr[4])
def forward(self, input, t):
t_emb = self.time_embedder(t).to(input.device)
t_emb0 = self.time_embedder0(t_emb)
t_emb1 = self.time_embedder1(t_emb)
t_emb2 = self.time_embedder2(t_emb)
t_emb3 = self.time_embedder3(t_emb)
t_emb4 = self.time_embedder4(t_emb)
# first two conv layers
x = self.first_conv(input) + t_emb0[:,:,None,None]
#timemb
skip1 =x
skip1,x = self.down1(x,t_emb1)
skip2,x = self.down2(x,t_emb2)
skip3,x = self.down3(x,t_emb3)
skip4,x = self.down4(x,t_emb4)
x = self.mid1(x,t_emb4)
x = self.up4(x,skip4,t_emb3)
x = self.up3(x,skip3,t_emb2)
x = self.up2(x,skip2,t_emb1)
x = self.up1(x,skip1,t_emb0)
x = self.end_conv(x)
return x
#TimeEmbedding
class TimeEmbedding(nn.Module):
def __init__(self, time_dim=64):
super().__init__()
self.time_dim = time_dim
n = 10000
self.factor = torch.pow(n*torch.ones(size=(time_dim//2,)),(-2/time_dim*torch.arange(time_dim//2)))
def forward(self, t):
"""
input is t (B,)
factor dim (time_dim,)
output is (B,time_dim)
"""
self.factor = self.factor.to(t.device)
theta = torch.outer(t,self.factor)
# shape of embedding [time_channels, dim]
emb = torch.zeros(t.size(0), self.time_dim,device=t.device)
emb[:, 0::2] = torch.sin(theta)
emb[:, 1::2] = torch.cos(theta)
return emb
# Self Attention
class MHABlock(nn.Module):
def __init__(self,
channels_in,
num_attention_heads=1 # number of attention heads in MHA
):
super().__init__()
self.channels_in = channels_in
self.num_attention_heads = num_attention_heads
self.self_attention = nn.MultiheadAttention(channels_in, num_heads=self.num_attention_heads)
def forward(self, x):
skip = x
batch_size,_,height,width = x.size()
x = x.permute(2, 3, 0, 1).reshape(height * width, batch_size, -1)
attn_output, _ = self.self_attention(x, x, x)
attn_output = attn_output.reshape(batch_size, -1, height, width)
return attn_output+skip
# Residual Convolution Block
class ConvBlock_Res(nn.Module):
def __init__(self,
channels_in, # number of input channels fed into the block
channels_out, # number of output channels produced by the block
time_dim,
attention,
num_groups=32, # number of groups used in Group Normalization; channels_in must be divisible by num_groups
):
super().__init__()
self.attention = attention
if self.attention:
self.attlayer = MHABlock(channels_in=channels_out)
# Convolution layer 1
self.conv1 = nn.Conv2d(channels_in, channels_out, kernel_size=3, padding='same', bias=True)
self.gn1 = nn.GroupNorm(num_groups, channels_out)
self.act1 = nn.SiLU()
# Convolution layer 2
self.conv2 = nn.Conv2d(channels_out, channels_out, kernel_size=3, padding='same', bias=True)
self.gn2 = nn.GroupNorm(num_groups, channels_out)
self.act2 = nn.SiLU()
# Convolution layer 3
self.conv3 = nn.Conv2d(channels_out, channels_out, kernel_size=3, padding='same', bias=True)
self.gn3 = nn.GroupNorm(num_groups, channels_out)
self.act3 = nn.SiLU()
#Convolution skip
self.res_skip = nn.Conv2d(channels_in,channels_out,kernel_size=1)
nn.init.xavier_normal_(self.conv1.weight)
nn.init.xavier_normal_(self.conv2.weight)
nn.init.xavier_normal_(self.conv3.weight)
def forward(self, x, t):
res = self.res_skip(x)
# second convolution layer
x = self.act1(self.gn1(self.conv1(x)))
h =x + t[:,:,None,None]
# third convolution layer
h = self.act2(self.gn2(self.conv2(h)))
h = self.act3(self.gn3(self.conv3(h)))
if self.attention:
h = self.attlayer(h)
return h +res
# Down Sample
class DownsampleBlock_Res(nn.Module):
def __init__(self, channels_in, channels_out,time_dim,attention=False):
super().__init__()
self.pool = nn.MaxPool2d((2,2), stride=2)
self.convblock = ConvBlock_Res(channels_in, channels_out,time_dim,attention=attention)
def forward(self, x, t):
x = self.convblock(x, t)
h = self.pool(x)
return x,h
# Upsample Block
class UpsampleBlock_Res(nn.Module):
def __init__(self, channels_in, channels_out,time_dim,attention=False):
super().__init__()
self.upconv = nn.ConvTranspose2d(channels_in, channels_in, kernel_size=2, stride=2)
self.convblock = ConvBlock_Res(channels_in, channels_out,time_dim,attention=attention)
def forward(self, x, skip_x, t):
x = self.upconv(x)
# skip-connection - merge features from contracting path to its symmetric counterpart in expansive path
out = x + skip_x
out = self.convblock(out, t)
return out
# Middle Block
class MidBlock_Res(nn.Module):
def __init__(self,channels,time_dim,attention=False):
super().__init__()
self.convblock1 = ConvBlock_Res(channels,channels,time_dim,attention=attention)
self.convblock2 = ConvBlock_Res(channels,channels,time_dim,attention=False)
def forward(self,x,t):
x = self.convblock1(x,t)
return self.convblock2(x,t)
...@@ -8,3 +8,4 @@ wandb ...@@ -8,3 +8,4 @@ wandb
torch torch
torchvision torchvision
torchaudio torchaudio
einops
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment