1. 配套使用: tf.train.Examples将数据转换为二进制,提升IO效率和方便管理

   对于int类型 : tf.train.Examples(features=tf.train.Features(feature=tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))))

   对于bytes类型: tf.train.Examples(features=tf.train.Features(feature=tf.train.Feature(int64_list=tf.train.BytesList(value=[value]))))

2. tf.image.decode_jpeg(image, channel=3) # 将图片进行解码操作,通道数是3

参数说明:image表示图片,channel表示通道数

3. tf.image.encode_jpeg(image, format='rgb', quality=100) # 将解码后的图片,转码成jpeg 

参数说明:image表示图片,format表示rgb的类型,quality表示图片的质量

4. tf.train.Coordinator() # 构建多线程通道,使用coord.join(threads)将线程列表添加

5.t = threading.Thread(target=, args=) # 建立单线程

参数说明:target表示对应的函数,args表示传入的参数

6. tf.python_io.TFRecordWriter(output_path) 构造TFR的写入器writer,使用writer.write将打包的参数写入

参数说明:output_path表示保存TFR文件的路径

7.np.linspace(0, len(3), 3) # 表示将数据进行切分,

参数说明:0表示第一个数,len(3)表示第二个数,3表示切分成3个数,

8. sys.stdout.flush() 进行数据的刷新

 

数据说明:在flower文件中,存在5个文件,每个文件的文件名分别对应的是标签名,在flower_label.txt保存的5个标签名

代码说明:这个采用了多线程的方式,即如果要生成n个shards,有2个thread, 那么每个thread就处理n/2个shards的生成

参数说明:使用tf.app.flags.DEFINE_string 进行参数的设置,使用FLAGS = tf.app.flags.FLAGS将参数并入FLAGS中

 

代码主要分为第一部分和第二部分,使用tf.app.run()执行代码,定义model(),在model中分为两部分

第一部分:用于进行文件名的读取filenames,标签的制作labels,以及存入标签名texts

第二部分:建立多个线程,在每个线程中,调用函数,建立TFR的写入writer,循环一个线程的shards数,将标准化后的example进行写入,关闭writer

第一部分:

      第一步:构建find_image_files, 传入参数为文件名,标签名

      第二步:使用[l.strip() for l in TF.gfile.FastGFile(f).readlines()] 来获得标签值

     第三步: 循环标签值,使用os.path.join(‘%s/%s/*’%())标签值和文件路径进行拼接,获得每一个文件夹的名字

     第四步: 使用tf.gfile.Glob(文件名) 获得文件名中所有的图片名称,如果报错,打印文件名,continue

     第五步:定义filenames, labels, texts = [] 

     第六步:使用.extend() 将图片列表添加到filenames, 使用[label_index] * len(matchfile)制作标签,以及[text] * len(matchfile) 制作标签名,使用.extend() 进行添加

     第七步: 构造shuffed_index = np.arange(len(filenames)), 使用random.shuffle(shuffed_index) 获得混乱的标签索引值

     第八步:使用filenames = [filename[i] for i in shuffed_index] 构造打乱的filenams,labels,和texts, 返回结果

 

第二部分:将生成的filenams,labels和texts, train_shards传入到process_image_files函数

       第一步:使用assert len(filenames) == len(labels),即filenams的大小是否和labeles以及texts的大小相同

       第二步:使用np.linspace(0, len(filenames), train_shards+1) 根据train_shards大小进行切分,

       第三步:循环切分的列表,将两个之间的索引值添加到ranges中, ranges.append([split_index[i], split_index[i+1]]), 使用sys.stdout.flush() 刷新操作

       第四步:实例化coder = ImageCode()

                     第一步:构造png转jpg的初始化函数,使用tf.placeholder初始化,使用tf,image.decode_png()解码png,使用tf.image.encode_jpeg()编码jpeg  

                     第二步:构造jpg的解码的初始化函数, 使用tf.placeholder初始化,使用tf.image.decode_jpeg()解码jpg

                     第三步:构造函数png_to_jpeg(self, image_data), sess.run(self._png_to_jpg, feed_dict={self.png_image:image_data}),进行实际转换

                     第四步:构造函数decode_jpeg(self, image_data):, sess.run(self._decode_jpg) 进行实际转换

         第五步:使用coord = tf.train.Coordinator() 建立线程的通道

         第六步:循环num_thread, 建立args参数,使用threading.Thread(target=_precess_image_files_batch, args) 构建单线程,使用t.start()启动单线程,threads.append(t), 然后再将coord.join(threads)构建出了多线程

                函数target = _precess_image_files_batch 说明:主要是用于构建Tfrecord数据集

                       第一步:将train_shards / num_threads相除,获得每个线程所需要建立的shards的个数

                       第二步:使用np.linspace(range[thread_index][0], range[thread_index][1],num_shards_per_batch + 1) # 获得每一个shards的数据索引

                       第三步:使用ranges[thread_index][1] - ranges[thread_index][0] 获得当前线程所执行的样本数

                       第四步:循环num_shard_per_batch,即每个线程所需要建立的shards个数

                       第五步:使用shard = thread_index * num_per_batch + s,获得当前的shards的索引

                       第六步:使用name,shard, trian_shard建立当前的路径

                        第七步:使用os.path.join() 将文件路径与当前路径进行拼接,获得保存路径

                        第八步:使用writer = tf.python_io.TFRecordWriter(output_path) 构造一个TFR写入的writer函数

                       第九步:循环每一个shards的数据索引值

                       第十步:使用_image_process获得图片的image_buff, 如果是png格式使用coder转换为jpg格式

                       第十一步:使用tf.train.Example(features=tf.train.Features(feature={

}))  # 将参数进行打包,返回example

                       第十二步:writer.write(example.SerializeToString()) 将数据写入到writer中

                       第十三步:如果执行1000次,打印当前时间,第几个线程,第几张图片, 该线程的图片数,使用sys.stdout.flush() 刷新

                       第十四步:一个shards结束后,writer.close() ,打印,并进行刷新

                       第十五步:整个循环结束后,打印,并刷新

 

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function



from datetime import datetime
import os
import random
import sys

import threading
import numpy as np
import tensorflow as tf

tf.app.flags.DEFINE_string('train_directory', './flower_photos/',
                           'Training data directory')
tf.app.flags.DEFINE_string('validation_directory', './flower_photos',
                           'Validation data directory')

tf.app.flags.DEFINE_string('output_directory', './data/',
                           'Output data directory')

tf.app.flags.DEFINE_integer('train_shards', 2, 'Number of shards in training TFRecord files.')

tf.app.flags.DEFINE_integer('validation_shards', 0, 'Number of shards in validation TFRecord files.')

tf.app.flags.DEFINE_integer('num_threads', 2, 'Number of thread to preprocess the images')

tf.app.flags.DEFINE_string('labels_file', './flower_label.txt', 'labels file')

FLAGS = tf.app.flags.FLAGS


def _int64_feature(value):

    if not isinstance(value, list):
        value = [value]
    # 将数据转换为int的列表类型
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

def _bytes_feature(value):
    # 将数据转换为bytes的列表类型
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def _convert_to_example(filename, image_buff, label, text, height, width):

    colorspace= 'RGB'
    channels = 3
    image_format = 'JPEG'
    # 使用tf.train.Example和tf.train.Features构造打包数据
    example = tf.train.Example(features=tf.train.Features(feature={
        'image/height': _int64_feature(height),
        'image/width': _int64_feature(width),
        'image/colorspace': _bytes_feature(tf.compat.as_bytes(colorspace)),
        'image/channels': _int64_feature(channels),
        'image/class/label': _int64_feature(label),
        'image/class/text': _bytes_feature(tf.compat.as_bytes(text)),
        'image/format': _bytes_feature(tf.compat.as_bytes(image_format)),
        'image/filename': _bytes_feature(tf.compat.as_bytes(os.path.basename(filename))),
        'image/encoded': _bytes_feature(tf.compat.as_bytes(image_buff))
    }))
    return example

def is_png(filename):

    return '.png' in filename

def _process_image(filename, coder):

    # 打开文件夹,使用bytes数据类型
    with tf.gfile.FastGFile(filename, 'rb') as f:
        # 进行数据的读取
        image_data = f.read()
    # 如果数据是png的格式的话就转换为jpeg类型
    if is_png(filename):
        image_data = coder.png_to_jpeg(image_data)
    # 将iamge_data进行解码
    image = coder.decode_jpeg(image_data)
    # 判断图片的维度是否是3
    assert len(image.shape) == 3
    # 图片的高度
    height = image.shape[0]
    # 图片的宽度
    width = image.shape[1]

    return image_data, height, width



def _process_image_files_batch(coder, thread_index, ranges, name, filenames, texts, labels, train_shards):

    # 获得线程数
    num_threads = len(ranges)
    # 保证线程和train_shards可以整除
    assert not train_shards % num_threads
    # 每个线程所需要执行的shards数
    num_shards_per_batch = int(train_shards / num_threads)
    # 根据每个线程的shars数,获得每个线程切分后的数据索引值
    shard_ranges = np.linspace(ranges[thread_index][0],
                               ranges[thread_index][1], num_shards_per_batch + 1)
    # 获得线程的样本数目
    num_files_in_thread = ranges[thread_index][1] - ranges[thread_index][0]
    # 用于图片的统计
    counter = 0
    # 循环线程
    for s in range(num_shards_per_batch):
        # 获得当前是第几个shards
        shard = thread_index * num_shards_per_batch + s
        # 构造TFR的路径
        output_filename = '%s-%.5d-of-%.5d.tfrecord'%(name, shard, train_shards)
        # 拼接获得最终的路径
        output_file = os.path.join(FLAGS.output_directory, output_filename)
        # 输入路径,构造TFR的读入器
        writer = tf.python_io.TFRecordWriter(output_file)
        #
        shard_counter = 0
        # 获得数据索引值的列表
        file_in_shard = np.arange(shard_ranges[s], shard_ranges[s+1]).astype('int')
        # 循环列表
        for i in file_in_shard:
            # 获得文件名
            filename = filenames[i]
            # 获得标签值
            label = labels[i]
            # 获得标签名
            text = texts[i]
            # 使用tf.gfile.FastGFile(filename, 'rb'): 打开文件
            image_buff, height, width = _process_image(filename, coder)
            # 将各个参数进行打包
            example = _convert_to_example(filename, image_buff, label, text,
                                          height, width)
            # 将一张图片打包好的参数写入到writer里面
            writer.write(example.SerializeToString())
            shard_counter += 1
            counter += 1
            # 如果读取了1000张图片,打印并进行刷新操作
            if not counter % 1000:
                print('%s [thread %d]:Processed %d of %d images in thread batch.'%(
                    datetime.now(), thread_index, counter, num_files_in_thread
                ))
                sys.stdout.flush()
        # 循环了一个shards就关闭writer,打印和刷新操作
        writer.close()
        print('%s [thread %d]:Processed %d of %d images in thread batch.' % (
            datetime.now(), thread_index, counter, num_files_in_thread
        ))
        sys.stdout.flush()
    # 循环结束,打印和刷新操作
    print('%s [thread %d]:Processed %d of %d images in thread batch.' % (
        datetime.now(), thread_index, counter, num_files_in_thread
    ))
    sys.stdout.flush()


class ImageCoder(object):

    def __init__(self):
        # 构造sess函数
        self._sess = tf.Session()
        # 初始化png的输入数据
        self.png_img = tf.placeholder(dtype=tf.string)
        # 将png进行解码操作,使用tf.image.decode_png
        image = tf.image.decode_png(self.png_img, channels=3)
        # 对解码的图片,使用tf.image.encode_jpeg进行加码操作,format表示rgb, quality表示图片的质量
        self._png_to_jpeg = tf.image.encode_jpeg(image, format='rgb', quality=100)
        # 初始化输入jpeg_image的图片
        self.jpeg_image = tf.placeholder(dtype=tf.string)
        # 使用tf.image.decode_jpeg对图片进行解码操作
        self._decode_jpeg = tf.image.decode_jpeg(self.jpeg_image, channels=3)

    def png_to_jpeg(self, image_data):
        # 输入数据,获得实际的png_to_jpeg的转换
        return self._sess.run(self._png_to_jpeg, feed_dict={self.png_img:image_data})

    def decode_jpeg(self, image_data):
        # 输入数据,进行实际的decode_jpeg的解码操作
        image = self._sess.run(self._decode_jpeg, feed_dict={self.jpeg_image:image_data})

        return image






def _process_image_files(name, filenames, labels, texts, train_shards):
    # 第一步:确认维度是否相等
    assert len(filenames) == len(texts)
    assert len(filenames) == len(labels)

    # 第二步:使用np.linspace()将len(filenames)根据train_shards的个数进行切片,获得每一份的索引值
    ranges = []
    split_index = np.linspace(0, len(filenames), train_shards+1)
    # 第三步:根据索引值,构造range列表,即每一个列表,对于前后的索引值
    for i in range(len(split_index) - 1):
        ranges.append([split_index[i], split_index[i+1]])

    # 刷新操作
    print('Launching %d threads for spacings: %s'%(FLAGS.num_threads, ranges))
    sys.stdout.flush()
    # 第四步:实例化ImageCoder() 进行png_to_jpeg,或者对jpeg进行解码操作
    coder = ImageCoder()

    # 第五步:使用tf.train.Coordinator获得线程的通道
    coord = tf.train.Coordinator()
    # 构造线程的列表
    threads = []
    # 第六步:循环线程的个数,构造多线程
    for thread_index in range(len(ranges)):
        # 构造线程参数,coder表示解码器,thread_index当前是第几个线程,ranges表示线程的范围,name表示名字,filenames表示文件的图片名
        args = (coder, thread_index, ranges, name, filenames, texts, labels, train_shards)
        # 构建线程
        t = threading.Thread(target=_process_image_files_batch, args=args)
        # 开始线程
        t.start()
        # 线程添加到线程列表中
        threads.append(t)
    # 将线程添加到通道中
    coord.join(threads)

def _find_image_files(data_path, labels_file):

    # 使用tf.gfile.FastGFile读取标签文件,获得标签的列表
    upload_labels = [l.strip() for l in tf.gfile.FastGFile(labels_file).readlines()]

    # 第一种类别的标签值
    label_index = 1
    # 文件图片的名字列表
    filenames = []
    # 标签列表
    labels = []
    # 图片标签名列表
    texts = []
    # 循环标签文本
    for text in upload_labels:
        # 构造相对路径
        jpeg_path_join = '%s/%s/*' % (data_path, text)
        try:
            # 使用tf.gfile.Glob遍历相对路径下所有的图片,获得图片的路径
            matchfile = tf.gfile.Glob(jpeg_path_join)
        except:
            print(jpeg_path_join)
            continue

        # 将获得的路径列表添加到filenames中
        filenames.extend(matchfile)
        # 产生[1] * len(filenams)的标签,并添加到标签列表中
        labels.extend([label_index] * len(matchfile))
        # 产生[test] * len(filenames)的标签名,并添加到标签名列表中
        texts.extend([text] * len(matchfile))
        # 标签值加1,作为下一个类比的标签值
        label_index += 1
    # 获得len(filenames)的索引值
    shuffled_index = np.arange(len(filenames))
    # 将索引值进行打乱
    random.seed(1234)
    random.shuffle(shuffled_index)
    # 根据打乱的索引值重新构造filenames,labels,和texts
    filenames = [filenames[i] for i in shuffled_index]
    labels = [labels[i] for i in shuffled_index]
    texts = [texts[i] for i in shuffled_index]
    # 返回filenames,labels和texts
    return filenames, labels, texts

def _process_dataset(name, data_path, train_shards, labels_file):

    # 第一部分:找出文件名,构造标签列表以及文件名列表,并使用random.shuffle获得乱序的索引
    filenames, labels, texts = _find_image_files(data_path, labels_file)
    # 第二部分:将文件图片的名字,标签值,标签名,以及生产shards个数传入,用于生成tfrecord
    _process_image_files(name, filenames, labels, texts, train_shards)




def main(unused_argv):
    # 确认train_shards是够能被线程整除
    assert not FLAGS.train_shards % FLAGS.num_threads, ('在训练过程中线程的数目应该与文件的数目相对应')

    assert not FLAGS.validation_shards % FLAGS.num_threads, ('在验证集制作中线程的数目应该与文件的数目相对应')
    # 构建process_dataset执行函数,传入的参数有name, 训练数据文件路径,训练集的num,以及标签的文件名
    _process_dataset('train', FLAGS.train_directory,
                     FLAGS.train_shards,
                     FLAGS.labels_file)



if __name__ == '__main__':
    tf.app.run()

深度学习原理与框架-Tfrecord数据集的制作  1.tf.train.Examples(数据转换为二进制)  3.tf.image.encode_jpeg(解码图片加码成jpeg) 4.tf.train.Coordinator(构建多线程通道) 5.threading.Thread(建立单线程) 6.tf.python_io.TFR(TFR读入器)