Hands-on Machine learning 之 TensorFLow入门


简介

TensorFlow是Google在2015年11月份开源的人工智能系统,是之前所开发的深度学习基础架构DistBelief的改进版本,该系统可以被用于语音识别、图像识别等多个领域。本文将介绍TensorFlow的基本概念和常见用法。


创建图并在会话中运行

一个简单的数据流图

1
2
3
4
5
6
7
# 导入tensorflow
import tensorflow as tf

# 定义变量x,y及计算节点f
x = tf.Variable(3, name='x')
y = tf.Variable(4, name='y')
f = x * x * y + y + 2

值得注意的是,上述代码并没有进行任何的运算,仅仅是创建了一个计算图(computation graph)而已。实际上,连变量都还没有被初始化。

为了对该计算图进行运算,我们必须创建一个会话(session)。然后在会话中初始化变量,以及计算f

1
2
3
4
5
6
sess = tf.Session() # 创建会话
sess.run(x.initializer) # 初始化变量x
sess.run(y.initializer) # 初始化变量y
result = sess.run(f) # 计算f
print(result) # 打印结果
>> 42
1
sess.close() # 关闭会话

重复使用sess.run(...)可能有点繁琐,有一个更好的方式是使用python的with语句。

with语句块开始时,创建的会话会成为计算图的默认会话。在语句块结束时,创建的会话也会自动结束。

1
2
3
4
5
# 创建一个会话并命名为sess
with tf.Session() as sess:
x.initializer.run() # 等同于 sess.run(x.initializer)
y.initializer.run() # 等同于 sess.run(y.initializer)
result = f.eval() # result = sess.run(f)

除了手动初始化每个变量,也可以使用global_variables_initializer()函数初始化所有变量。

1
2
3
4
5
6
# 注意,并没有立即将变量初始化,而是创建一个初始化节点
init = tf.global_variables_initializer()

with tf.Session() as sess:
init.run() # 在这里才真正初始化
result = f.eval()

管理计算图

所有创建的节点(node)都会自动添加进默认图中。

1
2
3
4
x1 = tf.Variable(1)
# 判断变量x1所在的图是否是默认图
x1.graph is tf.get_default_graph()
>> True

但是,有的时候需要管理多个独立的图。我们便可以创建一个临时的图,并在with语句块内将其设置为默认图。

1
2
3
4
5
graph = tf.Graph() # 创建图
# 在with内将graph设置为默认图
with graph.as_default():
# 此时创建的变量应该在图graph里
x2 = tf.Variable(2)
1
2
3
4
5
6
7
# 判断x2所在的图,是不是真的在graph里
x2.graph is graph
>> True

# 同样,判断x2是不是在全局的默认图里(显然不是的)
x2.graph is tf.get_default_graph()
>> False

若想要将默认图重置(删除图中所有的节点),可以使用tf.reset_default_graph()函数。


节点的生命周期

1
2
3
4
5
6
7
8
9
# 创建一个简单的图
w = tf.constant(3)
x = w + 2
y = x + 5
z = x * 3
# 在会话中计算图
with tf.Session() as sess:
print(y.eval()) # 10
print(z.eval()) # 15

在会话中,为了计算y,会自动检测出y依赖于x,而x又依赖于w。那么,将会依次计算wx,最后再计算y。而为了计算z,也同样会依次计算wx

注意在此过程中,wx的值并不会被重复使用!也就是说,上述代码总共对wx计算了两次(即使两次的结果都是一样的)。

因此,对于同一张图的多次运算,除了tf.Variable()变量外,其他节点的值在一次运行结束后都会被丢弃,不会被重复使用。

节点类型 生命周期
tf.Variable() 整个会话
others 会话的某次运行

为了高效地求得yz的值,可以在会话的一次运行内同时计算它们。

1
2
3
4
5
with tf.Session() as sess:
# 同时计算y和z,此时w和x只计算一次
y_val, z_val = sess.run([y, z])
print(y_val) # 10
print(z_val) # 15

使用TensorFlow进行线性回归

TensorFlow的操作(operations,记作ops)可以接收任意多个输入,可以产生任意多个输出。

比如additionmultiplication可以接受2个输入,产生1个输出。
而被成为源操作(source ops)的Constantsvariables,则没有输入。

其中,输入和输出都是多维数组,称之为张量(tensor)。


接下来,将使用sklearn加利福尼亚房屋数据来进行线性回归。

对于线性回归参数theta的拟合,将使用正规方程(Normal Equation)计算:$\theta = (X^T X)^{-1}X^T y$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import numpy as np
from sklearn.datasets import fetch_california_housing

# 载入sklearn自带的加利福尼亚房屋数据
housing = fetch_california_housing()
# 样本数及特征数
m, n = housing.data.shape
# 添加bias
housing_data_plus_bias = np.c_[np.ones((m, 1)), housing.data]

# 创建TensorFlow的常量节点X和y,分别用来存放样本和标签
X = tf.constant(housing_data_plus_bias, dtype=tf.float32, name='X')
y = tf.constant(housing.target.reshape(-1, 1), dtype=tf.float32, name='y')
# 计算X的转置
XT = tf.transpose(X)
# 使用正规方程计算theta
theta = tf.matmul(tf.matmul(tf.matrix_inverse(tf.matmul(XT, X)), XT), y)

# 再次注意,上述代码并没有进行实际的运算,只是在构建计算图而已

with tf.Session() as sess:
# 在会话中计算theta的值
theta_value = theta.eval()

实现梯度下降

接下来将使用批梯度下降(Batch Gradient Descent)方法来进行线性回归参数的拟合。

使用梯度下降方法一般要先对特征进行标准化(normalize,即减均值,除方差)

1
2
3
4
5
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaled_housing_data = scaler.fit_transform(housing.data)
scaled_housing_data_plus_bias = np.c_[np.ones((m, 1)), scaled_housing_data]

手动计算梯度

$\theta := \theta - \frac{\alpha}{m} X^{T} (X\theta - \vec{y})$
其中,$\alpha$是学习率,$m$是批样本数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
n_epochs = 1000 # 遍历1000次数据集
learning_rate = 0.01 # 学习率

# 创建TensorFlow的常量节点X和y,分别用来存放样本和标签
# 特征已经过normalize,并加上了bias
X = tf.constant(scaled_housing_data_plus_bias, dtype=tf.float32, name='X')
y = tf.constant(housing.target.reshape(-1, 1), dtype=tf.float32, name='y')

# 创建TensorFlow的变量节点theta,用来存放待求解的参数(使用均匀分布初始化节点)
theta = tf.Variable(tf.random_uniform([n + 1, 1], -1.0, 1.0), name='theta')

# 创建预测节点
y_pred = tf.matmul(X, theta, name='predictions')

# 创建误差节点
error = y_pred - y

# 创建损失函数节点,使用均方根误差(mean square error)
mse = tf.reduce_mean(tf.square(error), name='mse')

# 创建梯度计算节点
gradients = 1 / m * tf.matmul(tf.transpose(X), error)

# 创建赋值节点,即 theta = theta - learning_rate * gradients
training_op = tf.assign(theta, theta - learning_rate * gradients)

# 创建变量初始化节点
init = tf.global_variables_initializer()

with tf.Session() as sess:
sess.run(init)
for epoch in range(n_epochs):
# 每100个epoch打印当前的loss值
if epoch % 100 == 0:
print('Epoch', epoch, 'MSE=', mse.eval())
# 进行梯度下降更新参数theta
sess.run(training_op)
# 训练完成后,计算最终的theta参数值
best_theta = theta.eval()

使用autodiff

在前面的代码中,需要我们事先手动计算好loss function(MSE)的梯度才能进行训练。
虽然在线性回归里面求解梯度还不算复杂,但是对于深度神经网络来说,梯度的求解将会让人十分头疼。

在TensorFlow中,提供了autodiff能够帮助我们自动计算梯度。

只需将之前的梯度计算替换为

1
gradients = tf.gradients(mse, [theta])[0]

gradients()函数接受一个操作节点(如,mse损失函数计算节点),以及一系列需要求解梯度的变量(如,theta),最终返回对应的梯度列表。


使用优化器

TensorFlow能够自动计算梯度已经很方便了,但是可以将事情变得更加简单——使用优化器。

比如,使用梯度下降优化器(Gardient Descent optimizer)。

便可以简单地将之前gradients = ...training_op = ...直接替换成下列方式:

1
2
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
training_op = optimizer.minimize(mse)

为训练算法提供数据

首先,我们将之前的批梯度下降算法改为小批量梯度下降算法(Mini-batch Gradient Descent)。
那么对于每个batch,我们都需要不断地替换Xy常量节点,来向训练算法提供数据。

在TensorFlow中,一个典型的做法是使用placeholder占位符节点

占位符节点在创建的时候,只需指定其存放的数据类型(如,floa32等),以及存放的数据维度大小即可。
然后,等到训练运行时才真正往占位符里放数据(使用feed_dict参数放数据)。

1
2
3
4
5
6
7
8
9
# None 意味着在该维度不限制大小
A = tf.placeholder(tf.float32, shape=(None, 3))
B = A + 5

with tf.Session() as sess:
# 由于B依赖于A,因此在对B求值的时候,必须先用feed_dict向占位符A提供数据
B_val_1 = B.eval(feed_dict={A: [[1, 2, 3]]})
# 占位符A的第0维度可以是任意大小
B_val_2 = B.eval(feed_dict={A: [[4, 5, 6], [7, 8, 9]]})
1
2
3
4
5
6
print(B_val_1)
>> [[ 6. 7. 8.]]

print(B_val_2)
>> [[ 9. 10. 11.]
[ 12. 13. 14.]]

使用placeholder实现小批量梯度下降算法

1
2
3
4
n_epochs = 10 # 遍历10次数据集
learning_rate = 0.01 # 学习率
batch_size = 100 # 批数据大小
n_batches = int(np.ceil(m / batch_size)) # 完成一次epoch所需要的批次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def fetch_batch(epoch, batch_index, batch_size):
''' 获取批数据

:param epoch: 当前epoch的次数
:param batch_index: 当前batch的序号
:param batch_size: 每个batch的大小
:return: 样本和标签的批数据
'''
# seed的种子设为:当前batch的总序号
# 比如,完成一个epoch需要5个batch
# 当前是第3个epoch的第2个batch
# 那么这个batch总序号就是3 × 5 + 2 = 17
np.random.seed(epoch * n_batches + batch_index)
# 在[0, m)范围内,产生batch_size个索引
indices = np.random.randint(m, size=batch_size)
# 根据索引,获取样本的批数据
X_batch = scaled_housing_data_plus_bias[indices]
# 根据索引,获取标签的批数据
y_batch = housing.target.reshape(-1, 1)[indices]
return X_batch, y_batch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 创建TensorFlow的占位符节点X和y,分别用来存放样本和标签
# 第0维度为None则不指定大小,因为样本数需要在运行时确定
X = tf.placeholder(tf.float32, shape=(None, n + 1), name='X')
y = tf.placeholder(tf.float32, shape=(None, 1), name='y')

# 创建TensorFlow的变量节点theta,用来存放待求解的参数(使用均匀分布初始化节点)
theta = tf.Variable(tf.random_uniform([n + 1, 1], -1.0, 1.0, seed=42), name='thete')

# 创建预测节点
y_pred = tf.matmul(X, theta, name='predictions')

# 创建误差节点
error = y_pred - y

# 创建损失函数节点,使用均方根误差(mean square error)
mse = tf.reduce_mean(tf.square(error), name='mse')

# 创建梯度下降优化器节点 及 对应的训练节点
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
training_op = optimizer.minimize(mse)

# 创建变量初始化节点
init = tf.global_variables_initializer()

with tf.Session() as sess:
sess.run(init)
for epoch in range(n_epochs):
for batch_index in range(n_batches):
# 获取批数据
X_batch, y_batch = fetch_batch(epoch, batch_index, batch_size)
# 由于训练节点依赖于X和y,因此使用feed_dict参数传送一个字典,分别为X和y提供数据
sess.run(training_op, feed_dict={X: X_batch, y: y_batch})
best_theta = theta.eval()

模型的存储及读取

模型一旦训练好,那么应该将相关的信息(如,参数、计算图等)存储在硬盘里,方便以后的使用。
同样,在模型的训练过程中,应该定时地保存检查点(checkpoint)。这使得模型训练中断之后,可以直接从最后一次检查点开始训练而不是重头开始。


存储模型

只需在计算图构造阶段(construction phase)的最后新建一个Saver保存节点即可。
然后在执行阶段(execution phase),调用其save(sess, path)方法即可将模型保存到path路径下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[...]
theta = tf.Variable(tf.random_uniform([n + 1, 1], -1.0, 1.0), name="theta")
[...]
init = tf.global_variables_initializer()
# 在construction phase之后创建Saver node即可
saver = tf.train.Saver()

with tf.Session() as sess:
sess.run(init)
for epoch in range(n_epochs):
if epoch % 100 == 0: # 每100个epoch保存checkpoint
save_path = saver.save(sess, 'tmp/my_model.ckpt')
sess.run(training_op)
best_theta = theta.eval()
# 训练完毕,保存最终模型
save_path = saver.save(sess, 'tmp/my_model_final.ckpt')

读取模型

首先,还是在构造阶段的最后新建一个Saver保存节点。
然后,在执行阶段的开始,调用其restore(sess, path)方法即可将path路径下的模型读取到当前的会话中。

1
2
3
with tf.Session() as sess:
saver.restore(sess, 'tmp/my_model_final.ckpt')
[...]

Saver默认情况下,会存储和读取模型的所有参数。
不过,我们也可以指定只保存哪些变量,以及使用什么名字。

1
2
# 只保存theta变量,命名为weights
saver = tf.train.Saver({"weights": theta})

Saver默认情况下,也会存储计算图的结构,保存在路径的*.meta文件中。
如果需要读取模型的计算图,可以调用tf.train.import_meta_graph()函数。

1
2
3
4
5
6
# 读取模型的计算图
# 这样可以完整地恢复模型,不仅包括模型的参数,还包括模型的计算图结构
saver = tf.train.import_meta_graph("/tmp/my_model_final.ckpt.meta")
with tf.Session() as sess:
saver.restore(sess, "/tmp/my_model_final.ckpt")
[...]

使用TensorBoard进行可视化

在此之前,我们都是用print函数打印出训练过程。然而,有一个更好的选择是:使用TensorBoard

接下来,我们将对线性回归的loss值进行可视化。


首先,新建一个存放数据的日志目录(使用时间戳作为目录名)。

1
2
3
4
5
6
7
8
from datetime import datetime

# 以指定格式获取当前时间
now = datetime.utcnow().strftime('%Y%m%d%H%M%S')
# 根目录
root_logdir = 'tf_logs'
# 日志文件目录
logdir = '{}/run-{}/'.format(root_logdir, now)

其次,在构造阶段的末尾添加以下代码

1
2
3
4
5
6
7
8
9
10
# summary是TensorBoard的一种二进制日志字符串
# 我们使用它的scalar标量类型(还有其他类型,如tf.summary.image可以可视化图像)
# 参数'MSE':可视化时变量的名称
# 参数mse:loss function节点
mse_summary = tf.summary.scalar('MSE', mse)

# FileWriter可以将summaries写进指定的日志文件中
# 参数logdir:指定的日志文件路径
# 参数tf.get_default_graph():需要可视化的计算图结构
file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph())

最后,在执行阶段添加以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
[...]
for batch_index in range(n_batches):
X_batch, y_batch = fetch_batch(epoch, batch_index, batch_size)
if batch_index % 10 == 0:
# 计算mse_summary的值
summary_str = mse_summary.eval(feed_dict={X: X_batch, y: y_batch})
step = epoch * n_batches + batch_index
# 将summary添加到日志文件中,同时需要指定当前的step(也就是可视化时的横轴坐标)
file_writer.add_summary(summary_str, step)
sess.run(training_op, feed_dict={X: X_batch, y: y_batch})
[...]
# 关闭FileWriter
file_writer.close()

现在,让我们启动TensorBoard

首先,在终端中输入:

1
2
3
$ tensorboard --logdir tf_logs/
>> Starting TensorBoard on port 6006
(You can navigate to http://0.0.0.0:6006)

然后,在浏览器中打开http://0.0.0.0:6006/ (或http://localhost:6006/),即可访问TensorBoard!

可视化loss值

可视化计算图


命名域

当处理复杂模型的时候,图中可能有成千上万个节点。因此,非常有必要将相关的节点组织起来放到一起。
这就需要TensorFlow中的命名域(Name Scopes)来管理节点!

比如,我们可以将之前代码里的errormse操作节点放在一个名叫loss的命名域里。

1
2
3
with tf.name_scope('loss') as scope:
error = y_pred - y
mse = tf.reduce_mean(tf.square(error), name='mse')
1
2
3
4
print(error.op.name)
>> loss/sub
print(mse.op.name)
>> loss/mse

在TensorBoard中,error和mse将出现在loss命名域内。

TensorBoard中的命名域

1
2
3
4
5
6
7
8
9
10
11
12
13
a1 = tf.Variable(0, name="a")
a2 = tf.Variable(0, name="a")
with tf.name_scope("param"):
a3 = tf.Variable(0, name="a")
with tf.name_scope("param"):
a4 = tf.Variable(0, name="a")
for node in (a1, a2, a3, a4):
print(node.op.name)

>> a
a_1
param/a
param_1/a

模块化

假设现在要对多个ReLU(rectified linear units,修正线性单元)输出进行累加。

由于要计算多次ReLU,所以基于模块化的思想,我们可以将实现ReLU功能的语句单独封装成一个函数以供调用。

1
2
3
4
5
6
7
8
9
10
11
def relu(X):
''' 实现ReLU

:param X: 输入样本
:return: 经过ReLU修正后的输出
'''
w_shape = (int(X.get_shape()[1]), 1)
w = tf.Variable(tf.random_normal(w_shape), name='weights')
b = tf.Variable(0.0, name='bias')
z = tf.add(tf.matmul(X, w), b, name='z')
return tf.maximum(z, 0.0, name='relu')
1
2
3
4
5
6
n_features = 3
X = tf.placeholder(tf.float32, shape=(None, n_features), name='X')
# 调用5次relu函数
relus = [relu(X) for i in range(5)]
# 将5次的结果累加
output = tf.add_n(relus, name='output')

ReLU的计算图(good)


还可以做得更好…

我们将之前讲过的命名域加进来

1
2
3
4
5
6
7
8
def relu(X):
''' 实现ReLU

:param X: 输入样本
:return: 经过ReLU修正后的输出
'''
with tf.name_scope('relu'):
[...]

ReLU的计算图(better)


共享变量

如果你想让计算图中不同的部分共享一个变量(比如CNN中卷积核的权值),一个可能的操作是将变量作为参数传递过去。但是,当计算图中需要共享的变量非常多时,将变得十分麻烦。

TensorFlow有一个更好的解决方案是,使用get_variable()函数来创建(或复用)共享变量。
而选择创建还是选择复用则是由当前的变量域variable_scope()决定的。

1
2
3
# 在relu变量域内创建threshold变量
with tf.variable_scope('relu'):
threshold = tf.get_variable('threshold', shape=(), initializer=tf.constant_initializer(0.0))
1
2
3
# 复用,设置reuse=True
with tf.variable_scope('relu', reuse=True):
threshold = tf.get_variable('threshold')
1
2
3
4
# 另一种复用方式
with tf.variable_scope('relu') as scope:
scope.reuse_variables() # 调用scope的reuse_variables方法
threshold = tf.get_variable('threshold')

复用ReLU的阈值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def relu(X):
# 存在则复用,不存在则创建
threshold = tf.get_variable('threshold', shape=(), initializer=tf.constant_initializer(0.0))
[...]
return tf.maximum(z, threshold, name="max")

X = tf.placeholder(tf.float32, shape=(None, n_features), name='X')
relus = []
for relu_index in range(5):
# 第一次调用relu时reuse为0,则不复用选择创建变量
# 后续调用relu则会选择复用变量
with tf.variable_scope('relu', reuse=(relu_index >= 1)) as scope:
relus.append(relu(X))
output = tf.add_n(relus, name='output')

5个ReLUs共享threshold变量


复用CNN卷积层参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import tensorflow as tf
import numpy as np

input_images = tf.placeholder(tf.float32, shape = (1, 32, 32, 1))

# 定义了一层卷积神经网络
def conv_relu(input, kernel_shape, bias_shape):
# 创建名为weights的变量
weights = tf.get_variable("weights", kernel_shape, initializer=tf.random_normal_initializer())
# 创建名为biases的变量
biases = tf.get_variable("biases", bias_shape, initializer=tf.constant_initializer(0.0))
conv = tf.nn.conv2d(input, weights, strides=[1, 1, 1, 1], padding='SAME')
return tf.nn.relu(conv + biases)

def my_image_filter(input_images):
with tf.variable_scope("conv1"):
# 在名为conv1的variable scope下调用一层神经网络,对应的参数名为
# "conv1/weights", "conv1/biases"
relu1 = conv_relu(input_images, [3, 3, 1, 1], [1])
with tf.variable_scope("conv2"):
# 在名为conv2的variable scope下调用一层神经网络,对应的参数名为
# "conv2/weights", "conv2/biases"
return conv_relu(relu1, [3, 3, 1, 1], [1])

with tf.variable_scope("image_filter") as scope:
result1 = my_image_filter(input_images)
scope.reuse_variables() # 复用变量
result2 = my_image_filter(input_images)

init = tf.global_variables_initializer();

with tf.Session() as sess:
sess.run(init)
image = np.random.rand(1, 32, 32, 1)
result1 = sess.run(result1, feed_dict={input_images: image})
result2 = sess.run(result2, feed_dict={input_images: image})
print(result2.all() == result1.all())

>> True
# 说明第二次的参数没有重新初始化,而是复用了第一次的参数