1、知识点

"""
注意:在tensorflow当中,运行操作具有依赖性

1、CPU操作计算与IO计算区别:
        CPU操作:
            1、tensorflow是一个正真的多线程,并行的执行任务
            2、使用tfrecords对文件读取进行改善
            
        IO操作:
            1、一次性读取数据,消耗内存
            2、一次性进行训练
        
2、队列API:        
        1、tf.FIFOQueue(capacity, dtypes, name='fifo_queue') 先进先出队列,按顺序出队列
                capacity:整数。可能存储在此队列中的元素数量的上限
                dtypes:DType对象列表。长度dtypes必须等于每个队列元素中的张量数,dtype的类型形状,决定了后面进队列元素形状
                return:返回一个进队列操作
                        dequeue(name=None) #从队列获取一个数据
                        enqueue(vals, name=None) #将数据存放在队列
                        enqueue_many(vals, name=None):放入数据,其中vals列表或者元组
    
        2、tf.RandomShuffleQueue 随机出队列
        
3、队列管理器:qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q*2])
            qr.create_threads(sess,start=True)    #开启子线程    

4、线程协调器:tf.train.Coordinator() ,线程协调员,实现一个简单的机制来协调一组线程的终止
            返回对象方法:
                request_stop() 
                should_stop() 检查是否要求停止
                join(threads=None, stop_grace_period_secs=120)  等待线程终止
                return:线程协调员实例

5、CSV文件读取步骤:
        1、先找到文件,构造一个列表 
            file_name = os.listdir("./csvData/")
            file_list = [os.path.join(file) for file in file_name]
        2、构造文件列队
            
        3、构造阅读器,读取队列内容(一行)
        4、解码内容
        5、批处理(多个样本)

6、文件读取API-文件队列构造:tf.train.string_input_producer(string_tensor,,shuffle=True) 将输出字符串(例如文件名)输入到管道队列
         参数:   
            string_tensor    含有文件名的1阶张量
            num_epochs:过几遍数据,默认无限过数据
            return:具有输出字符串的队列
            
7、文件读取API-文件阅读器:根据文件格式,选择对应的文件阅读器
    a) class tf.TextLineReader() 阅读文本文件逗号分隔值(CSV)格式,默认按行读取
            return:读取器实例
    b) tf.FixedLengthRecordReader(record_bytes)要读取每个记录是固定数量字节的二进制文件
            record_bytes:整型,指定每次读取的字节数
            return:读取器实例
    c) tf.TFRecordReader    读取TfRecords文件
    共同的读取方法:read(file_queue):从队列中指定数量内容 ,返回一个Tensors元组(key文件名字,value默认的内容(行,字节))

8、文件读取API-文件内容解码器:由于从文件中读取的是字符串,需要函数去解析这些字符串到张量
    a) tf.decode_csv(records,record_defaults=None,field_delim = None,name = None)  将CSV转换为张量,与tf.TextLineReader搭配使用
        records:tensor型字符串,每个字符串是csv中的记录行
        field_delim:默认分割符”,”
        record_defaults:参数决定了所得张量的类型,并设置一个值在输入字符串中缺少使用默认值,如
    b) tf.decode_raw(bytes,out_type,little_endian = None,name = None) 
        将字节转换为一个数字向量表示,字节为一字符串类型的张量,与函数tf.FixedLengthRecordReader搭配使用,二进制读取为uint8格式

9、开启线程操作
    tf.train.start_queue_runners(sess=None,coord=None) 收集所有图中的队列线程,并启动线程
            sess:所在的会话中
            coord:线程协调器
            return:返回所有线程队列

9、管道读端批处理:
    a) tf.train.batch(tensors,batch_size,num_threads = 1,capacity = 32,name=None) 读取指定大小(个数)的张量
            tensors:可以是包含张量的列表
            batch_size:从队列中读取的批处理大小
            num_threads:进入队列的线程数
            capacity:整数,队列中元素的最大数量
            return:tensors
    b) tf.train.shuffle_batch(tensors,batch_size,capacity,min_after_dequeue,num_threads=1,) 乱序读取指定大小(个数)的张量
            min_after_dequeue:留下队列里的张量个数,能够保持随机打乱

10、错误
    OutOfRangeError (see above for traceback): FIFOQueue '_1_batch/fifo_queue' is closed and has insufficient elements (requested 9, current size 0)
    解决方法:由于从上可知需要9个数据,但是读取为0,因此可能是数据有问题,即数据文件或者读取路径有问题
"""

2、代码

# coding = utf-8

import tensorflow as tf
import  os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

def readCSVFile(filelist):
    """
    读取CSV文件
    :param filelist:  文件路径+名字列表
    :return: 读取的内容
    """
    #1、构造文件队列
    file_queue = tf.train.string_input_producer(file_list)
    #2、构造CSV阅读器读取队列数据(读一行)
    reader = tf.TextLineReader()
    key , value = reader.read(file_queue)

    #3、对每行内容解码
    #record_defaults:指定每一个样本的每一列的类型,指定默认值[["None"], [4.0]]
    records =[["None"],["None"]]
    example , label = tf.decode_csv(value,record_defaults=records)
    #批处理大小跟队列、数据的数量没有影响,只决定这批次取多少数据batch_size

    ##############批处理####################
   #读取多个数据,就需要使用批处理
    example_batch,label_batch = tf.train.batch([example,label],batch_size=20,num_threads=1,capacity=90)
    print(example_batch, label_batch)
    return example_batch,label_batch
    #return example,label


############队列################
def queue():
    #1、首先定义数据
    Q = tf.FIFOQueue(3,tf.float32)

    #2、放入数据
    enq_many = Q.enqueue_many([[0.1,0.2,0.3],])

    #定义一些数据处理的逻辑
    out_q = Q.dequeue()
    out_q = out_q + 1
    en_q = Q.enqueue(out_q)

    #运行会话
    with tf.Session() as sess:
        #初始化队列
        sess.run(enq_many)
        #处理数据
        for i in range(100):
            sess.run(en_q)
        for i in range(Q.size().eval()):
            print(sess.run(Q.dequeue()))
    return None


#############异步执行#########################
def unasynQueue():
    """
    异步读取
    :return:
    """
    #1、定义一个队列,1000
    Q = tf.FIFOQueue(1000,tf.float32)
    #2、定义要做的事,并放入队列中
    var = tf.Variable(0.0)
    #实现自增
    data = tf.assign_add(var,tf.constant(1.0))
    en_q = Q.enqueue(data)
    #3、定义队列管理器,指定多少个子线程,子线程做事
    qr = tf.train.QueueRunner(Q,enqueue_ops=[en_q,]*2)

    #初始化变量OP
    init_op = tf.global_variables_initializer()
    with tf.Session() as sess:
        sess.run(init_op)

        # 开启线程管理器
        coord = tf.train.Coordinator()
        #开启子线程
        threads = qr.create_threads(sess,coord,start=True)
        #主线程不断读取数据
        for i in range(300):
            print(sess.run(Q.dequeue()))

        #回收线程
        coord.request_stop()
        coord.join(threads)
    return  None

if __name__ == '__main__':
    file_name = os.listdir("./csvData/")
    file_list = [os.path.join("./csvData/",file) for file in file_name]
    example_batch ,label_batch = readCSVFile(file_list)
    with tf.Session() as sess:
        # #定义一个线程协调器
        coord = tf.train.Coordinator()
        # #开启读文件的线程
        threads = tf.train.start_queue_runners(sess,coord=coord)
        #打印读取的内容
        print(sess.run([example_batch,label_batch]))
        #回收线程
        coord.request_stop()
        coord.join(threads)