• 队列和线程
  • 文件读取,
  • 图片处理

问题:大文件读取,读取速度,

tensorflow中真正的多线程

子线程读取数据 向队列放数据(如每次100个),主线程学习,不用全部数据读取后,开始学习

队列与对垒管理器,线程与协调器

  • dequeue() 出队方法

  • enqueue(vals,name=None) 入队方法

  • enqueue_many(vals,name=None) 多个入队

  • size(name=None) 还有对少数据

tf.RandomShuffleQueue() # 随机出队队列

tf.train.QueueRunner(queue=None,enqueue_ops=None)

  • queue:队列

  • enqueue_ops:添加线程的队列操作列表,[op1,op2]*2 2表示指定两个线程

  • create_threads(sess,coord=None,start=False) 创建线程运行给定的入队操作

  • start:若为True线程自动启动,Fasle需手动start()

  • coord:线程协调器,后面线程管理器用到

  • return:返回线程实例

线程协调器

tf.train.Coordinator()

  • request_stop()

  • should_stop() 检查是否要求停止

  • join(threads=None,stop_grace_period_secs=120) 等待线程终止

  • return 返回线程协调器实例

同步,异步案例

import  tensorflow as tf
def sync():
    # 模拟一下同步  先处理数据,然后取数据训练
    # tensorflow中,运行操作有依赖性
    # 1,首先定义队列
    Q = tf.FIFOQueue(3,tf.float32)
    # 放入数据 tensorflow 会把传入的数据当做张量 a=y 此时y不会当做列表  a=[x,] 此时a是list
    enq_many = Q.enqueue_many([[0.1,0.2,0.3],])
    # 2,定义读取数据,取数据过程, 入队,出队
    out_q = Q.dequeue() # 虽然是个op 但取出来是数据
    data = out_q+1
    en_q = Q.enqueue(data)
    with tf.Session() as sess:
        # 初始化队列
        sess.run(enq_many)
        # 处理数据
        for i in range(100):
            sess.run(en_q) # 有依赖性,会运行out_q,data,qn_q
        # 训练数据
        for i in range(Q.size().eval()):
            print(sess.run(Q.dequeue())) # 33.2 33.3 34.1

模拟异步

import  tensorflow as tf
def asyn():
    # 1,定义队列
    Q = tf.FIFOQueue(1000,tf.float32)
    # 2,定义子线程 操作
    var = tf.Variable(0.0)
    # 实现自增 tf.assign_add() # 主要作用是江var的值改变了
    data = tf.assign_add(var,tf.constant(1.0))
    en_q = Q.enqueue(data)
    # 3,定义队列管理器op,指定子线程该做什么
    qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q]*2)
    # 初始化变量op
    init = tf.global_variables_initializer()
    with tf.Session() as sess:
        sess.run(init)
        # 开启线程管理器
        coord = tf.train.Coordinator()
        # 开启子线程
        threads = qr.create_threads(sess,start=True,coord=coord) # coord指定管理器
        # 主线程,不断取数据训练
        for i in range(300): # 300 次后主线程结束,sess释放,单子线程还在,会报错,需要回收子线程
            print(sess.run(Q.dequeue()))
        coord.request_stop() # 请求回收
        coord.join(threads) # 等待回收

文件读取

  • 构造文件列表,
  • 构造文件队列,
  • 阅读器 读取内容一行,
  • 解码,
  • 批处理

文件队列

tf.train.string_input_producer(string_tensor=None,shuffle=True) # 奖输出字符串输入到管道队列

  • string_tensor 含有文件名的1阶张量

  • num_epochs:过几遍数据,默认无限

  • return:具有字符串的队列

文件阅读器

  • tf.TextLineReader # csv文件
  • tf.FixedLengthRecordReader(record_bytes=1024) # 二进制
  • tf.TFRecordReader # TfRecords 自带的文件格式

共同方法 read(file_queue)

返回tensors元祖(key文件名字,value 默认的内容,一行,或字节)

解码

  • recodes:上边的value

  • field_delim:默认分隔符

  • record_defaults:指定每列类型,指定默认值

  • 参数决定了张量的类型,并设置一个值,在字符串中缺少使用默认值

tf.decode_raw(bytes,out_type=None,little_endian=None,name=None)

将字节转换为一个数字向量,字节为一字符类型的张量,与函数

tf.FixLengthRecordReader

搭配使用,二进制读取为uint8格式

管道批处理

tf.train.batch()

  • tensors:包含张量的列表

  • batch_size:从队列汇总读取的批处理的大小

  • num_threads:进入队列的线程数

  • capacity:整数,队列的最大数量

案例

import  tensorflow as tf
def readcsv(filelist):
    """
    :param filelist:文件路径列表
    :return:
    """
    import os
    path = r"E:\Workspace\PythonWorkSpace\Machine-Learning\asstes\temp"
    file_name = os.listdir(path)
    file_list = [os.path.join(path, file) for file in file_name]
    # 1,构造文件队列
    file_queue = tf.train.string_input_producer(file_list)
    # 2,构造csv阅读器 读取队列数据
    reader = tf.TextLineReader()
    key,value=reader.read(file_queue)
    # 3,对每行内容进行解码
    records = [["None"],["None"]] # 指定第一列为字符串,默认值为None, 或指定为[1.0] 数字
    # 单行读取
    first,second = tf.decode_csv(value,record_defaults=records) # 返回就是n列,就是n个值接收
    # 批量读取,每次取出的大小只与batch_size:有关 若大于总数,会重复取
    first_batch,second_batch = tf.train.batch([first,second],batch_size=6,num_threads=1,capacity=6)
    with tf.Session() as sess:
        # 定义线程协调器
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess,coord=coord)
        #print(sess.run([first,second]))
        print(sess.run([first_batch,second_batch]))
    coord.request_stop()
    coord.join(threads)
    return None

图像读取

特征值:像素

黑白:单通道[0-255] 像素数

彩色:三通道:RGB 像素数*3

图片数字化

三要素:长度,宽度,通道数

3D张量 [200,200,1] [200,200,3]

图片特征要一样:像素要一样-->大小相同 缩放图片

图片文件读取

API:

图像读取器:tf.WholeFileReader

  • ​ 将文件的全部内容作为值输出的读取器

  • ​ 返回读取实例

  • ​ read(file_queuse) 输出时一个文件名和该文件的值

图像解码器:tf.image.decode_jpeg(contents)

  • ​ 将jpeg编码为的图像解码为uint8张量

  • ​ 返回uint8张量 3D形状 [height,width,channels]

  • ​ tf.image.decode_png(contents)

  • ​ 解码为uint8或uint16

  • ​ 返回3D形状

def picread():
    import os
    import os,tensorflow as tf
    path = r"D:\图片\壁纸\手机壁纸\people"
    file_name = os.listdir(path)
    file_list = [os.path.join(path, file) for file in file_name]
    # 1, 构造文件队列
    file_queue=tf.train.string_input_producer(file_list)
    # 2, 构造文件读取器读取图片内容(默认一张)
    reader = tf.WholeFileReader()
    key,value=reader.read(file_queue)  # value tensor(shape=(),dtype=string)
    # 3, 对读取的图片数据进行解码
    image = tf.image.decode_jpeg(value)  # tensor(shape=(?,?,?),dtype=uint8)
    # 4, 处理图片大小统一
    image_resize = tf.image.resize_images(image,[1080,2160])  # shape(1080,2160,?)
    # 设置通道数,在批处理时要求形状必须固定
    image_resize.set_shape([1080,2160,3])   # 此时 shape(1080,2160,3)
    # 5, 进行批处理
    image_batch = tf.train.batch([image_resize],batch_size=20,num_threads=4,capacity=20) # shape(20,1080,2160,3)

    with tf.Session() as sess:
        # 定义线程协调器
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess, coord=coord)
        # print(sess.run([image_resize])) #
        print(sess.run([image_batch])) #
        coord.request_stop()
        coord.join(threads)
    return None

二进制文件处理

案例分析:https://www.cs.toronto.edu/~kriz/cifar.html

CIFAR-10数据集:6000张32*32彩色图片 分为训练集文件,测试集文件

文件中每一行为(标签 像素值) 即为0-9 1024红色通道,1024绿色,1024蓝色 每行3072个字节 目标值+特征值

TFRecords 分析,存取

  • ​ Tensorflow内置文件格式,二进制文件, 更好的利用内存,速度快,更好的赋值和移动
  • ​ 为了将二进制数据和标签 存储在同一个文件中
  • ​ 文件格式*.tfrecords
  • ​ 写入文件内容:Example协议块 ----->类字典格式
  • ​ 每个example样本:{image:xxx,label:yyy}

API

tf.python_io.TFRecordsWriter(path)

  • ​ path:TFrecords文件路径
  • ​ return:文件写入器
  • ​ write(record):向文件中写入一个字符串记录.,就是一个example
  • ​ close():关闭文件写入器

一个样本--->example

tf.train.Example(features=None)

  • ​ 写入tfrecords文件
  • ​ feature:tf.train.Features类型的特征实例
  • ​ return:example协议块

tf.train.Features(feature=None)

  • ​ 构建每个样本的信息键值对
  • ​ feature:字典数据,key为名字,value为tf.train.Feature 实例
  • ​ return:Features类型

tf.train.Feature(**options)

​ **options:例如

​ bytes_list=tf.train.BytesList(value=[Bytes])

​ int64_list=tf.train.Int64List(value=[Value])

​ float_list=tf.train.FloatList(value=[Value])

TfRecords读取

  • 同文件阅读器流程,中间需要解析过程

  • ​ 解析Tfrecords的example协议块

  • ​ tf.parse_single_example(seralized,features=None,name=None)

    • ​ 解析一个单一的example原型
    • ​ seralized:标量字符串Tensor,一个序列化的Example
    • ​ Features:dcit字典数据,键为读取的名字,值为 Fixed LenFeature
    • ​ return:一个键值对组成的字典,键为读取的名字
  • ​ tf.FixedLengthFeature(shape.dtype)

    • ​ shape:输入数据的形状,一般不指定,为空列表
    • ​ dtype:输入数据的类型,与存储金文件的类型一致,只能为float32,int64,string
import tensorflow as tf
class CifarRead(object):
    """
    完成读取二进制文件,写进tfrecords,读取tfrecords
    """
    def __init__(self,filelist,tfrecords_path):
        # tfrecords文件路径
        self.tfrecords_path = tfrecords_path
        # 文件列表(文件的目录)
        self.filelist = filelist
        #
        # 定义读取图片的属性
        self.height = 32
        self.weight = 32
        self.channel = 3
        self.label_bytes = 1
        self.image_bytes= self.height*self.weight*self.channel
        self.bytes = self.image_bytes+self.label_bytes
    def read_and_decodes(self):
        # 1, 构造文件队列
        file_queue = tf.train.string_input_producer(self.filelist)
        # 2, 构造文件读取器,读取内容,每个人样本字节数
        reader = tf.FixedLengthRecordReader(self.bytes)
        key,value = reader.read(file_queue) # 此时shape() dtype=string
        # 3, 解码内容
        label_image = tf.decode_raw(value,tf.uint8)  # shape(?,) dtype=uint8 一维 图片与,目标值在一起
        # 4,分割图片与标签数据 特征值和目标值 (本例数据中,目标值在前边,1,3072)
        label = tf.slice(label_image,[0],[self.label_bytes]) # 切割出label ,slice(value,从哪读,读多少)
        image = tf.slice(label_image,[self.label_bytes],[self.image_bytes])
        label = tf.cast(label,tf.int32)
        # image = tf.cast(image,tf.float32) # 需要计算的话转为float32
        # 5, 对图片数据进行形状改变,[3072]--->[32,32,3]
        image_reshape = tf.reshape(image,[self.height,self.weight,self.channel])
        # 6, 批处理数据
        image_batch,label_batch = tf.train.batch([image_reshape,label],batch_size=10,num_threads=1,capacity=10)  # shape(10,32,32,3) shape(10,1)
        return image_batch,label_batch
    def write_to_tfrecords(self,image_batch,label_batch):
        """
        将图片的特征值和目标值 存进tfrecords
        :param image_batch: n张图片的目标值
        :param label_batch: n张图片的目标值
        :return: None
        """
        # 1, 构造一个tfrecords文件,使用Tfrecords存储器
        writer = tf.python_io.TFRecordWriter(self.tfrecords_path)
        # 2, 循环写入,每张图片都早example
        for i in range(image_batch.shape[0]):
            # 取出第i个图片的特征值和目标值
            image = image_batch[i] # (32,32,3)
            label = label_batch[i] # (10,1)
            # 要转化为张量,张量值转化为字符串
            image = image.eval().tostring()
            label = int(label.eval()[0])
            # 构造一个样本的example
            example = tf.train.Example(features=tf.train.Features(feature={
                "image":tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
                "label":tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
            }))
            # 写入单独的样本
            writer.write(example.SerializeToString())
        # 关闭
        writer.close()
        return None
    def read_form_tfrecords(self):
        # 1, 构造文件队列
        file_queue = tf.train.string_input_producer([r"I:\人工智能数据\6000_32_32_图片集\cifar-10-batches-bin\all.tfrecords"])
        # 2, 构造文件阅读器,读取内容 example value 为一个样本序列化的example
        reader = tf.TFRecordReader()
        key,value = reader.read(file_queue)
        # 3, 解析example
        features = tf.parse_single_example(value,features={
            "image":tf.FixedLenFeature([],tf.string),
            "label":tf.FixedLenFeature([],tf.int64),
        })
            # features["image"] Tensor(shape=(),dtype=string)
            # features["label"] Tensor(shape=(),dtype=64)
        # 4,解码内容 ,若内容时string 需要解码,是int64,string32 不需要解码
        image = tf.decode_raw(features["image"],tf.uint8)  # Tensor(shape(?,) uint8)
        # 固定形状
        image_reshape = tf.reshape(image,[self.height,self.weight,self.channel])
        label = features["label"]  # Tensor(shape=(),int64)
        # 5, 进行批处理
        image_batch,label_batch = tf.train.batch([image_reshape,label],batch_size=20,num_threads=1,capacity=10)
        return image_batch,label_batch
def readBinary():
    import os
    # 数据地址 https://www.cs.toronto.edu/~kriz/cifar.html
    path = r"I:\人工智能数据\6000_32_32_图片集\cifar-10-batches-bin"
    tfrecords = os.path.join(path,"all.tfrecords")
    file_name = os.listdir(path)
    # 获取文件列表,并过滤 非bin文件
    filelist = [os.path.join(path,file) for file in file_name if file[-3:]=="bin"]
    with tf.Session() as sess:
        c = CifarRead(filelist, tfrecords)
        # 读取数据并存入tfrecords
        image_batch, label_batch = c.read_and_decodes()
        c.write_to_tfrecords(image_batch=image_batch, label_batch=label_batch)
        # 从tfrercords中读取
        # image_batch, label_batch = c.read_form_tfrecords()
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess, coord=coord)
        print(sess.run([image_batch,label_batch]))
        coord.request_stop()
        coord.join(threads)
    return None