DCGAN debugging. Getting just garbage DCGAN debugging. Getting just garbage python python

DCGAN debugging. Getting just garbage

So I solved this issue a while ago, but forgot to post an answer on stack overflow. So I will simply post my code here which should work probably pretty good.Some disclaimer:

  • I am not quite sure if it works since I did this a year ago
  • its for 128x128px Images MNIST
  • It's not a vanilla GAN I used various optimization techniques
  • If you want to use it you need to change various details, such as the training dataset



import torchfrom torch.autograd import Variableimport torch.nn as nnimport torch.nn.functional as Fimport torchvisionimport torchvision.transforms as transformsfrom torch.utils.data import DataLoaderimport pytorch_lightning as plfrom pytorch_lightning import loggersfrom numpy.random import choiceimport osfrom pathlib import Pathimport shutilfrom collections import OrderedDict# custom weights initialization called on netG and netDdef weights_init(m):    classname = m.__class__.__name__    if classname.find('Conv') != -1:        nn.init.normal_(m.weight.data, 0.0, 0.02)    elif classname.find('BatchNorm') != -1:        nn.init.normal_(m.weight.data, 1.0, 0.02)        nn.init.constant_(m.bias.data, 0)# randomly flip some labelsdef noisy_labels(y, p_flip=0.05):  # # flip labels with 5% probability    # determine the number of labels to flip    n_select = int(p_flip * y.shape[0])    # choose labels to flip    flip_ix = choice([i for i in range(y.shape[0])], size=n_select)    # invert the labels in place    y[flip_ix] = 1 - y[flip_ix]    return yclass AddGaussianNoise(object):    def __init__(self, mean=0.0, std=0.1):        self.std = std        self.mean = mean    def __call__(self, tensor):        tensor = tensor.cuda()        return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda()    def __repr__(self):        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)def resize2d(img, size):    return (F.adaptive_avg_pool2d(img, size).data).cuda()def get_valid_labels(img):    return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda()  # soft labelsdef get_unvalid_labels(img):    return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda()  # soft labelsclass Generator(pl.LightningModule):    def __init__(self, ngf, nc, latent_dim):        super(Generator, self).__init__()        self.ngf = ngf        self.latent_dim = latent_dim        self.nc = nc        self.fc0 = nn.Sequential(            # input is Z, going into a convolution            nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)),            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ngf * 16)        )        self.fc1 = nn.Sequential(            # state size. (ngf*8) x 4 x 4            nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)),            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ngf * 8)        )        self.fc2 = nn.Sequential(            # state size. (ngf*4) x 8 x 8            nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)),            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ngf * 4)        )        self.fc3 = nn.Sequential(            # state size. (ngf*2) x 16 x 16            nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)),            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ngf * 2)        )        self.fc4 = nn.Sequential(            # state size. (ngf) x 32 x 32            nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)),            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ngf)        )        self.fc5 = nn.Sequential(            # state size. (nc) x 64 x 64            nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)),            nn.Tanh()        )        # state size. (nc) x 128 x 128        # For Multi-Scale Gradient        # Converting the intermediate layers into images        self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1)        self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1)        self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1)        self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1)        self.fc4_r = nn.Conv2d(ngf, self.nc, 1)    def forward(self, input):        x_0 = self.fc0(input)        x_1 = self.fc1(x_0)        x_2 = self.fc2(x_1)        x_3 = self.fc3(x_2)        x_4 = self.fc4(x_3)        x_5 = self.fc5(x_4)        # For Multi-Scale Gradient        # Converting the intermediate layers into images        x_0_r = self.fc0_r(x_0)        x_1_r = self.fc1_r(x_1)        x_2_r = self.fc2_r(x_2)        x_3_r = self.fc3_r(x_3)        x_4_r = self.fc4_r(x_4)        return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_rclass Discriminator(pl.LightningModule):    def __init__(self, ndf, nc):        super(Discriminator, self).__init__()        self.nc = nc        self.ndf = ndf        self.fc0 = nn.Sequential(            # input is (nc) x 128 x 128            nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)),            nn.LeakyReLU(0.2, inplace=True)        )        self.fc1 = nn.Sequential(            # state size. (ndf) x 64 x 64            nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)),            # "+ nc" because of multi scale gradient            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ndf * 2)        )        self.fc2 = nn.Sequential(            # state size. (ndf*2) x 32 x 32            nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)),            # "+ nc" because of multi scale gradient            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ndf * 4)        )        self.fc3 = nn.Sequential(            # state size. (ndf*4) x 16 x 16e            nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)),            # "+ nc" because of multi scale gradient            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ndf * 8),        )        self.fc4 = nn.Sequential(            # state size. (ndf*8) x 8 x 8            nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)),            nn.LeakyReLU(0.2, inplace=True),            nn.BatchNorm2d(ndf * 16)        )        self.fc5 = nn.Sequential(            # state size. (ndf*8) x 4 x 4            nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)),            nn.Sigmoid()        )        # state size. 1 x 1 x 1    def forward(self, input, detach_or_not):        # When we train i ncombination with generator we use multi scale gradient.        x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input        if detach_or_not:            x = x.detach()        x_0 = self.fc0(x)        x_0 = torch.cat((x_0, x_4_r), dim=1)  # Concat Multi-Scale Gradient        x_1 = self.fc1(x_0)        x_1 = torch.cat((x_1, x_3_r), dim=1)  # Concat Multi-Scale Gradient        x_2 = self.fc2(x_1)        x_2 = torch.cat((x_2, x_2_r), dim=1)  # Concat Multi-Scale Gradient        x_3 = self.fc3(x_2)        x_3 = torch.cat((x_3, x_1_r), dim=1)  # Concat Multi-Scale Gradient        x_4 = self.fc4(x_3)        x_4 = torch.cat((x_4, x_0_r), dim=1)  # Concat Multi-Scale Gradient        x_5 = self.fc5(x_4)        return x_5class DCGAN(pl.LightningModule):    def __init__(self, hparams, checkpoint_folder, experiment_name):        super().__init__()        self.hparams = hparams        self.checkpoint_folder = checkpoint_folder        self.experiment_name = experiment_name        # networks        self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim)        self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc)        self.generator.apply(weights_init)        self.discriminator.apply(weights_init)        # cache for generated images        self.generated_imgs = None        self.last_imgs = None        # For experience replay        self.exp_replay_dis = torch.tensor([])    def forward(self, z):        return self.generator(z)    def adversarial_loss(self, y_hat, y):        return F.binary_cross_entropy(y_hat, y)    def training_step(self, batch, batch_nb, optimizer_idx):        # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/        std_gaussian = max(0, self.hparams.level_of_noise - (                (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs)))        AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian)  # the noise decays over time        imgs, _ = batch        imgs = AddGaussianNoiseInst(imgs)  # Adding instance noise to real images        self.last_imgs = imgs        # train generator        if optimizer_idx == 0:            # sample noise            z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda()            # generate images            self.generated_imgs = self(z)            # ground truth result (ie: all fake)            g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0]))  # adversarial loss is binary cross-entropy; [0] is the image of the last layer            tqdm_dict = {'g_loss': g_loss}            log = {'g_loss': g_loss, "std_gaussian": std_gaussian}            output = OrderedDict({                'loss': g_loss,                'progress_bar': tqdm_dict,                'log': log            })            return output        # train discriminator        if optimizer_idx == 1:            # Measure discriminator's ability to classify real from generated samples            # how well can it label as real?            real_loss = self.adversarial_loss(                self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)],                                   False), get_valid_labels(imgs))            fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels(                self.generated_imgs[0]))  # how well can it label as fake?; [0] is the image of the last layer            # discriminator loss is the average of these            d_loss = (real_loss + fake_loss) / 2            tqdm_dict = {'d_loss': d_loss}            log = {'d_loss': d_loss, "std_gaussian": std_gaussian}            output = OrderedDict({                'loss': d_loss,                'progress_bar': tqdm_dict,                'log': log            })            return output    def configure_optimizers(self):        lr_gen = self.hparams.lr_gen        lr_dis = self.hparams.lr_dis        b1 = self.hparams.b1        b2 = self.hparams.b2        opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2))        opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2))        return [opt_g, opt_d], []    def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None:        loss.backward(retain_graph=True)    def train_dataloader(self):        # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),        #                                 transforms.ToTensor(),        #                                 transforms.Normalize([0.5], [0.5])])        # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform)        # return DataLoader(dataset, batch_size=self.hparams.batch_size)        # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),        #                                 transforms.ToTensor(),        #                                 transforms.Normalize([0.5], [0.5])        #                                 ])        # train_dataset = torchvision.datasets.ImageFolder(        #     root="./drive/My Drive/datasets/flower_dataset/",        #     # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/",        #     transform=transform        # )        # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,        #                   batch_size=self.hparams.batch_size)        transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),                                        transforms.ToTensor(),                                        transforms.Normalize([0.5], [0.5])                                        ])        train_dataset = torchvision.datasets.ImageFolder(            root="ghibli_dataset_small_overfit/",            transform=transform        )        return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,                          batch_size=self.hparams.batch_size)    def on_epoch_end(self):        z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda()        # match gpu device (or keep as cpu)        if self.on_gpu:            z = z.cuda(self.last_imgs.device.index)        # log sampled images        sample_imgs = self.generator(z)[0]        torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png')        # save model        if self.current_epoch % self.hparams.save_model_every_epoch == 0:            trainer.save_checkpoint(                self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt")from argparse import Namespaceargs = {    'batch_size': 128, # batch size    'lr_gen': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002    'lr_dis': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002    'b1': 0.5,  # Momentum for adam; tested value(dcgan paper): 0.5    'b2': 0.999,  # Momentum for adam; tested value(dcgan paper): 0.999    'latent_dim': 256,  # tested value which worked(in V4_1): 100    'nc': 3,  # number of color channels    'ndf': 8,  # number of discriminator features    'ngf': 8,  # number of generator features    'epochs': 4,  # the maxima lamount of epochs the algorith should run    'save_model_every_epoch': 1,  # how often we save our model    'image_size': 128, # size of the image    'num_workers': 3,    'level_of_noise': 0.1,  # how much instance noise we introduce(std; tested value: 0.15 and 0.1    'experience_save_per_batch': 1,  # this value should be very low; tested value which works: 1    'experience_batch_size': 50  # this value shouldnt be too high; tested value which works: 50}hparams = Namespace(**args)# Parametersexperiment_name = "DCGAN_6_2_MNIST_128px"dataset_name = "mnist"checkpoint_folder = "DCGAN/"tags = ["DCGAN", "128x128"]dirpath = Path(checkpoint_folder)# defining netnet = DCGAN(hparams, checkpoint_folder, experiment_name)torch.autograd.set_detect_anomaly(True)trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt",    max_epochs=args["epochs"],    gpus=1)trainer.fit(net)
