aidiary
1/20/2016 - 7:33 AM

Theanoによる積層自己符号化器の実装

Theanoによる積層自己符号化器の実装

#coding: utf-8
import os
import timeit

import numpy as np

import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

from logistic_sgd import LogisticRegression, load_data
from mlp import HiddenLayer
from denoising_autoencoder import DenoisingAutoencoder

class StackedDenoisingAutoencoder(object):
    def __init__(self,
                 numpy_rng,
                 n_ins,
                 hidden_layers_sizes,
                 n_outs,
                 corruption_levels):

        # 隠れ層オブジェクトのリスト
        self.hidden_layers = []

        # 自己符号化器のリスト
        self.autoencoder_layers = []

        # パラメータのリスト
        self.params = []

        # 隠れ層の数
        self.n_layers = len(hidden_layers_sizes)

        assert self.n_layers > 0

        theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

        # 学習データのミニバッチ(入力データと正解ラベル)を表すシンボル
        # これまでの実装と違って複数のメソッド内で使うので属性にしている
        self.x = T.matrix('x')
        self.y = T.ivector('y')

        # ネットワークを構築
        # 隠れ層の数だけループして積み上げていく
        for i in xrange(self.n_layers):
            # ユニット数
            if i == 0:
                input_size = n_ins
            else:
                input_size = hidden_layers_sizes[i - 1]

            # 隠れ層への入力データ
            if i == 0:
                layer_input = self.x
            else:
                layer_input = self.hidden_layers[-1].output

            # 多層パーセプトロンの隠れ層
            # fine-tuningで重みを更新するため
            hidden_layer = HiddenLayer(rng=numpy_rng,
                                       input=layer_input,
                                       n_in=input_size,
                                       n_out=hidden_layers_sizes[i],
                                       activation=T.nnet.sigmoid)
            self.hidden_layers.append(hidden_layer)
            self.params.extend(hidden_layer.params)

            # 自己符号化器だが重みは多層パーセプトロンの隠れ層と共有
            # そのため自己符号化器のparamsはない
            # 自己符号化器で重みとバイアスの初期値を求めたあとfine-tuningでそのまま重みとバイアスを引き継げる
            autoencoder_layer = DenoisingAutoencoder(numpy_rng=numpy_rng,
                                                     theano_rng=theano_rng,
                                                     input=layer_input,
                                                     n_visible=input_size,
                                                     n_hidden=hidden_layers_sizes[i],
                                                     W=hidden_layer.W,      # 隠れ層の重みを共有
                                                     bhid=hidden_layer.b)   # 隠れ層のバイアスを共有
            self.autoencoder_layers.append(autoencoder_layer)

        # MNISTの分類ができるように最後にロジスティック回帰層を追加
        self.log_layer = LogisticRegression(
                            input=self.hidden_layers[-1].output,
                            n_in=hidden_layers_sizes[-1],
                            n_out=n_outs)
        self.params.extend(self.log_layer.params)

        # fine-tuning時のコスト関数を計算するシンボル
        # 多層パーセプトロンと同じく負の対数尤度
        self.finetune_cost = self.log_layer.negative_log_likelihood(self.y)

        # 分類の誤差率を計算するシンボル
        self.errors = self.log_layer.errors(self.y)

    def pretraining_functions(self, train_set_x, batch_size):
        """自己符号化器を学習するpre-training用の関数リストを返す
        教師なし学習なのでxのみを渡す"""
        # 学習に使うミニバッチのインデックス
        index = T.lscalar('index')

        # 複数の自己符号化器で異なる値を指定できるようにシンボル化する
        corruption_level = T.scalar('corruption')
        learning_rate = T.scalar('lr')

        batch_begin = index * batch_size
        batch_end = batch_begin + batch_size

        # 自己符号化器を学習する関数を生成
        # 入力層に近い方から順番に追加する
        pretrain_functions = []
        for autoencoder in self.autoencoder_layers:
            # 誤差と更新式を計算するシンボルを取得
            cost, updates = autoencoder.get_cost_updates(corruption_level, learning_rate)
            fn = theano.function(
                inputs=[
                    index,
                    # Paramにした引数を関数呼び出し時に与えるときはPython変数名ではなく、
                    # Tensorの引数の名前(corruption, lr)で指定できる
                    theano.Param(corruption_level, default=0.2),
                    theano.Param(learning_rate, default=0.1)
                ],
                outputs=cost,
                updates=updates,
                givens={
                    self.x: train_set_x[batch_begin:batch_end]
                }
            )
            pretrain_functions.append(fn)

        return pretrain_functions

    def build_finetune_functions(self, datasets, batch_size, learning_rate):
        """fine-tuning用の関数を返す"""
        train_set_x, train_set_y = datasets[0]
        valid_set_x, valid_set_y = datasets[1]
        test_set_x, test_set_y = datasets[2]

        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size
        n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size

        index = T.lscalar('index')

        gparams = T.grad(self.finetune_cost, self.params)

        updates = [
            (param, param - gparam * learning_rate) for param, gparam in zip(self.params, gparams)
        ]

        train_model = theano.function(
            inputs=[index],
            outputs=self.finetune_cost,
            updates=updates,
            givens={
                self.x: train_set_x[index * batch_size: (index + 1) * batch_size],
                self.y: train_set_y[index * batch_size: (index + 1) * batch_size]
            },
            name='train')

        test_score_i = theano.function(
            inputs=[index],
            outputs=self.errors,
            givens={
                self.x: test_set_x[index * batch_size: (index + 1) * batch_size],
                self.y: test_set_y[index * batch_size: (index + 1) * batch_size]
            },
            name='test')

        valid_score_i = theano.function(
            inputs=[index],
            outputs=self.errors,
            givens={
                self.x: valid_set_x[index * batch_size: (index + 1) * batch_size],
                self.y: valid_set_y[index * batch_size: (index + 1) * batch_size]
            },
            name='validate')

        def valid_score():
            """各ミニバッチのvalid誤差のリストを返す"""
            return [valid_score_i(i) for i in xrange(n_valid_batches)]

        def test_score():
            """各ミニバッチのtest誤差のリストを返す"""
            return [test_score_i(i) for i in xrange(n_test_batches)]

        return train_model, valid_score, test_score

def test_stacked_autoencoder(finetune_lr=0.1, pretraining_epochs=15,
                             pretrain_lr=0.001, training_epochs=200,
                             dataset='mnist.pkl.gz', batch_size=1,
                             hidden_layers_sizes=[1000, 1000, 1000],
                             corruption_levels=[0.1, 0.2, 0.3],
                             valerr_file='validation_error.txt',
                             testerr_file='test_error.txt'):
    datasets = load_data(dataset)
    train_set_x = datasets[0][0]
    n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
    numpy_rng = np.random.RandomState(89677)

    print "building the model ..."

    sda = StackedDenoisingAutoencoder(
        numpy_rng,
        28 * 28,
        hidden_layers_sizes,
        10,
        corruption_levels)

    # Pre-training
    print "getting the pre-training functions ..."
    pretraining_functions = sda.pretraining_functions(train_set_x=train_set_x,
                                                      batch_size=batch_size)

    print "pre-training the model ..."
    start_time = timeit.default_timer()
    for i in xrange(sda.n_layers):
        # pre-trainingのエポック数は固定
        for epoch in xrange(pretraining_epochs):
            c = []
            for batch_index in xrange(n_train_batches):
                c.append(pretraining_functions[i](index=batch_index,
                                                  corruption=corruption_levels[i],
                                                  lr=pretrain_lr))
            print "Pre-training layer %i, epoch %d, cost %f" % (i, epoch, np.mean(c))

    end_time = timeit.default_timer()
    training_time = end_time - start_time

    print "The pretraining code for file %s ran for %.2fm" % (os.path.split(__file__)[1], training_time / 60.0)


    # Fine-tuning
    print "getting the fine-tuning functions ..."
    train_model, validate_model, test_model = sda.build_finetune_functions(
        datasets=datasets,
        batch_size=batch_size,
        learning_rate=finetune_lr
    )

    print "fine-tuning the model ..."

    # eary-stoppingのパラメータ
    patience = 10 * n_train_batches
    patience_increase = 2.0
    improvement_threshold = 0.995
    validation_frequency = min(n_train_batches, patience / 2)

    best_validation_loss = np.inf
    test_score = 0

    start_time = timeit.default_timer()

    epoch = 0
    done_looping = False

    fp1 = open(valerr_file, "w")
    fp2 = open(testerr_file, "w")

    while (epoch < training_epochs) and (not done_looping):
        epoch = epoch + 1
        for minibatch_index in xrange(n_train_batches):
            train_model(minibatch_index)
            iter = (epoch - 1) * n_train_batches + minibatch_index

            if (iter + 1) % validation_frequency == 0:
                validation_losses = validate_model()
                this_validation_loss = np.mean(validation_losses)
                print "epoch %i, minibatch %i/%i, validation error %f %%" % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100)
                fp1.write("%d\t%f\n" % (epoch, this_validation_loss * 100))

                if this_validation_loss < best_validation_loss:
                    if this_validation_loss < best_validation_loss * improvement_threshold:
                        # 十分改善したならまだ改善の余地があるためpatienceを上げてより多くループを回せるようにする
                        patience = max(patience, iter * patience_increase)
                        print "*** iter %d / patience %d" % (iter, patience)
                    best_validation_loss = this_validation_loss
                    best_iter = iter

                    test_losses = test_model()
                    test_score = np.mean(test_losses)
                    print "    epoch %i, minibatch %i/%i, test error of best model %f %%" % (epoch, minibatch_index + 1, n_train_batches, test_score * 100)
                    fp2.write("%d\t%f\n" % (epoch, test_score * 100))

            # patienceを超えたらループを終了
            if patience <= iter:
                done_looping = True
                break

    end_time = timeit.default_timer()
    training_time = (end_time - start_time)

    print "Optimization complete with the best validation score of %f %%, on iteration %i, with test performance %f %%" \
            % (best_validation_loss * 100.0, best_iter + 1, test_score * 100)
    print "The training code for file %s ran for %.2fm" % (os.path.split(__file__)[1], training_time / 60.0)

    fp1.close()
    fp2.close()

if __name__ == "__main__":
    test_stacked_autoencoder(hidden_layers_sizes=[1000, 1000, 1000],
                             corruption_levels=[0.1, 0.2, 0.3])