はじめに
各ライブラリを利用した時のDCGANのコードを羅列しただけの記事です。 使い方の雰囲気だけでも比較したいときに見てください(基本的にただのメモです)。
それぞれについて要点だけを説明しておきます。
PyTorch
generator
PyTorchにはtorch.nn.Sequentialというクラスがあります。 この中にはtorch.nnモジュールで実装されているクラスを複数個渡すことができます。
nn.Sequentialは渡した順番で演算を実行することができるため、あたかもこれで1つの層かのように扱うことができ、 大きなモデルを作りたい場合に非常に便利です。
class Generator(nn.Module): def __init__(self): super(Generator, self).__init__() self.fc = nn.Sequential( nn.Linear(62, 1024), nn.BatchNorm1d(1024), nn.ReLU(), nn.Linear(1024, 128 * 7 * 7), nn.BatchNorm1d(128 * 7 * 7), nn.ReLU(), ) self.deconv = nn.Sequential( nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.ConvTranspose2d(64, 1, kernel_size=4, stride=2, padding=1), nn.Sigmoid(), ) def forward(self, input): x = self.fc(input) x = x.view(-1, 128, 7, 7) x = self.deconv(x) return x
nn.Sequentialを使うことで、forwardではたった2つの層の順伝播をを書くだけで済みます。
ただし、Linear層からConvTranspose2d層に渡すときにデータの形を変更しなければなりません。viewメソッドはnumpyのreshapeメソッドに相当します。
discriminator
こちらも同様。
class Discriminator(nn.Module): def __init__(self): super(Discriminator, self).__init__() self.conv = nn.Sequential( nn.Conv2d(1, 64, kernel_size=4, stride=2, padding=1), nn.LeakyReLU(0.2), nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), nn.BatchNorm2d(128), nn.LeakyReLU(0.2), ) self.fc = nn.Sequential( nn.Linear(128 * 7 * 7, 1024), nn.BatchNorm1d(1024), nn.LeakyReLU(0.2), nn.Linear(1024, 1), nn.Sigmoid(), ) def forward(self, input): x = self.conv(input) x = x.view(-1, 128 * 7 * 7) x = self.fc(x) return x
学習部分
PyTorchでは、モデル毎にgpuへの転送を明示しなければなりません。 しかし、単にcudaメソッドを用いればよく、特に難しいことはありません。
optimizerには、そのoptimizerに更新させたいパラメータを予め渡しておきます。 これによってGANのような交互に更新を行いたい場合や、あるいはそれよりも多くのモデルを学習させたい場合に非常に便利です。
G = Generator() D = Discriminator() if cuda: G.cuda() D.cuda() # optimizer G_optimizer = optim.Adam(G.parameters(), lr=lr, betas=(0.5, 0.999)) D_optimizer = optim.Adam(D.parameters(), lr=lr, betas=(0.5, 0.999)) # loss criterion = nn.BCELoss() data_loader = ## mnistなりFmnistなり def train(D, G, criterion, D_optimizer, G_optimizer, data_loader): D.train() G.train() y_real = Variable(torch.ones(batch_size, 1)) y_fake = Variable(torch.zeros(batch_size, 1)) if cuda: y_real = y_real.cuda() y_fake = y_fake.cuda() D_running_loss = 0 G_running_loss = 0 for batch_idx, (real_images, _) in enumerate(data_loader): if real_images.size()[0] != batch_size: break z = torch.rand((batch_size, z_dim)) if cuda: real_images, z = real_images.cuda(), z.cuda() real_images, z = Variable(real_images), Variable(z) D_optimizer.zero_grad() D_real = D(real_images) D_real_loss = criterion(D_real, y_real) z = torch.rand((batch_size, z_dim)) if cuda: z = z.cuda() z = Variable(z) fake_images = G(z) D_fake = D(fake_images.detach()) D_fake_loss = criterion(D_fake, y_fake) D_loss = D_real_loss + D_fake_loss D_loss.backward() D_optimizer.step() D_running_loss += D_loss.data[0] G_optimizer.zero_grad() z = torch.rand((batch_size, z_dim)) if cuda: z = z.cuda() z = Variable(z) fake_images = G(z) D_fake = D(fake_images) G_loss = criterion(D_fake, y_real) G_loss.backward() G_optimizer.step() G_running_loss += G_loss.data[0] D_running_loss /= len(data_loader) G_running_loss /= len(data_loader) return D_running_loss, G_running_loss for epoch in range(100): D_loss, G_loss = train(D, G, criterion, D_optimizer, G_optimizer, data_loader) print('epoch %d, D_loss: %.4f G_loss: %.4f' % (epoch + 1, D_loss, G_loss))
PyTorchでのDCGANの実装に関しては
に詳しい情報があります。
TensorFlow eager
TensorFlowのDefine by Runモードです。
generator
PyTorchにはnn.Moduleクラスにtrainメソッドとevalメソッドがあり、これらによってドロップアウトやバッチ正規化などの 検証時と訓練時で振る舞いの変わる層の制御が可能です。
いまのところこの機能はTFEagerモードには備わっていないため、そのようなベースとなるクラスを予め作っておきます。
これを継承したクラスを作って、self.trainingを使ってバッチ正規化の振る舞いを制御するようにします。 Sequentialクラスも存在しないので、ベタに順伝播を書かなければなりませんが、まあ読みづらいくらいでそれほど面倒ではありません。
もう1つポイントとしては、pytorchのTensor型には多くのメソッドが備わっていますが、 TensorFlowのTensor型は基本的に関数でゴリゴリ処理を施していくことになります。 ここが意外と()が入れ子になって書きづらい印象です。
class Base_model(tfe.Network): ''' 今のところtrainとevalを切り替える良い仕様は無いので 手動で切り替えるクラスを作っておきます。 ''' def __init__(self, *args): super(Base_model, self).__init__() self.training = True def train_mode(self): self.training = True def eval_mode(self): self.training = False class Generator(Base_model): ''' 潜在変数(ノイズ)を受け取って28*28の白黒画像へ バッチ正規化などはfunctionとして実装し、self.trainingを引数にする gpu利用時にはPytorchなどのように(batch, channel, h, w)の形式が良い ''' def __init__(self): super(Generator, self).__init__() self.fc1 = self.track_layer(tf.layers.Dense(1024)) self.fc2 = self.track_layer(tf.layers.Dense(128*7*7)) self.deconv1 = self.track_layer( tf.layers.Conv2DTranspose(128, [4, 4], [2, 2], "same", data_format='channels_first')) self.deconv2 = self.track_layer( tf.layers.Conv2DTranspose(1, [4, 4], [2, 2], "same", data_format='channels_first')) def call(self, x): x = tf.nn.relu(tf.layers.batch_normalization( self.fc1(x), training=self.training)) x = tf.nn.relu(tf.layers.batch_normalization( self.fc2(x), training=self.training)) x = tf.reshape(x, [-1, 128, 7, 7]) x = tf.nn.relu(tf.layers.batch_normalization( self.deconv1(x), training=self.training, axis=1)) x = tf.nn.sigmoid(self.deconv2(x)) return x
discriminator
class Discriminator(Base_model): ''' 画像を受け取ったら、それがfake画像がreal画像かを判別する 注意事項はGenerator同様 ''' def __init__(self): super(Discriminator, self).__init__() self.conv1 = self.track_layer( tf.layers.Conv2D(64, [4,4], [2,2], "same", data_format='channels_first')) self.conv2 = self.track_layer( tf.layers.Conv2D(128, [4,4], [2,2], "same", data_format='channels_first')) self.fc1 = self.track_layer(tf.layers.Dense(1024)) self.fc2 = self.track_layer(tf.layers.Dense(1)) def call(self, x): x = tf.nn.leaky_relu(tf.layers.batch_normalization( self.conv1(x), training=self.training, axis=1)) x = tf.nn.leaky_relu(tf.layers.batch_normalization( self.conv2(x), training=self.training, axis=1)) x = tf.layers.flatten(x) x = tf.nn.leaky_relu(tf.layers.batch_normalization( self.fc1(x), training=self.training)) x = self.fc2(x) return x
loss
def D_loss(real_outputs, fake_outputs): ''' DiscriminatorのlossはDiscriminatorが real画像に対してどのような判定を行ったかのlogitsと fake画像に対してどのような判定を行ったかのlogitsが あれば計算できる。 ''' loss_on_real = tf.losses.sigmoid_cross_entropy( multi_class_labels=tf.ones_like(real_outputs), logits=real_outputs, label_smoothing=0.25) loss_on_fake = tf.losses.sigmoid_cross_entropy( multi_class_labels=tf.zeros_like(fake_outputs), logits=fake_outputs) return loss_on_real + loss_on_fake def G_loss(fake_outputs): ''' GeneratorのlossはDiscriminatorが fake画像に対してどのような判定を行ったかのlogitsが あれば計算できる ''' loss = tf.losses.sigmoid_cross_entropy( multi_class_labels=tf.ones_like(fake_outputs), logits=fake_outputs) return loss
学習部分
1回のパラメータ更新をpythonの関数で書いておきます。
TFeagerではwith tf.device
で利用するハードを指定するのが良いと思われます。
Tensorに関してはgpu()
メソッドやcpu()
メソッドがありますが、tfe.Networkクラスにはこのようなメソッドがないため、
with tf.device
で一挙に処理したほうがやり忘れが無いように思います。
ここでは特にデバイスは指定せずに実装しておき、関数を実行するときに、関数をwith tf.device
で包むことにします。
正しい、大きいサイズの行列計算を必要としないような値を計算したい場合は、cpuに任せるようにしてもいいでしょう。
def train(G, D, G_optimizer, D_optimizer, G_loss, D_loss, data_loader, z_dim): # 訓練モードへ D.train_mode() G.train_mode() D_running_loss = 0 G_running_loss = 0 with tf.device("/cpu:0"): count = 0 for batch_idx, (real_images, _) in enumerate(tfe.Iterator(data_loader)): current_batchsize = real_images.shape[0] z = tf.random_uniform( shape=[current_batchsize, z_dim], minval=-1., maxval=1., seed=batch_idx) with tfe.GradientTape(persistent=True) as g: ''' tfe.GradientTapeクラスは dloss/dwの自動微分計算をしたい場合に g = tfe.GradientTape()として g(loss, w)とすればいい。 そのために必要なlossの計算を行う。 ''' fake_images = G(z) D_fake = D(fake_images) D_real = D(real_images) D_loss_value = D_loss(D_real, D_fake) D_running_loss += D_loss_value G_loss_value = G_loss(D_fake) G_running_loss += G_loss_value G_grad = g.gradient(G_loss_value, G.variables) D_grad = g.gradient(D_loss_value, D.variables) with tf.variable_scope('generator'): ''' optimizer.apply_gradientsには パラメータによる勾配と現在のパラメータの値をタプルで渡す。 ''' G_optimizer.apply_gradients( zip(G_grad, G.variables)) with tf.variable_scope('discriminator'): D_optimizer.apply_gradients( zip(D_grad, D.variables)) with tf.device("/cpu:0"): count += 1 D_running_loss /= count G_running_loss /= count return D_running_loss, G_running_loss
こちらの方で、利用するデバイスを指定します。
data_loader = ## with tf.device("/gpu:0"): D_loss_list = [] G_loss_list = [] for epoch in range(1, num_epochs+1): D_loss_value, G_loss_value = train(D=D, G=G, G_loss=G_loss, D_loss=D_loss, D_optimizer=D_optimizer, G_optimizer=G_optimizer, data_loader=ds_train, z_dim=z_dim) D_loss_list.append(D_loss_value.numpy()) G_loss_list.append(G_loss_value.numpy()) print('G_loss {}, D_loss {}'.format(G_loss_value.numpy(), D_loss_value.numpy()))
edward
エドワードではGANを学習させるためのAPIが提供されています。 なので学習コードを明示的に書く必要はありません。
placeholderとノイズ入力
まず、バッチデータのgeneratorを作ります。 これは、他のライブラリに対しても使えます。練習のため。
EdwardはTensorFlowのGraphモードのみ対応なので、プレースホルダーを使ってバッチデータとtrainingモードか否かをboolで送るようにします。
def generator(array, batch_size): start = 0 while True: stop = start + batch_size diff = stop - array.shape[0] if diff <= 0: batch = array[start:stop] start += batch_size else: batch = np.concatenate((array[start:], array[:diff])) start = diff batch = batch.astype(np.float32) / 255.0 # batch = np.random.binomial(1, batch) yield batch batch_size = 128 x_train_generator = generator(X_train, batch_size) x_ph = tf.placeholder(tf.float32, [batch_size, 1, 28, 28]) training = tf.placeholder(tf.bool)
generetor
ここは普通にTensorFlowのtf.layers
を使います。
generatorの方はインスタンスを生成しておくことに注意。
def g_net(eps): net = tf.layers.dense(eps, 1024, activation=tf.nn.relu) net = tf.layers.batch_normalization(net, axis=1, training=training) net = tf.layers.dense(net, 128*7*7, activation=tf.nn.relu) net = tf.layers.batch_normalization(net, axis=1, training=training) net = tf.reshape(net, [-1, 128, 7, 7]) net = tf.layers.conv2d_transpose(net, 128, [4, 4], [2, 2], "same", activation=None, data_format="channels_first") net = tf.layers.batch_normalization(net, axis=1, training=training) net = tf.nn.relu(net) net = tf.layers.conv2d_transpose(net, 1, [4, 4], [2, 2], "same", activation=tf.nn.sigmoid, data_format="channels_first") return net latent_dim = 100 with tf.variable_scope("g_net"): eps = Uniform(tf.zeros([batch_size, latent_dim])) x = g_net(eps)
discriminator
こちらはPython関数で書いておくだけで、インスタンスを生成しないようです。 このような作りになっている理由は謎…。
def d_net(x): net = tf.layers.conv2d(x, 64, [4, 4], [2, 2], "same", data_format="channels_first", activation=None) net = tf.layers.batch_normalization(net, axis=1, training=training) net = tf.nn.leaky_relu(net) net = tf.layers.conv2d(net, 128, [4, 4], [2, 2], "same", data_format="channels_first", activation=None) net = tf.layers.batch_normalization(net, axis=1, training=training) net = tf.nn.leaky_relu(net) net = tf.layers.flatten(net) net = tf.layers.dense(net, 256, activation=tf.nn.leaky_relu) net = tf.layers.dense(net, 1, activation=None) return net
学習部分
ed.GANInference
にdiscriminatorのpython関数を渡します。
inference = ed.GANInference( data={x: x_ph}, discriminator=d_net) optimizer = tf.train.AdamOptimizer(2e-4) optimizer_d = tf.train.AdamOptimizer(2e-4) inference.initialize( optimizer=optimizer, optimizer_d=optimizer_d, n_iter=45000, n_print=1000) sess = ed.get_session() tf.global_variables_initializer().run() idx = np.random.randint(batch_size, size=16) i = 0 for t in range(inference.n_iter): ## 画像保存用コード # if t % inference.n_print == 0: # samples = sess.run(x, {training: False}) # samples = samples[idx, ] # fig = plot(samples) # plt.savefig(os.path.join(out_dir, '{}.png').format( # str(i).zfill(3)), bbox_inches='tight') # plt.close(fig) # i += 1 x_batch = next(x_train_generator) info_dict = inference.update(feed_dict={x_ph: x_batch, training: True}) inference.print_progress(info_dict)