深度学习原理与框架-卷积神经网络-cifar10分类(图片分类代码) 1.数据读入 2.模型构建 3.模型参数训练

时间:2021-06-09 10:02:43

卷积神经网络:下面要说的这个网络,由下面三层所组成

卷积网络:卷积层 + 激活层relu+ 池化层max_pool组成

神经网络:线性变化 + 激活层relu

神经网络: 线性变化(获得得分值)

代码说明:

代码主要有三部分组成

第一部分: 数据读入

第二部分:模型的构建,用于生成loss和梯度值

第三部分:将数据和模型输入,使用batch_size数据进行模型参数的训练

第一部分:数据读入

第一步:输入文件的地址

第二步: 创建列表,用于文件数据的保存

第三步:使用pickle.load进行数据的读取

第四步: 创建mask索引,用于生成验证集,训练集,测试集

第五步: 对图像进行均值化操作

第六步:使用.transpose将图像的维度进行转换

第七步:创建字典将data数据进行返回

代码:data_utils.py

import os
import pickle
import sys
import importlib
importlib.reload(sys)
import numpy as np def load_single_data(dir): # 第三步:打开文件,进行数据的读取
with open(dir, 'rb') as f:
# 文件的读取
data_dict = pickle.load(f, encoding='latin1')
# 获得数据
X = data_dict['data']
# 获得标签
y = data_dict['labels']
# 进行数据的维度重构
X = X.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype('float')
# 将标签转换为np.array格式
y = np.array(y) return X, y def load_CIFAR10_data(root): # 第二步:创建列表,用于文件数据的保存
xs = []
ys = []
for i in range(1, 2):
filename = os.path.join(root, 'data_batch_%d'%i)
X, y = load_single_data(filename)
xs.append(X)
ys.append(y) xtr = np.concatenate(xs)
ytr = np.concatenate(ys) # 获得测试数据
xte, yte = load_single_data(os.path.join(root, 'test_batch')) return xtr, ytr, xte, yte def get_CIFAR10_data(num_train=5000, num_val=500, num_test=500): # 第一步:输入文件的地址
filename_dir = 'D://BaiduNetdiskDownload//神经网络入门基础(PPT,代码)//绁炵粡缃戠粶鍏ラ棬鍩虹锛圥PT锛屼唬鐮侊級//cifar-10-batches-py//'
# 获得训练数据和测试数据
train_X, train_y, test_X, test_y = load_CIFAR10_data(filename_dir)
# 第四步:创建一个mask索引,用于生成500个验证集val数据, 5000个训练数据和500个测试数据
mask = np.arange(num_train, num_train+num_val)
val_X = train_X[mask]
val_y = train_y[mask] mask = np.arange(num_train)
train_X = train_X[mask]
train_y = train_y[mask] mask = np.arange(num_test)
test_X = test_X[mask]
test_y = test_y[mask]
# 第五步:对图像进行均值化操作
mean_img = np.mean(train_X, axis=0, keepdims=True)
train_X -= mean_img
val_X -= mean_img
test_X -= mean_img
# 第六步:将图片的维度进行转换
train_X = train_X.transpose(0, 3, 1, 2)
val_X = val_X.transpose(0, 3, 1, 2)
test_X = test_X.transpose(0, 3, 1, 2)
# 第七步:创建一个字典返回数据
return {'train_X':train_X, 'train_y':train_y,
'val_X':val_X, 'val_y':val_y,
'test_X':test_X, 'test_y':test_y}

第二部分:进行模型的构建,用于生成lscores, loss和grads

有标签值y输出loss和grads

没有标签值y 输出scores得分

第一步:输入参数的初始化,包括输入图片维度,filter卷积核个数, filter_size卷积核的大小, num_hidden: 隐藏层个数, num_classes:分类的结果,weight_scale表示权重参数的偏置,reg表示正则化惩罚项的力度

第二步:初始化构造卷积的参数维度,构建字典self.params进行存放,将卷积参数进行数据类型的转换,转换为np.float32

第三步:构造loss函数,获得各个层的参数值

第四步:前向传播:

第一步:卷积 + relu激活 + pool的前向传播, 保存输入的cache用于后续的反向传播

第二步: 第一次全连接层的前向传播,这里使用.reshape(N, -1)进行层的维度变化

第三步:第二次全连接层的前向传播,用于计算各个类别的得分值

第四步:如果没有标签值,就返回scores得分值

第五步:反向传播

第一步:计算损失值loss和softmax概率的反向传播dout

第二步:计算第二次全连接的反向传播

第三步:计算第一次全连接的relu和线性的反向传播,这是输入结果与输出结果的维度相同,用于进行(N, 32, 32, 32)的矩阵变换

第四步:pool + relu + conv 的反向传播

第六步:在dw的基础上,加上正则化的梯度求导结果,同时对于加上正则化的损失值

第七步:构建grads字典,返回grads和loss

代码:主函数:cnn.py

import numpy as np
from layer_utils import * class ThreeLayerConvNet(object): # 第一步:输入参数的初始化
def __init__(self, input_image=(3, 32, 32), filter_size=7, num_hidden=100, filter_num = 32,
num_classes=10, weight_scale=1e-3, reg=0.0, dtype=np.float32): # 第二步:建立存放参数的字典,并对参数进行零值化的构造其维度,使用.astype进行类型转换
# 正则化惩罚项
self.reg = reg
# 数据类型
self.dtype = dtype
# 用于存放参数
self.params = {}
# 输入样本的维度
C, H, W = input_image
# 对w参数,进行正态化的初始化,W1卷积层的维度, F卷积核个数, C上一通道的维度(图片维度), HH(卷积核长), WW(卷积核宽),
self.params['W1'] = weight_scale * np.random.randn(filter_num, C, filter_size, filter_size)
# 对b参数,使用零值初始化,b1卷积核的常熟项,F表示b的个数,每一个卷积核对应一个b
self.params['b1'] = np.zeros(filter_num)
# 全连接层,池化后的数据维度为N, int(H*W*filter_num/4) 构造w2.shape(int(H*W*filter_num/4, num_hidden))
self.params['W2'] = weight_scale * np.random.randn(int(H*W*filter_num/4), num_hidden)
# 全连接层b的参数为num_hidden隐藏层的个数
self.params['b2'] = np.zeros(num_hidden)
# 全连接层w3:用于进行得分值得计算,维度为(num_hidden, num_classes)
self.params['W3'] = weight_scale * np.random.randn(num_hidden, num_classes)
# 全连接层b3,得分层计算的偏置项b
self.params['b3'] = np.zeros(num_classes) # 将参数转换为np.float32
for k, v in self.params.items():
self.params[k] = v.astype(dtype=self.dtype) def loss(self, X, y=None): # 第三步:从self.params获得各个层的参数
W1, b1 = self.params['W1'], self.params['b1']
W2, b2 = self.params['W2'], self.params['b2']
W3, b3 = self.params['W3'], self.params['b3']
# 第四步:进行前向传播
# 卷积的步长
stride = 1
# 补零的维度,为了保证卷积后的维度不变
pad = int((W1.shape[2] - 1) / 2)
# 组合成卷积的参数
conv_params = {'stride': stride, 'pad': pad}
# 组合成池化层的参数
pool_params = {'pool_height':2, 'pool_width':2, 'stride':2}
# 进行卷积,激活层和pool池化层的前向传播,保存cache用于后续的反向传播
a1, cache1 = conv_relu_pool_forward(X, W1, b1, conv_params, pool_params)
# 进行全连接层的前向传播,线性变化和relu层
a2, cache2 = affine_relu_forward(a1, W2, b2)
# 进行全连接层的前向传播,线性变化获得得分值
scores, cache3 = affine_forward(a2, W3, b3)
# 如果没有标签值,返回得分
if y is None:
return scores
# 第五步:计算反向传播的结果
# 计算data的损失值loss,以及反向传输softmax的结果,即dloss/dprob的求导结果
data_loss, dscores = softmax_loss(scores, y)
# 进行第二层全连接的反向传播
da2, dW3, db3 = affine_backward(dscores, cache3)
# 进行第一层全连接层的反向传播,包括relu层和线性变换层
da1, dW2, db2 = affine_relu_backward(da2, cache2)
# 进行卷积层的反向传播,包括pool, relu, conv卷积层的反向传播,输出梯度值
dx, dW1, db1 = conv_relu_pool_backward(da1, cache1) # 第六步:加上正则化的损失值,同时梯度dw加上w正则化的求导值
reg_loss = 0.5 * np.sum(W1*W1) + 0.5 * np.sum(W2*W2) + 0.5 * np.sum(W3*W3)
loss = reg_loss + data_loss
dW1 += self.reg * W1
dW2 += self.reg * W2
dW3 += self.reg * W3
# 第七步:构造grads梯度字典,并返回梯度值和损失值
grads = {'W1': dW1, 'b1': db1, 'W2': dW2, 'b2': db2, 'W3': dW3, 'b3': db3} return loss, grads

副函数:layer_utils.py

from layers import *

# 卷积层,激活层,池化层的前向传播
def conv_relu_pool_forward(x, w, b, conv_params, pool_params): # 卷积层的前向传播使用对每个filterF, np.sum对C通道乘积进行加和,再加上b偏置项
a, cache_conv = conv_forward(x, w, b, conv_params)
# relu层的前向传播, np.maxmuim(0, x) 小于零的值使用零表示
r, cache_relu = relu_forward(a)
# pool层的前向传播,对卷积部分的图像求出最大值,作为pool池化后的大小
out, cache_pool = pool_forward(r, pool_params)
# 将各个输入组合成一个cache,用于反向传播
cache = (cache_conv, cache_relu, cache_pool) return out, cache # pool,relu, conv的反向传播
def conv_relu_pool_backward(dout, cache): # 获得三个层的输入参数
cache_conv, cache_relu, cache_pool = cache
# 进行池化层的反向传播,构造最大值的[[false, false], [false, True]]列表,最大值部分不变,其他部位使用0值填充
dpool = pool_backward(dout, cache_pool)
# 进行relu层的反向传播,dout[x<0] = 0, 将输入小于0的dout置为0
drelu = relu_backward(dpool, cache_relu)
# 卷积层的反向传播,对dx, dw, db进行反向传播,dx[i, :, j*s] += dout * w[f], dw[f] += windows * dout, db[f] += dout
dconv, dw, db = conv_backward(drelu, cache_conv) return dconv, dw, db # 线性传播和池化层的前向传播,即全连接层的前向传播
def affine_relu_forward(x, w, b): a, cache_affine = affine_forward(x, w, b)
r, cache_relu = relu_forward(a) cache = (cache_affine, cache_relu) return r, cache
# 线性传播和池化层的反向传播,即全连接层的反向传播
def affine_relu_backward(dout, cache): affine_cache, relu_cache = cache
r = relu_backward(dout, relu_cache)
dx, dw, db = affine_backward(r, affine_cache) return dx, dw, db

副副函数:layers.py

import numpy as np

# 卷积的前向传播
def conv_forward(x, w, b, conv_params): N, C, H, W = x.shape
F, C, HH, WW = w.shape
pad = conv_params['pad']
stride = conv_params['stride']
# 进行补零操作
x_pad = np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant')
# 进行卷积后的H和W的维度计算
H_new = int((H - HH + 2*pad) / stride + 1)
W_new = int((W - WW + 2*pad) / stride + 1)
s = stride
# 构造输出矩阵
out = np.zeros((N, F, H_new, W_new))
for i in range(N):
for f in range(F):
for j in range(H_new):
for k in range(W_new):
# 将C通道分别进行相乘,和最后的相加操作,再加上一个b值,作为最后的输出
out[i, f, j, k] = np.sum(x_pad[i, :, j*s:j*s+HH, k*s:k*s+WW] * w[f]) + b[f] cache = (x, w, b, conv_params) return out, cache # 卷积的反向传播
def conv_backward(dout, cache): (x, w, b, conv_params) = cache
N, C, H, W = x.shape
F, C, HH, WW = w.shape
pad = conv_params['pad']
stride = conv_params['stride']
s = stride
H_new = dout.shape[2]
W_new = dout.shape[3]
# 进行补零操作
x_pad = np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant')
# 构造dw, dx, db的输出矩阵,即与输入矩阵的维度相同
dw = np.zeros_like(w)
dx = np.zeros_like(x_pad)
db = np.zeros_like(b) for i in range(N):
for f in range(F):
for j in range(H_new):
for k in range(W_new):
# 获得前向传播的x
windows = x_pad[i, :, j*s:j*s+HH, k*s:k*s+WW]
# dw[f] = dout[i, f, j, k] * x
dw[f] += dout[i, f, j, k] * windows
# dx = dout * w
dx[i, :, j*s:j*s+HH, k*s:k*s+WW] += dout[i, f, j, k] * w[f]
# db[f] += dout[i, f, j, k]
db[f] += dout[i, f, j, k]
# 进行裁剪,去除补零部分
dx = dx[:, :, int(pad):pad + H, pad:pad + W] return dx, dw, db # 池化的前向传播
def pool_forward(x, pool_params): N, C, H, W = x.shape
# 池化的维度
pool_height, pool_width = pool_params['pool_height'], pool_params['pool_width']
pool_stride = pool_params['stride']
# 池化后的维度
H_new = int((H - pool_height) / pool_stride + 1)
W_new = int((W - pool_width) / pool_stride + 1)
s = pool_stride
HH = pool_height
WW = pool_width
out = np.zeros((N, C, H_new, W_new))
for i in range(N):
for c in range(C):
for j in range(H_new):
for k in range(W_new):
# 将图像上卷积区域的最大值,赋值给池化后的数据
out[i, c, j, k] = np.max(x[i, c, j*s:j*s+HH, k*s:k*s+WW]) cache = (x, pool_params) return out, cache # 池化层的反向传播
def pool_backward(dout, cache): # 获得输入层的输入
(x, pool_params) = cache
HH = pool_params['pool_height']
WW = pool_params['pool_width']
stride = pool_params['stride']
s = stride
N, C, H, W = x.shape
# 迭代的次数,这与池化层的前向传播的次数是相同的
H_new = int((H - HH) / stride + 1)
W_new = int((W - WW) / stride + 1)
# 构造输出矩阵
out = np.zeros_like(x)
for i in range(N):
for c in range(C):
for j in range(H_new):
for k in range(W_new):
# 生成[[false, false],[false, True]]
window = (np.array(x[i, c, j*s:j*s+HH, k*s:k*s+WW]) == dout[i, c, j, k])
# [[false, false],[false, True]] * dout[i, c, j, k] = [[0, 0], [0, dout[i, c, j, k]]
out[i, c, j*s:j*s+HH, k*s:k*s+WW] = window * dout[i, c, j, k] return out # 线性变化的前向传播
def affine_forward(x, w, b): N = x.shape[0]
x_row = x.reshape(N, -1)
out = np.dot(x_row, w) + b
cache = (x, w, b) return out, cache # 线性变化的反向传播
def affine_backward(dout, cache): x, w, b = cache
dx = np.dot(dout, w.T)
dx = dx.reshape(x.shape)
x_row = x.reshape(x.shape[0], -1)
dw = np.dot(x_row.T, dout)
db = np.sum(dout, axis=0, keepdims=True) return dx, dw, db # relu层的前向传播
def relu_forward(x): out = None
cache = x
out = ReLU(x) return out, cache # relu层的反向传播
def relu_backward(dout, cache): out = None
x = cache
out = dout.copy()
out[x < 0] = 0
return out # relu激活函数
def ReLU(x):
return np.maximum(0, x) # 计算损失值,即dloss/dprob的损失函数对概率的反导
def softmax_loss(scores, y): N = scores.shape[0]
#softmax概率值
probs = np.exp(scores)
probs /= np.sum(probs, axis=1, keepdims=True)
# 计算损失值函数
loss = -np.sum(np.log(probs[np.arange(N), y])) / N # 损失值对softmax概率值求导
dout = probs.copy()
dout[np.arange(N), y] -= 1
dout /= N return loss, dout

第三部分:将数据data和模型model输入,使用batch_size数据进行self.model.loss进行损失值得计算和参数的更新

第一步:获得data中的训练数据和验证集的数据

第二步:使用kwargs.pop()获得传入字典中的参数

第三步: 进行部分参数初始化,同时构造参数对应的学习率和momentum字典,即后续的v

第四步:使用num_data和batch_size, 即num_epoches构造出迭代的次数

第五步:进行循环,使用动量梯度sgd,即self.model.loss计算损失值和更新self.model.params的参数

第六步:每一个print_every 打印损失值loss

第七步:每一个epoch值,进行学习率的衰减

第八步:在开始或者结束,以及每一个epoch值,打印准确率

第九步:对于最好的验证集的准确率,保存当前的self.model.params,迭代结束,将最好的验证集参数赋值给self.model.params

主函数:Solver.py

import numpy as np
import optim class Solver(object): def __init__(self, data, model, **kwargs): # 第一步:从data中获得训练数据和验证集数据
self.train_X = data['train_X']
self.train_y = data['train_y']
self.val_X = data['val_X']
self.val_y = data['val_y']
self.model = model
# 第二步:获得传入的参数,学习率衰减值,训练的batch_size, epoch的大小,学习率和momentum, 梯度下降的方式
self.lr_decay = kwargs.pop('lr_decay', 1.0)
self.print_every = kwargs.pop('print_every', 10)
self.num_epochs = kwargs.pop('num_epochs', 2)
self.batch_size = kwargs.pop('batch_size', 2)
self.update_rule = kwargs.pop('update_rule', 'sgd')
self.optim_config = kwargs.pop('optim_config', {})
self.verbose = kwargs.pop('verbose', True) # 如果存在未知的输入则报错
if len(kwargs) > 0:
extra = ','.join('%s'% k for k in kwargs)
raise ValueError('Unrecognized arguments %s' % extra)
# 如果optim中不存在,梯度下降的方式就报错
if not hasattr(optim, self.update_rule):
raise ValueError('Unrecognized arguments %s' % self.update_rule)
# 将optim中的函数功能赋予函数名self.update_rule
self.update_rule = getattr(optim, self.update_rule)
# 第三步:进行部分初始化操作
self._reset() def _reset(self):
# 迭代epoch的次数
self.epoch = 0
# 损失值的list
self.loss_history = []
# 准确率的list
self.acc_train = []
self.acc_val = []
# 最好的验证集的准确率
self.best_val = 0
# 每个dw和db对应的学习率和momentum
self.optim_configs = {}
# 建立每个参数对应的学习率和momentum
for p in self.model.params:
d = {k:v for k, v in self.optim_config.items()}
self.optim_configs[p] = d def train(self):
# 第四步:使用样本数和batch_size,即epoch_num,构造迭代的次数
num_data = self.train_X.shape[0]
num_every_epoch = max(num_data / self.batch_size, 1)
num_iterations = num_every_epoch * self.num_epochs
for t in range(int(num_iterations)):
# 第五步:循环,计算损失值和梯度值,并使用sgd_momentum进行参数更新
self._step()
# 第六步:每一个print_every打印损失值
if self.verbose and t % self.print_every == 0:
print('%d / %d %f'%(t+1, num_iterations, self.loss_history[-1])) # 第七步:每一个循环进行一次学习率的下降
epoch_end = (t + 1) % self.batch_size == 0
if epoch_end:
self.epoch += 1
for p in self.optim_configs:
self.optim_configs[p]['learning_rate'] *= self.lr_decay start = 0
end = num_iterations - 1
# 第八步:开始或者结束,或者每一个epoch计算准确率,同时获得验证集最好的参数
if t == start or t == end or t+1 % num_every_epoch == 0:
train_acc = self.check_accuracy(self.train_X, self.train_y, num_sample=4)
val_acc = self.check_accuracy(self.val_X, self.val_y, num_sample=4)
print('%d/%d train acc%.2f'%(self.epoch, self.num_epochs,
train_acc))
print('%d/%d val acc%.2f' % (self.epoch, self.num_epochs,
val_acc))
self.acc_train.append(train_acc)
self.acc_val.append(val_acc) if val_acc > self.best_val:
self.best_params = {}
for k, v in self.model.params.items():
self.best_params[k] = v
self.best_val = val_acc
# 将验证集最好的参数赋予给当前的模型参数
self.model.params = self.best_params # 进行准确率的计算
def check_accuracy(self, X, y, num_sample=None, batch_size=2):
# num_sample表示使用多少个数据计算准确率
N = X.shape[0]
if N > num_sample:
# 随机从N个样本中,抽取num_sample个样本
mask = np.random.choice(N, num_sample)
X = X[mask]
y = y[mask]
num_batch = num_sample / batch_size
if num_batch % batch_size != 0:
num_batch += 1
y_pred = []
for i in range(int(num_batch)):
start_id = i * batch_size
end_id = (i + 1) * batch_size
# 不传入y,获得scores得分
scores = self.model.loss(X[start_id:end_id])
y_pred.append(np.argmax(scores, axis=1))
# 将数据进行横向排列
y_pred = np.hstack(y_pred)
# 计算结果的平均值
accr = np.mean((y_pred == y)) return accr # 计算loss和进行参数更新
def _step(self):
# 获得当前样本的个数
self.num_data = self.train_X.shape[0]
# 随机抽取2个样本,用于进行参数的更新
batch_mask = np.random.choice(self.num_data, self.batch_size)
X_batch = self.train_X[batch_mask]
y_batch = self.train_y[batch_mask]
# 计算损失值和梯度方向
loss, grads = self.model.loss(X_batch, y_batch)
# 将损失值的结果进行添加
self.loss_history.append(loss)
# 对每个参数进行循环
for p, v in self.model.params.items():
# 获得当前的学习率和momentum, 以及后续加入的v
config = self.optim_configs[p]
# 获得w和dw梯度值
w, dw = self.model.params[p], grads[p]
# 将w,dw, config传入到动量梯度算法,进行参数更新
next_w, next_config = self.update_rule(w, dw, config)
# 将更新后的config替代字典中的config
self.optim_configs[p] = next_config
# 将更新后的参数替换成模型中的参数
self.model.params[p] = next_w

副函数:optim.py

import numpy as np

# sgd_momentum 计算动量梯度下降
def sgd_momentum(w, dw, configs=None):
# 传入每一个参数对应的学习率和momentum
if configs is None: configs = {}
# 如果不存在该属性,使用默认值
learning_rate = configs.setdefault('learning_rate', 1e-2)
momentum = configs.setdefault('momentum', 1.0)
# 获得前一次传播的v,没有就使用构造全零
v = configs.get('velocity', np.zeros_like(w))
# 进行当前v的更新,即v*momentum - dw * learning_rate
v = v*momentum - dw * learning_rate
# 进行w参数更新
w = v + w
# 将v替换,最为下一次的前一次传播v
configs['velocity'] = v return w, configs

上述代码的主要函数:start.py

import numpy as np
from data_utils import get_CIFAR10_data
from cnn import ThreeLayerConvNet
from solver import Solver
import matplotlib.pyplot as plt # 第一步数据读取
data = get_CIFAR10_data()
# 第二步:建立model用于进行loss和grads的计算
model = ThreeLayerConvNet(reg=0.9)
# 第三步:使用batch_size进行参数的更新
solver = Solver(data, model, lr_decay=0.95,
print_every=10, num_epochs=5, batch_size=2,
update_rule='sgd_momentum',
optim_config={'learning_rate': 5e-4, 'momentum': 0.9}) solver.train() # 画出loss图
plt.subplot(2, 1, 1)
plt.title('Training loss')
plt.plot(solver.loss_history, 'o')
plt.xlabel('Iteration')
# 画出准确率的图
plt.subplot(2, 1, 2)
plt.title('Accuracy')
plt.plot(solver.train_acc_history, '-o', label='train')
plt.plot(solver.val_acc_history, '-o', label='val')
plt.plot([0.5]*len(solver.val_acc_history), 'k--')
plt.xlabel('Epoch')
plt.legend(loc='lower right')
plt.show() # 计算测试值得准确率
best_model = model
test_X, test_y = data['test_X'], data['test_y']
print('test_value', np.mean(np.argmax(model.loss(test_X), axis=1) == test_y))