深度学习基础:图文并茂细节到位batch normalization原理和实践

关键字batch normalizationtensorflow批量归一化

bn简介

batch normalization批量归一化,目的是对神经网络的中间层的输出进行一次额外的处理,经过处理之后期望每一层的输出尽量都呈现出均值为0标准差是1的相同的分布上,从而保证每一层的输出稳定不会剧烈波动,从而有效降低模型的训练难度快速收敛,同时对大学习率的容忍度增强,避免了大学习率的梯度爆炸问题,因此配合大学习率能加快收敛,跳出不好的局部极值。


原理概括

bn的实现方法是:针对一个批次的数据,对网络的隐藏层(中间层)的输出做批量归因化操作,该操作包括两个部分:

操作如下图所示

bn操作两步走示意图

标准化很好理解,为啥要在标准化之后再把分布拉回来,那不是做了无用功吗,不是的。标准化破坏了数据的原始分布,可能导致输入给下游非线性函数比如激活函数的时候产生负面效果,因此加入还原线性变换进行适度还原,所谓适度还原就是不用担心数据的原始分布被破坏导致影响网络训练的问题,因为还有个还原层,它的上限就是把标准化彻底还原成原始分布(w是标准差,b是均值)下限就是保留标准化,中间水平就是把标准化的结果稍微拉动一下,具体还原函数还原到什么程度完全由网络自行学习决定,相当于人为让分布标准化统一,也给模型留了个口子如果这个人为动作不合理就打退回去,等同于有自动审查机制的每层分布统一,在不影响模型学习的情况下尽量让分布统一

两个阶段的各自目的说明白之后还有几个重点问题没有解决:


图示计算过程

先看下公式

bn计算公式

公式和之前讲的多一个小e,通常是一个极小的数比如1e-3,目的防止分母为0。

深入理解bn图示

公式太抽象,画一下bn计算和应用的图示,如上图是一个(None, 3)的输入到一层全连接(3, 4)之后加入bn再到激活函数的数据流转情况


滑动平均估算整体均值标准差

模型训练好之后,需要计算训练集全集在各个bn层的均值标准差权重向量,实现方法是在训练过程中记录中间状态不断修正调整逼近结果,这样等模型训练完,这个结果也记录在网络变量中,在预测的时候直接调用即可。具体公式如下

mean_value = mean_value * decay + batch_mean * (1 - decay)
var_value = var_value * decay + batch_var * (1 - decay)

其中mean_value吸收了训练数据之后每一轮都会更新,初始值是0,同理var_value,他的初始值是1,decay推荐是0.9或者0.99,举例

a = [1, 2, 1.5, 2, 3, 3, 3.2, 1.2, 0.5, 0.8, 0.3, 2, 2.1, 2.2, 1.6] * 100
sum(a) / len(a)  # 1.759999999999992
mean_value = 0
for i in a:
    mean_value = mean_value * 0.99 + i * 0.01  # 1.755411190749514


理论上batch越多结果越接近真实,另外decay越大越稳定,decay越小新加入的batch mean占比重大波动越大,推荐0.9以上是求稳定,因此需要更多的batch,这样才能避免还没有毕竟真实就停止计算了,导致测试集的参考均值和方差不准。


tensorflow实现方法

推荐使用from tensorflow.contrib.layers.python.layers import batch_norm,传入需要bn的tensor,将是否是训练还是测试/预测也作为一个tensor传入进去,通过tf.cond+布尔标量实现逻辑判断,其中训练中batch_norm的参数is_training=True,预测is_training=False,另外测试时reuse=True,表示训练和预测网络中共享这个BN层,否则会出现两个BN层,预测时拿得是初始化的w=0和b=1,导致预测集效果出问题

def batch_norm_layer(value, is_training, scope):
    def batch_statistics():
        return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=True, scope=scope)

    def population_statistics():
        return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=False, reuse=True, scope=scope)

    return tf.cond(is_training, batch_statistics, population_statistics)

tf.cond输入一个布尔tensor,一个true返回函数,一个false返回函数,举例

a = tf.convert_to_tensor([1, 2, 3])
b = tf.constant(3)
c = tf.constant(False)
result = tf.cond(c, lambda: tf.add(a, a), lambda: tf.square(b))

with tf.Session() as sess:
    res = sess.run(result)
    print(res)  # 9,如果c是True返回[2 4 6]

除此之外还需要以下八股文代码

update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
    # loss的例子
    self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)

这段代表表示训练过程中记录的滑动平均均值和标准差这个操作存储在tf.GraphKeys.UPDATE_OPS中,每次进行一次loss计算需要也计算一遍,希望在计算loss之前也把滑动平均也计算一边因此采用tf.control_dependencies,表示with下文的内容必须在control_dependencies后面的条件完成之后才能运行。如果不加这个网络仅仅计算loss操作,中间滑动平均根本不计算,这不影响训练因为训练用不到,但是几乎彻底摧毁预测,导致预测的参考均值标准差都是初始值。另一种就是把update_ops拿出来和train_step合并成一个最终的训练操作

update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
if update_ops:
    train_ops = [train_step] + update_ops
    train_op_final = tf.group(*train_ops)

这种不区分执行的前后顺序了,好像也可以。



代码实战

以下代码测试一下batch_norm在一个网络中的应用,分别对比有bn和没有bn的网络训练情况

import os
import time
import shutil
import pickle
import tensorflow as tf

from tensorflow.contrib.layers.python.layers import batch_norm
from tensorflow.python.saved_model import tag_constants

from preprocessing import get_batch


def batch_norm_layer(value, is_training, scope):
    def batch_statistics():
        return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=True, scope=scope)

    def population_statistics():
        return batch_norm(value, decay=0.9, updates_collections=tf.GraphKeys.UPDATE_OPS, is_training=False, reuse=True, scope=scope)

    return tf.cond(is_training, batch_statistics, population_statistics)


class Model(object):
    def __init__(self, num_class, feature_size, learning_rate=0.5, weight_decay=0.01, decay_learning_rate=0.99):
        self.input_x = tf.placeholder(tf.float32, [None, feature_size], name="input_x")
        self.input_y = tf.placeholder(tf.float32, [None, num_class], name="input_y")
        self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")
        self.batch_normalization = tf.placeholder(tf.bool, name="batch_normalization")
        self.global_step = tf.Variable(0, name="global_step", trainable=False)

        with tf.name_scope('layer_1'):
            dense_out_1 = tf.layers.dense(self.input_x, 64)
            # add
            dense_out_1 = batch_norm_layer(dense_out_1, is_training=self.batch_normalization, scope="bn1")
            dense_out_act_1 = tf.nn.relu(dense_out_1)

        with tf.name_scope('layer_2'):
            dense_out_2 = tf.layers.dense(dense_out_act_1, 32)
            # add
            dense_out_2 = batch_norm_layer(dense_out_2, is_training=self.batch_normalization, scope="bn2")
            dense_out_act_2 = tf.nn.relu(dense_out_2)

        with tf.name_scope('layer_out'):
            self.output = tf.layers.dense(dense_out_act_2, 2)
            self.probs = tf.nn.softmax(self.output, dim=1, name="probs")

        with tf.name_scope('loss'):
            self.loss = tf.reduce_mean(
                tf.nn.softmax_cross_entropy_with_logits_v2(logits=self.output, labels=self.input_y))
            vars = tf.trainable_variables()
            loss_l2 = tf.add_n([tf.nn.l2_loss(v) for v in vars if
                                v.name not in ['bias', 'gamma', 'b', 'g', 'beta']]) * weight_decay
            self.loss += loss_l2

        with tf.name_scope("optimizer"):
            if decay_learning_rate:
                learning_rate = tf.train.exponential_decay(learning_rate, self.global_step, 100, decay_learning_rate)
            optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
            update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
            with tf.control_dependencies(update_ops):
                self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)
            # update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
            # if update_ops:
            #     train_ops = [self.train_step] + update_ops
            #     self.train_op_final = tf.group(*train_ops)

        with tf.name_scope("metrics"):
            self.accuracy = tf.reduce_mean(
                tf.cast(tf.equal(tf.arg_max(self.probs, 1), tf.arg_max(self.input_y, 1)), dtype=tf.float32))


if __name__ == '__main__':
    train_x, train_y = pickle.load(
        open("/home/myproject/BatchNormalizationTest/batch_normalization_test/data/train.pkl", "rb"))
    test_x, test_y = pickle.load(
        open("/home/myproject/BatchNormalizationTest/batch_normalization_test/data/test.pkl", "rb"))

    tf.reset_default_graph()
    model = Model(num_class=2, feature_size=15, weight_decay=0)
    saver = tf.train.Saver(tf.global_variables(), max_to_keep=1)

    # all_variable = tf.global_variables()
    # for variable in all_variable:
    #     if "moving" in variable.name:
    #         print(variable.name, variable.eval())
    BASIC_PATH = "/home/myproject/BatchNormalizationTest/batch_normalization_test"
    with tf.Session() as sess:
        init_op = tf.group(tf.global_variables_initializer())
        sess.run(init_op)

        train_batch = get_batch(10, 64, train_x, train_y)
        train_loss_list = []
        train_step_cnt = []
        acc_list = []
        val_feed_dict = {model.input_x: test_x, model.input_y: test_y, model.dropout_keep_prob: 1,
                         model.batch_normalization: False}
        for batch in train_batch:
            epoch, batch_x, batch_y = batch
            feed_dict = {model.input_x: batch_x, model.input_y: batch_y, model.dropout_keep_prob: 1,
                         model.batch_normalization: True}
            _, step, loss_train = sess.run([model.train_step, model.global_step, model.loss], feed_dict=feed_dict)
            train_loss_list.append(loss_train)
            train_step_cnt.append(step)
            if step % 10 == 0:
                print("epoch:", epoch + 1, "step:", step, "loss:", loss_train)
                # ckpt
                saver.save(sess, os.path.join(BASIC_PATH, "./ckpt2/ckpt"))

            if step % 50 == 0:
                loss_val, acc_val, probs = sess.run([model.loss, model.accuracy, model.probs], feed_dict=val_feed_dict)
                print("{:-^30}".format("evaluation"))
                print("[evaluation]", "loss:", loss_val, "acc", acc_val)

        loss_val, acc_val, probs = sess.run([model.loss, model.accuracy, model.probs], feed_dict=val_feed_dict)
        print("{:-^30}".format("evaluation"))
        print("[evaluation]", "loss:", loss_val, "acc", acc_val)

    import matplotlib.pyplot as plt

    plt.plot(train_step_cnt, train_loss_list)
    plt.ylim([0.25, 2])
    plt.show()

    # save
    pb_num = str(int(time.time()))
    pb_path = os.path.join(BASIC_PATH, "./tfserving2", pb_num)
    shutil.rmtree(pb_path, ignore_errors=True)
    tf.reset_default_graph()
    with tf.Session() as sess:
        last_ckpt = tf.train.latest_checkpoint(os.path.join(BASIC_PATH, "./ckpt2"))
        print("读取ckpt: {}".format(last_ckpt))
        saver = tf.train.import_meta_graph("{}.meta".format(last_ckpt))
        saver.restore(sess, last_ckpt)
        graph = tf.get_default_graph()
        # get tensor
        input_x = graph.get_tensor_by_name("input_x:0")
        dropout_keep_prob = graph.get_tensor_by_name("dropout_keep_prob:0")
        batch_norm_is_train = graph.get_tensor_by_name("batch_normalization:0")
        pred = graph.get_tensor_by_name("layer_out/probs:0")
        builder = tf.saved_model.builder.SavedModelBuilder(pb_path)
        inputs = {'input_x': tf.saved_model.utils.build_tensor_info(input_x),
                  'dropout_keep_prob': tf.saved_model.utils.build_tensor_info(dropout_keep_prob),
                  'batch_norm': tf.saved_model.utils.build_tensor_info(batch_norm_is_train)
                  }
        outputs = {'output': tf.saved_model.utils.build_tensor_info(pred)}
        signature = tf.saved_model.signature_def_utils.build_signature_def(
            inputs=inputs,
            outputs=outputs,
            method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME)

        builder.add_meta_graph_and_variables(sess, [tag_constants.SERVING], {'my_signature': signature})
        builder.save()
    print("pb文件保存完成:", pb_num)

代码是两个全连接层,每层在线性之后加入bn,bn之后是relu,其中bn只使用均值偏移,不是用标准差,采用0.5的学习率运行结果如下

----------evaluation----------
[evaluation] loss: 0.48918334 acc 0.75525653
epoch: 10 step: 2210 loss: 0.38185906
epoch: 10 step: 2220 loss: 0.47259575
epoch: 10 step: 2230 loss: 0.62081766
epoch: 10 step: 2240 loss: 0.5432115
----------evaluation----------
[evaluation] loss: 0.49052832 acc 0.76843286

同样采用0.5的学习率,去除两个bn层之后训练结果如下

----------evaluation----------
[evaluation] loss: 0.6970511 acc 0.50602746
epoch: 10 step: 2210 loss: 0.71779585
epoch: 10 step: 2220 loss: 0.7110723
epoch: 10 step: 2230 loss: 0.72440666
epoch: 10 step: 2240 loss: 0.7036286
----------evaluation----------
[evaluation] loss: 0.6983369 acc 0.50602746

对比下有bn和没有bn在采用一个比较大的学习率的时候训练阶段网络的收敛情况,先看全部训练step,明显发现最左侧快速收敛阶段使用bn比不是用bn更薄,后期不是用bn loss直接起飞,估计梯度爆炸了

收敛对比

再看前几轮快速收敛阶段,不是用bn前200轮还没有收敛到0.6以下,使用bn已经收敛到最好0.4维持在0.5左右

收敛对比2

如果学习率回到正常比如0.01,两个网络的效果没有明显区别


预测阶段和验证一样,设置一个布尔占位符给道False即可

import os
import pickle

from sklearn.metrics import accuracy_score
import tensorflow as tf
from tensorflow.python.saved_model import tag_constants

BASIC_PATH = "/home/myproject/BatchNormalizationTest/batch_normalization_test"


def predict_pb(input_x_value, pb_file_no=None):
    """从pb导入模型"""
    max_time = pb_file_no
    if max_time is None:
        max_time = max(os.listdir(os.path.join(BASIC_PATH, "./tfserving2")))
    # max_time = "1672132226"
    print("读取pb版本:", max_time)
    with tf.Session(graph=tf.Graph()) as sess:
        tf.saved_model.loader.load(sess, [tag_constants.SERVING], os.path.join(BASIC_PATH, "./tfserving2", max_time))
        graph = tf.get_default_graph()
        input_x = graph.get_operation_by_name("input_x").outputs[0]
        dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
        batch_norm_is_train = graph.get_operation_by_name("batch_normalization").outputs[0]
        probs = graph.get_tensor_by_name("layer_out/probs:0")

        pred = sess.run(probs, feed_dict={input_x: input_x_value, dropout_keep_prob: 1.0, batch_norm_is_train: False})

    return pred


if __name__ == '__main__':
    test_x, test_y = pickle.load(
        open("/home/myproject/BatchNormalizationTest/batch_normalization_test/data/test.pkl", "rb"))
    pred = predict_pb(test_x).tolist()
    pred = [0 if x[1] < 0.5 else 1 for x in pred]
    test_y = [0 if x == [1, 0] else 1 for x in test_y]
    print(accuracy_score(test_y, pred))

参考文章:

batchnorm BN无法更新保存参数 moving_mean/variance_batch_norm保存失效_文草汇的三色堇的博客-CSDN博客

slim.batch_norm无法更新以及保存参数_DRACO于的博客-CSDN博客

batch normalization的原理和作用_理解Batch Normalization系列1——原理(清晰解释)_weixin_39627201的博客-CSDN博客

展开阅读全文

页面更新:2024-04-25

标签:神经元   中间层   向量   图文并茂   函数   深度   模型   细节   原理   平均   标准差   操作   基础   数据   网络

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top