深度学习原理与框架-Tfrecord数据集的制作 1.tf.train.Examples(数据转换为二进制) 3.tf.image.encode_jpeg(解码图片加码成jpeg) 4.tf.train.Coordinator(构建多线程通道) 5.threading.Thread(建立单线程) 6.tf.python_io.TFR(TFR读入器)

时间:2021-08-14 17:24:42

1. 配套使用: tf.train.Examples将数据转换为二进制,提升IO效率和方便管理

对于int类型 : tf.train.Examples(features=tf.train.Features(feature=tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))))

对于bytes类型: tf.train.Examples(features=tf.train.Features(feature=tf.train.Feature(int64_list=tf.train.BytesList(value=[value]))))

2. tf.image.decode_jpeg(image, channel=3) # 将图片进行解码操作,通道数是3

参数说明:image表示图片,channel表示通道数

3. tf.image.encode_jpeg(image, format='rgb', quality=100) # 将解码后的图片,转码成jpeg

参数说明:image表示图片,format表示rgb的类型,quality表示图片的质量

4. tf.train.Coordinator() # 构建多线程通道,使用coord.join(threads)将线程列表添加

5.t = threading.Thread(target=, args=) # 建立单线程

参数说明:target表示对应的函数,args表示传入的参数

6. tf.python_io.TFRecordWriter(output_path) 构造TFR的写入器writer,使用writer.write将打包的参数写入

参数说明:output_path表示保存TFR文件的路径

7.np.linspace(0, len(3), 3) # 表示将数据进行切分,

参数说明:0表示第一个数,len(3)表示第二个数,3表示切分成3个数,

8. sys.stdout.flush() 进行数据的刷新

数据说明:在flower文件中,存在5个文件,每个文件的文件名分别对应的是标签名,在flower_label.txt保存的5个标签名

代码说明:这个采用了多线程的方式,即如果要生成n个shards,有2个thread, 那么每个thread就处理n/2个shards的生成

参数说明:使用tf.app.flags.DEFINE_string 进行参数的设置,使用FLAGS = tf.app.flags.FLAGS将参数并入FLAGS中

代码主要分为第一部分和第二部分,使用tf.app.run()执行代码,定义model(),在model中分为两部分

第一部分:用于进行文件名的读取filenames,标签的制作labels,以及存入标签名texts

第二部分:建立多个线程,在每个线程中,调用函数,建立TFR的写入writer,循环一个线程的shards数,将标准化后的example进行写入,关闭writer

第一部分:

第一步:构建find_image_files, 传入参数为文件名,标签名

第二步:使用[l.strip() for l in TF.gfile.FastGFile(f).readlines()] 来获得标签值

第三步: 循环标签值,使用os.path.join(‘%s/%s/*’%())标签值和文件路径进行拼接,获得每一个文件夹的名字

第四步: 使用tf.gfile.Glob(文件名) 获得文件名中所有的图片名称,如果报错,打印文件名,continue

第五步:定义filenames, labels, texts = []

第六步:使用.extend() 将图片列表添加到filenames, 使用[label_index] * len(matchfile)制作标签,以及[text] * len(matchfile) 制作标签名,使用.extend() 进行添加

第七步: 构造shuffed_index = np.arange(len(filenames)), 使用random.shuffle(shuffed_index) 获得混乱的标签索引值

第八步:使用filenames = [filename[i] for i in shuffed_index] 构造打乱的filenams,labels,和texts, 返回结果

第二部分:将生成的filenams,labels和texts, train_shards传入到process_image_files函数

第一步:使用assert len(filenames) == len(labels),即filenams的大小是否和labeles以及texts的大小相同

第二步:使用np.linspace(0, len(filenames), train_shards+1) 根据train_shards大小进行切分,

第三步:循环切分的列表,将两个之间的索引值添加到ranges中, ranges.append([split_index[i], split_index[i+1]]), 使用sys.stdout.flush() 刷新操作

第四步:实例化coder = ImageCode()

第一步:构造png转jpg的初始化函数,使用tf.placeholder初始化,使用tf,image.decode_png()解码png,使用tf.image.encode_jpeg()编码jpeg

第二步:构造jpg的解码的初始化函数, 使用tf.placeholder初始化,使用tf.image.decode_jpeg()解码jpg

第三步:构造函数png_to_jpeg(self, image_data), sess.run(self._png_to_jpg, feed_dict={self.png_image:image_data}),进行实际转换

第四步:构造函数decode_jpeg(self, image_data):, sess.run(self._decode_jpg) 进行实际转换

第五步:使用coord = tf.train.Coordinator() 建立线程的通道

第六步:循环num_thread, 建立args参数,使用threading.Thread(target=_precess_image_files_batch, args) 构建单线程,使用t.start()启动单线程,threads.append(t), 然后再将coord.join(threads)构建出了多线程

函数target = _precess_image_files_batch 说明:主要是用于构建Tfrecord数据集

第一步:将train_shards / num_threads相除,获得每个线程所需要建立的shards的个数

第二步:使用np.linspace(range[thread_index][0], range[thread_index][1],num_shards_per_batch + 1) # 获得每一个shards的数据索引

第三步:使用ranges[thread_index][1] - ranges[thread_index][0] 获得当前线程所执行的样本数

第四步:循环num_shard_per_batch,即每个线程所需要建立的shards个数

第五步:使用shard = thread_index * num_per_batch + s,获得当前的shards的索引

第六步:使用name,shard, trian_shard建立当前的路径

第七步:使用os.path.join() 将文件路径与当前路径进行拼接,获得保存路径

第八步:使用writer = tf.python_io.TFRecordWriter(output_path) 构造一个TFR写入的writer函数

第九步:循环每一个shards的数据索引值

第十步:使用_image_process获得图片的image_buff, 如果是png格式使用coder转换为jpg格式

第十一步:使用tf.train.Example(features=tf.train.Features(feature={

}))  # 将参数进行打包,返回example

第十二步:writer.write(example.SerializeToString()) 将数据写入到writer中

第十三步:如果执行1000次,打印当前时间,第几个线程,第几张图片, 该线程的图片数,使用sys.stdout.flush() 刷新

第十四步:一个shards结束后,writer.close() ,打印,并进行刷新

第十五步:整个循环结束后,打印,并刷新

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function from datetime import datetime
import os
import random
import sys import threading
import numpy as np
import tensorflow as tf tf.app.flags.DEFINE_string('train_directory', './flower_photos/',
'Training data directory')
tf.app.flags.DEFINE_string('validation_directory', './flower_photos',
'Validation data directory') tf.app.flags.DEFINE_string('output_directory', './data/',
'Output data directory') tf.app.flags.DEFINE_integer('train_shards', 2, 'Number of shards in training TFRecord files.') tf.app.flags.DEFINE_integer('validation_shards', 0, 'Number of shards in validation TFRecord files.') tf.app.flags.DEFINE_integer('num_threads', 2, 'Number of thread to preprocess the images') tf.app.flags.DEFINE_string('labels_file', './flower_label.txt', 'labels file') FLAGS = tf.app.flags.FLAGS def _int64_feature(value): if not isinstance(value, list):
value = [value]
# 将数据转换为int的列表类型
return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) def _bytes_feature(value):
# 将数据转换为bytes的列表类型
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) def _convert_to_example(filename, image_buff, label, text, height, width): colorspace= 'RGB'
channels = 3
image_format = 'JPEG'
# 使用tf.train.Example和tf.train.Features构造打包数据
example = tf.train.Example(features=tf.train.Features(feature={
'image/height': _int64_feature(height),
'image/width': _int64_feature(width),
'image/colorspace': _bytes_feature(tf.compat.as_bytes(colorspace)),
'image/channels': _int64_feature(channels),
'image/class/label': _int64_feature(label),
'image/class/text': _bytes_feature(tf.compat.as_bytes(text)),
'image/format': _bytes_feature(tf.compat.as_bytes(image_format)),
'image/filename': _bytes_feature(tf.compat.as_bytes(os.path.basename(filename))),
'image/encoded': _bytes_feature(tf.compat.as_bytes(image_buff))
}))
return example def is_png(filename): return '.png' in filename def _process_image(filename, coder): # 打开文件夹,使用bytes数据类型
with tf.gfile.FastGFile(filename, 'rb') as f:
# 进行数据的读取
image_data = f.read()
# 如果数据是png的格式的话就转换为jpeg类型
if is_png(filename):
image_data = coder.png_to_jpeg(image_data)
# 将iamge_data进行解码
image = coder.decode_jpeg(image_data)
# 判断图片的维度是否是3
assert len(image.shape) == 3
# 图片的高度
height = image.shape[0]
# 图片的宽度
width = image.shape[1] return image_data, height, width def _process_image_files_batch(coder, thread_index, ranges, name, filenames, texts, labels, train_shards): # 获得线程数
num_threads = len(ranges)
# 保证线程和train_shards可以整除
assert not train_shards % num_threads
# 每个线程所需要执行的shards数
num_shards_per_batch = int(train_shards / num_threads)
# 根据每个线程的shars数,获得每个线程切分后的数据索引值
shard_ranges = np.linspace(ranges[thread_index][0],
ranges[thread_index][1], num_shards_per_batch + 1)
# 获得线程的样本数目
num_files_in_thread = ranges[thread_index][1] - ranges[thread_index][0]
# 用于图片的统计
counter = 0
# 循环线程
for s in range(num_shards_per_batch):
# 获得当前是第几个shards
shard = thread_index * num_shards_per_batch + s
# 构造TFR的路径
output_filename = '%s-%.5d-of-%.5d.tfrecord'%(name, shard, train_shards)
# 拼接获得最终的路径
output_file = os.path.join(FLAGS.output_directory, output_filename)
# 输入路径,构造TFR的读入器
writer = tf.python_io.TFRecordWriter(output_file)
#
shard_counter = 0
# 获得数据索引值的列表
file_in_shard = np.arange(shard_ranges[s], shard_ranges[s+1]).astype('int')
# 循环列表
for i in file_in_shard:
# 获得文件名
filename = filenames[i]
# 获得标签值
label = labels[i]
# 获得标签名
text = texts[i]
# 使用tf.gfile.FastGFile(filename, 'rb'): 打开文件
image_buff, height, width = _process_image(filename, coder)
# 将各个参数进行打包
example = _convert_to_example(filename, image_buff, label, text,
height, width)
# 将一张图片打包好的参数写入到writer里面
writer.write(example.SerializeToString())
shard_counter += 1
counter += 1
# 如果读取了1000张图片,打印并进行刷新操作
if not counter % 1000:
print('%s [thread %d]:Processed %d of %d images in thread batch.'%(
datetime.now(), thread_index, counter, num_files_in_thread
))
sys.stdout.flush()
# 循环了一个shards就关闭writer,打印和刷新操作
writer.close()
print('%s [thread %d]:Processed %d of %d images in thread batch.' % (
datetime.now(), thread_index, counter, num_files_in_thread
))
sys.stdout.flush()
# 循环结束,打印和刷新操作
print('%s [thread %d]:Processed %d of %d images in thread batch.' % (
datetime.now(), thread_index, counter, num_files_in_thread
))
sys.stdout.flush() class ImageCoder(object): def __init__(self):
# 构造sess函数
self._sess = tf.Session()
# 初始化png的输入数据
self.png_img = tf.placeholder(dtype=tf.string)
# 将png进行解码操作,使用tf.image.decode_png
image = tf.image.decode_png(self.png_img, channels=3)
# 对解码的图片,使用tf.image.encode_jpeg进行加码操作,format表示rgb, quality表示图片的质量
self._png_to_jpeg = tf.image.encode_jpeg(image, format='rgb', quality=100)
# 初始化输入jpeg_image的图片
self.jpeg_image = tf.placeholder(dtype=tf.string)
# 使用tf.image.decode_jpeg对图片进行解码操作
self._decode_jpeg = tf.image.decode_jpeg(self.jpeg_image, channels=3) def png_to_jpeg(self, image_data):
# 输入数据,获得实际的png_to_jpeg的转换
return self._sess.run(self._png_to_jpeg, feed_dict={self.png_img:image_data}) def decode_jpeg(self, image_data):
# 输入数据,进行实际的decode_jpeg的解码操作
image = self._sess.run(self._decode_jpeg, feed_dict={self.jpeg_image:image_data}) return image def _process_image_files(name, filenames, labels, texts, train_shards):
# 第一步:确认维度是否相等
assert len(filenames) == len(texts)
assert len(filenames) == len(labels) # 第二步:使用np.linspace()将len(filenames)根据train_shards的个数进行切片,获得每一份的索引值
ranges = []
split_index = np.linspace(0, len(filenames), train_shards+1)
# 第三步:根据索引值,构造range列表,即每一个列表,对于前后的索引值
for i in range(len(split_index) - 1):
ranges.append([split_index[i], split_index[i+1]]) # 刷新操作
print('Launching %d threads for spacings: %s'%(FLAGS.num_threads, ranges))
sys.stdout.flush()
# 第四步:实例化ImageCoder() 进行png_to_jpeg,或者对jpeg进行解码操作
coder = ImageCoder() # 第五步:使用tf.train.Coordinator获得线程的通道
coord = tf.train.Coordinator()
# 构造线程的列表
threads = []
# 第六步:循环线程的个数,构造多线程
for thread_index in range(len(ranges)):
# 构造线程参数,coder表示解码器,thread_index当前是第几个线程,ranges表示线程的范围,name表示名字,filenames表示文件的图片名
args = (coder, thread_index, ranges, name, filenames, texts, labels, train_shards)
# 构建线程
t = threading.Thread(target=_process_image_files_batch, args=args)
# 开始线程
t.start()
# 线程添加到线程列表中
threads.append(t)
# 将线程添加到通道中
coord.join(threads) def _find_image_files(data_path, labels_file): # 使用tf.gfile.FastGFile读取标签文件,获得标签的列表
upload_labels = [l.strip() for l in tf.gfile.FastGFile(labels_file).readlines()] # 第一种类别的标签值
label_index = 1
# 文件图片的名字列表
filenames = []
# 标签列表
labels = []
# 图片标签名列表
texts = []
# 循环标签文本
for text in upload_labels:
# 构造相对路径
jpeg_path_join = '%s/%s/*' % (data_path, text)
try:
# 使用tf.gfile.Glob遍历相对路径下所有的图片,获得图片的路径
matchfile = tf.gfile.Glob(jpeg_path_join)
except:
print(jpeg_path_join)
continue # 将获得的路径列表添加到filenames中
filenames.extend(matchfile)
# 产生[1] * len(filenams)的标签,并添加到标签列表中
labels.extend([label_index] * len(matchfile))
# 产生[test] * len(filenames)的标签名,并添加到标签名列表中
texts.extend([text] * len(matchfile))
# 标签值加1,作为下一个类比的标签值
label_index += 1
# 获得len(filenames)的索引值
shuffled_index = np.arange(len(filenames))
# 将索引值进行打乱
random.seed(1234)
random.shuffle(shuffled_index)
# 根据打乱的索引值重新构造filenames,labels,和texts
filenames = [filenames[i] for i in shuffled_index]
labels = [labels[i] for i in shuffled_index]
texts = [texts[i] for i in shuffled_index]
# 返回filenames,labels和texts
return filenames, labels, texts def _process_dataset(name, data_path, train_shards, labels_file): # 第一部分:找出文件名,构造标签列表以及文件名列表,并使用random.shuffle获得乱序的索引
filenames, labels, texts = _find_image_files(data_path, labels_file)
# 第二部分:将文件图片的名字,标签值,标签名,以及生产shards个数传入,用于生成tfrecord
_process_image_files(name, filenames, labels, texts, train_shards) def main(unused_argv):
# 确认train_shards是够能被线程整除
assert not FLAGS.train_shards % FLAGS.num_threads, ('在训练过程中线程的数目应该与文件的数目相对应') assert not FLAGS.validation_shards % FLAGS.num_threads, ('在验证集制作中线程的数目应该与文件的数目相对应')
# 构建process_dataset执行函数,传入的参数有name, 训练数据文件路径,训练集的num,以及标签的文件名
_process_dataset('train', FLAGS.train_directory,
FLAGS.train_shards,
FLAGS.labels_file) if __name__ == '__main__':
tf.app.run()

深度学习原理与框架-Tfrecord数据集的制作  1.tf.train.Examples(数据转换为二进制)  3.tf.image.encode_jpeg(解码图片加码成jpeg) 4.tf.train.Coordinator(构建多线程通道) 5.threading.Thread(建立单线程) 6.tf.python_io.TFR(TFR读入器)