Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/training/input.py: 26%
385 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
16"""Input pipeline.
18Please see the [reading data
19how-to](https://tensorflow.org/api_guides/python/reading_data)
20for context.
21"""
24from tensorflow.python.eager import context
25from tensorflow.python.framework import constant_op
26from tensorflow.python.framework import dtypes
27from tensorflow.python.framework import indexed_slices
28from tensorflow.python.framework import ops
29from tensorflow.python.framework import sparse_tensor
30from tensorflow.python.framework import tensor_shape
31from tensorflow.python.layers import utils
32from tensorflow.python.ops import array_ops
33from tensorflow.python.ops import control_flow_assert
34from tensorflow.python.ops import control_flow_ops
35from tensorflow.python.ops import data_flow_ops
36from tensorflow.python.ops import io_ops
37from tensorflow.python.ops import math_ops
38from tensorflow.python.ops import random_ops
39from tensorflow.python.ops import sparse_ops
40from tensorflow.python.ops import variable_v1
41from tensorflow.python.summary import summary
42from tensorflow.python.training import queue_runner
43from tensorflow.python.util import deprecation
44from tensorflow.python.util.compat import collections_abc
45from tensorflow.python.util.tf_export import tf_export
48# pylint: disable=protected-access
49_store_sparse = sparse_ops._add_sparse_to_tensors_map
50_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map
51_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map
52# pylint: enable=protected-access
55@tf_export(
56 "io.match_filenames_once",
57 v1=["io.match_filenames_once", "train.match_filenames_once"])
58@deprecation.deprecated_endpoints("train.match_filenames_once")
59def match_filenames_once(pattern, name=None):
60 """Save the list of files matching pattern, so it is only computed once.
62 NOTE: The order of the files returned is deterministic.
64 Args:
65 pattern: A file pattern (glob), or 1D tensor of file patterns.
66 name: A name for the operations (optional).
68 Returns:
69 A variable that is initialized to the list of files matching the pattern(s).
70 """
71 with ops.name_scope(name, "matching_filenames", [pattern]) as name:
72 return variable_v1.VariableV1(
73 name=name, initial_value=io_ops.matching_files(pattern),
74 trainable=False, validate_shape=False,
75 collections=[ops.GraphKeys.LOCAL_VARIABLES])
78@tf_export(v1=["train.limit_epochs"])
79@deprecation.deprecated(
80 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
81 "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.")
82def limit_epochs(tensor, num_epochs=None, name=None):
83 """Returns tensor `num_epochs` times and then raises an `OutOfRange` error.
85 Note: creates local counter `epochs`. Use `local_variables_initializer()` to
86 initialize local variables.
88 Args:
89 tensor: Any `Tensor`.
90 num_epochs: A positive integer (optional). If specified, limits the number
91 of steps the output tensor may be evaluated.
92 name: A name for the operations (optional).
94 Returns:
95 tensor or `OutOfRange`.
97 Raises:
98 ValueError: if `num_epochs` is invalid.
99 """
100 if num_epochs is None:
101 return tensor
102 if num_epochs <= 0:
103 raise ValueError("num_epochs must be > 0 not %d." % num_epochs)
104 with ops.name_scope(name, "limit_epochs", [tensor]) as name:
105 zero64 = constant_op.constant(0, dtype=dtypes.int64)
106 epochs = variable_v1.VariableV1(
107 zero64, name="epochs", trainable=False,
108 collections=[ops.GraphKeys.LOCAL_VARIABLES])
109 counter = epochs.count_up_to(num_epochs)
110 with ops.control_dependencies([counter]):
111 return array_ops.identity(tensor, name=name)
114@tf_export(v1=["train.input_producer"])
115@deprecation.deprecated(
116 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
117 "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle"
118 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
119 "`shuffle=False`, omit the `.shuffle(...)`.")
120def input_producer(input_tensor,
121 element_shape=None,
122 num_epochs=None,
123 shuffle=True,
124 seed=None,
125 capacity=32,
126 shared_name=None,
127 summary_name=None,
128 name=None,
129 cancel_op=None):
130 """Output the rows of `input_tensor` to a queue for an input pipeline.
132 Note: if `num_epochs` is not `None`, this function creates local counter
133 `epochs`. Use `local_variables_initializer()` to initialize local variables.
135 Args:
136 input_tensor: A tensor with the rows to produce. Must be at least
137 one-dimensional. Must either have a fully-defined shape, or
138 `element_shape` must be defined.
139 element_shape: (Optional.) A `TensorShape` representing the shape of a
140 row of `input_tensor`, if it cannot be inferred.
141 num_epochs: (Optional.) An integer. If specified `input_producer` produces
142 each row of `input_tensor` `num_epochs` times before generating an
143 `OutOfRange` error. If not specified, `input_producer` can cycle through
144 the rows of `input_tensor` an unlimited number of times.
145 shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled
146 within each epoch.
147 seed: (Optional.) An integer. The seed to use if `shuffle` is true.
148 capacity: (Optional.) The capacity of the queue to be used for buffering
149 the input.
150 shared_name: (Optional.) If set, this queue will be shared under the given
151 name across multiple sessions.
152 summary_name: (Optional.) If set, a scalar summary for the current queue
153 size will be generated, using this name as part of the tag.
154 name: (Optional.) A name for queue.
155 cancel_op: (Optional.) Cancel op for the queue
157 Returns:
158 A queue with the output rows. A `QueueRunner` for the queue is
159 added to the current `QUEUE_RUNNER` collection of the current
160 graph.
162 Raises:
163 ValueError: If the shape of the input cannot be inferred from the arguments.
164 RuntimeError: If called with eager execution enabled.
166 @compatibility(eager)
167 Input pipelines based on Queues are not supported when eager execution is
168 enabled. Please use the `tf.data` API to ingest data under eager execution.
169 @end_compatibility
170 """
171 if context.executing_eagerly():
172 raise RuntimeError(
173 "Input pipelines based on Queues are not supported when eager execution"
174 " is enabled. Please use tf.data to ingest data into your model"
175 " instead.")
176 with ops.name_scope(name, "input_producer", [input_tensor]):
177 input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor")
178 element_shape = input_tensor.shape[1:].merge_with(element_shape)
179 if not element_shape.is_fully_defined():
180 raise ValueError("Either `input_tensor` must have a fully defined shape "
181 "or `element_shape` must be specified")
183 if shuffle:
184 input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
186 input_tensor = limit_epochs(input_tensor, num_epochs)
188 q = data_flow_ops.FIFOQueue(capacity=capacity,
189 dtypes=[input_tensor.dtype.base_dtype],
190 shapes=[element_shape],
191 shared_name=shared_name, name=name)
192 enq = q.enqueue_many([input_tensor])
193 queue_runner.add_queue_runner(
194 queue_runner.QueueRunner(
195 q, [enq], cancel_op=cancel_op))
196 if summary_name is not None:
197 summary.scalar(summary_name,
198 math_ops.cast(q.size(), dtypes.float32) * (1. / capacity))
199 return q
202@tf_export(v1=["train.string_input_producer"])
203@deprecation.deprecated(
204 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
205 "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle"
206 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
207 "`shuffle=False`, omit the `.shuffle(...)`.")
208def string_input_producer(string_tensor,
209 num_epochs=None,
210 shuffle=True,
211 seed=None,
212 capacity=32,
213 shared_name=None,
214 name=None,
215 cancel_op=None):
216 """Output strings (e.g. filenames) to a queue for an input pipeline.
218 Note: if `num_epochs` is not `None`, this function creates local counter
219 `epochs`. Use `local_variables_initializer()` to initialize local variables.
221 Args:
222 string_tensor: A 1-D string tensor with the strings to produce.
223 num_epochs: An integer (optional). If specified, `string_input_producer`
224 produces each string from `string_tensor` `num_epochs` times before
225 generating an `OutOfRange` error. If not specified,
226 `string_input_producer` can cycle through the strings in `string_tensor`
227 an unlimited number of times.
228 shuffle: Boolean. If true, the strings are randomly shuffled within each
229 epoch.
230 seed: An integer (optional). Seed used if shuffle == True.
231 capacity: An integer. Sets the queue capacity.
232 shared_name: (optional). If set, this queue will be shared under the given
233 name across multiple sessions. All sessions open to the device which has
234 this queue will be able to access it via the shared_name. Using this in
235 a distributed setting means each name will only be seen by one of the
236 sessions which has access to this operation.
237 name: A name for the operations (optional).
238 cancel_op: Cancel op for the queue (optional).
240 Returns:
241 A queue with the output strings. A `QueueRunner` for the Queue
242 is added to the current `Graph`'s `QUEUE_RUNNER` collection.
244 Raises:
245 ValueError: If the string_tensor is a null Python list. At runtime,
246 will fail with an assertion if string_tensor becomes a null tensor.
248 @compatibility(eager)
249 Input pipelines based on Queues are not supported when eager execution is
250 enabled. Please use the `tf.data` API to ingest data under eager execution.
251 @end_compatibility
252 """
253 not_null_err = "string_input_producer requires a non-null input tensor"
254 if not isinstance(string_tensor, ops.Tensor) and not string_tensor:
255 raise ValueError(not_null_err)
257 with ops.name_scope(name, "input_producer", [string_tensor]) as name:
258 string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string)
259 with ops.control_dependencies([
260 control_flow_assert.Assert(
261 math_ops.greater(array_ops.size(string_tensor), 0), [not_null_err])
262 ]):
263 string_tensor = array_ops.identity(string_tensor)
264 return input_producer(
265 input_tensor=string_tensor,
266 element_shape=[],
267 num_epochs=num_epochs,
268 shuffle=shuffle,
269 seed=seed,
270 capacity=capacity,
271 shared_name=shared_name,
272 name=name,
273 summary_name="fraction_of_%d_full" % capacity,
274 cancel_op=cancel_op)
277@tf_export(v1=["train.range_input_producer"])
278@deprecation.deprecated(
279 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
280 "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If "
281 "`shuffle=False`, omit the `.shuffle(...)`.")
282def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None,
283 capacity=32, shared_name=None, name=None):
284 """Produces the integers from 0 to limit-1 in a queue.
286 Note: if `num_epochs` is not `None`, this function creates local counter
287 `epochs`. Use `local_variables_initializer()` to initialize local variables.
289 Args:
290 limit: An int32 scalar tensor.
291 num_epochs: An integer (optional). If specified, `range_input_producer`
292 produces each integer `num_epochs` times before generating an
293 OutOfRange error. If not specified, `range_input_producer` can cycle
294 through the integers an unlimited number of times.
295 shuffle: Boolean. If true, the integers are randomly shuffled within each
296 epoch.
297 seed: An integer (optional). Seed used if shuffle == True.
298 capacity: An integer. Sets the queue capacity.
299 shared_name: (optional). If set, this queue will be shared under the given
300 name across multiple sessions.
301 name: A name for the operations (optional).
303 Returns:
304 A Queue with the output integers. A `QueueRunner` for the Queue
305 is added to the current `Graph`'s `QUEUE_RUNNER` collection.
307 @compatibility(eager)
308 Input pipelines based on Queues are not supported when eager execution is
309 enabled. Please use the `tf.data` API to ingest data under eager execution.
310 @end_compatibility
311 """
312 with ops.name_scope(name, "input_producer", [limit]) as name:
313 range_tensor = math_ops.range(limit)
314 return input_producer(
315 range_tensor, [], num_epochs, shuffle, seed, capacity,
316 shared_name, "fraction_of_%d_full" % capacity, name)
319@tf_export(v1=["train.slice_input_producer"])
320@deprecation.deprecated(
321 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
322 "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle"
323 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
324 "`shuffle=False`, omit the `.shuffle(...)`.")
325def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
326 capacity=32, shared_name=None, name=None):
327 """Produces a slice of each `Tensor` in `tensor_list`.
329 Implemented using a Queue -- a `QueueRunner` for the Queue
330 is added to the current `Graph`'s `QUEUE_RUNNER` collection.
332 Args:
333 tensor_list: A list of `Tensor` objects. Every `Tensor` in
334 `tensor_list` must have the same size in the first dimension.
335 num_epochs: An integer (optional). If specified, `slice_input_producer`
336 produces each slice `num_epochs` times before generating
337 an `OutOfRange` error. If not specified, `slice_input_producer` can cycle
338 through the slices an unlimited number of times.
339 shuffle: Boolean. If true, the integers are randomly shuffled within each
340 epoch.
341 seed: An integer (optional). Seed used if shuffle == True.
342 capacity: An integer. Sets the queue capacity.
343 shared_name: (optional). If set, this queue will be shared under the given
344 name across multiple sessions.
345 name: A name for the operations (optional).
347 Returns:
348 A list of tensors, one for each element of `tensor_list`. If the tensor
349 in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output
350 tensor will have shape `[a, b, ..., z]`.
352 Raises:
353 ValueError: if `slice_input_producer` produces nothing from `tensor_list`.
355 @compatibility(eager)
356 Input pipelines based on Queues are not supported when eager execution is
357 enabled. Please use the `tf.data` API to ingest data under eager execution.
358 @end_compatibility
359 """
360 with ops.name_scope(name, "input_producer", tensor_list):
361 tensor_list = indexed_slices.convert_n_to_tensor_or_indexed_slices(
362 tensor_list)
363 if not tensor_list:
364 raise ValueError(
365 "Expected at least one tensor in slice_input_producer().")
366 range_size = array_ops.shape(tensor_list[0])[0]
367 # TODO(josh11b): Add an assertion that the first dimension of
368 # everything in TensorList matches. Maybe just check the inferred shapes?
369 queue = range_input_producer(range_size, num_epochs=num_epochs,
370 shuffle=shuffle, seed=seed, capacity=capacity,
371 shared_name=shared_name)
372 index = queue.dequeue()
373 output = [array_ops.gather(t, index) for t in tensor_list]
374 return output
377# Helpers for the batching functions ------------------------------------------
380def _flatten(tensor_list_list):
381 return [tensor for tensor_list in tensor_list_list for tensor in tensor_list]
384class _SparseMetaData:
385 """Store information about the Tensor: Is it sparse?, map_op, and rank."""
387 def __init__(self, sparse, map_op, rank):
388 """Create the metadata.
390 Args:
391 sparse: Python boolean.
392 map_op: The `Operation` that created the `SparseTensorsMap` in question.
393 This Op contains information about the underlying Map object and the
394 dtype of the original data.
395 rank: The statically known rank of the `SparseTensor`.
396 """
397 self._sparse = sparse
398 self._map_op = map_op
399 self._rank = tensor_shape.as_dimension(rank)
401 def __eq__(self, other):
402 if self.sparse != other.sparse:
403 return False
404 if not self.sparse:
405 return True
406 # If map_ops are not the same, the data source is not the same.
407 if (self.map_op is not None) != (other.map_op is not None):
408 return False
409 if self.map_op != other.map_op:
410 return False
411 if not self.rank.is_compatible_with(other.rank):
412 return False
413 return True
415 def __ne__(self, other):
416 return not self.__eq__(other)
418 def __str__(self):
419 return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name,
420 self.rank)
422 def merge_with(self, other):
423 if self != other:
424 raise ValueError("SparseMetaData objects are incompatible: %s vs. %s"
425 % (self, other))
426 if self.sparse:
427 self.rank.merge_with(other.rank)
428 return self
430 @property
431 def map_op(self):
432 return self._map_op
434 @property
435 def sparse(self):
436 return self._sparse
438 @property
439 def rank(self):
440 return self._rank
443def _as_tensor_list(tensors):
444 if isinstance(tensors, dict):
445 return [tensors[k] for k in sorted(tensors, key=str)]
446 else:
447 return tensors
450def _as_tensor_list_list(tensors_list):
451 if not tensors_list:
452 raise ValueError("Expected at least one set of tensors")
453 if isinstance(tensors_list[0], dict):
454 expected_keys = set(tensors_list[0].keys())
455 for tensors in tensors_list[1:]:
456 if set(tensors.keys()) != expected_keys:
457 raise ValueError("All dictionaries in tensors_list must have "
458 "the same keys")
459 return [_as_tensor_list(tensors) for tensors in tensors_list]
460 else:
461 return tensors_list
464def _as_original_type(original_tensors, tensor_list):
465 if isinstance(original_tensors, dict):
466 if len(original_tensors) == 1:
467 # tensor_list is bogusly returned as a single tensor if only one tensor
468 # was enqueued. Make it a list again. See b/28117485.
469 tensor_list = [tensor_list]
470 return {k: tensor_list[i]
471 for i, k in enumerate(sorted(original_tensors, key=str))}
472 else:
473 return tensor_list
476def _store_sparse_tensors(tensor_list, enqueue_many, keep_input,
477 shared_map_ops=None):
478 """Store SparseTensors for feeding into batch, etc.
480 If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects
481 are reused (shared). This argument is useful for, e.g., `batch_join`
482 where multiple enqueue operations write to the same Queue component,
483 and another (dequeue) thread reads from that same location and must then
484 restore the associated `SparseTensor` objects. In this case, the sparse
485 restore must have a single `SparseTensorMap` from which to read out the
486 handles; so a single `SparseTensorMap` must be shared for storing
487 across the multiple enqueue operations. This sharing is performed by
488 calling `_store_sparse_tensors` the first time with `shared_map_ops=None`,
489 and then in subsequent times with this value set to the list of `Operation`
490 objects created in the first call.
492 Args:
493 tensor_list: List of `Tensor` and `SparseTensor` objects.
494 enqueue_many: Python `Boolean`.
495 keep_input: Must be a scalar bool Tensor (not a Python bool). If False,
496 don't store.
497 shared_map_ops: (optional) List of `Operation` objects from a previous
498 call to `_store_sparse_tensors`. If not `None`, the op types should be
499 one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the
500 locations corresponding to `SparseTensors` in `tensor_list`.
502 Returns:
503 A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list
504 of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list`
505 is a list of the same length of `_SparseMetaData` objects.
506 """
507 maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list)
509 def _sparse_meta_data(t, storing_op, map_op):
510 if not isinstance(t, sparse_tensor.SparseTensor):
511 return _SparseMetaData(False, None, None)
512 rank = t.dense_shape.shape.with_rank(1).dims[0]
513 if enqueue_many:
514 rank -= 1
515 # If a shared map_op was provided, use that. Otherwise use the name of
516 # the operation used to store the SparseTensor.
517 return _SparseMetaData(
518 sparse=True, map_op=map_op or storing_op, rank=rank)
520 def _maybe_store(t, shared_map_op):
521 """Store Sparse tensor, if necessary."""
522 if not isinstance(t, sparse_tensor.SparseTensor):
523 return t
524 map_op_name = shared_map_op.name if shared_map_op else None
525 def _maybe_store_sparse(t, map_op_name, keep_input):
526 """Conditionally store a single sparse Tensor."""
527 return utils.smart_cond(
528 keep_input,
529 lambda: _store_sparse(t, shared_name=map_op_name),
530 lambda: constant_op.constant(-1, dtypes.int64))
531 def _maybe_store_many_sparse(t, map_op_name, keep_input):
532 """Conditionally store multiple sparse Tensors."""
533 out_tensor = utils.smart_cond(
534 keep_input,
535 lambda: _store_many_sparse(t, shared_name=map_op_name),
536 lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64))
537 out_tensor.set_shape([None]) # necessary when t.ndims is unknown
538 return out_tensor
539 def _sparse_values_to_keep(t, keep_input):
540 """Convert a per-row `keep_input` vector to a per-value one."""
541 # Get the rows of every value in the sparse Tensor.
542 row_values = t.indices[:, 0]
543 # The value should be kept iff the row should be kept.
544 return array_ops.gather(keep_input, row_values)
545 if keep_input.shape.ndims == 1:
546 t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input))
547 store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name)
548 elif enqueue_many:
549 store_f = _maybe_store_many_sparse
550 else:
551 store_f = _maybe_store_sparse
552 return store_f(t, map_op_name, keep_input)
554 stored_list = [
555 _maybe_store(t, shared_map_op) for t, shared_map_op
556 in zip(tensor_list, maybe_shared_map_ops)]
557 # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`,
558 # we can't just get the Op of the resulting tensor.
559 def _sparse_op(stored):
560 for input_tensor in stored.op.inputs:
561 if input_tensor.op.type in ("AddSparseToTensorsMap",
562 "AddManySparseToTensorsMap"):
563 return input_tensor.op
564 # If there was no sparse input, then the original stored Tensor wasn't
565 # sparse and we can just return the original Tensor's Op.
566 return stored.op
567 sparse_info_list = [
568 _sparse_meta_data(t, _sparse_op(stored), shared_map_op)
569 for t, stored, shared_map_op
570 in zip(tensor_list, stored_list, maybe_shared_map_ops)]
571 # Expand dims of stored tensors by 1 for proper enqueue shape
572 stored_list = [
573 array_ops.expand_dims(s, [-1]) if s_info.sparse else s
574 for s, s_info in zip(stored_list, sparse_info_list)]
575 return stored_list, sparse_info_list
578def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input):
579 """Store SparseTensors for feeding into batch_join, etc."""
580 (s0, sparse_info_list) = _store_sparse_tensors(
581 tensor_list_list[0], enqueue_many, keep_input)
582 stored_list_list = [s0]
583 for tensor_list in tensor_list_list[1:]:
584 s, sparse_info_candidate = _store_sparse_tensors(
585 tensor_list, enqueue_many, keep_input,
586 [st.map_op for st in sparse_info_list])
587 if sparse_info_list != sparse_info_candidate:
588 raise ValueError("Inconsistent SparseTensors list: %s vs. %s"
589 % (tensor_list_list[0], tensor_list))
590 sparse_info_list = [
591 info.merge_with(candidate)
592 for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)]
593 stored_list_list.append(s)
595 return (stored_list_list, sparse_info_list)
598def _restore_sparse_tensors(stored_list, sparse_info_list):
599 """Restore SparseTensors after dequeue in batch, batch_join, etc."""
600 received_sequence = isinstance(stored_list, collections_abc.Sequence)
601 if not received_sequence:
602 stored_list = (stored_list,)
603 tensors = [
604 _restore_sparse(sparse_map_op=info.map_op,
605 sparse_handles=array_ops.squeeze(s, [1]),
606 rank=tensor_shape.dimension_value(info.rank + 1))
607 if info.sparse else s
608 for (s, info) in zip(stored_list, sparse_info_list)]
609 has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors)
610 if has_st:
611 t_values = [
612 x.values if isinstance(x, sparse_tensor.SparseTensor)
613 else x
614 for x in tensors]
615 with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x)
616 ensure_restore_tensors = [
617 sparse_tensor.SparseTensor(indices=with_deps(x.indices),
618 values=with_deps(x.values),
619 dense_shape=with_deps(x.dense_shape))
620 if isinstance(x, sparse_tensor.SparseTensor)
621 else with_deps(x)
622 for x in tensors]
623 else:
624 ensure_restore_tensors = tensors
625 return ensure_restore_tensors if received_sequence else tensors[0]
628def _validate(tensor_list):
629 tensor_list = indexed_slices.convert_n_to_tensor_or_indexed_slices(
630 tensor_list)
631 if not tensor_list:
632 raise ValueError("Expected at least one tensor in batch().")
633 return tensor_list
636def _validate_join(tensor_list_list):
637 tensor_list_list = [
638 indexed_slices.convert_n_to_tensor_or_indexed_slices(tl)
639 for tl in tensor_list_list
640 ]
641 if not tensor_list_list:
642 raise ValueError("Expected at least one input in batch_join().")
643 return tensor_list_list
646def _validate_keep_input(keep_input, enqueue_many):
647 """Validate `keep_input` argument to conditional batching functions."""
648 keep_input = ops.convert_to_tensor(keep_input)
649 if keep_input.shape.ndims is None:
650 raise ValueError(
651 "`keep_input` dimensions must be known at graph construction.")
652 if not enqueue_many and keep_input.shape.ndims == 1:
653 raise ValueError(
654 "`keep_input` cannot be a vector when `enqueue_many=False`.")
655 if keep_input.shape.ndims > 1:
656 raise ValueError("`keep_input` must be 0 or 1 dimensions.")
657 return keep_input
660def _dtypes(tensor_list_list):
661 all_types = [[t.dtype for t in tl] for tl in tensor_list_list]
662 types = all_types[0]
663 for other_types in all_types[1:]:
664 if other_types != types:
665 raise TypeError("Expected types to be consistent: %s vs. %s." %
666 (", ".join(x.name for x in types),
667 ", ".join(x.name for x in other_types)))
668 return types
671def _merge_shapes(shape_list, enqueue_many):
672 shape_list = [tensor_shape.as_shape(s) for s in shape_list]
673 if enqueue_many:
674 # We want the shapes without the leading batch dimension.
675 shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list]
676 merged_shape = shape_list[0]
677 for s in shape_list[1:]:
678 merged_shape.merge_with(s)
679 return merged_shape.as_list()
682def _shapes(tensor_list_list, shapes, enqueue_many):
683 """Calculate and merge the shapes of incoming tensors.
685 Args:
686 tensor_list_list: List of tensor lists.
687 shapes: List of shape tuples corresponding to tensors within the lists.
688 enqueue_many: Boolean describing whether shapes will be enqueued as
689 batches or individual entries.
691 Returns:
692 A list of shapes aggregating shape inference info from `tensor_list_list`,
693 or returning `shapes` if it is not `None`.
695 Raises:
696 ValueError: If any of the inferred shapes in `tensor_list_list` lack a
697 well defined rank.
698 """
699 if shapes is None:
700 len0 = len(tensor_list_list[0])
702 for tl in tensor_list_list:
703 for i in range(len0):
704 if tl[i].shape.ndims is None:
705 raise ValueError("Cannot infer Tensor's rank: %s" % tl[i])
707 shapes = [
708 _merge_shapes([tl[i].shape.as_list()
709 for tl in tensor_list_list], enqueue_many)
710 for i in range(len0)
711 ]
712 return shapes
715def _select_which_to_enqueue(tensor_list, keep_input):
716 """Select which examples to enqueue based on vector `keep_input`."""
717 select_i = math_ops.cast(keep_input, dtypes.int32)
718 tensor_list = [
719 data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1]
720 for x in tensor_list]
721 return tensor_list
724def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input):
725 """Enqueue `tensor_list_list` in `queue`."""
726 if enqueue_many:
727 enqueue_fn = queue.enqueue_many
728 else:
729 enqueue_fn = queue.enqueue
730 if keep_input.shape.ndims == 1:
731 enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input))
732 for x in tensor_list_list]
733 else:
734 enqueue_ops = [utils.smart_cond(
735 keep_input,
736 lambda: enqueue_fn(tl), # pylint:disable=cell-var-from-loop
737 control_flow_ops.no_op) for tl in tensor_list_list]
738 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
741def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input):
742 """Enqueue `tensor_list` in `queue`."""
743 if enqueue_many:
744 enqueue_fn = queue.enqueue_many
745 else:
746 enqueue_fn = queue.enqueue
747 if keep_input.shape.ndims == 1:
748 enqueue_ops = [
749 enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads
750 else:
751 enqueue_ops = [utils.smart_cond(
752 keep_input,
753 lambda: enqueue_fn(tensor_list),
754 control_flow_ops.no_op)] * threads
755 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
758def _which_queue(dynamic_pad):
759 return (data_flow_ops.PaddingFIFOQueue if dynamic_pad
760 else data_flow_ops.FIFOQueue)
763def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32,
764 enqueue_many=False, shapes=None, dynamic_pad=False,
765 allow_smaller_final_batch=False, shared_name=None,
766 name=None):
767 """Helper function for `batch` and `maybe_batch`."""
768 if context.executing_eagerly():
769 raise ValueError(
770 "Input pipelines based on Queues are not supported when eager execution"
771 " is enabled. Please use tf.data to ingest data into your model"
772 " instead.")
773 tensor_list = _as_tensor_list(tensors)
774 with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name:
775 tensor_list = _validate(tensor_list)
776 keep_input = _validate_keep_input(keep_input, enqueue_many)
777 (tensor_list, sparse_info) = _store_sparse_tensors(
778 tensor_list, enqueue_many, keep_input)
779 types = _dtypes([tensor_list])
780 shapes = _shapes([tensor_list], shapes, enqueue_many)
781 # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
782 queue = _which_queue(dynamic_pad)(
783 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
784 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
785 summary.scalar(
786 "fraction_of_%d_full" % capacity,
787 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
789 if allow_smaller_final_batch:
790 dequeued = queue.dequeue_up_to(batch_size, name=name)
791 else:
792 dequeued = queue.dequeue_many(batch_size, name=name)
793 dequeued = _restore_sparse_tensors(dequeued, sparse_info)
794 return _as_original_type(tensors, dequeued)
797# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be
798# a multiple of len(tensor_list_list)?) parameter, to address the use
799# case where you want more parallelism than you can support different
800# readers (either because you don't have that many files or can't
801# read that many files in parallel due to the number of seeks required).
802# Once this is done, batch() can be written as a call to batch_join().
803def _batch_join(tensors_list, batch_size, keep_input, capacity=32,
804 enqueue_many=False, shapes=None, dynamic_pad=False,
805 allow_smaller_final_batch=False, shared_name=None, name=None):
806 """Helper function for `batch_join` and `maybe_batch_join`."""
807 if context.executing_eagerly():
808 raise ValueError(
809 "Input pipelines based on Queues are not supported when eager execution"
810 " is enabled. Please use tf.data to ingest data into your model"
811 " instead.")
812 tensor_list_list = _as_tensor_list_list(tensors_list)
813 with ops.name_scope(name, "batch_join",
814 _flatten(tensor_list_list) + [keep_input]) as name:
815 tensor_list_list = _validate_join(tensor_list_list)
816 keep_input = _validate_keep_input(keep_input, enqueue_many)
817 tensor_list_list, sparse_info = _store_sparse_tensors_join(
818 tensor_list_list, enqueue_many, keep_input)
819 types = _dtypes(tensor_list_list)
820 shapes = _shapes(tensor_list_list, shapes, enqueue_many)
821 # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
822 queue = _which_queue(dynamic_pad)(
823 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
824 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
825 summary.scalar(
826 "fraction_of_%d_full" % capacity,
827 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
829 if allow_smaller_final_batch:
830 dequeued = queue.dequeue_up_to(batch_size, name=name)
831 else:
832 dequeued = queue.dequeue_many(batch_size, name=name)
833 dequeued = _restore_sparse_tensors(dequeued, sparse_info)
834 # tensors_list was validated to not be empty.
835 return _as_original_type(tensors_list[0], dequeued)
838def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
839 keep_input, num_threads=1, seed=None, enqueue_many=False,
840 shapes=None, allow_smaller_final_batch=False,
841 shared_name=None, name=None):
842 """Helper function for `shuffle_batch` and `maybe_shuffle_batch`."""
843 if context.executing_eagerly():
844 raise ValueError(
845 "Input pipelines based on Queues are not supported when eager execution"
846 " is enabled. Please use tf.data to ingest data into your model"
847 " instead.")
848 tensor_list = _as_tensor_list(tensors)
849 with ops.name_scope(name, "shuffle_batch",
850 list(tensor_list) + [keep_input]) as name:
851 if capacity <= min_after_dequeue:
852 raise ValueError("capacity %d must be bigger than min_after_dequeue %d."
853 % (capacity, min_after_dequeue))
854 tensor_list = _validate(tensor_list)
855 keep_input = _validate_keep_input(keep_input, enqueue_many)
856 tensor_list, sparse_info = _store_sparse_tensors(
857 tensor_list, enqueue_many, keep_input)
858 types = _dtypes([tensor_list])
859 shapes = _shapes([tensor_list], shapes, enqueue_many)
860 queue = data_flow_ops.RandomShuffleQueue(
861 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
862 dtypes=types, shapes=shapes, shared_name=shared_name)
863 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
864 full = (math_ops.cast(
865 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
866 (1. / (capacity - min_after_dequeue)))
867 # Note that name contains a '/' at the end so we intentionally do not place
868 # a '/' after %s below.
869 summary_name = (
870 "fraction_over_%d_of_%d_full" %
871 (min_after_dequeue, capacity - min_after_dequeue))
872 summary.scalar(summary_name, full)
874 if allow_smaller_final_batch:
875 dequeued = queue.dequeue_up_to(batch_size, name=name)
876 else:
877 dequeued = queue.dequeue_many(batch_size, name=name)
878 dequeued = _restore_sparse_tensors(dequeued, sparse_info)
879 return _as_original_type(tensors, dequeued)
882def _shuffle_batch_join(tensors_list, batch_size, capacity,
883 min_after_dequeue, keep_input, seed=None,
884 enqueue_many=False, shapes=None,
885 allow_smaller_final_batch=False, shared_name=None,
886 name=None):
887 """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`."""
888 if context.executing_eagerly():
889 raise ValueError(
890 "Input pipelines based on Queues are not supported when eager execution"
891 " is enabled. Please use tf.data to ingest data into your model"
892 " instead.")
893 tensor_list_list = _as_tensor_list_list(tensors_list)
894 with ops.name_scope(name, "shuffle_batch_join",
895 _flatten(tensor_list_list) + [keep_input]) as name:
896 tensor_list_list = _validate_join(tensor_list_list)
897 keep_input = _validate_keep_input(keep_input, enqueue_many)
898 tensor_list_list, sparse_info = _store_sparse_tensors_join(
899 tensor_list_list, enqueue_many, keep_input)
900 types = _dtypes(tensor_list_list)
901 shapes = _shapes(tensor_list_list, shapes, enqueue_many)
902 queue = data_flow_ops.RandomShuffleQueue(
903 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
904 dtypes=types, shapes=shapes, shared_name=shared_name)
905 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
906 full = (math_ops.cast(
907 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
908 (1. / (capacity - min_after_dequeue)))
909 # Note that name contains a '/' at the end so we intentionally do not place
910 # a '/' after %s below.
911 summary_name = (
912 "fraction_over_%d_of_%d_full" %
913 (min_after_dequeue, capacity - min_after_dequeue))
914 summary.scalar(summary_name, full)
916 if allow_smaller_final_batch:
917 dequeued = queue.dequeue_up_to(batch_size, name=name)
918 else:
919 dequeued = queue.dequeue_many(batch_size, name=name)
920 dequeued = _restore_sparse_tensors(dequeued, sparse_info)
921 # tensors_list was validated to not be empty.
922 return _as_original_type(tensors_list[0], dequeued)
924# Batching functions ----------------------------------------------------------
927@tf_export(v1=["train.batch"])
928@deprecation.deprecated(
929 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
930 "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if "
931 "`dynamic_pad=True`).")
932def batch(tensors, batch_size, num_threads=1, capacity=32,
933 enqueue_many=False, shapes=None, dynamic_pad=False,
934 allow_smaller_final_batch=False, shared_name=None, name=None):
935 """Creates batches of tensors in `tensors`.
937 The argument `tensors` can be a list or a dictionary of tensors.
938 The value returned by the function will be of the same type
939 as `tensors`.
941 This function is implemented using a queue. A `QueueRunner` for the
942 queue is added to the current `Graph`'s `QUEUE_RUNNER` collection.
944 If `enqueue_many` is `False`, `tensors` is assumed to represent a single
945 example. An input tensor with shape `[x, y, z]` will be output as a tensor
946 with shape `[batch_size, x, y, z]`.
948 If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of
949 examples, where the first dimension is indexed by example, and all members of
950 `tensors` should have the same size in the first dimension. If an input
951 tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x,
952 y, z]`. The `capacity` argument controls the how long the prefetching is
953 allowed to grow the queues.
955 The returned operation is a dequeue operation and will throw
956 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
957 operation is feeding another input queue, its queue runner will catch
958 this exception, however, if this operation is used in your main thread
959 you are responsible for catching this yourself.
961 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
962 (i) the `shapes` argument is passed, or (ii) all of the tensors in
963 `tensors` must have fully-defined shapes. `ValueError` will be
964 raised if neither of these conditions holds.
966 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
967 tensors is known, but individual dimensions may have shape `None`.
968 In this case, for each enqueue the dimensions with value `None`
969 may have a variable length; upon dequeue, the output tensors will be padded
970 on the right to the maximum shape of the tensors in the current minibatch.
971 For numbers, this padding takes value 0. For strings, this padding is
972 the empty string. See `PaddingFIFOQueue` for more info.
974 If `allow_smaller_final_batch` is `True`, a smaller batch value than
975 `batch_size` is returned when the queue is closed and there are not enough
976 elements to fill the batch, otherwise the pending elements are discarded.
977 In addition, all output tensors' static shapes, as accessed via the
978 `shape` property will have a first `Dimension` value of `None`, and
979 operations that depend on fixed batch_size would fail.
981 Args:
982 tensors: The list or dictionary of tensors to enqueue.
983 batch_size: The new batch size pulled from the queue.
984 num_threads: The number of threads enqueuing `tensors`. The batching will
985 be nondeterministic if `num_threads > 1`.
986 capacity: An integer. The maximum number of elements in the queue.
987 enqueue_many: Whether each tensor in `tensors` is a single example.
988 shapes: (Optional) The shapes for each example. Defaults to the
989 inferred shapes for `tensors`.
990 dynamic_pad: Boolean. Allow variable dimensions in input shapes.
991 The given dimensions are padded upon dequeue so that tensors within a
992 batch have the same shapes.
993 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
994 batch to be smaller if there are insufficient items left in the queue.
995 shared_name: (Optional). If set, this queue will be shared under the given
996 name across multiple sessions.
997 name: (Optional) A name for the operations.
999 Returns:
1000 A list or dictionary of tensors with the same types as `tensors` (except if
1001 the input is a list of one element, then it returns a tensor, not a list).
1003 Raises:
1004 ValueError: If the `shapes` are not specified, and cannot be
1005 inferred from the elements of `tensors`.
1007 @compatibility(eager)
1008 Input pipelines based on Queues are not supported when eager execution is
1009 enabled. Please use the `tf.data` API to ingest data under eager execution.
1010 @end_compatibility
1011 """
1012 return _batch(
1013 tensors,
1014 batch_size,
1015 keep_input=True,
1016 num_threads=num_threads,
1017 capacity=capacity,
1018 enqueue_many=enqueue_many,
1019 shapes=shapes,
1020 dynamic_pad=dynamic_pad,
1021 allow_smaller_final_batch=allow_smaller_final_batch,
1022 shared_name=shared_name,
1023 name=name)
1026@tf_export(v1=["train.maybe_batch"])
1027@deprecation.deprecated(
1028 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1029 "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`"
1030 " if `dynamic_pad=True`).")
1031def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32,
1032 enqueue_many=False, shapes=None, dynamic_pad=False,
1033 allow_smaller_final_batch=False, shared_name=None, name=None):
1034 """Conditionally creates batches of tensors based on `keep_input`.
1036 See docstring in `batch` for more details.
1038 Args:
1039 tensors: The list or dictionary of tensors to enqueue.
1040 keep_input: A `bool` Tensor. This tensor controls whether the input is
1041 added to the queue or not. If it is a scalar and evaluates `True`, then
1042 `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1043 is `True`, then each example is added to the queue only if the
1044 corresponding value in `keep_input` is `True`. This tensor essentially
1045 acts as a filtering mechanism.
1046 batch_size: The new batch size pulled from the queue.
1047 num_threads: The number of threads enqueuing `tensors`. The batching will
1048 be nondeterministic if `num_threads > 1`.
1049 capacity: An integer. The maximum number of elements in the queue.
1050 enqueue_many: Whether each tensor in `tensors` is a single example.
1051 shapes: (Optional) The shapes for each example. Defaults to the
1052 inferred shapes for `tensors`.
1053 dynamic_pad: Boolean. Allow variable dimensions in input shapes.
1054 The given dimensions are padded upon dequeue so that tensors within a
1055 batch have the same shapes.
1056 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1057 batch to be smaller if there are insufficient items left in the queue.
1058 shared_name: (Optional). If set, this queue will be shared under the given
1059 name across multiple sessions.
1060 name: (Optional) A name for the operations.
1062 Returns:
1063 A list or dictionary of tensors with the same types as `tensors`.
1065 Raises:
1066 ValueError: If the `shapes` are not specified, and cannot be
1067 inferred from the elements of `tensors`.
1068 """
1069 return _batch(
1070 tensors,
1071 batch_size,
1072 keep_input,
1073 num_threads=num_threads,
1074 capacity=capacity,
1075 enqueue_many=enqueue_many,
1076 shapes=shapes,
1077 dynamic_pad=dynamic_pad,
1078 allow_smaller_final_batch=allow_smaller_final_batch,
1079 shared_name=shared_name,
1080 name=name)
1083@tf_export(v1=["train.batch_join"])
1084@deprecation.deprecated(
1085 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1086 "`tf.data.Dataset.interleave(...).batch(batch_size)` (or "
1087 "`padded_batch(...)` if `dynamic_pad=True`).")
1088def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False,
1089 shapes=None, dynamic_pad=False, allow_smaller_final_batch=False,
1090 shared_name=None, name=None):
1091 """Runs a list of tensors to fill a queue to create batches of examples.
1093 The `tensors_list` argument is a list of tuples of tensors, or a list of
1094 dictionaries of tensors. Each element in the list is treated similarly
1095 to the `tensors` argument of `tf.compat.v1.train.batch()`.
1097 WARNING: This function is nondeterministic, since it starts a separate thread
1098 for each tensor.
1100 Enqueues a different list of tensors in different threads.
1101 Implemented using a queue -- a `QueueRunner` for the queue
1102 is added to the current `Graph`'s `QUEUE_RUNNER` collection.
1104 `len(tensors_list)` threads will be started,
1105 with thread `i` enqueuing the tensors from
1106 `tensors_list[i]`. `tensors_list[i1][j]` must match
1107 `tensors_list[i2][j]` in type and shape, except in the first
1108 dimension if `enqueue_many` is true.
1110 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1111 to represent a single example. An input tensor `x` will be output as a
1112 tensor with shape `[batch_size] + x.shape`.
1114 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1115 represent a batch of examples, where the first dimension is indexed
1116 by example, and all members of `tensors_list[i]` should have the
1117 same size in the first dimension. The slices of any input tensor
1118 `x` are treated as examples, and the output tensors will have shape
1119 `[batch_size] + x.shape[1:]`.
1121 The `capacity` argument controls the how long the prefetching is allowed to
1122 grow the queues.
1124 The returned operation is a dequeue operation and will throw
1125 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1126 operation is feeding another input queue, its queue runner will catch
1127 this exception, however, if this operation is used in your main thread
1128 you are responsible for catching this yourself.
1130 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
1131 (i) the `shapes` argument is passed, or (ii) all of the tensors in
1132 `tensors_list` must have fully-defined shapes. `ValueError` will be
1133 raised if neither of these conditions holds.
1135 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
1136 tensors is known, but individual dimensions may have value `None`.
1137 In this case, for each enqueue the dimensions with value `None`
1138 may have a variable length; upon dequeue, the output tensors will be padded
1139 on the right to the maximum shape of the tensors in the current minibatch.
1140 For numbers, this padding takes value 0. For strings, this padding is
1141 the empty string. See `PaddingFIFOQueue` for more info.
1143 If `allow_smaller_final_batch` is `True`, a smaller batch value than
1144 `batch_size` is returned when the queue is closed and there are not enough
1145 elements to fill the batch, otherwise the pending elements are discarded.
1146 In addition, all output tensors' static shapes, as accessed via the
1147 `shape` property will have a first `Dimension` value of `None`, and
1148 operations that depend on fixed batch_size would fail.
1150 Args:
1151 tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1152 batch_size: An integer. The new batch size pulled from the queue.
1153 capacity: An integer. The maximum number of elements in the queue.
1154 enqueue_many: Whether each tensor in `tensor_list_list` is a single
1155 example.
1156 shapes: (Optional) The shapes for each example. Defaults to the
1157 inferred shapes for `tensor_list_list[i]`.
1158 dynamic_pad: Boolean. Allow variable dimensions in input shapes.
1159 The given dimensions are padded upon dequeue so that tensors within a
1160 batch have the same shapes.
1161 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1162 batch to be smaller if there are insufficient items left in the queue.
1163 shared_name: (Optional) If set, this queue will be shared under the given
1164 name across multiple sessions.
1165 name: (Optional) A name for the operations.
1167 Returns:
1168 A list or dictionary of tensors with the same number and types as
1169 `tensors_list[i]`.
1171 Raises:
1172 ValueError: If the `shapes` are not specified, and cannot be
1173 inferred from the elements of `tensor_list_list`.
1175 @compatibility(eager)
1176 Input pipelines based on Queues are not supported when eager execution is
1177 enabled. Please use the `tf.data` API to ingest data under eager execution.
1178 @end_compatibility
1179 """
1180 return _batch_join(
1181 tensors_list,
1182 batch_size,
1183 keep_input=True,
1184 capacity=capacity,
1185 enqueue_many=enqueue_many,
1186 shapes=shapes,
1187 dynamic_pad=dynamic_pad,
1188 allow_smaller_final_batch=allow_smaller_final_batch,
1189 shared_name=shared_name,
1190 name=name)
1193@tf_export(v1=["train.maybe_batch_join"])
1194@deprecation.deprecated(
1195 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1196 "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or "
1197 "`padded_batch(...)` if `dynamic_pad=True`).")
1198def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32,
1199 enqueue_many=False, shapes=None, dynamic_pad=False,
1200 allow_smaller_final_batch=False, shared_name=None,
1201 name=None):
1202 """Runs a list of tensors to conditionally fill a queue to create batches.
1204 See docstring in `batch_join` for more details.
1206 Args:
1207 tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1208 keep_input: A `bool` Tensor. This tensor controls whether the input is
1209 added to the queue or not. If it is a scalar and evaluates `True`, then
1210 `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1211 is `True`, then each example is added to the queue only if the
1212 corresponding value in `keep_input` is `True`. This tensor essentially
1213 acts as a filtering mechanism.
1214 batch_size: An integer. The new batch size pulled from the queue.
1215 capacity: An integer. The maximum number of elements in the queue.
1216 enqueue_many: Whether each tensor in `tensor_list_list` is a single
1217 example.
1218 shapes: (Optional) The shapes for each example. Defaults to the
1219 inferred shapes for `tensor_list_list[i]`.
1220 dynamic_pad: Boolean. Allow variable dimensions in input shapes.
1221 The given dimensions are padded upon dequeue so that tensors within a
1222 batch have the same shapes.
1223 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1224 batch to be smaller if there are insufficient items left in the queue.
1225 shared_name: (Optional) If set, this queue will be shared under the given
1226 name across multiple sessions.
1227 name: (Optional) A name for the operations.
1229 Returns:
1230 A list or dictionary of tensors with the same number and types as
1231 `tensors_list[i]`.
1233 Raises:
1234 ValueError: If the `shapes` are not specified, and cannot be
1235 inferred from the elements of `tensor_list_list`.
1236 """
1237 return _batch_join(
1238 tensors_list,
1239 batch_size,
1240 keep_input,
1241 capacity=capacity,
1242 enqueue_many=enqueue_many,
1243 shapes=shapes,
1244 dynamic_pad=dynamic_pad,
1245 allow_smaller_final_batch=allow_smaller_final_batch,
1246 shared_name=shared_name,
1247 name=name)
1250@tf_export(v1=["train.shuffle_batch"])
1251@deprecation.deprecated(
1252 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1253 "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.")
1254def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1255 num_threads=1, seed=None, enqueue_many=False, shapes=None,
1256 allow_smaller_final_batch=False, shared_name=None, name=None):
1257 """Creates batches by randomly shuffling tensors.
1259 This function adds the following to the current `Graph`:
1261 * A shuffling queue into which tensors from `tensors` are enqueued.
1262 * A `dequeue_many` operation to create batches from the queue.
1263 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1264 from `tensors`.
1266 If `enqueue_many` is `False`, `tensors` is assumed to represent a
1267 single example. An input tensor with shape `[x, y, z]` will be output
1268 as a tensor with shape `[batch_size, x, y, z]`.
1270 If `enqueue_many` is `True`, `tensors` is assumed to represent a
1271 batch of examples, where the first dimension is indexed by example,
1272 and all members of `tensors` should have the same size in the
1273 first dimension. If an input tensor has shape `[*, x, y, z]`, the
1274 output will have shape `[batch_size, x, y, z]`.
1276 The `capacity` argument controls the how long the prefetching is allowed to
1277 grow the queues.
1279 The returned operation is a dequeue operation and will throw
1280 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1281 operation is feeding another input queue, its queue runner will catch
1282 this exception, however, if this operation is used in your main thread
1283 you are responsible for catching this yourself.
1285 For example:
1287 ```python
1288 # Creates batches of 32 images and 32 labels.
1289 image_batch, label_batch = tf.compat.v1.train.shuffle_batch(
1290 [single_image, single_label],
1291 batch_size=32,
1292 num_threads=4,
1293 capacity=50000,
1294 min_after_dequeue=10000)
1295 ```
1297 *N.B.:* You must ensure that either (i) the `shapes` argument is
1298 passed, or (ii) all of the tensors in `tensors` must have
1299 fully-defined shapes. `ValueError` will be raised if neither of
1300 these conditions holds.
1302 If `allow_smaller_final_batch` is `True`, a smaller batch value than
1303 `batch_size` is returned when the queue is closed and there are not enough
1304 elements to fill the batch, otherwise the pending elements are discarded.
1305 In addition, all output tensors' static shapes, as accessed via the
1306 `shape` property will have a first `Dimension` value of `None`, and
1307 operations that depend on fixed batch_size would fail.
1309 Args:
1310 tensors: The list or dictionary of tensors to enqueue.
1311 batch_size: The new batch size pulled from the queue.
1312 capacity: An integer. The maximum number of elements in the queue.
1313 min_after_dequeue: Minimum number elements in the queue after a
1314 dequeue, used to ensure a level of mixing of elements.
1315 num_threads: The number of threads enqueuing `tensor_list`.
1316 seed: Seed for the random shuffling within the queue.
1317 enqueue_many: Whether each tensor in `tensor_list` is a single example.
1318 shapes: (Optional) The shapes for each example. Defaults to the
1319 inferred shapes for `tensor_list`.
1320 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1321 batch to be smaller if there are insufficient items left in the queue.
1322 shared_name: (Optional) If set, this queue will be shared under the given
1323 name across multiple sessions.
1324 name: (Optional) A name for the operations.
1326 Returns:
1327 A list or dictionary of tensors with the types as `tensors`.
1329 Raises:
1330 ValueError: If the `shapes` are not specified, and cannot be
1331 inferred from the elements of `tensors`.
1333 @compatibility(eager)
1334 Input pipelines based on Queues are not supported when eager execution is
1335 enabled. Please use the `tf.data` API to ingest data under eager execution.
1336 @end_compatibility
1337 """
1338 return _shuffle_batch(
1339 tensors,
1340 batch_size,
1341 capacity,
1342 min_after_dequeue,
1343 keep_input=True,
1344 num_threads=num_threads,
1345 seed=seed,
1346 enqueue_many=enqueue_many,
1347 shapes=shapes,
1348 allow_smaller_final_batch=allow_smaller_final_batch,
1349 shared_name=shared_name,
1350 name=name)
1353@tf_export(v1=["train.maybe_shuffle_batch"])
1354@deprecation.deprecated(
1355 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1356 "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`"
1357 ".")
1358def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1359 keep_input, num_threads=1, seed=None,
1360 enqueue_many=False, shapes=None,
1361 allow_smaller_final_batch=False, shared_name=None,
1362 name=None):
1363 """Creates batches by randomly shuffling conditionally-enqueued tensors.
1365 See docstring in `shuffle_batch` for more details.
1367 Args:
1368 tensors: The list or dictionary of tensors to enqueue.
1369 batch_size: The new batch size pulled from the queue.
1370 capacity: An integer. The maximum number of elements in the queue.
1371 min_after_dequeue: Minimum number elements in the queue after a
1372 dequeue, used to ensure a level of mixing of elements.
1373 keep_input: A `bool` Tensor. This tensor controls whether the input is
1374 added to the queue or not. If it is a scalar and evaluates `True`, then
1375 `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1376 is `True`, then each example is added to the queue only if the
1377 corresponding value in `keep_input` is `True`. This tensor essentially
1378 acts as a filtering mechanism.
1379 num_threads: The number of threads enqueuing `tensor_list`.
1380 seed: Seed for the random shuffling within the queue.
1381 enqueue_many: Whether each tensor in `tensor_list` is a single example.
1382 shapes: (Optional) The shapes for each example. Defaults to the
1383 inferred shapes for `tensor_list`.
1384 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1385 batch to be smaller if there are insufficient items left in the queue.
1386 shared_name: (Optional) If set, this queue will be shared under the given
1387 name across multiple sessions.
1388 name: (Optional) A name for the operations.
1390 Returns:
1391 A list or dictionary of tensors with the types as `tensors`.
1393 Raises:
1394 ValueError: If the `shapes` are not specified, and cannot be
1395 inferred from the elements of `tensors`.
1397 @compatibility(eager)
1398 Input pipelines based on Queues are not supported when eager execution is
1399 enabled. Please use the `tf.data` API to ingest data under eager execution.
1400 @end_compatibility
1401 """
1402 return _shuffle_batch(
1403 tensors,
1404 batch_size,
1405 capacity,
1406 min_after_dequeue,
1407 keep_input,
1408 num_threads=num_threads,
1409 seed=seed,
1410 enqueue_many=enqueue_many,
1411 shapes=shapes,
1412 allow_smaller_final_batch=allow_smaller_final_batch,
1413 shared_name=shared_name,
1414 name=name)
1417@tf_export(v1=["train.shuffle_batch_join"])
1418@deprecation.deprecated(
1419 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1420 "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch"
1421 "(batch_size)`.")
1422def shuffle_batch_join(tensors_list, batch_size, capacity,
1423 min_after_dequeue, seed=None, enqueue_many=False,
1424 shapes=None, allow_smaller_final_batch=False,
1425 shared_name=None, name=None):
1426 """Create batches by randomly shuffling tensors.
1428 The `tensors_list` argument is a list of tuples of tensors, or a list of
1429 dictionaries of tensors. Each element in the list is treated similarly
1430 to the `tensors` argument of `tf.compat.v1.train.shuffle_batch()`.
1432 This version enqueues a different list of tensors in different threads.
1433 It adds the following to the current `Graph`:
1435 * A shuffling queue into which tensors from `tensors_list` are enqueued.
1436 * A `dequeue_many` operation to create batches from the queue.
1437 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1438 from `tensors_list`.
1440 `len(tensors_list)` threads will be started, with thread `i` enqueuing
1441 the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match
1442 `tensors_list[i2][j]` in type and shape, except in the first dimension if
1443 `enqueue_many` is true.
1445 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1446 to represent a single example. An input tensor with shape `[x, y, z]`
1447 will be output as a tensor with shape `[batch_size, x, y, z]`.
1449 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1450 represent a batch of examples, where the first dimension is indexed
1451 by example, and all members of `tensors_list[i]` should have the
1452 same size in the first dimension. If an input tensor has shape `[*, x,
1453 y, z]`, the output will have shape `[batch_size, x, y, z]`.
1455 The `capacity` argument controls the how long the prefetching is allowed to
1456 grow the queues.
1458 The returned operation is a dequeue operation and will throw
1459 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1460 operation is feeding another input queue, its queue runner will catch
1461 this exception, however, if this operation is used in your main thread
1462 you are responsible for catching this yourself.
1464 If `allow_smaller_final_batch` is `True`, a smaller batch value than
1465 `batch_size` is returned when the queue is closed and there are not enough
1466 elements to fill the batch, otherwise the pending elements are discarded.
1467 In addition, all output tensors' static shapes, as accessed via the
1468 `shape` property will have a first `Dimension` value of `None`, and
1469 operations that depend on fixed batch_size would fail.
1471 Args:
1472 tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1473 batch_size: An integer. The new batch size pulled from the queue.
1474 capacity: An integer. The maximum number of elements in the queue.
1475 min_after_dequeue: Minimum number elements in the queue after a
1476 dequeue, used to ensure a level of mixing of elements.
1477 seed: Seed for the random shuffling within the queue.
1478 enqueue_many: Whether each tensor in `tensor_list_list` is a single
1479 example.
1480 shapes: (Optional) The shapes for each example. Defaults to the
1481 inferred shapes for `tensors_list[i]`.
1482 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1483 batch to be smaller if there are insufficient items left in the queue.
1484 shared_name: (optional). If set, this queue will be shared under the given
1485 name across multiple sessions.
1486 name: (Optional) A name for the operations.
1488 Returns:
1489 A list or dictionary of tensors with the same number and types as
1490 `tensors_list[i]`.
1492 Raises:
1493 ValueError: If the `shapes` are not specified, and cannot be
1494 inferred from the elements of `tensors_list`.
1496 @compatibility(eager)
1497 Input pipelines based on Queues are not supported when eager execution is
1498 enabled. Please use the `tf.data` API to ingest data under eager execution.
1499 @end_compatibility
1500 """
1501 return _shuffle_batch_join(
1502 tensors_list,
1503 batch_size,
1504 capacity,
1505 min_after_dequeue,
1506 keep_input=True,
1507 seed=seed,
1508 enqueue_many=enqueue_many,
1509 shapes=shapes,
1510 allow_smaller_final_batch=allow_smaller_final_batch,
1511 shared_name=shared_name,
1512 name=name)
1515@tf_export(v1=["train.maybe_shuffle_batch_join"])
1516@deprecation.deprecated(
1517 None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1518 "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)"
1519 ".batch(batch_size)`.")
1520def maybe_shuffle_batch_join(tensors_list, batch_size, capacity,
1521 min_after_dequeue, keep_input, seed=None,
1522 enqueue_many=False, shapes=None,
1523 allow_smaller_final_batch=False, shared_name=None,
1524 name=None):
1525 """Create batches by randomly shuffling conditionally-enqueued tensors.
1527 See docstring in `shuffle_batch_join` for more details.
1529 Args:
1530 tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1531 batch_size: An integer. The new batch size pulled from the queue.
1532 capacity: An integer. The maximum number of elements in the queue.
1533 min_after_dequeue: Minimum number elements in the queue after a
1534 dequeue, used to ensure a level of mixing of elements.
1535 keep_input: A `bool` Tensor. This tensor controls whether the input is
1536 added to the queue or not. If it is a scalar and evaluates `True`, then
1537 `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1538 is `True`, then each example is added to the queue only if the
1539 corresponding value in `keep_input` is `True`. This tensor essentially
1540 acts as a filtering mechanism.
1541 seed: Seed for the random shuffling within the queue.
1542 enqueue_many: Whether each tensor in `tensor_list_list` is a single
1543 example.
1544 shapes: (Optional) The shapes for each example. Defaults to the
1545 inferred shapes for `tensors_list[i]`.
1546 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1547 batch to be smaller if there are insufficient items left in the queue.
1548 shared_name: (optional). If set, this queue will be shared under the given
1549 name across multiple sessions.
1550 name: (Optional) A name for the operations.
1552 Returns:
1553 A list or dictionary of tensors with the same number and types as
1554 `tensors_list[i]`.
1556 Raises:
1557 ValueError: If the `shapes` are not specified, and cannot be
1558 inferred from the elements of `tensors_list`.
1560 @compatibility(eager)
1561 Input pipelines based on Queues are not supported when eager execution is
1562 enabled. Please use the `tf.data` API to ingest data under eager execution.
1563 @end_compatibility
1564 """
1565 return _shuffle_batch_join(
1566 tensors_list,
1567 batch_size,
1568 capacity,
1569 min_after_dequeue,
1570 keep_input,
1571 seed=seed,
1572 enqueue_many=enqueue_many,
1573 shapes=shapes,
1574 allow_smaller_final_batch=allow_smaller_final_batch,
1575 shared_name=shared_name,
1576 name=name)