Tensorflow线程队列与IO操作
目录Tensorflow线程队列与IO操作1 线程和队列1.1 前言1.2队列1.3队列管理器1.4线程协调器2 文件读取2.1 流程2.2 文件读取API:3图像读取3.1 图像读取基本知识3.2图像基本操作3.3 图像读取API3.4图片批处理流程3.5 读取图片案...
目录
Tensorflow线程队列与IO操作
1 线程和队列
1.1 前言
IO操作进行大文件读取时,如果一次性进行读取时非常消耗内存的,那么一次性读取就需要一次性训练。这样是非常慢的。Tensorflow是计算型的,重点在计算,所以不能在读写上花太多时间,那么就提供了多线程,队列等机制。在Tensorflow中的多线程是真正的多线程,能并行的执行任务。
1.2 队列
(1)先进先出队列,按顺序出队列 :tf.FIFOQueue
FIFOQueue(capacity, dtypes, name='fifo_queue') 创建一个以先进先出的顺序对元素进行排队的队列
capacity:整数。可能存储在此队列中的元素数量的上限
dtypes:DType对象列表。长度dtypes必须等于每个队列元 素中的张量数,dtype的类型形状,决定了后面进队列元素形状
常用方法:
dequeue(name=None) :取数据,出队列
enqueue(vals, name=None): 放数据
enqueue_many(vals, name=None):放数据,vals列表或者元组
返回一个进队列操作 size(name=None)
(2)随机出队列:tf.RandomShuffleQueue
import tensorflow as tf
# 模拟同步先处理数据,取数据训练
# 1、定义队列
Q = tf.FIFOQueue(10, tf.float32)
# 放入数据,参数如果是[0.1, 0.2, 0.3]会认为是一个张量
enq_many = Q.enqueue_many([[0.1, 0.2, 0.3], ])
# 2、定义处理数据的逻辑,取数据*2, 入队列
out= Q.dequeue()
data = out*2
enter = Q.enqueue(data)
with tf.Session() as sess:
# 初始化队列
sess.run(enq_many)
# 处理数据
for i in range(10):
sess.run(enter)
# 训练数据
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue()))
注意:tensorflow当中,运行操作有依赖性,有操作之间计算的关系才能叫做依赖性
1.3 队列管理器
当数据量很大时,入队操作从硬盘中读取数据,放入内存中, 主线程需要等待入队操作完成,才能进行训练。会话里可以运行多个 线程,实现异步读取。
tf.train.QueueRunner(queue, enqueue_ops=None) 创建一个QueueRunner
queue:一个队列
enqueue_ops:添加线程的队列操作列表,[]*2,指定两个线程。[]里面指定线程做什么操作
方法:create_threads(sess, coord=None,start=False) 创建线程来运行给定会话的入队操作,返回线程的实例
coord:线程协调器,后面线程管理需要用到
start:布尔值,如果True启动线程;如果为False调用者 必须调用start()启动线程
1.4 线程协调器
tf.train.Coordinator() 线程协调员,实现一个简单的机制来协调一 组线程的终止 。返回线程协调员实例
request_stop() :请求停止线程
should_stop(): 检查是否要求停止
join(threads=None, stop_grace_period_secs=120) 等待线程终止,回收线程
import tensorflow as tf
# 模拟异步子线程存入样本, 主线程读取样本
# 1、定义队列
Q = tf.FIFOQueue(10, tf.float32)
# 2、定义处理逻辑 循环值+1, 放入队列当中
var = tf.Variable(0.0)
# 实现一个自增 tf.assign_add
data = tf.assign_add(var, tf.constant(1.0))
# 放数据
enter = Q.enqueue(data)
# 3、定义队列管理器op, 指定开启多少个子线程,子线程的任务
qr = tf.train.QueueRunner(Q, enqueue_ops=[enter] * 2)
# 初始化变量的OP
init_op = tf.global_variables_initializer()
with tf.Session() as sess:
# 初始化变量
sess.run(init_op)
# 开启线程管理器
coord = tf.train.Coordinator()
# 真正开启子线程
threads = qr.create_threads(sess, coord=coord, start=True)
# 主线程,不断读取数据训练
for i in range(100):
print(sess.run(Q.dequeue()))
# 回收
coord.request_stop()
coord.join(threads)
2 文件读取
2.1 流程
(1)构造一个文件队列
(2)构造文件阅读器,read读取队列内容,默认只读取一个样本
①csv文件,默认读取一行;②二进制文件,指定一个样本的bytes读取;③图片文件,按一张一张的读取;④threcords
(3)decode解码
(4)批处理
2.2 文件读取API:
(1)文件队列构造
tf.train.string_input_producer(string_tensor,shuffle=True) 将输出字符串(例如文件名)输入到管道队列
string_tensor: 含有文件名的1阶张量
num_epochs:过几遍数据,默认无限过数据
return:具有输出字符串的队列
(2)文件阅读器
tf.TextLineReader 根据文件格式,选择对应的文件阅读器
阅读文本文件逗号分隔值(CSV)格式,默认按行读取
return:读取器实例
tf.FixedLengthRecordReader(record_bytes)
要读取每个记录是固定数量字节的二进制文件
record_bytes:整型,指定每次读取的字节数
return:读取器实例
tf.TFRecordReader 读取TfRecords文件
注意:有一个共同的读取方法: read(file_queue):从队列中指定数量内容 返回一个Tensors元组(key文件名字,value默认的内容(行,字节))
(3)文件内容解码器
由于从文件中读取的是字符串,需要函数去解析这些字符串到张量
tf.decode_csv(records,record_defaults=None,field_delim = None,name = None) 将CSV转换为张量,与tf.TextLineReader搭配使用
records:tensor型字符串,每个字符串是csv中的记录行
field_delim:默认分割符”,”
record_defaults:参数决定了所得张量的类型,并设置一个值在输入字符串中缺少使用默认值,如 tf.decode_raw(bytes,out_type,little_endian = None,name = None) 将字节转换为一个数字向量表示,字节为一字符串类型的张量,与函数tf.FixedLengthRecordReader搭配使用,二进制读取为uint8格式
(4)开启线程操作
tf.train.start_queue_runners(sess=None,coord=None) 收集所有图中的队列线程,并启动线程
sess:所在的会话中
coord:线程协调器
return:返回所有线程队列
(5)管道读端批处理
tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 读取指定大小(个数)的张量
tensors:可以是包含张量的列表
batch_size:从队列中读取的批处理大小
num_threads:进入队列的线程数
capacity:整数,队列中元素的最大数量
return:tensors
tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue, num_threads=1,) 乱序读取指定大小(个数)的张量
min_after_dequeue:留下队列里的张量个数,能够保持随机打乱
import tensorflow as tf
import os
def csvread(filelist):
"""
读取CSV文件
:param filelist: 文件路径+名字的列表
:return: 读取的内容
"""
# 1、构造文件的队列
file_queue = tf.train.string_input_producer(filelist)
# 2、构造csv阅读器读取队列数据(按一行)
reader = tf.TextLineReader()
key, value = reader.read(file_queue)
# 3、对每行内容解码
# record_defaults:指定每一个样本的每一列的类型,指定默认值[["None"], [4.0]]
records = [["None"], ["None"]]
example, label = tf.decode_csv(value, record_defaults=records,field_delim=" ")
# 4、想要读取多个数据,就需要批处理
example_batch, label_batch = tf.train.batch([example, label], batch_size=4, num_threads=1, capacity=4)
print(example_batch, label_batch)
return example_batch, label_batch
if __name__ == "__main__":
# 1、找到文件,放入列表 路径+名字 ->列表当中
file_name = os.listdir("./floder")
filelist = [os.path.join("./floder", file) for file in file_name ]
# 打印文件名
example_batch, label_batch = csvread(filelist)
# 开启会话运行结果
with tf.Session() as sess:
# 定义一个线程协调器
coord = tf.train.Coordinator()
# 开启读文件的线程
threads = tf.train.start_queue_runners(sess, coord=coord)
# 打印读取的内容
print(sess.run([example_batch, label_batch]))
# 回收子线程
coord.request_stop()
coord.join(threads)
3 图像读取
3.1 图像读取基本知识
机器学习算法输入是特征值+目标值。每个图片由像素组成的,读取的时候是读取像素值去识别。
在图像数字化表示当中,分为黑白和彩色两种。在数字化表示图片的时候,有三个因素。分别是图片的长、图片的宽、图片的颜色通道数。
①黑白图片:颜色通道数为1,个像素点只有一个值,称为灰度值[0-255];
②彩色图片:它有三个颜色通道,分别为RGB,通过三个数字表示一个像素位。TensorFlow支持JPG、PNG图像格式,RGB、RGBA颜色空间。图像用与图像尺寸相同(heightwidthchnanel)张量表示。图像所有像素存在磁盘文件,需要被加载到内存。
3.2 图像基本操作
操作:缩小图片大小,为了所有图片统一特征数(像素值一样)。
目的:①增加图片数据的统一性②所有图片转换成指定大小 ③缩小图片数据量,防止增加开销
图片存储计算的类型:存储uint8(节约空间) 矩阵计算float32(提高精度)
API:
tf.image.resize_images(images, size) 缩小图片
images:4-D形状[batch, height, width, channels]或3-D形状的张 量[height, width, channels]的图片数据
size:1-D int32张量:new_height, new_width,图像的新尺寸
返回4-D格式或者3-D格式图片
3.3 图像读取API
图像读取器 :
①tf.WholeFileReader 将文件的全部内容作为值输出的读取器
return:读取器实例
read(file_queue):输出将是一个文件名(key)和该文件的内容 (值)
图像解码器 :
①tf.image.decode_jpeg(contents) 将JPEG编码的图像解码为uint8张量
return:uint8张量,3-D形状[height, width, channels]
②tf.image.decode_png(contents) 将PNG编码的图像解码为uint8或uint16张量
return:张量类型,3-D形状[height, width, channels]
3.4 图片批处理流程
(1)构造图片文件队列
(2)构造图片阅读器
(3)读取图片数据
(4)处理图片数据
3.5 读取图片案例
import tensorflow as tf
import os
def pictureRead(filelist):
"""
读取狗图片并转换成张量
:param filelist: 文件路径+ 名字的列表
:return: 每张图片的张量
"""
# 1、构造文件队列
file_queue = tf.train.string_input_producer(filelist)
# 2、构造阅读器去读取图片内容(默认读取一张图片)
reader = tf.WholeFileReader()
key, value = reader.read(file_queue)
print(value)
# 3、对读取的图片数据进行解码
image = tf.image.decode_jpeg(value)
print(image)
# 5、处理图片的大小(统一大小)
image_resize = tf.image.resize_images(image, [300, 300])
print(image_resize)
# 注意:一定要把样本的形状固定 [300, 300, 3],在批处理的时候要求所有数据形状必须定义
image_resize.set_shape([300, 300, 3])
print(image_resize)
# 6、进行批处理
image_batch = tf.train.batch([image_resize], batch_size=50, num_threads=2, capacity=50)
print(image_batch)
return image_batch
if __name__ == "__main__":
# 1、找到文件,放入列表 路径+名字 ->列表当中
file_name = os.listdir("./cat")
filelist = [os.path.join("./cat", file) for file in file_name ]
# 图片的张量
image_batch = pictureRead(filelist)
# 开启会话运行结果
with tf.Session() as sess:
# 定义一个线程协调器
coord = tf.train.Coordinator()
# 开启读文件的线程
threads = tf.train.start_queue_runners(sess, coord=coord)
# 打印读取的内容
print(sess.run([image_batch]))
# 回收子线程
coord.request_stop()
coord.join(threads)
4 二进制文件读取
4.1 CIFAR-10 二进制数据读取
网址:http://www.cs.toronto.edu/~kriz/cifar.html
由介绍可知每个样本的大小为:1(目标值)+3072(特征值)=3073字节
import tensorflow as tf
import os
def binaryRead(filelist):
# 定义读取的图片的一些属性
height,width,channel = 32,33,3
# 二进制文件每张图片的字节
label_bytes = 1
image_bytes = height * width * channel
bytes = label_bytes + image_bytes
# 1、构造文件队列
file_queue = tf.train.string_input_producer(filelist)
# 2、构造二进制文件读取器,读取内容, 每个样本的字节数
reader = tf.FixedLengthRecordReader(bytes)
key, value = reader.read(file_queue)
# 3、解码内容, 二进制文件内容的解码
label_image = tf.decode_raw(value, tf.uint8)
print(label_image)
# 4、分割出图片和标签数据,切除特征值和目标值
label = tf.cast(tf.slice(label_image, [0], [label_bytes]), tf.int32)
image = tf.slice(label_image, [label_bytes], [image_bytes])
# 5、对图片的特征数据进行形状的改变 [3072] --> [32, 32, 3]
image_reshape = tf.reshape(image, [height, width, channel])
print(label, image_reshape)
# 6、批处理数据
image_batch, label_batch = tf.train.batch([image_reshape, label], batch_size=20, num_threads=2, capacity=20)
print(image_batch, label_batch)
return image_batch, label_batch
if __name__ == "__main__":
# 1、找到文件,放入列表 路径+名字 ->列表当中
file_name = os.listdir("./data/cifar-10-batches-bin")
filelist = [os.path.join("./data/cifar-10-batches-bin", file) for file in file_name if file[-3:]=="bin"]
# 二进制的张量
image_batch, label_batch= binaryRead(filelist)
# 开启会话运行结果
with tf.Session() as sess:
# 定义一个线程协调器
coord = tf.train.Coordinator()
# 开启读文件的线程
threads = tf.train.start_queue_runners(sess, coord=coord)
# 打印读取的内容
print(sess.run([image_batch, label_batch]))
# 回收子线程
coord.request_stop()
coord.join(threads)
5 TFRecords分析存储
5.1 简介
FRecords是Tensorflow设计的一种内置文件格式,是一种二进制文件, 它能更好的利用内存,更方便复制和移动。从机器学习角度,一个样本是特征值和目标值组成,FRecords是为了将二进制数据和标签(训练的类别标签)数据存储在同一个文件中
文件格式:*.tfrecords 写入文件内容:Example协议块(类字典的格式)
优点:特征值目标值共同存储,获取的时候只要指定键是什么值是什么就能获取到了。
5.2 TFRecords存储
(1)建立TFRecord存储器
tf.python_io.TFRecordWriter(path) 写入tfrecords文件
path: TFRecords文件的路径
return:写文件
方法method:
write(record):向文件中写入一个字符串记录(就是example)
close():关闭文件写入器
注意:字符串为一个序列化的Example,使用Example.SerializeToString()
(2)构造每个样本的Example协议块
tf.train.Example(features=None)
写入tfrecords文件
features:tf.train.Features类型的特征实例
return:example格式协议块
tf.train.Features(feature=None) 构建每个样本的信息键值对
feature:字典数据,key为要保存的名字, value为tf.train.Feature实例
return:Features类型
tf.train.Feature(**options)
**options:例如:
bytes_list=tf.train. BytesList(value=[Bytes])
int64_list=tf.train. Int64List(value=[Value])
tf.train. Int64List(value=[Value])
tf.train. BytesList(value=[Bytes])
tf.train. FloatList(value=[value])
5.3 TFRecords读取方法
同文件阅读器流程,中间需要解析过程
解析TFRecords的example协议内存块:
tf.parse_single_example(serialized,features=None,name=None)
解析一个单一的Example原型
serialized:标量字符串Tensor,一个序列化的Example
features:dict字典数据,键为读取的名字,值为FixedLenFeature
return:一个键值对组成的字典,键为读取的名字
tf.FixedLenFeature(shape,dtype)
shape:输入数据的形状,一般不指定,为空列表
dtype:输入数据类型,与存储进文件的类型要一致 类型只能是float32,int64,string
5.4 案例
CIFAR-10批处理结果存入tfrecords流程
(1)构造存储器
(2)构造每一个样本的Example
(3)写入序列化的Example
读取tfrecords流程
(1)构造文件队列
(2)构造TFRecords阅读器
(3)解析Example
(4)转换格式,bytes解码
import tensorflow as tf
import os
# 定义数据等命令行参数
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string("data_dir", "./data/cifar-10-batches-bin", "文件的目录")
tf.app.flags.DEFINE_string("data_tfrecords", "./tmp/dataTFR.tfrecords", "存进tfrecords的文件")
class TFRRead(object):
"""完成读取二进制文件, 写进tfrecords,读取tfrecords
"""
def __init__(self, filelist):
# 文件列表
self.file_list = filelist
# 定义读取的图片的一些属性
self.height = 32
self.width = 32
self.channel = 3
# 二进制文件每张图片的字节
self.label_bytes = 1
self.image_bytes = self.height * self.width * self.channel
self.bytes = self.label_bytes + self.image_bytes
def read_and_decode(self):
# 1、构造文件队列
file_queue = tf.train.string_input_producer(self.file_list)
# 2、构造二进制文件读取器,读取内容, 每个样本的字节数
reader = tf.FixedLengthRecordReader(self.bytes)
key, value = reader.read(file_queue)
# 3、解码内容, 二进制文件内容的解码
label_image = tf.decode_raw(value, tf.uint8)
# 4、分割出图片和标签数据,切除特征值和目标值
label = tf.cast(tf.slice(label_image, [0], [self.label_bytes]), tf.int32)
image = tf.slice(label_image, [self.label_bytes], [self.image_bytes])
# 5、对图片的特征数据进行形状的改变 [3072] --> [32, 32, 3]
image_reshape = tf.reshape(image, [self.height, self.width, self.channel])
# 6、批处理数据
image_batch, label_batch = tf.train.batch([image_reshape, label], batch_size=20, num_threads=1, capacity=20)
return image_batch, label_batch
def write_ro_tfrecords(self, image_batch, label_batch):
"""
将图片的特征值和目标值存进tfrecords
:param image_batch: 20张图片的特征值
:param label_batch: 20张图片的目标值
:return: None
"""
# 1、建立TFRecord存储器
writer = tf.python_io.TFRecordWriter(FLAGS.data_tfrecords)
# 2、循环将所有样本写入文件,每张图片样本都要构造example协议
for i in range(20):
# 取出第i个图片数据的特征值和目标值,image_batch[i]是类型,调用eval()获取值,因为是个张量,需要调用.tostring()转换成字符串
image = image_batch[i].eval().tostring()
label = int(label_batch[i].eval()[0])
# 构造一个样本的example
example = tf.train.Example(features=tf.train.Features(feature={
"image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
"label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
}))
# 写入单独的样本,字符串要为一个序列化的Example
writer.write(example.SerializeToString())
# 关闭
writer.close()
return None
def read_from_tfrecords(self):
# 1、构造文件队列
file_queue = tf.train.string_input_producer([FLAGS.data_tfrecords])
# 2、构造文件阅读器,读取内容example,value=一个样本的序列化example
reader = tf.TFRecordReader()
key, value = reader.read(file_queue)
# 3、解析example
features = tf.parse_single_example(value, features={
"image": tf.FixedLenFeature([], tf.string),
"label": tf.FixedLenFeature([], tf.int64),
})
# 4、解码内容, 如果读取的内容格式是string需要解码, 如果是int64,float32不需要解码
image = tf.decode_raw(features["image"], tf.uint8)
# 固定图片的形状,方便与批处理
image_reshape = tf.reshape(image, [self.height, self.width, self.channel])
label = tf.cast(features["label"], tf.int32)
print(image_reshape, label)
# 进行批处理
image_batch, label_batch = tf.train.batch([image_reshape, label], batch_size=20, num_threads=1, capacity=20)
return image_batch, label_batch
if __name__ == "__main__":
# 1、找到文件,放入列表 路径+名字 ->列表当中
file_name = os.listdir(FLAGS.data_dir)
filelist = [os.path.join(FLAGS.data_dir, file) for file in file_name if file[-3:] == "bin"]
# print(file_name)
cf = TFRRead(filelist)
#image_batch, label_batch = cf.read_and_decode()
image_batch, label_batch = cf.read_from_tfrecords()
# 开启会话运行结果
with tf.Session() as sess:
# 定义一个线程协调器
coord = tf.train.Coordinator()
# 开启读文件的线程
threads = tf.train.start_queue_runners(sess, coord=coord)
#存进tfrecords文件
# print("开始存储")
# threads = cf.write_ro_tfrecords(image_batch, label_batch)
# print("结束存储")
# 打印读取的内容
print(sess.run([image_batch, label_batch]))
# 回收子线程
coord.request_stop()
coord.join(threads)
更多推荐
所有评论(0)