之前我们学习使用TensorFlow对图像数据进行预处理的方法。虽然使用这些图像数据预处理的方法可以减少无关因素对图像识别模型效果的影响,但这些复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈,TensorFlow提供了一套多线程处理输入数据的框架。   下面总结了一个经典的输入数据处理的流程:   下面我们首先学习TensorFlow中队列的概念。在TensorFlow中,队列不仅是一种数据结构,它更提供了多线程机制。队列也是TensorFlow多线程输入数据处理框架的基础。然后再学习上面的流程。最后这个流程将处理好的单个训练数据整理成训练数据 batch,这些batch就可以作为神经网络的输入。 准备知识:多线程的简单介绍   在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程。线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于CPU),而一条流水线必须属于一个车间,一个车间就是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是CPU上的执行单位。   多线程(即多个控制线程)的概念就是:在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。比如成都地铁和西安地铁是不同的进程,而成都地铁3号线是一个线程,成都地铁所有的线程共享成都所有的资源,比如成都所有的乘客可以被所有线拉。   开启多线程的方式: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import time import random from threading import Thread def study(name): print("%s is learning" % name) time.sleep(random.randint(1, 3)) print("%s is playing " % name) if __name__ == '__main__': t = Thread(target=study, args=('james', )) t.start() print("主线程开始运行") ''' 结果展示: james is learning 主线程开始运行 james is playing '''     t.start() 将开启进程的信号发给操作系统后,操作系统要申请内存空间,让好拷贝父进程地址空间到子进程,开销远大于线程。 1,队列与多线程   在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。其他的计算节点可以修改他们的状态。对于变量,可以通过赋值操作修改变量的取值。对于队列,修改队列状态的操作主要有Enqueue,EnqueueMany和Dequeue。下面程序展示了如何使用这些函数来操作一个队列。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #_*_coding:utf-8_*_ import tensorflow as tf # 创建一个先进先出的队列,指定队列中最多可以保存两个元素,并指定类型为整数 q = tf.FIFOQueue(2, 'int32') # 使用enqueue_many 函数来初始化队列中的元素。 # 和变量初始化类似,在使用队列之前需要明确的调用这个初始化过程 init = q.enqueue_many(([0, 10], )) # 使用Dequeue 函数将队列中的第一个元素出队列。这个元素的值将被存在变量x中 x = q.dequeue() # 将得到的值加1 y = x + 1 # 将加 1 后的值在重新加入队列 q_inc = q.enqueue([y]) with tf.Session() as sess: # 运行初始化队列的操作 init.run() for _ in range(6): #运行q_inc 将执行数据出队列,出队的元素 +1 ,重新加入队列的整个过程 v, _ = sess.run([x, q_inc]) # 打印出队元素的取值 print('%s'%v) ''' 队列开始有[0, 10] 两个元素,第一个出队的为0, 加1之后为[10, 1] 第二次出队的为10, 加1之后入队的为11, 得到的队列为[1, 11] 以此类推,最后得到的输出为: 0 10 1 11 2 '''   TensorFlow中提供了FIFOQueue 和 RandomShuffleQueue 两种队列。在上面的程序中,已经展示了如何使用FIFOQueue,它的实现的一个先进先出队列。 RandomShuffleQueue 会将队列中的元素打乱,每次出队操作得到的是从当前队列所有元素中随机选择的一个。在训练审计网络时希望每次使用的训练数据尽量随机。 RandomShuffleQueue 就提供了这样的功能。   在TensorFlow中,队列不仅仅是一种数据结构,还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素,或者同时读取一个队列中的元素。在后面我们会学习TensorFlow是如何利用队列来实现多线程输入数据处理的。   TensorFlow提供了 tf.Coordinator 和 tf.QueueRunner 两个类来完成多线程协同的功能。tf.Coordinator 主要用于协同多个线程一起停止,并提供了 should_stop, request_stop 和 join 三个函数。在启动线程之前,需要先声明一个 tf.Coordinator 类,并将这个类传入每一个创建的线程中。启动的线程需要一直查询 tf.Coordinator 类中提供的 should_stop 函数,当这个函数的返回值为 True时,则当前线程也需要退出。每一个启动的线程都可以通过调用 request_stop 函数来通知其他线程退出。当某一个线程调用 request_stop 函数之后, should_stop 函数的返回值将被设置为 TRUE,这样其他的线程就可以同时终止了。下面程序展示了如何使用 tf.Coordinator。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 #_*_coding:utf-8_*_ import tensorflow as tf import numpy as np import threading import time # 线程中运行的程序,这个程序每隔1秒判断是否停止并打印自己的ID def MyLoop(coord, worker_id): # 使用 tf.Coordinator 类提供的协同工具判断当前是否需要停止 while not coord.should_stop(): # 随机停止所有的线程 if np.random.rand() < 0.1: print("Stopping from id: %d\n" % worker_id) # 调用 coord.request_stop() 函数来通知其他线程停止 coord.request_stop() else: # 打印当前线程的 ID print("Working on id: %d\n" % worker_id) # 暂停1 s time.sleep(1) # 声明一个 tf.train.Coordinator 类来协同多个线程 coord = tf.train.Coordinator() # 声明创建 5 个线程 threads = [ threading.Thread(target=MyLoop, args=(coord, i, )) for i in range(5) ] # 启动所有的线程 for t in threads: t.start() # 等待所有线程退出 coord.join(threads) ''' Working on id: 0 Working on id: 1 Working on id: 2 Working on id: 3 Working on id: 4 Working on id: 0 Working on id: 1 Working on id: 3 Working on id: 2 Working on id: 4 Working on id: 0 Working on id: 2 Working on id: 1 Working on id: 3 Working on id: 4 Working on id: 2 Working on id: 1 Working on id: 0 Working on id: 3 Working on id: 4 Working on id: 3 Working on id: 0 Working on id: 1 Working on id: 2 Working on id: 4 Working on id: 1 Stopping from id: 0 '''   当所有线程启动之后,每个线程会打印各自的ID,于是前面4行打印出了他们的ID。然后在暂停1秒之后,所有的线程又开始第二遍打印ID。在这个时候有一个线程推出的条件达到,于是调用了coord.request_stop 函数来停止所有其他的线程。然而在打印Stoping_from_id:4之后,可以看到有线程仍然在输出。这是因为这些线程已经执行完 coord.should_stop 的判断,于是仍然会继续输出自己的ID。但在下一轮判断是否需要停止时将推出线程。于是在打印一次ID之后就不会再有输出了。   tf.QueueRunner 主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的 tf.Coordinator 类来统一管理,下面代码展示了如何使用 tf.QueueRunner 和 tf.Coordinator 来管理多线程队列操作。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 #_*_coding:utf-8_*_ import tensorflow as tf # 声明一个先进先出的队列,队列中最多100个元素,类型为实数 queue = tf.FIFOQueue(100, 'float') # 定义队列的入队操作 enqueue_op = queue.enqueue([tf.random_normal([1])]) # 使用 tf.train.QueueRunner 来创建多个线程运行队列的入队操作 # tf.train.QueueRunner 的第一个参数给出了被操作的队列 # [enqueue_op] * 5 表示了需要启动5个线程,每个线程运行的是equeue_op操作 qr = tf.train.QueueRunner(queue, [enqueue_op]*5) # 将定义过的 QueueRunner 加入 TensorFlow计算图上指定的集合 # tf.train.add_queue_runner 函数没有指定集合 # 则加入默认集合 tf.GraphKeys.QUEUE_RUNNERS # 下面的函数就是讲刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS集合 tf.train.add_queue_runner(qr) # 定义出队操作 out_tensor = queue.dequeue() with tf.Session() as sess: # 使用 tf.train.coordinator 来协同启动的线程 coord = tf.train.Coordinator() # 使用tf.train.QueueRunner时,需要明确调用 tf.train.start_queue_runnsers来启动所有线程 # 否则因为没有线程运行入队操作,当调用出队操作时,程序会一直等待入队操作被运行。 # tf.train.start_queue_runners 函数会默认启动 tf.GraphKeys.QUEUE_RUNNERS集合 # 所说的 tf.train.add_queue_runner 函数和 tf.train.start_queue_runners 函数会指定同一个集合 threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 获取队列中的取值 for _ in range(3): print(sess.run(out_tensor)[0]) # s使用 tf.train.Coordinator 来停止所有的线程 coord.request_stop() coord.join(threads) ''' -0.88587755 -0.6659831 -2.9722364 '''    输入文件队列   下面将学习如何使用TensorFlow中的队列管理输入文件列表。这里假设所有的输入数据都已经整理成了TFRecord 格式。虽然一个 TFRecord 文件中可以存储多个训练样例,但是当训练数据量较大时,可以将数据分成多个 TFRecord 文件来提高处理效率。 TensorFlow 提供了 tf.train.match_filenames_once 函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过 tf.train.string_input_producer 函数进行有效的管理。   tf.train.string_input_producer 函数会使用初始化时提供的文件列表创建一个输入队列,输入对垒中原始的元素为文件列表中的所有文件。如上面的代码所示,创建好的输入队列可以作为文件读取函数的参数。每次调用文件读取函数时,该函数会先判断当前是否已有打开的文件可读,如果没有或者打开的文件以及读完,这个函数会从输入队列中出队一个文件并从这个文件中读取数据。   通过设置 shuffle 参数,tf.train.string_input_producer 函数支持随机打乱文件列表中文件出队的顺序。当 shuffle 参数为 TRUE时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会泡在一个单独的线程上,这样不会影响获取文件的速度。tf.train.string_input_producer 函数生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀的分给不同的线程,不出现有些文件被处理过多次而有些文件还没有被处理过的情况。   当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer 函数可以设置 num_epochs 参数来限制加载初始文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报 OutOfRange 的错误。在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将 num_epochs 参数设置为1,这样在计算完一轮之后程序将自动停止。在展示 tf.train.match_filenames_once 和 tf.train.string_input_producer 函数的使用方法之前,我们可以先给出一个简单的程序来生成数据。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 #_*_coding:utf-8_*_ import tensorflow as tf # 创建TFReocrd文件的帮助函数 def _int64_feature(value): return tf.train.Feature(int64_list=tf.train.Int64List(value=[value])) # 模拟海量数据情况下降数据写入不同的文件,num_shards 定义了总共写入多少文件 # instances_per_shard 定义了每个文件中有多少个数据 num_shards = 2 instances_per_shard = 2 for i in range(num_shards): # 将数据分为多个文件时,可以将不同文件以类似0000n-of-0000m 的后缀区分 # 其中m表示了数据总共被存在了多少个文件中,n表示当前文件的编号 # 式样的方式既方便了通过正则表达式获取文件列表,又在文件名中加入了更多的信息 filename = ('data.tfrecords-%.5d-of-%.5d' % (i, num_shards)) writer = tf.python_io.TFRecordWriter(filename) # 将数据封装成Example结构并写入 TFRecord 文件 for j in range(instances_per_shard): # Example 结构仅包含当前样例属于第几个文件以及时当前文件的第几个样本 example = tf.train.Example(features=tf.train.Features( feature={ 'i': _int64_feature(i), 'j': _int64_feature(j) } )) writer.write(example.SerializeToString()) writer.close()   程序运行之后,在指定的目录下生产两个文件,每一个文件中存储了两个样例,在生成了样例数据之后,下面代码展示了 tf.train.match_filenames_once 函数 和 tf.train.string_input_producer 函数的使用方法: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #_*_coding:utf-8_*_ import tensorflow as tf # 使用tf.train.match_filenames_once 函数获取文件列表 files = tf.train.match_filenames_once('path/data.tfrecords-*') # print(files) # 输入队列中的文件列表为 tf.train.match_filenames_once 函数获取的文件列表 # 这里将 shuffle参数设置为FALSE来避免随机打乱读文件的顺序 # 但是一般在解决真实问题,会将shuffle参数设置为TRUE filename_queue = tf.train.string_input_producer(files, shuffle=False) # print(filename_queue) # 读取并解析一个样本 reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) features = tf.parse_single_example( serialized_example, features={ 'i': tf.FixedLenFeature([], tf.int64), 'j': tf.FixedLenFeature([], tf.int64), } ) with tf.Session() as sess: # 虽然在本段程序中没有声明任何变量 # 但在使用 tf.train.match_filenames_once 函数时需要初始化一些变量 # init = tf.global_variables_initializer() # init = tf.initialize_all_variables() init = tf.local_variables_initializer() sess.run(init) # sess.run(files) # sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()]) print(sess.run(files)) # 声明 tf.train.Coordinator 类来协同不同线程,并启动线程 coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 多次执行获取数据的操作 for i in range(6): print(sess.run([features['i'], features['j']])) coord.request_stop() coord.join(threads)   打印结果如下: 1 2 3 4 5 6 7 8 [b'path\\data.tfrecords-00000-of-00002' b'path\\data.tfrecords-00001-of-00002'] [0, 0] [0, 1] [1, 0] [1, 1] [0, 0] [0, 1]   在不打乱文件列表的情况下,会依次独处样例数据中的每一个样例。而且当所有样例都被读完之后,程序会自动从头开始。如果限制 num_epochs=1,那么程序会报错。 组合训练数据(batching)   在上面,我们已经学习了如何从文件列表中读取单个样例,将这些单个样例通过预处理方法进行处理,就可以得到提高给神经网络输入层的训练数据了。在之前学习过,将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的预处理结果之后,还需要将他们组织成batch,然后再提供给审计网络的输入层。TensorFlow提供了 tf.train.batch 和 tf.train.shuffle_batch 函数来将单个的样例组织成 batch 的形式输出。这两个函数都会生成一个队列,队列的入队操作时生成单个样例的方法,而每次出队得到的时一个batch的样例。他们唯一的区别自安于是否会将数据顺序打乱。下面代码展示了这两个函数的使用方法。   下面代码展示了 tf.train.batch函数的用法: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 #_*_coding:utf-8_*_ import tensorflow as tf # 读取解析得到样例,这里假设Example结构中 i表示一个样例的特征向量 # 比如一张图像的像素矩阵,而j表示该样例对应的标签 # 使用tf.train.match_filenames_once 函数获取文件列表 files = tf.train.match_filenames_once('path/data.tfrecords-*') # 输入队列中的文件列表为 tf.train.match_filenames_once 函数获取的文件列表 # 这里将 shuffle参数设置为FALSE来避免随机打乱读文件的顺序 # 但是一般在解决真实问题,会将shuffle参数设置为TRUE filename_queue = tf.train.string_input_producer(files, shuffle=False) # print(filename_queue) # 读取并解析一个样本 reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) features = tf.parse_single_example( serialized_example, features={ 'i': tf.FixedLenFeature([], tf.int64), 'j': tf.FixedLenFeature([], tf.int64), } ) example, label = features['i'], features['j'] # 一个 batch 中样例的个数 batch_size = 2 # 组合样例的队列中最多可以存储的样例个数。这个队列如果太大, # 那么需要占用很多内存资源,如果太小,那么出队操作可能会因为 # 没有数据而被阻碍(block),从而导致训练效率降低,一般来说 # 这个队列的大小会和每一个batch的大小相关,下面代码给出了设置 # 队列大小的一种方式。 capacity = 1000 + 3 * batch_size # 使用 tf.train.batch 函数来组合样例。[example, label] 参数给 # 出了需要组合的元素,一般 example 和 label分别代表训练样本和这个样本 # 对应的正确标签。batch_size 参数给出了每个batch中样例的个数。 # capacity 给出了队列的最大容量。当队列长度等于容量时,TensorFlow将暂停 # 入队操作,而只是等待元素出队。当元素个数小于容量时,TensorFlow将自动重新启动入队操作 example_batch, label_batch = tf.train.batch( [example, label], batch_size=batch_size, capacity=capacity ) with tf.Session() as sess: tf.global_variables_initializer().run() tf.local_variables_initializer().run() coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess, coord=coord) # 获取并打印组合之后的样例,在真实问题中,这个输出一般会作为神经网络的输入 for i in range(3): cur_example_batch, cur_label_batch = sess.run( [example_batch, label_batch] ) print(cur_example_batch, cur_label_batch) coord.request_stop() coord.join(threads) ''' 运行上面的程式会得到下面的输出: [0 0] [0 1] [1 1] [0 1] [0 0] [0 1] 从这个输出可以看到 tf.train.batch函数可以将单个的数据组织成3个一组的batch 在 example, lable 中读取的数据依次为: example:0 label:0 example:0 label:1 example:1 label:1 example:0 label:1 example:0 label:0 example:0 label:1 这是因为 tf.train.batch 函数不会随机打乱顺序,所以在组合之后得到的数据 组成了上面给出的输出。 '''   下面代码展示了 tf.train.shuffle_batch 函数的使用方法: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 import tensorflow as tf #_*_coding:utf-8_*_ import tensorflow as tf # 读取解析得到样例,这里假设Example结构中 i表示一个样例的特征向量 # 比如一张图像的像素矩阵,而j表示该样例对应的标签 # 使用tf.train