使用逻辑回归对MNIST数字分类

时间:2022-05-02 04:28:45

使用逻辑回归对MNIST数字分类


注意:这部分需要读者熟悉Theano的以下概念:shared variablesbasic arithmetic opsT.gradfloatX。如果准备使用GPU运行代码,你需要阅读Theano的GPU教程
这部分的源代码可以从这里下载。


在这部分中,我们将展示如何使用Theano建立最基本的分类器:逻辑回归(logistic regression)。我们从该模型的快速入门教程开始,这个教程既可以作为对该模型的复习,也将固定这个模型的数学符号,并展示这些数学表达式是如何被映射到Theano图模型中。
作为机器学习的深深传统,这个教程将使用逻辑回归解决令人兴奋的MNIST数字分类问题。

4.1 模型描述

逻辑回归是一个概率性的线性分类器。它的参数包含一个权重矩阵 W 和一个偏置向量 b 。它的原理是将一个输入向量映射到一系列超平面上,每个超平面对应一个分类类别。输入向量与一个超平面之间的距离反映了该输入被归入该超平面对应类别的可能性大小。

从数学的角度,一个输入向量 x 是一个特定类别 i (一个随机变量 Y ) 的可能性或者概率,可以写成:
P(Y=i|x,W,b)=softmax(Wx+b)=eWix+bijeWjx+bj(1)

该模型的预测值 ypred 是最大概率值所对应的类别,即:

ypred=argmaxiP(Y=i|x,W,b)(2)

相应的Theano的代码如下:

        # 初始化权重矩阵为0矩阵,形状为(n_input, n_output) 
self.W = theano.shared(value=np.zeros((n_input, n_output),
dtype=theano.config.floatX),
name="W", borrow=True)
# 初始化偏置向量为0向量,形状为(n_output,)
self.b = theano.shared(value=np.zeros((n_output,), dtype=theano.config.floatX),
name="b", borrow=True)
# 注意下面的量均为符号表达式
# 使用softmax计算分类类别概率矩阵
#`input`是一个符号矩阵,每一行对应一个训练样例
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)

# 计算类别预测值
self.y_pred = T.argmax(self.p_y_given_x, axis=1)

因为在训练过程中模型的变量要一直维持一个持久的状态,我们将 W b 设置成共享变量。共享变量是带有初始值的Theano符号变量,并且每次更新后会保持自己的更新值。点乘(dot)与柔性最大值(softmax)操作用来计算向量 P(Y|x,W,b) 。结果p_y_given_x 是一个向量类型的符号变量(注:对于一个样例,它是向量,但是对于多个样例,它对应一个矩阵,每一个行向量对应一个样例)

为了得到实际的预测值,我们使用T.argmax操作计算最大概率所对应的索引值(即最可能的分类类别)

目前我们定义的模型看以前并没有什么用,因为参数仍然是初始值状态。接下来我们将介绍如何通过训练学习到最优的参数值。


注意:有关Theano的数学符号操作的完整列表可以在这里找到。


4.2 定义一个代价函数

学习优化模型的参数涉及到最小化一个代价函数。对于多类别逻辑回归,常使用的代价函数是负对数似然函数。这等价于最大化数据集 D 在给定模型参数 θ 下的似然值。让我们首先定义似然值 ζ 和代价值 L
:

ζ(θ={W,b},D)=i=0|D|log(P(Y=y(i)|x(i),W,b))(3)
L(θ={W,b},D)=ζ(θ={W,b},D)(4)

尽管有很多专门的书讲解最小化优化问题,但梯度下降是目前来说最简单的方法来最小化任意非线性函数。这个教程将使用小批量随机梯度下降方法(mini-batch stochastic gradient, MSGD)。这里是随机梯度下降法一个详细教程。

下面的Theano代码定义一个给定小批量的代价值(符号表达式):

# 这里的T.arange与numpy.arange是一样的,返回每行的索引号
# `y` 代表的是正确的分类类别,所以[T.arange(y.shape[0]),y]
# 提取的是正确类别所对应概率值的索引
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]),y])

注意:尽管代价值通常是被定义为一个数据集上单个误差的总和值,但是在实际中,我们使用了误差的平均值(T.mean)。这样我们在选择学习速率时可以不用考虑批量的大小。


4.3 创建一个LogisticRegression类

目前我们已经准备好了建立逻辑回归所需的所有工具,那么我们将定义一个LogisticRegression类,这个类可以操控逻辑回归的基本行为。这个代码与我们前面已经讲过的非常相似,并且带有注释。

class LogisticRegression(object):
'''
多类别逻辑回归类
'''

def __init__(self, input, n_input, n_output):
'''
初始化参数
:param input: theano.tensor.TensorType
描述输入的符号变量(一个mini-batch)
:param n_input: int
输入单元数(即输入的维度)
:param n_output: int
输出单元数(类别数)
'''

# 初始化权重矩阵为0矩阵,形状为(n_input, n_output)
self.W = theano.shared(value=np.zeros((n_input, n_output),
dtype=theano.config.floatX),
name="W", borrow=True)
# 初始化偏置向量为0向量,形状为(n_output,)
self.b = theano.shared(value=np.zeros((n_output,), dtype=theano.config.floatX),
name="b", borrow=True)
# 注意下面的量均为符号表达式
# 使用softmax计算分类类别概率矩阵
# `input`是一个符号矩阵,每一行对应一个训练样例
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
# 计算类别预测值
self.y_pred = T.argmax(self.p_y_given_x, axis=1)

# 保存参数(训练更新时要用)
self.params = [self.W, self.b]

# 保存输入
self.input = input

def negative_log_likelihood(self, y):
'''
计算一个小批量样本的负对数似然值的平均值
:param y: theano.tensor.TensorType
一个向量,每一个元素对应一个样例所对应的正确分类类别

'''

# 这里的T.arange与numpy.arange是一样的,返回每行的索引号
# `y` 代表的是正确的分类类别,所以[T.arange(y.shape[0]),y]
# 提取的是正确类别所对应概率值的索引
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]),y])

def errors(self, y):
'''
计算一个小批量样本中分类的错误率
:param y: theano.tensor.TensorType
一个向量,每一个元素对应一个样例所对应的正确分类类别
:return:
'''

# `y` 要和 `y_pred` 的维度要一样
if y.ndim != self.y_pred.ndim:
raise TypeError("y should have the same shape as self.y_pred",
("y", y.type, "y_pred", self.y_pred.type))

# 保证`y` 是整数类型
if y.dtype.startswith("int"):
## T.nep 是不等于操作,如果对应元素不相同,返回1
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()

我们将实例化这个类:

    # 定义输入与输出的符号变量
x = T.matrix("x") # 输入值
y = T.ivector("y") # 分类标签值

# 构建LogisticRegression实例,每一个MNIST的图像大小为28*28
classifier = LogisticRegression(input=x, n_input=28*28, n_output=10)

我们首先为输入 x 与对应的类别标签值 y 分配符号变量。注意, x y 是定义在LogisticRegression实例外面。但是由于这个类需要输入来建立相应的图模型,所以它作为init函数的一个参数被传递进来。当你想使用这个类实例通过连接建立深度网络时,这非常有用。我们可以直接将上一层的输出传递给当前层的输入。(这个教程中不会建立多层网络,但是在未来的教程中将会使用这一特性)

最后,我们使用实例方法来定义代价值用来最小化:

    # 利用实例方法获得代价函数值
cost = classifier.negative_log_likelihood(y)

注意 x 对于代价函数的定义是一个隐含符号变量,因为这个变量已经在实例化classifier时已经定义(传给了input)。

4.4 训练模型

使用大部分的编程语言(C/C++, Matlab, Python)来实施MSGD时,我们首先要人工推导出代价函数关于给个参数的梯度,在这个例子中是 L/W L/b 。对于复杂的模型来说,这非常棘手,因为 /θ 将变得非常复杂,特别当要考虑到数值稳定性时。

使用Theano,这项工作将大大简化。Theano可以自动求导,并且使用特定的数学变换来提高数值稳定性。

Theano计算梯度值 L/W L/b ,仅仅使用下面的简单代码:

    g_W = T.grad(cost=cost, wrt=classifier.W)
g_b = T.grad(cost=cost, wrt=classifier.b)

g_W和g_b也是符号变量,同样可以作为计算图的一部分。我们定义train_model函数来实施一步梯度下降:

    # 计算更新值,传递的是包含(更新变量,更新表达式)的list
updates = [(classifier.W, classifier.W - learning_rate*g_W),
(classifier.b, classifier.b - learning_rate*g_b)]

# 编译一个Theano函数,这个函数将更新参数并返回代价值
train_model = theano.function(inputs=[index], outputs=cost, updates=updates,
givens={x: X_train[index*batch_size:(index+1)*batch_size],
y: y_train[index*batch_size:(index+1)*batch_size]})

updates是包含成对值的列表类型。在每一对值中,第一个元素是要更新的参数变量,而第二个元素是用于替换旧值的符号表达式。同样地,givens是字典类型,每一个键值对中键表示一个参数变量,而对应的键值指明了要替换该参数的变量值。train_function被定义如下:

  • 输入是小批量索引值index,它与batch_size(是一个固定值,而不是输入)共同定义了训练的样本 x 与对应的标签值 y
  • 函数的返回值是由index定义的训练样本所对应的代价值。
  • 函数每次被调用时,由index定义的训练集切片将替换 x y 。然后将评估该小批量样本对应的代价值,并更新updates列表定义的变量。

每一次train_model(index)被调用,它将计算并返回一个小批量样本的代价值,并同时执行一步MSGD。整个学习算法包含在遍历训练集中所有样本的循环中,一次只训练一个小批量样本,并且重复地调用train_model函数。

4.5 测试模型

当测试一个模型时,我们更关注样本集被错误归类的个数,而不是似然值。因此LogisticRegression类包含一个额外的实例方法,这个实例方法构建了用于返回每一个小批量样本中错误归类数的符号图。代码见上面的类定义。

接下来我们创建test_model和validate_model函数,这两个函数将被调用而返回错误率。很快你就可以看到,validate_model是实施提前停止(early-stopping)的关键。这些函数同样接受一个小批量index作为输入,并计算出这个小批量样本中被该模型错误归类的数量。test_model和validate_model函数的唯一区别是在于他们的样本集不同,前者使用test集,而后者使用validation集。

    # 编译测试模型
test_model = theano.function(inputs=[index], outputs=classifier.errors(y),
givens={x: X_test[index*batch_size:(index+1)*batch_size],
y: y_test[index*batch_size:(index+1)*batch_size]})
valid_model = theano.function(inputs=[index], outputs=classifier.errors(y),
givens={x: X_valid[index*batch_size:(index+1)*batch_size],
y: y_valid[index*batch_size:(index+1)*batch_size]})

4.6 组合所有代码

组合所有代码,最终的终结版本如下:

'''
使用逻辑回归进行分类
'''

import numpy as np
import theano
import theano.tensor as T
import pickle, gzip

class LogisticRegression(object):
'''
多类别逻辑回归类
'''

def __init__(self, input, n_input, n_output):
'''
初始化参数
:param input: theano.tensor.TensorType
描述输入的符号变量(一个mini-batch)
:param n_input: int
输入单元数(即输入的维度)
:param n_output: int
输出单元数(类别数)
'''

# 初始化权重矩阵为0矩阵,形状为(n_input, n_output)
self.W = theano.shared(value=np.zeros((n_input, n_output),
dtype=theano.config.floatX),
name="W", borrow=True)
# 初始化偏置向量为0向量,形状为(n_output,)
self.b = theano.shared(value=np.zeros((n_output,), dtype=theano.config.floatX),
name="b", borrow=True)
# 注意下面的量均为符号表达式
# 使用softmax计算分类类别概率矩阵
# `input`是一个符号矩阵,每一行对应一个训练样例
self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
# 计算类别预测值
self.y_pred = T.argmax(self.p_y_given_x, axis=1)

# 保存参数(训练更新时要用)
self.params = [self.W, self.b]

# 保存输入
self.input = input

def negative_log_likelihood(self, y):
'''
计算一个小批量样本的负对数似然值的平均值
:param y: theano.tensor.TensorType
一个向量,每一个元素对应一个样例所对应的正确分类类别

'''

# 这里的T.arange与numpy.arange是一样的,返回每行的索引号
# `y` 代表的是正确的分类类别,所以[T.arange(y.shape[0]),y]
# 提取的是正确类别所对应概率值的索引
return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]),y])

def errors(self, y):
'''
计算一个小批量样本中分类的错误率
:param y: theano.tensor.TensorType
一个向量,每一个元素对应一个样例所对应的正确分类类别
:return:
'''

# `y` 要和 `y_pred` 的维度要一样
if y.ndim != self.y_pred.ndim:
raise TypeError("y should have the same shape as self.y_pred",
("y", y.type, "y_pred", self.y_pred.type))

# 保证`y` 是整数类型
if y.dtype.startswith("int"):
## T.nep 是不等于操作,如果对应元素不相同,返回1
return T.mean(T.neq(self.y_pred, y))
else:
raise NotImplementedError()

def load_data(dataset):
'''
加载数据集
:param dataset: string
数据集所在的路径
:return:
'''



with gzip.open(dataset, "rb") as f:
try:
train_set, valid_set, test_set = pickle.load(f, encoding="latin1")
except:
train_set, valid_set, test_set = pickle.load(f)

def shared_dataset(data_xy, borrow=True):
'''
将数据转化为共享变量
:param data_xy:
:param borrow:
:return:
'''

data_x, data_y = data_xy
# 使用GPU时,数据必须以浮点型保存,但是标签值需要是整型。因此这里保存用float,返回值进行了整型转化
shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)

return shared_x, T.cast(shared_y, "int32")

X_train, y_train = shared_dataset(train_set)
X_validation, y_validation = shared_dataset(valid_set)
X_test, y_test = shared_dataset(test_set)

return [(X_train, y_train), (X_validation, y_validation), (X_test, y_test)]

def SGD_mnist(learning_rate=0.13, n_epochs=1000, dataset=r"G:\PYTHON_CODE\keras_learn\dataset\mnist.pkl.gz",
batch_size=32)
:

'''
随机梯度下降法训练模型
:param learning_rate: float
学习速率
:param n_epochs: int
最大迭代次数
:param dataset: string
MNIST数据集存储路径
:param batch_size: int
训练使用的批量样本大小
:return:
'''

# 加载数据集
datasets = load_data(dataset)

X_train, y_train = datasets[0]
X_valid, y_valid = datasets[1]
X_test, y_test = datasets[2]

# 计算批量总数 for training, validation and testing
n_train_batches = X_train.get_value(borrow=True).shape[0] // batch_size
n_valid_batches = X_valid.get_value(borrow=True).shape[0] // batch_size
n_test_batches = X_test.get_value(borrow=True).shape[0] // batch_size

# 建立模型
print("... building the model")

# 定义index的符号变量
index = T.lscalar("index")

# 定义输入与输出的符号变量
x = T.matrix("x") # 输入值
y = T.ivector("y") # 分类标签值

# 构建LogisticRegression实例,每一个MNIST的图像大小为28*28
classifier = LogisticRegression(input=x, n_input=28*28, n_output=10)

# 利用实例方法获得代价函数值
cost = classifier.negative_log_likelihood(y)

# 编译测试模型
test_model = theano.function(inputs=[index], outputs=classifier.errors(y),
givens={x: X_test[index*batch_size:(index+1)*batch_size],
y: y_test[index*batch_size:(index+1)*batch_size]})
valid_model = theano.function(inputs=[index], outputs=classifier.errors(y),
givens={x: X_valid[index*batch_size:(index+1)*batch_size],
y: y_valid[index*batch_size:(index+1)*batch_size]})

# 计算梯度
g_W = T.grad(cost, classifier.W)
g_b = T.grad(cost, classifier.b)

# 计算更新值,传递的是包含(更新变量,更新表达式)的list
updates = [(classifier.W, classifier.W - learning_rate*g_W),
(classifier.b, classifier.b - learning_rate*g_b)]

# 编译一个Theano函数,这个函数将更新参数并返回代价值
train_model = theano.function(inputs=[index], outputs=cost, updates=updates,
givens={x: X_train[index*batch_size:(index+1)*batch_size],
y: y_train[index*batch_size:(index+1)*batch_size]})

# 开始训练模型
print("... training the model")

# 设置 early-stopping 参数
patience = 5000
patience_increase = 2 # 如果一个较好结果出现,patience的递增幅度

improvement_threshold = 0.995 # 相对提高值达到此值时,认为提高显著

validation_frequency = min(n_train_batches, patience//2) # 验证频率

best_validation_loss = np.inf
test_score = 0.0

import timeit
start_time = timeit.default_timer()

done_looping = False
epoch = 0

while (epoch < n_epochs) and (not done_looping):
epoch += 1
for minibatch_index in range(n_train_batches):
minibatch_avg_cost = train_model(minibatch_index)
# 迭代次数
iter_num = (epoch - 1)*n_train_batches + minibatch_index
if (iter_num + 1) % validation_frequency == 0:
# 计算验证集误差
validation_losses = [ valid_model(i) for i in range(n_valid_batches)]
this_validation_loss = np.mean(validation_losses)
print(" epoch {0}, minibatch {1}/{2}, validation error {3} %%".format(
epoch, minibatch_index + 1, n_train_batches, this_validation_loss*100))

# 如果得到的结果比目前最优结果好
if this_validation_loss < best_validation_loss:
# 提高显著
if this_validation_loss < best_validation_loss*improvement_threshold:
patience = max(patience, iter_num*patience_increase)

best_validation_loss = this_validation_loss
# 在测试集上验证
test_losses = [ test_model(i) for i in range(n_test_batches)]
test_score = np.mean(test_losses)

print(" epoch {0}, minibatch {1}/{2}, test error {3} %%".format(
epoch, minibatch_index + 1, n_train_batches, test_score*100))
# 保存最优模型
with open("best_model.pkl", "wb") as f:
pickle.dump(classifier, f)
# 是否提前stop
if patience <= iter_num:
done_looping = True
break
end_time = timeit.default_timer()
print("Optimization complete with best validation score of {0} %%, with best \
performance {1} %%."
.format(best_validation_loss*100, test_score*100))
print("The code run for {0} epochs, with {1} epochs/sec.".format(epoch, 1.0*epoch/(end_time-start_time)))

if __name__ == "__main__":
SGD_mnist()

使用者可以使用这个逻辑回归分类器对MNIST数字进行分类,从DeepLearningTutorials目录下,敲入:

    python code/logistic_sgd.py

预期的运行结果如下:

...
epoch 72, minibatch 83/83, validation error 7.510417 %
epoch 72, minibatch 83/83, test error of best model 7.510417 %
epoch 73, minibatch 83/83, validation error 7.500000 %
epoch 73, minibatch 83/83, test error of best model 7.489583 %
Optimization complete with best validation score of 7.500000 %,with test performance 7.489583 %
The code run for 74 epochs, with 1.936983 epochs/sec

在Intel(R) Core(TM)2 Duo CPU E8400 @ 3.00 Ghz机器上运行这个代码,大约1.936 epochs/sec,并且运行 75 epochs 就可以达到测试错误率为 7.489 。在GPU上,大约是 10.0epochs/sec。其中使用的batch_size为600。

4.7 使用训练的模型进行预测

SGD_mnist序列化并pickle这个模型当新的最低验证错误率出现时。因此,我们可以重新加载这个模型,并使用它直接预测新数据。下面的predict函数展示了如何使用已经训练的模型进行预测:

def predict():
'''
预测函数:通过加载已经训练的模型对test数据集进行预测
:return:
'''

# 加载原来的模型
classifier = pickle.load(open("best_model.pkl", "rb"))

# 编译一个预测函数
predict_model = theano.function(inputs=[classifier.input], outputs=classifier.y_pred)

# 加载数据
datasets = load_data(r"G:\PYTHON_CODE\keras_learn\dataset\mnist.pkl.gz")
X_test, y_test = datasets[2]

pred_values = predict_model(X_test.get_value()[:10])
print("Predicted values for the first 10 examples in test data: ")
print(pred_values)
print("The targeted values for the first 10 examples in test data: ")
print(y_test[:10])

本文出自Theano的DeepLearning教程,更多信息请访问原文地址github地址