DCGAN debugging. Getting just garbage
So I solved this issue a while ago, but forgot to post an answer on stack overflow. So I will simply post my code here which should work probably pretty good.Some disclaimer:
- I am not quite sure if it works since I did this a year ago
- its for 128x128px Images MNIST
- It's not a vanilla GAN I used various optimization techniques
- If you want to use it you need to change various details, such as the training dataset
Resources:
``
import torchfrom torch.autograd import Variableimport torch.nn as nnimport torch.nn.functional as Fimport torchvisionimport torchvision.transforms as transformsfrom torch.utils.data import DataLoaderimport pytorch_lightning as plfrom pytorch_lightning import loggersfrom numpy.random import choiceimport osfrom pathlib import Pathimport shutilfrom collections import OrderedDict# custom weights initialization called on netG and netDdef weights_init(m): classname = m.__class__.__name__ if classname.find('Conv') != -1: nn.init.normal_(m.weight.data, 0.0, 0.02) elif classname.find('BatchNorm') != -1: nn.init.normal_(m.weight.data, 1.0, 0.02) nn.init.constant_(m.bias.data, 0)# randomly flip some labelsdef noisy_labels(y, p_flip=0.05): # # flip labels with 5% probability # determine the number of labels to flip n_select = int(p_flip * y.shape[0]) # choose labels to flip flip_ix = choice([i for i in range(y.shape[0])], size=n_select) # invert the labels in place y[flip_ix] = 1 - y[flip_ix] return yclass AddGaussianNoise(object): def __init__(self, mean=0.0, std=0.1): self.std = std self.mean = mean def __call__(self, tensor): tensor = tensor.cuda() return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda() def __repr__(self): return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)def resize2d(img, size): return (F.adaptive_avg_pool2d(img, size).data).cuda()def get_valid_labels(img): return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda() # soft labelsdef get_unvalid_labels(img): return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda() # soft labelsclass Generator(pl.LightningModule): def __init__(self, ngf, nc, latent_dim): super(Generator, self).__init__() self.ngf = ngf self.latent_dim = latent_dim self.nc = nc self.fc0 = nn.Sequential( # input is Z, going into a convolution nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 16) ) self.fc1 = nn.Sequential( # state size. (ngf*8) x 4 x 4 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 8) ) self.fc2 = nn.Sequential( # state size. (ngf*4) x 8 x 8 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 4) ) self.fc3 = nn.Sequential( # state size. (ngf*2) x 16 x 16 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 2) ) self.fc4 = nn.Sequential( # state size. (ngf) x 32 x 32 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf) ) self.fc5 = nn.Sequential( # state size. (nc) x 64 x 64 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)), nn.Tanh() ) # state size. (nc) x 128 x 128 # For Multi-Scale Gradient # Converting the intermediate layers into images self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1) self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1) self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1) self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1) self.fc4_r = nn.Conv2d(ngf, self.nc, 1) def forward(self, input): x_0 = self.fc0(input) x_1 = self.fc1(x_0) x_2 = self.fc2(x_1) x_3 = self.fc3(x_2) x_4 = self.fc4(x_3) x_5 = self.fc5(x_4) # For Multi-Scale Gradient # Converting the intermediate layers into images x_0_r = self.fc0_r(x_0) x_1_r = self.fc1_r(x_1) x_2_r = self.fc2_r(x_2) x_3_r = self.fc3_r(x_3) x_4_r = self.fc4_r(x_4) return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_rclass Discriminator(pl.LightningModule): def __init__(self, ndf, nc): super(Discriminator, self).__init__() self.nc = nc self.ndf = ndf self.fc0 = nn.Sequential( # input is (nc) x 128 x 128 nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True) ) self.fc1 = nn.Sequential( # state size. (ndf) x 64 x 64 nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)), # "+ nc" because of multi scale gradient nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 2) ) self.fc2 = nn.Sequential( # state size. (ndf*2) x 32 x 32 nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)), # "+ nc" because of multi scale gradient nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 4) ) self.fc3 = nn.Sequential( # state size. (ndf*4) x 16 x 16e nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)), # "+ nc" because of multi scale gradient nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 8), ) self.fc4 = nn.Sequential( # state size. (ndf*8) x 8 x 8 nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 16) ) self.fc5 = nn.Sequential( # state size. (ndf*8) x 4 x 4 nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)), nn.Sigmoid() ) # state size. 1 x 1 x 1 def forward(self, input, detach_or_not): # When we train i ncombination with generator we use multi scale gradient. x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input if detach_or_not: x = x.detach() x_0 = self.fc0(x) x_0 = torch.cat((x_0, x_4_r), dim=1) # Concat Multi-Scale Gradient x_1 = self.fc1(x_0) x_1 = torch.cat((x_1, x_3_r), dim=1) # Concat Multi-Scale Gradient x_2 = self.fc2(x_1) x_2 = torch.cat((x_2, x_2_r), dim=1) # Concat Multi-Scale Gradient x_3 = self.fc3(x_2) x_3 = torch.cat((x_3, x_1_r), dim=1) # Concat Multi-Scale Gradient x_4 = self.fc4(x_3) x_4 = torch.cat((x_4, x_0_r), dim=1) # Concat Multi-Scale Gradient x_5 = self.fc5(x_4) return x_5class DCGAN(pl.LightningModule): def __init__(self, hparams, checkpoint_folder, experiment_name): super().__init__() self.hparams = hparams self.checkpoint_folder = checkpoint_folder self.experiment_name = experiment_name # networks self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim) self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc) self.generator.apply(weights_init) self.discriminator.apply(weights_init) # cache for generated images self.generated_imgs = None self.last_imgs = None # For experience replay self.exp_replay_dis = torch.tensor([]) def forward(self, z): return self.generator(z) def adversarial_loss(self, y_hat, y): return F.binary_cross_entropy(y_hat, y) def training_step(self, batch, batch_nb, optimizer_idx): # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/ std_gaussian = max(0, self.hparams.level_of_noise - ( (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs))) AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian) # the noise decays over time imgs, _ = batch imgs = AddGaussianNoiseInst(imgs) # Adding instance noise to real images self.last_imgs = imgs # train generator if optimizer_idx == 0: # sample noise z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda() # generate images self.generated_imgs = self(z) # ground truth result (ie: all fake) g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0])) # adversarial loss is binary cross-entropy; [0] is the image of the last layer tqdm_dict = {'g_loss': g_loss} log = {'g_loss': g_loss, "std_gaussian": std_gaussian} output = OrderedDict({ 'loss': g_loss, 'progress_bar': tqdm_dict, 'log': log }) return output # train discriminator if optimizer_idx == 1: # Measure discriminator's ability to classify real from generated samples # how well can it label as real? real_loss = self.adversarial_loss( self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)], False), get_valid_labels(imgs)) fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels( self.generated_imgs[0])) # how well can it label as fake?; [0] is the image of the last layer # discriminator loss is the average of these d_loss = (real_loss + fake_loss) / 2 tqdm_dict = {'d_loss': d_loss} log = {'d_loss': d_loss, "std_gaussian": std_gaussian} output = OrderedDict({ 'loss': d_loss, 'progress_bar': tqdm_dict, 'log': log }) return output def configure_optimizers(self): lr_gen = self.hparams.lr_gen lr_dis = self.hparams.lr_dis b1 = self.hparams.b1 b2 = self.hparams.b2 opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2)) opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2)) return [opt_g, opt_d], [] def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None: loss.backward(retain_graph=True) def train_dataloader(self): # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)), # transforms.ToTensor(), # transforms.Normalize([0.5], [0.5])]) # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform) # return DataLoader(dataset, batch_size=self.hparams.batch_size) # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)), # transforms.ToTensor(), # transforms.Normalize([0.5], [0.5]) # ]) # train_dataset = torchvision.datasets.ImageFolder( # root="./drive/My Drive/datasets/flower_dataset/", # # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/", # transform=transform # ) # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True, # batch_size=self.hparams.batch_size) transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)), transforms.ToTensor(), transforms.Normalize([0.5], [0.5]) ]) train_dataset = torchvision.datasets.ImageFolder( root="ghibli_dataset_small_overfit/", transform=transform ) return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True, batch_size=self.hparams.batch_size) def on_epoch_end(self): z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda() # match gpu device (or keep as cpu) if self.on_gpu: z = z.cuda(self.last_imgs.device.index) # log sampled images sample_imgs = self.generator(z)[0] torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png') # save model if self.current_epoch % self.hparams.save_model_every_epoch == 0: trainer.save_checkpoint( self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt")from argparse import Namespaceargs = { 'batch_size': 128, # batch size 'lr_gen': 0.0003, # TTUR;learnin rate of both networks; tested value: 0.0002 'lr_dis': 0.0003, # TTUR;learnin rate of both networks; tested value: 0.0002 'b1': 0.5, # Momentum for adam; tested value(dcgan paper): 0.5 'b2': 0.999, # Momentum for adam; tested value(dcgan paper): 0.999 'latent_dim': 256, # tested value which worked(in V4_1): 100 'nc': 3, # number of color channels 'ndf': 8, # number of discriminator features 'ngf': 8, # number of generator features 'epochs': 4, # the maxima lamount of epochs the algorith should run 'save_model_every_epoch': 1, # how often we save our model 'image_size': 128, # size of the image 'num_workers': 3, 'level_of_noise': 0.1, # how much instance noise we introduce(std; tested value: 0.15 and 0.1 'experience_save_per_batch': 1, # this value should be very low; tested value which works: 1 'experience_batch_size': 50 # this value shouldnt be too high; tested value which works: 50}hparams = Namespace(**args)# Parametersexperiment_name = "DCGAN_6_2_MNIST_128px"dataset_name = "mnist"checkpoint_folder = "DCGAN/"tags = ["DCGAN", "128x128"]dirpath = Path(checkpoint_folder)# defining netnet = DCGAN(hparams, checkpoint_folder, experiment_name)torch.autograd.set_detect_anomaly(True)trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt", max_epochs=args["epochs"], gpus=1)trainer.fit(net)
``