libinggen 发表于 2017-07-09 23:06

学习笔记TF025:自编码器

传统机器学习依赖良好的特征工程。深度学习解决有效特征难人工提取问题。无监督学习,不需要标注数据,学习数据内容组织形式,提取频繁出现特征,逐层抽象,从简单到复杂,从微观到宏观。

稀疏编码(Sparse Coding),基本结构组合。自编码器(AutoEncoder),用自身高阶特征编码自己。期望输入/输出一致,使用高阶特征重构自己。

Hinton教授在Science发表文章《Reducing the dimensionality of data with neural networks》,讲解自编码器降维数据。基于深度信念网络(Deep Belief Networks,DBN,多层 RBN堆叠),无监督逐层训练贪心算法,逐层提取特征,极深网络权重初始化到较好位置,辅助监督训练。

1、限制中间隐含层节点数量,降维。中间隐含层权重加L1正则,根据惩罚系数控制隐含节点稀疏程度,惩罚系数越大,学到特征组合越稀疏,实际使用(非零权重)特征数量越少。
2、数据加入噪声,Denoising AutoEncoder(去噪自编码器),从噪声学习数据特征。学习数据频繁出现模式结构,略去无规律噪声。加性高斯噪声(Additive Gaussian Noise,AGN)。Nasking Noise,随机遮挡噪声,部分图像像素置0,从其他像素结构推测被遮挡像素。

一隐含层自动编码器类似主成分分析(PCA)。Hinton DBN 多个隐含层,每个隐含层是限制性玻尔兹曼机 RBM(Restricted Bltzman Machine,特殊连接分布神经网络)。先对每两层无监督预训练(pre-training),多层自编码器,网络权重初始化到理想分布。通过反向传播算法调整模型权重,使用经过标注信息做监督分类训练。解决梯度弥散(Gradient vanishment)。

去噪自编码器使用范围最广最通用。无噪声自编码器,去掉噪声,保证隐含层节点小于输入层节点。Masking Noise自编码器,高斯噪声改为随机遮挡噪声。Variational AutoEncoder(VAE),对中间节点分布强假设,有额外损失项,用特殊SGVB(Stochastic Gradient Variational Bayes)算
法训练。

先导入常用库NumPy,Scikit-lean preprocessing模块(数据预处理、标准化)。MNIST数据加载模块。

Xavier initialization 参数初始化方法,根据网络输入、输出节点数量自动调整最合适分布。Xaiver Glorot和Yoshua Bengio提出,深度学习模型权重初始化太小,信号在每层传递逐渐缩小,失效;初始化太大,逐渐放大,发散、失效。Xavier初始化器,权重满足0均值,方法为2/(nin+nout),分布用均匀分布或高斯分布。tf.random_uniform创建均匀分布,fan_in输入节点数量,fan_out输出节点数量。

去噪自编码类,初始化函数__init__()。输入,n_input(输入变量数),n_hidden(隐含层节点数),transfer_function(隐含层激活函数,默认softplus),optimizer(优化器,默认Adam),scale(高斯噪声系数,默认0.1)。

网络结构,输入x创建维度n_input placeholder。建立提取特征隐含层,输入x加噪声self.x+scale*tf.random_normal((n_input,)),tf.matmul相乘噪声输入和隐含层权重,tf.add加隐含偏置,self.transfer结果激活函数。输出层数据复原、重建,建立reconstruction层,隐含层输出self.hidden乘输出层权重,加输出层偏置。

自编码器损失函数,平方误差(Squared Error)作cost,tf.subtract计算输出(self.reconstruction)与输入(self.x)差,tf.pow求差平方,tf.reduce_sum求和。优化器self.optimizer优化损失self.cost。创建Session,初始化自编码器全部模型参数。

参数初始化函数_initialize_weights,创建all_weights字典dict,权重、偏置存入,返回all_weights。隐含层权重用xavier_init函数初始化,传入输入节点数、隐含层节点数,xavier返回适合softplus激活函数权重初始分布。隐含层偏置用tf.zeros全部置0。输出层self.reconstruction,权重、偏置都置为0。

函数partial_fit用batch数据训练返回当前损失cost。Session执行损失cost、训练过程optimizer计算图节点,输入feed_dict包括输入数据x,噪声系数scale。

求损失cost函数calc_total_cost,Session执行计算图节点self.cost,传入输入数据x,噪声系数scale,在测试集评测模型性能。

Transform函数,返回自编码器隐含层输出结果。提供获取抽象特征,自编码器隐含层最主要功能是学习出数据高阶特征。

Generate函数,隐含层输出结果为输入,复原重建层提取高阶特征为原始数据。

Reconstruct函数,整体运行复原过程,提取高阶特征,再复原数据,transform和generate,输入原始数据,输出复原数据。

GetWeights函数获取隐含层权重。

GetBiases函数获取隐含层偏置系数。

TensorFlow读取示例数据函数输入MNIST数据集。

定义训练、测试数据标准化处理函数。标准化,数据变成0均值且标准差1的分布。先减去均值,再除以标准差。sklearn.preprossing StandardScaler 类,训练集上fit,在训练数据和测试数据上必须用相同的Scaler。

获取随机block数据函数,取从0到len(data)-batch_size随机整数,作为block起始位置,顺序取batch size数据,不放回抽样。

用standard_scale函数标准化交换训练集和测试集。

定义总训练样本数,最大训练轮数(epoch)20,batch_size 128,每隔一轮(epoch)显示一次损失cost。

创建AGN自编码器实例,定义模型输入节点数n_input 784,自编码器隐含节点数n_hidden 200,隐含层激活函数transfer_function softplus,优化器optimizer Adam,学习速率 0.001,噪声系数scale 0.001。

每一轮循环开始,平均损失avg_cost设0,计算总共需要batch数(样本总数除batch大小),不放回抽样,不能保证每个样本都被抽到参与训练。每个batch循环,用get_random_block_from_data函数随机抽取block数据,用partial_fit训练batch数据,计算当前cost,整合到avg_cost。每轮迭代显示当前迭代数和本轮平均cost。

性能测试,用cal_total_cost测试测试集X_test,评价指标平方误差。

import numpy as np
    import sklearn.preprocessing as prep
    import tensorflow as tf
    from tensorflow.examples.tutorials.mnist import input_data
    def xavier_init(fan_in, fan_out, constant = 1):
      low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
      high = constant * np.sqrt(6.0 / (fan_in + fan_out))
      return tf.random_uniform((fan_in, fan_out),
                           minval = low, maxval = high,
                           dtype = tf.float32)
    class AdditiveGaussianNoiseAutoencoder(object):
      def __init__(self, n_input, n_hidden, transfer_function = tf.nn.softplus, optimizer = tf.train.AdamOptimizer(),
               scale = 0.1):
            self.n_input = n_input
            self.n_hidden = n_hidden
            self.transfer = transfer_function
            self.scale = tf.placeholder(tf.float32)
            self.training_scale = scale
            network_weights = self._initialize_weights()
            self.weights = network_weights
            # model
            self.x = tf.placeholder(tf.float32, )
            self.hidden = self.transfer(tf.add(tf.matmul(self.x + scale * tf.random_normal((n_input,)),
                self.weights['w1']),
                self.weights['b1']))
            self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2'])
            # cost
            self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0))
            self.optimizer = optimizer.minimize(self.cost)
            init = tf.global_variables_initializer()
            self.sess = tf.Session()
            self.sess.run(init)
      def _initialize_weights(self):
            all_weights = dict()
            all_weights['w1'] = tf.Variable(xavier_init(self.n_input, self.n_hidden))
            all_weights['b1'] = tf.Variable(tf.zeros(, dtype = tf.float32))
            all_weights['w2'] = tf.Variable(tf.zeros(, dtype = tf.float32))
            all_weights['b2'] = tf.Variable(tf.zeros(, dtype = tf.float32))
            return all_weights
      def partial_fit(self, X):
            cost, opt = self.sess.run((self.cost, self.optimizer), feed_dict = {self.x: X,
                                                                            self.scale: self.training_scale
                                                                            })
            return cost
      def calc_total_cost(self, X):
            return self.sess.run(self.cost, feed_dict = {self.x: X,
                                                   self.scale: self.training_scale
                                                   })
      def transform(self, X):
            return self.sess.run(self.hidden, feed_dict = {self.x: X,
                                                       self.scale: self.training_scale
                                                       })
      def generate(self, hidden = None):
            if hidden is None:
                hidden = np.random.normal(size = self.weights["b1"])
            return self.sess.run(self.reconstruction, feed_dict = {self.hidden: hidden})
      def reconstruct(self, X):
            return self.sess.run(self.reconstruction, feed_dict = {self.x: X,
                                                               self.scale: self.training_scale
                                                               })
      def getWeights(self):
            return self.sess.run(self.weights['w1'])
      def getBiases(self):
            return self.sess.run(self.weights['b1'])
         
    mnist = input_data.read_data_sets('MNIST_data', one_hot = True)
    def standard_scale(X_train, X_test):
      preprocessor = prep.StandardScaler().fit(X_train)
      X_train = preprocessor.transform(X_train)
      X_test = preprocessor.transform(X_test)
      return X_train, X_test
    def get_random_block_from_data(data, batch_size):
      start_index = np.random.randint(0, len(data) - batch_size)
      return data
    X_train, X_test = standard_scale(mnist.train.images, mnist.test.images)
    n_samples = int(mnist.train.num_examples)
    training_epochs = 20
    batch_size = 128
    display_step = 1
    autoencoder = AdditiveGaussianNoiseAutoencoder(n_input = 784,
                                             n_hidden = 200,
                                             transfer_function = tf.nn.softplus,
                                             optimizer = tf.train.AdamOptimizer(learning_rate = 0.001),
                                             scale = 0.01)
    for epoch in range(training_epochs):
      avg_cost = 0.
      total_batch = int(n_samples / batch_size)
      # Loop over all batches
      for i in range(total_batch):
            batch_xs = get_random_block_from_data(X_train, batch_size)
            # Fit training using batch data
            cost = autoencoder.partial_fit(batch_xs)
            # Compute average loss
            avg_cost += cost / n_samples * batch_size
      # Display logs per epoch step
      if epoch % display_step == 0:
            print("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(avg_cost))
    print("Total cost: " + str(autoencoder.calc_total_cost(X_test)))

参考资料:
《TensorFlow实践》

欢迎付费咨询(150元每小时),我的微信:qingxingfengzi

页: [1]
查看完整版本: 学习笔记TF025:自编码器