import datetime import numpy as np from PIL import Image import os from io import BytesIO import math import pylab import chainer from chainer import cuda from chainer import optimizers from chainer import serializers from chainer import Variable from chainer import function import chainer.functions as F import chainer.links as L image_dir = './images' out_image_dir = './out_images' out_model_dir = './out_models' nz = 100 batchsize = 32 n_epoch = 300 n_train = 15000 weight_decay = 0.00001 image_save_interval = 50000 fs = os.listdir(image_dir) print(len(fs)) dataset = [] for fn in fs: f = open('%s/%s' % (image_dir, fn), 'rb') img_bin = f.read() dataset.append(img_bin) f.close() class ELU(function.Function): def __init__(self, alpha=1.0): self.alpha = np.float32(alpha) def elu(x, alpha=1.0): return ELU(alpha=alpha)(x) class Generator(chainer.Chain): def __init__(self): super(Generator, self).__init__( l0z=L.Linear(nz, 6 * 6 * 512, initialW=chainer.initializers.Normal(0.02 * math.sqrt(nz) / math.sqrt(10))), dc1=L.Deconvolution2D(512, 256, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 512) / math.sqrt(10))), dc2=L.Deconvolution2D(256, 128, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 256) / math.sqrt(10))), dc3=L.Deconvolution2D(128, 64, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 128) / math.sqrt(10))), dc4=L.Deconvolution2D(64, 3, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 64) / math.sqrt(10))), bn0l=L.BatchNormalization(6 * 6 * 512), bn0=L.BatchNormalization(512), bn1=L.BatchNormalization(256), bn2=L.BatchNormalization(128), bn3=L.BatchNormalization(64), ) def __call__(self, z): h = F.reshape(F.relu(self.bn0l(self.l0z(z))), (z.data.shape[0], 512, 6, 6)) h = F.relu(self.bn1(self.dc1(h))) h = F.relu(self.bn2(self.dc2(h))) h = F.relu(self.bn3(self.dc3(h))) x = (self.dc4(h)) return x class Discriminator(chainer.Chain): def __init__(self): super(Discriminator, self).__init__( c0=L.Convolution2D(3, 64, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 3) / math.sqrt(10))), c1=L.Convolution2D(64, 128, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 64) / math.sqrt(10))), c2=L.Convolution2D(128, 256, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 128) / math.sqrt(10))), c3=L.Convolution2D(256, 512, 4, stride=2, pad=1, initialW=chainer.initializers.Normal(0.02 * math.sqrt(4 * 4 * 256) / math.sqrt(10))), l4l=L.Linear(6 * 6 * 512, 2, initialW=chainer.initializers.Normal(0.02 * math.sqrt(6 * 6 * 512) / math.sqrt(10))), bn0=L.BatchNormalization(64), bn1=L.BatchNormalization(128), bn2=L.BatchNormalization(256), bn3=L.BatchNormalization(512), ) def __call__(self, x): h = elu(self.c0(x)) h = elu(self.bn1(self.c1(h))) h = elu(self.bn2(self.c2(h))) h = elu(self.bn3(self.c3(h))) l = self.l4l(h) return l def clip_img(x): return np.float32(-1 if x < -1 else (1 if x > 1 else x)) def train_dcgan_labeled(gen, dis, epoch0=0): o_gen = optimizers.aAdam(alpha=0.0002, beta1=0.5) o_dis = optimizers.Adam(alpha=0.0002, beta1=0.5) o_gen.setup(gen) o_dis.setup(dis) o_gen.add_hook(chainer.optimizer.WeightDecay(weight_decay)) o_dis.add_hook(chainer.optimizer.WeightDecay(weight_decay)) zvis = (xp.random.uniform(-1, 1, (100, nz), dtype=np.float32)) for epoch in range(epoch0, n_epoch): sum_l_dis = np.float32(0) sum_l_gen = np.float32(0) for i in range(0, n_train, batchsize): x2 = np.zeros((batchsize, 3, 96, 96), dtype=np.float32) for j in range(batchsize): try: rnd = np.random.randint(len(dataset)) rnd2 = np.random.randint(2) img = np.asarray(Image.open(BytesIO(dataset[rnd])).convert('RGB')).astype(np.float32).transpose(2,0,1) if rnd2 == 0: x2[j, :, :, :] = (img[:, :, ::-1] - 128.0) / 128.0 else: x2[j, :, :, :] = (img[:, :, :] - 128.0) / 128.0 except: print('read image error occured', fs[rnd]) import traceback traceback.print_exc() break z = Variable(xp.random.uniform(-1, 1, (batchsize, nz), dtype=np.float32)) x = gen(z) yl = dis(x) L_gen = F.softmax_cross_entropy(yl, Variable(xp.zeros(batchsize, dtype=np.int32))) L_dis = F.softmax_cross_entropy(yl, Variable(xp.ones(batchsize, dtype=np.int32))) # train discriminator x2 = Variable(cuda.to_gpu(x2)) yl2 = dis(x2) L_dis += F.softmax_cross_entropy(yl2, Variable(xp.zeros(batchsize, dtype=np.int32))) gen.cleargrads() L_gen.backward() o_gen.update() dis.cleargrads() L_dis.backward() o_dis.update() sum_l_gen += L_gen.data.get() sum_l_dis += L_dis.data.get() if i % image_save_interval == 0 and epoch % 100 == 0: pylab.rcParams['figure.figsize'] = (16.0, 16.0) pylab.clf() z = zvis z[50:, :] = (xp.random.uniform(-1, 1, (50, nz), dtype=np.float32)) z = Variable(z) x = gen(z) x = x.data.get() for i_ in range(100): tmp = ((np.vectorize(clip_img)(x[i_, :, :, :]) + 1) / 2).transpose(1, 2, 0) pylab.subplot(10, 10, i_ + 1) pylab.imshow(tmp) pylab.axis('off') pylab.savefig('%s/vis_%d_%d.png' % (out_image_dir, epoch, i)) if epoch % 100 == 0: serializers.save_hdf5("%s/dcgan_model_dis_%d.h5" % (out_model_dir, epoch), dis) serializers.save_hdf5("%s/dcgan_model_gen_%d.h5" % (out_model_dir, epoch), gen) serializers.save_hdf5("%s/dcgan_state_dis_%d.h5" % (out_model_dir, epoch), o_dis) serializers.save_hdf5("%s/dcgan_state_gen_%d.h5" % (out_model_dir, epoch), o_gen) print('epoch end', epoch, sum_l_gen / n_train, sum_l_dis / n_train) xp = cuda.cupy cuda.get_device(0).use() gen = Generator() dis = Discriminator() gen.to_gpu() dis.to_gpu() try: os.mkdir(out_image_dir) os.mkdir(out_model_dir) except: pass train_dcgan_labeled(gen, dis)