320 lines
16 KiB
Python
320 lines
16 KiB
Python
|
import os
|
||
|
import time
|
||
|
import tensorflow as tf
|
||
|
import numpy as np
|
||
|
from glob import glob
|
||
|
import datetime
|
||
|
import random
|
||
|
from PIL import Image
|
||
|
import matplotlib.pyplot as plt
|
||
|
|
||
|
def generator(z, output_channel_dim, training):
|
||
|
with tf.variable_scope("generator", reuse= not training):
|
||
|
|
||
|
# 8x8x1024
|
||
|
fully_connected = tf.layers.dense(z, 8*8*1024)
|
||
|
fully_connected = tf.reshape(fully_connected, (-1, 8, 8, 1024))
|
||
|
fully_connected = tf.nn.leaky_relu(fully_connected)
|
||
|
|
||
|
# 8x8x1024 -> 16x16x512
|
||
|
trans_conv1 = tf.layers.conv2d_transpose(inputs=fully_connected,
|
||
|
filters=512,
|
||
|
kernel_size=[5,5],
|
||
|
strides=[2,2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name="trans_conv1")
|
||
|
batch_trans_conv1 = tf.layers.batch_normalization(inputs = trans_conv1,
|
||
|
training=training,
|
||
|
epsilon=EPSILON,
|
||
|
name="batch_trans_conv1")
|
||
|
trans_conv1_out = tf.nn.leaky_relu(batch_trans_conv1,
|
||
|
name="trans_conv1_out")
|
||
|
|
||
|
# 16x16x512 -> 32x32x256
|
||
|
trans_conv2 = tf.layers.conv2d_transpose(inputs=trans_conv1_out,
|
||
|
filters=256,
|
||
|
kernel_size=[5,5],
|
||
|
strides=[2,2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name="trans_conv2")
|
||
|
batch_trans_conv2 = tf.layers.batch_normalization(inputs = trans_conv2,
|
||
|
training=training,
|
||
|
epsilon=EPSILON,
|
||
|
name="batch_trans_conv2")
|
||
|
trans_conv2_out = tf.nn.leaky_relu(batch_trans_conv2,
|
||
|
name="trans_conv2_out")
|
||
|
|
||
|
# 32x32x256 -> 64x64x128
|
||
|
trans_conv3 = tf.layers.conv2d_transpose(inputs=trans_conv2_out,
|
||
|
filters=128,
|
||
|
kernel_size=[5,5],
|
||
|
strides=[2,2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name="trans_conv3")
|
||
|
batch_trans_conv3 = tf.layers.batch_normalization(inputs = trans_conv3,
|
||
|
training=training,
|
||
|
epsilon=EPSILON,
|
||
|
name="batch_trans_conv3")
|
||
|
trans_conv3_out = tf.nn.leaky_relu(batch_trans_conv3,
|
||
|
name="trans_conv3_out")
|
||
|
|
||
|
# 64x64x128 -> 128x128x64
|
||
|
trans_conv4 = tf.layers.conv2d_transpose(inputs=trans_conv3_out,
|
||
|
filters=64,
|
||
|
kernel_size=[5,5],
|
||
|
strides=[2,2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name="trans_conv4")
|
||
|
batch_trans_conv4 = tf.layers.batch_normalization(inputs = trans_conv4,
|
||
|
training=training,
|
||
|
epsilon=EPSILON,
|
||
|
name="batch_trans_conv4")
|
||
|
trans_conv4_out = tf.nn.leaky_relu(batch_trans_conv4,
|
||
|
name="trans_conv4_out")
|
||
|
|
||
|
# 128x128x64 -> 128x128x3
|
||
|
logits = tf.layers.conv2d_transpose(inputs=trans_conv4_out,
|
||
|
filters=3,
|
||
|
kernel_size=[5,5],
|
||
|
strides=[1,1],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name="logits")
|
||
|
out = tf.tanh(logits, name="out")
|
||
|
return out
|
||
|
|
||
|
def discriminator(x, reuse):
|
||
|
with tf.variable_scope("discriminator", reuse=reuse):
|
||
|
|
||
|
# 128*128*3 -> 64x64x64
|
||
|
conv1 = tf.layers.conv2d(inputs=x,
|
||
|
filters=64,
|
||
|
kernel_size=[5,5],
|
||
|
strides=[2,2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name='conv1')
|
||
|
batch_norm1 = tf.layers.batch_normalization(conv1,
|
||
|
training=True,
|
||
|
epsilon=EPSILON,
|
||
|
name='batch_norm1')
|
||
|
conv1_out = tf.nn.leaky_relu(batch_norm1,
|
||
|
name="conv1_out")
|
||
|
|
||
|
# 64x64x64-> 32x32x128
|
||
|
conv2 = tf.layers.conv2d(inputs=conv1_out,
|
||
|
filters=128,
|
||
|
kernel_size=[5, 5],
|
||
|
strides=[2, 2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name='conv2')
|
||
|
batch_norm2 = tf.layers.batch_normalization(conv2,
|
||
|
training=True,
|
||
|
epsilon=EPSILON,
|
||
|
name='batch_norm2')
|
||
|
conv2_out = tf.nn.leaky_relu(batch_norm2,
|
||
|
name="conv2_out")
|
||
|
|
||
|
# 32x32x128 -> 16x16x256
|
||
|
conv3 = tf.layers.conv2d(inputs=conv2_out,
|
||
|
filters=256,
|
||
|
kernel_size=[5, 5],
|
||
|
strides=[2, 2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name='conv3')
|
||
|
batch_norm3 = tf.layers.batch_normalization(conv3,
|
||
|
training=True,
|
||
|
epsilon=EPSILON,
|
||
|
name='batch_norm3')
|
||
|
conv3_out = tf.nn.leaky_relu(batch_norm3,
|
||
|
name="conv3_out")
|
||
|
|
||
|
# 16x16x256 -> 16x16x512
|
||
|
conv4 = tf.layers.conv2d(inputs=conv3_out,
|
||
|
filters=512,
|
||
|
kernel_size=[5, 5],
|
||
|
strides=[1, 1],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name='conv4')
|
||
|
batch_norm4 = tf.layers.batch_normalization(conv4,
|
||
|
training=True,
|
||
|
epsilon=EPSILON,
|
||
|
name='batch_norm4')
|
||
|
conv4_out = tf.nn.leaky_relu(batch_norm4,
|
||
|
name="conv4_out")
|
||
|
|
||
|
# 16x16x512 -> 8x8x1024
|
||
|
conv5 = tf.layers.conv2d(inputs=conv4_out,
|
||
|
filters=1024,
|
||
|
kernel_size=[5, 5],
|
||
|
strides=[2, 2],
|
||
|
padding="SAME",
|
||
|
kernel_initializer=tf.truncated_normal_initializer(stddev=WEIGHT_INIT_STDDEV),
|
||
|
name='conv5')
|
||
|
batch_norm5 = tf.layers.batch_normalization(conv5,
|
||
|
training=True,
|
||
|
epsilon=EPSILON,
|
||
|
name='batch_norm5')
|
||
|
conv5_out = tf.nn.leaky_relu(batch_norm5,
|
||
|
name="conv5_out")
|
||
|
|
||
|
flatten = tf.reshape(conv5_out, (-1, 8*8*1024))
|
||
|
logits = tf.layers.dense(inputs=flatten,
|
||
|
units=1,
|
||
|
activation=None)
|
||
|
out = tf.sigmoid(logits)
|
||
|
return out, logits
|
||
|
|
||
|
def model_loss(input_real, input_z, output_channel_dim):
|
||
|
g_model = generator(input_z, output_channel_dim, True)
|
||
|
|
||
|
noisy_input_real = input_real + tf.random_normal(shape=tf.shape(input_real),
|
||
|
mean=0.0,
|
||
|
stddev=random.uniform(0.0, 0.1),
|
||
|
dtype=tf.float32)
|
||
|
|
||
|
d_model_real, d_logits_real = discriminator(noisy_input_real, reuse=False)
|
||
|
d_model_fake, d_logits_fake = discriminator(g_model, reuse=True)
|
||
|
|
||
|
d_loss_real = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=d_logits_real,
|
||
|
labels=tf.ones_like(d_model_real)*random.uniform(0.9, 1.0)))
|
||
|
d_loss_fake = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=d_logits_fake,
|
||
|
labels=tf.zeros_like(d_model_fake)))
|
||
|
d_loss = tf.reduce_mean(0.5 * (d_loss_real + d_loss_fake))
|
||
|
g_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=d_logits_fake,
|
||
|
labels=tf.ones_like(d_model_fake)))
|
||
|
return d_loss, g_loss
|
||
|
|
||
|
def model_optimizers(d_loss, g_loss):
|
||
|
t_vars = tf.trainable_variables()
|
||
|
g_vars = [var for var in t_vars if var.name.startswith("generator")]
|
||
|
d_vars = [var for var in t_vars if var.name.startswith("discriminator")]
|
||
|
|
||
|
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
|
||
|
gen_updates = [op for op in update_ops if op.name.startswith('generator')]
|
||
|
|
||
|
with tf.control_dependencies(gen_updates):
|
||
|
d_train_opt = tf.train.AdamOptimizer(learning_rate=LR_D, beta1=BETA1).minimize(d_loss, var_list=d_vars)
|
||
|
g_train_opt = tf.train.AdamOptimizer(learning_rate=LR_G, beta1=BETA1).minimize(g_loss, var_list=g_vars)
|
||
|
return d_train_opt, g_train_opt
|
||
|
|
||
|
def model_inputs(real_dim, z_dim):
|
||
|
inputs_real = tf.placeholder(tf.float32, (None, *real_dim), name='inputs_real')
|
||
|
inputs_z = tf.placeholder(tf.float32, (None, z_dim), name="input_z")
|
||
|
learning_rate_G = tf.placeholder(tf.float32, name="lr_g")
|
||
|
learning_rate_D = tf.placeholder(tf.float32, name="lr_d")
|
||
|
return inputs_real, inputs_z, learning_rate_G, learning_rate_D
|
||
|
|
||
|
def show_samples(sample_images, name, epoch):
|
||
|
figure, axes = plt.subplots(1, len(sample_images), figsize = (IMAGE_SIZE, IMAGE_SIZE))
|
||
|
for index, axis in enumerate(axes):
|
||
|
axis.axis('off')
|
||
|
image_array = sample_images[index]
|
||
|
axis.imshow(image_array)
|
||
|
image = Image.fromarray(image_array)
|
||
|
image.save(name+"_"+str(epoch)+"_"+str(index)+".png")
|
||
|
plt.savefig(name+"_"+str(epoch)+".png", bbox_inches='tight', pad_inches=0)
|
||
|
|
||
|
def test(sess, input_z, out_channel_dim, epoch):
|
||
|
example_z = np.random.uniform(-1, 1, size=[SAMPLES_TO_SHOW, input_z.get_shape().as_list()[-1]])
|
||
|
samples = sess.run(generator(input_z, out_channel_dim, False), feed_dict={input_z: example_z})
|
||
|
sample_images = [((sample + 1.0) * 127.5).astype(np.uint8) for sample in samples]
|
||
|
show_samples(sample_images, OUTPUT_DIR + "samples", epoch)
|
||
|
|
||
|
def summarize_epoch(epoch, duration, sess, d_losses, g_losses, input_z, data_shape):
|
||
|
minibatch_size = int(data_shape[0]//BATCH_SIZE)
|
||
|
print("Epoch {}/{}".format(epoch, EPOCHS),
|
||
|
"\nDuration: {:.5f}".format(duration),
|
||
|
"\nD Loss: {:.5f}".format(np.mean(d_losses[-minibatch_size:])),
|
||
|
"\nG Loss: {:.5f}".format(np.mean(g_losses[-minibatch_size:])))
|
||
|
fig, ax = plt.subplots()
|
||
|
plt.plot(d_losses, label='Discriminator', alpha=0.6)
|
||
|
plt.plot(g_losses, label='Generator', alpha=0.6)
|
||
|
plt.title("Losses")
|
||
|
plt.legend()
|
||
|
plt.savefig(OUTPUT_DIR + "losses_" + str(epoch) + ".png")
|
||
|
test(sess, input_z, data_shape[3], epoch)
|
||
|
plt.close('all')
|
||
|
|
||
|
def get_batches(data):
|
||
|
batches = []
|
||
|
for i in range(int(data.shape[0]//BATCH_SIZE)):
|
||
|
batch = data[i * BATCH_SIZE:(i + 1) * BATCH_SIZE]
|
||
|
augmented_images = []
|
||
|
for img in batch:
|
||
|
image = Image.fromarray(img)
|
||
|
if random.choice([True, False]):
|
||
|
image = image.transpose(Image.FLIP_LEFT_RIGHT)
|
||
|
augmented_images.append(np.asarray(image))
|
||
|
batch = np.asarray(augmented_images)
|
||
|
normalized_batch = (batch / 127.5) - 1.0
|
||
|
batches.append(normalized_batch)
|
||
|
return batches
|
||
|
|
||
|
def train(get_batches, data_shape, checkpoint_to_load=None):
|
||
|
input_images, input_z, lr_G, lr_D = model_inputs(data_shape[1:], NOISE_SIZE)
|
||
|
d_loss, g_loss = model_loss(input_images, input_z, data_shape[3])
|
||
|
d_opt, g_opt = model_optimizers(d_loss, g_loss)
|
||
|
|
||
|
with tf.Session() as sess:
|
||
|
sess.run(tf.global_variables_initializer())
|
||
|
epoch = 0
|
||
|
iteration = 0
|
||
|
d_losses = []
|
||
|
g_losses = []
|
||
|
|
||
|
for epoch in range(EPOCHS):
|
||
|
epoch += 1
|
||
|
start_time = time.time()
|
||
|
|
||
|
for batch_images in get_batches:
|
||
|
iteration += 1
|
||
|
batch_z = np.random.uniform(-1, 1, size=(BATCH_SIZE, NOISE_SIZE))
|
||
|
_ = sess.run(d_opt, feed_dict={input_images: batch_images, input_z: batch_z, lr_D: LR_D})
|
||
|
_ = sess.run(g_opt, feed_dict={input_images: batch_images, input_z: batch_z, lr_G: LR_G})
|
||
|
d_losses.append(d_loss.eval({input_z: batch_z, input_images: batch_images}))
|
||
|
g_losses.append(g_loss.eval({input_z: batch_z}))
|
||
|
|
||
|
summarize_epoch(epoch, time.time()-start_time, sess, d_losses, g_losses, input_z, data_shape)
|
||
|
|
||
|
# Paths
|
||
|
INPUT_DATA_DIR = r"D:\ML_Datasets\Anime_Faces_Dataset\data" # Path to the folder with input images. For more info check simspons_dataset.txt
|
||
|
OUTPUT_DIR = "C:/Users/Harry/source/repos/PracticeGit/output/"
|
||
|
if not os.path.exists(OUTPUT_DIR):
|
||
|
os.makedirs(OUTPUT_DIR)
|
||
|
|
||
|
# Hyperparameters
|
||
|
IMAGE_SIZE = 128
|
||
|
NOISE_SIZE = 100
|
||
|
LR_D = 0.00004
|
||
|
LR_G = 0.0004
|
||
|
BATCH_SIZE = 64
|
||
|
EPOCHS = 10000 # For better results increase this value
|
||
|
BETA1 = 0.5
|
||
|
WEIGHT_INIT_STDDEV = 0.02
|
||
|
EPSILON = 0.00005
|
||
|
SAMPLES_TO_SHOW = 5
|
||
|
|
||
|
images = []
|
||
|
for root, dirs, files in os.walk(INPUT_DATA_DIR):
|
||
|
for file in files:
|
||
|
images.append(os.path.join(root, file))
|
||
|
|
||
|
# Training
|
||
|
input_images = np.asarray([np.asarray(Image.open(file).resize([IMAGE_SIZE, IMAGE_SIZE])) for file in images])
|
||
|
print ("Input: " + str(input_images.shape))
|
||
|
|
||
|
np.random.shuffle(input_images)
|
||
|
|
||
|
sample_images = random.sample(list(input_images), SAMPLES_TO_SHOW)
|
||
|
show_samples(sample_images, OUTPUT_DIR + "inputs", 0)
|
||
|
|
||
|
with tf.Graph().as_default():
|
||
|
train(get_batches(input_images), input_images.shape)
|