-
队列和线程
-
文件读取,
-
图片处理
问题:大文件读取,读取速度,
在tensorflow中真正的多线程
子线程读取数据 向队列放数据(如每次100个),主线程学习,不用全部数据读取后,开始学习
队列与对垒管理器,线程与协调器
-
dequeue() 出队方法
-
enqueue(vals,name=None) 入队方法
-
enqueue_many(vals,name=None) 多个入队
-
size(name=None) 还有对少数据
tf.RandomShuffleQueue() # 随机出队队列
tf.train.QueueRunner(queue=None,enqueue_ops=None)
-
queue:队列
-
enqueue_ops:添加线程的队列操作列表,[op1,op2]*2 2表示指定两个线程
-
create_threads(sess,coord=None,start=False) 创建线程运行给定的入队操作
-
start:若为True线程自动启动,Fasle需手动start()
-
coord:线程协调器,后面线程管理器用到
-
return:返回线程实例
线程协调器
tf.train.Coordinator()
-
request_stop()
-
should_stop() 检查是否要求停止
-
join(threads=None,stop_grace_period_secs=120) 等待线程终止
-
return 返回线程协调器实例
同步,异步案例
import tensorflow as tf
def sync():
# 模拟一下同步 先处理数据,然后取数据训练
# tensorflow中,运行操作有依赖性
# 1,首先定义队列
Q = tf.FIFOQueue(3,tf.float32)
# 放入数据 tensorflow 会把传入的数据当做张量 a=y 此时y不会当做列表 a=[x,] 此时a是list
enq_many = Q.enqueue_many([[0.1,0.2,0.3],])
# 2,定义读取数据,取数据过程, 入队,出队
out_q = Q.dequeue() # 虽然是个op 但取出来是数据
data = out_q+1
en_q = Q.enqueue(data)
with tf.Session() as sess:
# 初始化队列
sess.run(enq_many)
# 处理数据
for i in range(100):
sess.run(en_q) # 有依赖性,会运行out_q,data,qn_q
# 训练数据
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue())) # 33.2 33.3 34.1
模拟异步
import tensorflow as tf
def asyn():
# 1,定义队列
Q = tf.FIFOQueue(1000,tf.float32)
# 2,定义子线程 操作
var = tf.Variable(0.0)
# 实现自增 tf.assign_add() # 主要作用是江var的值改变了
data = tf.assign_add(var,tf.constant(1.0))
en_q = Q.enqueue(data)
# 3,定义队列管理器op,指定子线程该做什么
qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
# 初始化变量op
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
# 开启线程管理器
coord = tf.train.Coordinator()
# 开启子线程
threads = qr.create_threads(sess,start=True,coord=coord) # coord指定管理器
# 主线程,不断取数据训练
for i in range(300): # 300 次后主线程结束,sess释放,单子线程还在,会报错,需要回收子线程
print(sess.run(Q.dequeue()))
coord.request_stop() # 请求回收
coord.join(threads) # 等待回收
文件读取
- 构造文件列表,
- 构造文件队列,
- 阅读器 读取内容一行,
- 解码,
- 批处理
文件队列
tf.train.string_input_producer(string_tensor=None,shuffle=True) # 奖输出字符串输入到管道队列
-
string_tensor 含有文件名的1阶张量
-
num_epochs:过几遍数据,默认无限
-
return:具有字符串的队列
文件阅读器
- tf.TextLineReader # csv文件
- tf.FixedLengthRecordReader(record_bytes=1024) # 二进制
- tf.TFRecordReader # TfRecords 自带的文件格式
共同方法 read(file_queue)
dequeue() 出队方法
enqueue(vals,name=None) 入队方法
enqueue_many(vals,name=None) 多个入队
size(name=None) 还有对少数据
queue:队列
enqueue_ops:添加线程的队列操作列表,[op1,op2]*2 2表示指定两个线程
create_threads(sess,coord=None,start=False) 创建线程运行给定的入队操作
start:若为True线程自动启动,Fasle需手动start()
coord:线程协调器,后面线程管理器用到
return:返回线程实例
request_stop()
should_stop() 检查是否要求停止
join(threads=None,stop_grace_period_secs=120) 等待线程终止
return 返回线程协调器实例
import tensorflow as tf
def sync():
# 模拟一下同步 先处理数据,然后取数据训练
# tensorflow中,运行操作有依赖性
# 1,首先定义队列
Q = tf.FIFOQueue(3,tf.float32)
# 放入数据 tensorflow 会把传入的数据当做张量 a=y 此时y不会当做列表 a=[x,] 此时a是list
enq_many = Q.enqueue_many([[0.1,0.2,0.3],])
# 2,定义读取数据,取数据过程, 入队,出队
out_q = Q.dequeue() # 虽然是个op 但取出来是数据
data = out_q+1
en_q = Q.enqueue(data)
with tf.Session() as sess:
# 初始化队列
sess.run(enq_many)
# 处理数据
for i in range(100):
sess.run(en_q) # 有依赖性,会运行out_q,data,qn_q
# 训练数据
for i in range(Q.size().eval()):
print(sess.run(Q.dequeue())) # 33.2 33.3 34.1
import tensorflow as tf
def asyn():
# 1,定义队列
Q = tf.FIFOQueue(1000,tf.float32)
# 2,定义子线程 操作
var = tf.Variable(0.0)
# 实现自增 tf.assign_add() # 主要作用是江var的值改变了
data = tf.assign_add(var,tf.constant(1.0))
en_q = Q.enqueue(data)
# 3,定义队列管理器op,指定子线程该做什么
qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
# 初始化变量op
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
# 开启线程管理器
coord = tf.train.Coordinator()
# 开启子线程
threads = qr.create_threads(sess,start=True,coord=coord) # coord指定管理器
# 主线程,不断取数据训练
for i in range(300): # 300 次后主线程结束,sess释放,单子线程还在,会报错,需要回收子线程
print(sess.run(Q.dequeue()))
coord.request_stop() # 请求回收
coord.join(threads) # 等待回收
string_tensor 含有文件名的1阶张量
num_epochs:过几遍数据,默认无限
return:具有字符串的队列
返回tensors元祖(key文件名字,value 默认的内容,一行,或字节)
解码
-
recodes:上边的value
-
field_delim:默认分隔符
-
record_defaults:指定每列类型,指定默认值
-
参数决定了张量的类型,并设置一个值,在字符串中缺少使用默认值
tf.decode_raw(bytes,out_type=None,little_endian=None,name=None)
recodes:上边的value
field_delim:默认分隔符
record_defaults:指定每列类型,指定默认值
参数决定了张量的类型,并设置一个值,在字符串中缺少使用默认值
将字节转换为一个数字向量,字节为一字符类型的张量,与函数
tf.FixLengthRecordReader
搭配使用,二进制读取为uint8格式
管道批处理
tf.train.batch()
-
tensors:包含张量的列表
-
batch_size:从队列汇总读取的批处理的大小
-
num_threads:进入队列的线程数
-
capacity:整数,队列的最大数量
案例
import tensorflow as tf
def readcsv(filelist):
"""
:param filelist:文件路径列表
:return:
"""
import os
path = r"E:\Workspace\PythonWorkSpace\Machine-Learning\asstes\temp"
file_name = os.listdir(path)
file_list = [os.path.join(path, file) for file in file_name]
# 1,构造文件队列
file_queue = tf.train.string_input_producer(file_list)
# 2,构造csv阅读器 读取队列数据
reader = tf.TextLineReader()
key,value=reader.read(file_queue)
# 3,对每行内容进行解码
records = [["None"],["None"]] # 指定第一列为字符串,默认值为None, 或指定为[1.0] 数字
# 单行读取
first,second = tf.decode_csv(value,record_defaults=records) # 返回就是n列,就是n个值接收
# 批量读取,每次取出的大小只与batch_size:有关 若大于总数,会重复取
first_batch,second_batch = tf.train.batch([first,second],batch_size=6,num_threads=1,capacity=6)
with tf.Session() as sess:
# 定义线程协调器
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess,coord=coord)
#print(sess.run([first,second]))
print(sess.run([first_batch,second_batch]))
coord.request_stop()
coord.join(threads)
return None
图像读取
特征值:像素
黑白:单通道[0-255] 像素数
彩色:三通道:RGB 像素数*3
图片数字化
三要素:长度,宽度,通道数
3D张量 [200,200,1] [200,200,3]
图片特征要一样:像素要一样-->大小相同 缩放图片
图片文件读取
API:
图像读取器:tf.WholeFileReader
-
将文件的全部内容作为值输出的读取器
-
返回读取实例
-
read(file_queuse) 输出时一个文件名和该文件的值
图像解码器:tf.image.decode_jpeg(contents)
-
将jpeg编码为的图像解码为uint8张量
-
返回uint8张量 3D形状 [height,width,channels]
-
tf.image.decode_png(contents)
-
解码为uint8或uint16
-
返回3D形状
def picread():
import os
import os,tensorflow as tf
path = r"D:\图片\壁纸\手机壁纸\people"
file_name = os.listdir(path)
file_list = [os.path.join(path, file) for file in file_name]
# 1, 构造文件队列
file_queue=tf.train.string_input_producer(file_list)
# 2, 构造文件读取器读取图片内容(默认一张)
reader = tf.WholeFileReader()
key,value=reader.read(file_queue) # value tensor(shape=(),dtype=string)
# 3, 对读取的图片数据进行解码
image = tf.image.decode_jpeg(value) # tensor(shape=(?,?,?),dtype=uint8)
# 4, 处理图片大小统一
image_resize = tf.image.resize_images(image,[1080,2160]) # shape(1080,2160,?)
# 设置通道数,在批处理时要求形状必须固定
image_resize.set_shape([1080,2160,3]) # 此时 shape(1080,2160,3)
# 5, 进行批处理
image_batch = tf.train.batch([image_resize],batch_size=20,num_threads=4,capacity=20) # shape(20,1080,2160,3)
with tf.Session() as sess:
# 定义线程协调器
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess, coord=coord)
# print(sess.run([image_resize])) #
print(sess.run([image_batch])) #
coord.request_stop()
coord.join(threads)
return None
二进制文件处理
案例分析:https://www.cs.toronto.edu/~kriz/cifar.html
CIFAR-10数据集:6000张32*32彩色图片 分为训练集文件,测试集文件
文件中每一行为(标签 像素值) 即为0-9 1024红色通道,1024绿色,1024蓝色 每行3072个字节 目标值+特征值
TFRecords 分析,存取
- Tensorflow内置文件格式,二进制文件, 更好的利用内存,速度快,更好的赋值和移动
- 为了将二进制数据和标签 存储在同一个文件中
- 文件格式*.tfrecords
- 写入文件内容:Example协议块 ----->类字典格式
- 每个example样本:{image:xxx,label:yyy}
API
tf.python_io.TFRecordsWriter(path)
- path:TFrecords文件路径
- return:文件写入器
- write(record):向文件中写入一个字符串记录.,就是一个example
- close():关闭文件写入器
一个样本--->example
tf.train.Example(features=None)
- 写入tfrecords文件
- feature:tf.train.Features类型的特征实例
- return:example协议块
tf.train.Features(feature=None)
- 构建每个样本的信息键值对
- feature:字典数据,key为名字,value为tf.train.Feature 实例
- return:Features类型
tf.train.Feature(**options)
**options:例如
bytes_list=tf.train.BytesList(value=[Bytes])
int64_list=tf.train.Int64List(value=[Value])
float_list=tf.train.FloatList(value=[Value])
TfRecords读取
-
同文件阅读器流程,中间需要解析过程
-
解析Tfrecords的example协议块
-
tf.parse_single_example(seralized,features=None,name=None)
- 解析一个单一的example原型
- seralized:标量字符串Tensor,一个序列化的Example
- Features:dcit字典数据,键为读取的名字,值为 Fixed LenFeature
- return:一个键值对组成的字典,键为读取的名字
-
tf.FixedLengthFeature(shape.dtype)
- shape:输入数据的形状,一般不指定,为空列表
- dtype:输入数据的类型,与存储金文件的类型一致,只能为float32,int64,string
import tensorflow as tf
class CifarRead(object):
"""
完成读取二进制文件,写进tfrecords,读取tfrecords
"""
def __init__(self,filelist,tfrecords_path):
# tfrecords文件路径
self.tfrecords_path = tfrecords_path
# 文件列表(文件的目录)
self.filelist = filelist
#
# 定义读取图片的属性
self.height = 32
self.weight = 32
self.channel = 3
self.label_bytes = 1
self.image_bytes= self.height*self.weight*self.channel
self.bytes = self.image_bytes+self.label_bytes
def read_and_decodes(self):
# 1, 构造文件队列
file_queue = tf.train.string_input_producer(self.filelist)
# 2, 构造文件读取器,读取内容,每个人样本字节数
reader = tf.FixedLengthRecordReader(self.bytes)
key,value = reader.read(file_queue) # 此时shape() dtype=string
# 3, 解码内容
label_image = tf.decode_raw(value,tf.uint8) # shape(?,) dtype=uint8 一维 图片与,目标值在一起
# 4,分割图片与标签数据 特征值和目标值 (本例数据中,目标值在前边,1,3072)
label = tf.slice(label_image,[0],[self.label_bytes]) # 切割出label ,slice(value,从哪读,读多少)
image = tf.slice(label_image,[self.label_bytes],[self.image_bytes])
label = tf.cast(label,tf.int32)
# image = tf.cast(image,tf.float32) # 需要计算的话转为float32
# 5, 对图片数据进行形状改变,[3072]--->[32,32,3]
image_reshape = tf.reshape(image,[self.height,self.weight,self.channel])
# 6, 批处理数据
image_batch,label_batch = tf.train.batch([image_reshape,label],batch_size=10,num_threads=1,capacity=10) # shape(10,32,32,3) shape(10,1)
return image_batch,label_batch
def write_to_tfrecords(self,image_batch,label_batch):
"""
将图片的特征值和目标值 存进tfrecords
:param image_batch: n张图片的目标值
:param label_batch: n张图片的目标值
:return: None
"""
# 1, 构造一个tfrecords文件,使用Tfrecords存储器
writer = tf.python_io.TFRecordWriter(self.tfrecords_path)
# 2, 循环写入,每张图片都早example
for i in range(image_batch.shape[0]):
# 取出第i个图片的特征值和目标值
image = image_batch[i] # (32,32,3)
label = label_batch[i] # (10,1)
# 要转化为张量,张量值转化为字符串
image = image.eval().tostring()
label = int(label.eval()[0])
# 构造一个样本的example
example = tf.train.Example(features=tf.train.Features(feature={
"image":tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
"label":tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
}))
# 写入单独的样本
writer.write(example.SerializeToString())
# 关闭
writer.close()
return None
def read_form_tfrecords(self):
# 1, 构造文件队列
file_queue = tf.train.string_input_producer([r"I:\人工智能数据\6000_32_32_图片集\cifar-10-batches-bin\all.tfrecords"])
# 2, 构造文件阅读器,读取内容 example value 为一个样本序列化的example
reader = tf.TFRecordReader()
key,value = reader.read(file_queue)
# 3, 解析example
features = tf.parse_single_example(value,features={
"image":tf.FixedLenFeature([],tf.string),
"label":tf.FixedLenFeature([],tf.int64),
})
# features["image"] Tensor(shape=(),dtype=string)
# features["label"] Tensor(shape=(),dtype=64)
# 4,解码内容 ,若内容时string 需要解码,是int64,string32 不需要解码
image = tf.decode_raw(features["image"],tf.uint8) # Tensor(shape(?,) uint8)
# 固定形状
image_reshape = tf.reshape(image,[self.height,self.weight,self.channel])
label = features["label"] # Tensor(shape=(),int64)
# 5, 进行批处理
image_batch,label_batch = tf.train.batch([image_reshape,label],batch_size=20,num_threads=1,capacity=10)
return image_batch,label_batch
def readBinary():
import os
# 数据地址 https://www.cs.toronto.edu/~kriz/cifar.html
path = r"I:\人工智能数据\6000_32_32_图片集\cifar-10-batches-bin"
tfrecords = os.path.join(path,"all.tfrecords")
file_name = os.listdir(path)
# 获取文件列表,并过滤 非bin文件
filelist = [os.path.join(path,file) for file in file_name if file[-3:]=="bin"]
with tf.Session() as sess:
c = CifarRead(filelist, tfrecords)
# 读取数据并存入tfrecords
image_batch, label_batch = c.read_and_decodes()
c.write_to_tfrecords(image_batch=image_batch, label_batch=label_batch)
# 从tfrercords中读取
# image_batch, label_batch = c.read_form_tfrecords()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess, coord=coord)
print(sess.run([image_batch,label_batch]))
coord.request_stop()
coord.join(threads)
return None
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:深度学习_1_Tensorflow_2_数据_文件读取 - Python技术站