Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/training/input.py: 26%

385 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15 

16"""Input pipeline. 

17 

18Please see the [reading data 

19how-to](https://tensorflow.org/api_guides/python/reading_data) 

20for context. 

21""" 

22 

23 

24from tensorflow.python.eager import context 

25from tensorflow.python.framework import constant_op 

26from tensorflow.python.framework import dtypes 

27from tensorflow.python.framework import indexed_slices 

28from tensorflow.python.framework import ops 

29from tensorflow.python.framework import sparse_tensor 

30from tensorflow.python.framework import tensor_shape 

31from tensorflow.python.layers import utils 

32from tensorflow.python.ops import array_ops 

33from tensorflow.python.ops import control_flow_assert 

34from tensorflow.python.ops import control_flow_ops 

35from tensorflow.python.ops import data_flow_ops 

36from tensorflow.python.ops import io_ops 

37from tensorflow.python.ops import math_ops 

38from tensorflow.python.ops import random_ops 

39from tensorflow.python.ops import sparse_ops 

40from tensorflow.python.ops import variable_v1 

41from tensorflow.python.summary import summary 

42from tensorflow.python.training import queue_runner 

43from tensorflow.python.util import deprecation 

44from tensorflow.python.util.compat import collections_abc 

45from tensorflow.python.util.tf_export import tf_export 

46 

47 

48# pylint: disable=protected-access 

49_store_sparse = sparse_ops._add_sparse_to_tensors_map 

50_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map 

51_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map 

52# pylint: enable=protected-access 

53 

54 

55@tf_export( 

56 "io.match_filenames_once", 

57 v1=["io.match_filenames_once", "train.match_filenames_once"]) 

58@deprecation.deprecated_endpoints("train.match_filenames_once") 

59def match_filenames_once(pattern, name=None): 

60 """Save the list of files matching pattern, so it is only computed once. 

61 

62 NOTE: The order of the files returned is deterministic. 

63 

64 Args: 

65 pattern: A file pattern (glob), or 1D tensor of file patterns. 

66 name: A name for the operations (optional). 

67 

68 Returns: 

69 A variable that is initialized to the list of files matching the pattern(s). 

70 """ 

71 with ops.name_scope(name, "matching_filenames", [pattern]) as name: 

72 return variable_v1.VariableV1( 

73 name=name, initial_value=io_ops.matching_files(pattern), 

74 trainable=False, validate_shape=False, 

75 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 

76 

77 

78@tf_export(v1=["train.limit_epochs"]) 

79@deprecation.deprecated( 

80 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

81 "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.") 

82def limit_epochs(tensor, num_epochs=None, name=None): 

83 """Returns tensor `num_epochs` times and then raises an `OutOfRange` error. 

84 

85 Note: creates local counter `epochs`. Use `local_variables_initializer()` to 

86 initialize local variables. 

87 

88 Args: 

89 tensor: Any `Tensor`. 

90 num_epochs: A positive integer (optional). If specified, limits the number 

91 of steps the output tensor may be evaluated. 

92 name: A name for the operations (optional). 

93 

94 Returns: 

95 tensor or `OutOfRange`. 

96 

97 Raises: 

98 ValueError: if `num_epochs` is invalid. 

99 """ 

100 if num_epochs is None: 

101 return tensor 

102 if num_epochs <= 0: 

103 raise ValueError("num_epochs must be > 0 not %d." % num_epochs) 

104 with ops.name_scope(name, "limit_epochs", [tensor]) as name: 

105 zero64 = constant_op.constant(0, dtype=dtypes.int64) 

106 epochs = variable_v1.VariableV1( 

107 zero64, name="epochs", trainable=False, 

108 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 

109 counter = epochs.count_up_to(num_epochs) 

110 with ops.control_dependencies([counter]): 

111 return array_ops.identity(tensor, name=name) 

112 

113 

114@tf_export(v1=["train.input_producer"]) 

115@deprecation.deprecated( 

116 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

117 "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle" 

118 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 

119 "`shuffle=False`, omit the `.shuffle(...)`.") 

120def input_producer(input_tensor, 

121 element_shape=None, 

122 num_epochs=None, 

123 shuffle=True, 

124 seed=None, 

125 capacity=32, 

126 shared_name=None, 

127 summary_name=None, 

128 name=None, 

129 cancel_op=None): 

130 """Output the rows of `input_tensor` to a queue for an input pipeline. 

131 

132 Note: if `num_epochs` is not `None`, this function creates local counter 

133 `epochs`. Use `local_variables_initializer()` to initialize local variables. 

134 

135 Args: 

136 input_tensor: A tensor with the rows to produce. Must be at least 

137 one-dimensional. Must either have a fully-defined shape, or 

138 `element_shape` must be defined. 

139 element_shape: (Optional.) A `TensorShape` representing the shape of a 

140 row of `input_tensor`, if it cannot be inferred. 

141 num_epochs: (Optional.) An integer. If specified `input_producer` produces 

142 each row of `input_tensor` `num_epochs` times before generating an 

143 `OutOfRange` error. If not specified, `input_producer` can cycle through 

144 the rows of `input_tensor` an unlimited number of times. 

145 shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled 

146 within each epoch. 

147 seed: (Optional.) An integer. The seed to use if `shuffle` is true. 

148 capacity: (Optional.) The capacity of the queue to be used for buffering 

149 the input. 

150 shared_name: (Optional.) If set, this queue will be shared under the given 

151 name across multiple sessions. 

152 summary_name: (Optional.) If set, a scalar summary for the current queue 

153 size will be generated, using this name as part of the tag. 

154 name: (Optional.) A name for queue. 

155 cancel_op: (Optional.) Cancel op for the queue 

156 

157 Returns: 

158 A queue with the output rows. A `QueueRunner` for the queue is 

159 added to the current `QUEUE_RUNNER` collection of the current 

160 graph. 

161 

162 Raises: 

163 ValueError: If the shape of the input cannot be inferred from the arguments. 

164 RuntimeError: If called with eager execution enabled. 

165 

166 @compatibility(eager) 

167 Input pipelines based on Queues are not supported when eager execution is 

168 enabled. Please use the `tf.data` API to ingest data under eager execution. 

169 @end_compatibility 

170 """ 

171 if context.executing_eagerly(): 

172 raise RuntimeError( 

173 "Input pipelines based on Queues are not supported when eager execution" 

174 " is enabled. Please use tf.data to ingest data into your model" 

175 " instead.") 

176 with ops.name_scope(name, "input_producer", [input_tensor]): 

177 input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor") 

178 element_shape = input_tensor.shape[1:].merge_with(element_shape) 

179 if not element_shape.is_fully_defined(): 

180 raise ValueError("Either `input_tensor` must have a fully defined shape " 

181 "or `element_shape` must be specified") 

182 

183 if shuffle: 

184 input_tensor = random_ops.random_shuffle(input_tensor, seed=seed) 

185 

186 input_tensor = limit_epochs(input_tensor, num_epochs) 

187 

188 q = data_flow_ops.FIFOQueue(capacity=capacity, 

189 dtypes=[input_tensor.dtype.base_dtype], 

190 shapes=[element_shape], 

191 shared_name=shared_name, name=name) 

192 enq = q.enqueue_many([input_tensor]) 

193 queue_runner.add_queue_runner( 

194 queue_runner.QueueRunner( 

195 q, [enq], cancel_op=cancel_op)) 

196 if summary_name is not None: 

197 summary.scalar(summary_name, 

198 math_ops.cast(q.size(), dtypes.float32) * (1. / capacity)) 

199 return q 

200 

201 

202@tf_export(v1=["train.string_input_producer"]) 

203@deprecation.deprecated( 

204 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

205 "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle" 

206 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 

207 "`shuffle=False`, omit the `.shuffle(...)`.") 

208def string_input_producer(string_tensor, 

209 num_epochs=None, 

210 shuffle=True, 

211 seed=None, 

212 capacity=32, 

213 shared_name=None, 

214 name=None, 

215 cancel_op=None): 

216 """Output strings (e.g. filenames) to a queue for an input pipeline. 

217 

218 Note: if `num_epochs` is not `None`, this function creates local counter 

219 `epochs`. Use `local_variables_initializer()` to initialize local variables. 

220 

221 Args: 

222 string_tensor: A 1-D string tensor with the strings to produce. 

223 num_epochs: An integer (optional). If specified, `string_input_producer` 

224 produces each string from `string_tensor` `num_epochs` times before 

225 generating an `OutOfRange` error. If not specified, 

226 `string_input_producer` can cycle through the strings in `string_tensor` 

227 an unlimited number of times. 

228 shuffle: Boolean. If true, the strings are randomly shuffled within each 

229 epoch. 

230 seed: An integer (optional). Seed used if shuffle == True. 

231 capacity: An integer. Sets the queue capacity. 

232 shared_name: (optional). If set, this queue will be shared under the given 

233 name across multiple sessions. All sessions open to the device which has 

234 this queue will be able to access it via the shared_name. Using this in 

235 a distributed setting means each name will only be seen by one of the 

236 sessions which has access to this operation. 

237 name: A name for the operations (optional). 

238 cancel_op: Cancel op for the queue (optional). 

239 

240 Returns: 

241 A queue with the output strings. A `QueueRunner` for the Queue 

242 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 

243 

244 Raises: 

245 ValueError: If the string_tensor is a null Python list. At runtime, 

246 will fail with an assertion if string_tensor becomes a null tensor. 

247 

248 @compatibility(eager) 

249 Input pipelines based on Queues are not supported when eager execution is 

250 enabled. Please use the `tf.data` API to ingest data under eager execution. 

251 @end_compatibility 

252 """ 

253 not_null_err = "string_input_producer requires a non-null input tensor" 

254 if not isinstance(string_tensor, ops.Tensor) and not string_tensor: 

255 raise ValueError(not_null_err) 

256 

257 with ops.name_scope(name, "input_producer", [string_tensor]) as name: 

258 string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string) 

259 with ops.control_dependencies([ 

260 control_flow_assert.Assert( 

261 math_ops.greater(array_ops.size(string_tensor), 0), [not_null_err]) 

262 ]): 

263 string_tensor = array_ops.identity(string_tensor) 

264 return input_producer( 

265 input_tensor=string_tensor, 

266 element_shape=[], 

267 num_epochs=num_epochs, 

268 shuffle=shuffle, 

269 seed=seed, 

270 capacity=capacity, 

271 shared_name=shared_name, 

272 name=name, 

273 summary_name="fraction_of_%d_full" % capacity, 

274 cancel_op=cancel_op) 

275 

276 

277@tf_export(v1=["train.range_input_producer"]) 

278@deprecation.deprecated( 

279 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

280 "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If " 

281 "`shuffle=False`, omit the `.shuffle(...)`.") 

282def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None, 

283 capacity=32, shared_name=None, name=None): 

284 """Produces the integers from 0 to limit-1 in a queue. 

285 

286 Note: if `num_epochs` is not `None`, this function creates local counter 

287 `epochs`. Use `local_variables_initializer()` to initialize local variables. 

288 

289 Args: 

290 limit: An int32 scalar tensor. 

291 num_epochs: An integer (optional). If specified, `range_input_producer` 

292 produces each integer `num_epochs` times before generating an 

293 OutOfRange error. If not specified, `range_input_producer` can cycle 

294 through the integers an unlimited number of times. 

295 shuffle: Boolean. If true, the integers are randomly shuffled within each 

296 epoch. 

297 seed: An integer (optional). Seed used if shuffle == True. 

298 capacity: An integer. Sets the queue capacity. 

299 shared_name: (optional). If set, this queue will be shared under the given 

300 name across multiple sessions. 

301 name: A name for the operations (optional). 

302 

303 Returns: 

304 A Queue with the output integers. A `QueueRunner` for the Queue 

305 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 

306 

307 @compatibility(eager) 

308 Input pipelines based on Queues are not supported when eager execution is 

309 enabled. Please use the `tf.data` API to ingest data under eager execution. 

310 @end_compatibility 

311 """ 

312 with ops.name_scope(name, "input_producer", [limit]) as name: 

313 range_tensor = math_ops.range(limit) 

314 return input_producer( 

315 range_tensor, [], num_epochs, shuffle, seed, capacity, 

316 shared_name, "fraction_of_%d_full" % capacity, name) 

317 

318 

319@tf_export(v1=["train.slice_input_producer"]) 

320@deprecation.deprecated( 

321 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

322 "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle" 

323 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 

324 "`shuffle=False`, omit the `.shuffle(...)`.") 

325def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, 

326 capacity=32, shared_name=None, name=None): 

327 """Produces a slice of each `Tensor` in `tensor_list`. 

328 

329 Implemented using a Queue -- a `QueueRunner` for the Queue 

330 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 

331 

332 Args: 

333 tensor_list: A list of `Tensor` objects. Every `Tensor` in 

334 `tensor_list` must have the same size in the first dimension. 

335 num_epochs: An integer (optional). If specified, `slice_input_producer` 

336 produces each slice `num_epochs` times before generating 

337 an `OutOfRange` error. If not specified, `slice_input_producer` can cycle 

338 through the slices an unlimited number of times. 

339 shuffle: Boolean. If true, the integers are randomly shuffled within each 

340 epoch. 

341 seed: An integer (optional). Seed used if shuffle == True. 

342 capacity: An integer. Sets the queue capacity. 

343 shared_name: (optional). If set, this queue will be shared under the given 

344 name across multiple sessions. 

345 name: A name for the operations (optional). 

346 

347 Returns: 

348 A list of tensors, one for each element of `tensor_list`. If the tensor 

349 in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output 

350 tensor will have shape `[a, b, ..., z]`. 

351 

352 Raises: 

353 ValueError: if `slice_input_producer` produces nothing from `tensor_list`. 

354 

355 @compatibility(eager) 

356 Input pipelines based on Queues are not supported when eager execution is 

357 enabled. Please use the `tf.data` API to ingest data under eager execution. 

358 @end_compatibility 

359 """ 

360 with ops.name_scope(name, "input_producer", tensor_list): 

361 tensor_list = indexed_slices.convert_n_to_tensor_or_indexed_slices( 

362 tensor_list) 

363 if not tensor_list: 

364 raise ValueError( 

365 "Expected at least one tensor in slice_input_producer().") 

366 range_size = array_ops.shape(tensor_list[0])[0] 

367 # TODO(josh11b): Add an assertion that the first dimension of 

368 # everything in TensorList matches. Maybe just check the inferred shapes? 

369 queue = range_input_producer(range_size, num_epochs=num_epochs, 

370 shuffle=shuffle, seed=seed, capacity=capacity, 

371 shared_name=shared_name) 

372 index = queue.dequeue() 

373 output = [array_ops.gather(t, index) for t in tensor_list] 

374 return output 

375 

376 

377# Helpers for the batching functions ------------------------------------------ 

378 

379 

380def _flatten(tensor_list_list): 

381 return [tensor for tensor_list in tensor_list_list for tensor in tensor_list] 

382 

383 

384class _SparseMetaData: 

385 """Store information about the Tensor: Is it sparse?, map_op, and rank.""" 

386 

387 def __init__(self, sparse, map_op, rank): 

388 """Create the metadata. 

389 

390 Args: 

391 sparse: Python boolean. 

392 map_op: The `Operation` that created the `SparseTensorsMap` in question. 

393 This Op contains information about the underlying Map object and the 

394 dtype of the original data. 

395 rank: The statically known rank of the `SparseTensor`. 

396 """ 

397 self._sparse = sparse 

398 self._map_op = map_op 

399 self._rank = tensor_shape.as_dimension(rank) 

400 

401 def __eq__(self, other): 

402 if self.sparse != other.sparse: 

403 return False 

404 if not self.sparse: 

405 return True 

406 # If map_ops are not the same, the data source is not the same. 

407 if (self.map_op is not None) != (other.map_op is not None): 

408 return False 

409 if self.map_op != other.map_op: 

410 return False 

411 if not self.rank.is_compatible_with(other.rank): 

412 return False 

413 return True 

414 

415 def __ne__(self, other): 

416 return not self.__eq__(other) 

417 

418 def __str__(self): 

419 return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name, 

420 self.rank) 

421 

422 def merge_with(self, other): 

423 if self != other: 

424 raise ValueError("SparseMetaData objects are incompatible: %s vs. %s" 

425 % (self, other)) 

426 if self.sparse: 

427 self.rank.merge_with(other.rank) 

428 return self 

429 

430 @property 

431 def map_op(self): 

432 return self._map_op 

433 

434 @property 

435 def sparse(self): 

436 return self._sparse 

437 

438 @property 

439 def rank(self): 

440 return self._rank 

441 

442 

443def _as_tensor_list(tensors): 

444 if isinstance(tensors, dict): 

445 return [tensors[k] for k in sorted(tensors, key=str)] 

446 else: 

447 return tensors 

448 

449 

450def _as_tensor_list_list(tensors_list): 

451 if not tensors_list: 

452 raise ValueError("Expected at least one set of tensors") 

453 if isinstance(tensors_list[0], dict): 

454 expected_keys = set(tensors_list[0].keys()) 

455 for tensors in tensors_list[1:]: 

456 if set(tensors.keys()) != expected_keys: 

457 raise ValueError("All dictionaries in tensors_list must have " 

458 "the same keys") 

459 return [_as_tensor_list(tensors) for tensors in tensors_list] 

460 else: 

461 return tensors_list 

462 

463 

464def _as_original_type(original_tensors, tensor_list): 

465 if isinstance(original_tensors, dict): 

466 if len(original_tensors) == 1: 

467 # tensor_list is bogusly returned as a single tensor if only one tensor 

468 # was enqueued. Make it a list again. See b/28117485. 

469 tensor_list = [tensor_list] 

470 return {k: tensor_list[i] 

471 for i, k in enumerate(sorted(original_tensors, key=str))} 

472 else: 

473 return tensor_list 

474 

475 

476def _store_sparse_tensors(tensor_list, enqueue_many, keep_input, 

477 shared_map_ops=None): 

478 """Store SparseTensors for feeding into batch, etc. 

479 

480 If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects 

481 are reused (shared). This argument is useful for, e.g., `batch_join` 

482 where multiple enqueue operations write to the same Queue component, 

483 and another (dequeue) thread reads from that same location and must then 

484 restore the associated `SparseTensor` objects. In this case, the sparse 

485 restore must have a single `SparseTensorMap` from which to read out the 

486 handles; so a single `SparseTensorMap` must be shared for storing 

487 across the multiple enqueue operations. This sharing is performed by 

488 calling `_store_sparse_tensors` the first time with `shared_map_ops=None`, 

489 and then in subsequent times with this value set to the list of `Operation` 

490 objects created in the first call. 

491 

492 Args: 

493 tensor_list: List of `Tensor` and `SparseTensor` objects. 

494 enqueue_many: Python `Boolean`. 

495 keep_input: Must be a scalar bool Tensor (not a Python bool). If False, 

496 don't store. 

497 shared_map_ops: (optional) List of `Operation` objects from a previous 

498 call to `_store_sparse_tensors`. If not `None`, the op types should be 

499 one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the 

500 locations corresponding to `SparseTensors` in `tensor_list`. 

501 

502 Returns: 

503 A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list 

504 of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list` 

505 is a list of the same length of `_SparseMetaData` objects. 

506 """ 

507 maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list) 

508 

509 def _sparse_meta_data(t, storing_op, map_op): 

510 if not isinstance(t, sparse_tensor.SparseTensor): 

511 return _SparseMetaData(False, None, None) 

512 rank = t.dense_shape.shape.with_rank(1).dims[0] 

513 if enqueue_many: 

514 rank -= 1 

515 # If a shared map_op was provided, use that. Otherwise use the name of 

516 # the operation used to store the SparseTensor. 

517 return _SparseMetaData( 

518 sparse=True, map_op=map_op or storing_op, rank=rank) 

519 

520 def _maybe_store(t, shared_map_op): 

521 """Store Sparse tensor, if necessary.""" 

522 if not isinstance(t, sparse_tensor.SparseTensor): 

523 return t 

524 map_op_name = shared_map_op.name if shared_map_op else None 

525 def _maybe_store_sparse(t, map_op_name, keep_input): 

526 """Conditionally store a single sparse Tensor.""" 

527 return utils.smart_cond( 

528 keep_input, 

529 lambda: _store_sparse(t, shared_name=map_op_name), 

530 lambda: constant_op.constant(-1, dtypes.int64)) 

531 def _maybe_store_many_sparse(t, map_op_name, keep_input): 

532 """Conditionally store multiple sparse Tensors.""" 

533 out_tensor = utils.smart_cond( 

534 keep_input, 

535 lambda: _store_many_sparse(t, shared_name=map_op_name), 

536 lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64)) 

537 out_tensor.set_shape([None]) # necessary when t.ndims is unknown 

538 return out_tensor 

539 def _sparse_values_to_keep(t, keep_input): 

540 """Convert a per-row `keep_input` vector to a per-value one.""" 

541 # Get the rows of every value in the sparse Tensor. 

542 row_values = t.indices[:, 0] 

543 # The value should be kept iff the row should be kept. 

544 return array_ops.gather(keep_input, row_values) 

545 if keep_input.shape.ndims == 1: 

546 t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input)) 

547 store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name) 

548 elif enqueue_many: 

549 store_f = _maybe_store_many_sparse 

550 else: 

551 store_f = _maybe_store_sparse 

552 return store_f(t, map_op_name, keep_input) 

553 

554 stored_list = [ 

555 _maybe_store(t, shared_map_op) for t, shared_map_op 

556 in zip(tensor_list, maybe_shared_map_ops)] 

557 # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`, 

558 # we can't just get the Op of the resulting tensor. 

559 def _sparse_op(stored): 

560 for input_tensor in stored.op.inputs: 

561 if input_tensor.op.type in ("AddSparseToTensorsMap", 

562 "AddManySparseToTensorsMap"): 

563 return input_tensor.op 

564 # If there was no sparse input, then the original stored Tensor wasn't 

565 # sparse and we can just return the original Tensor's Op. 

566 return stored.op 

567 sparse_info_list = [ 

568 _sparse_meta_data(t, _sparse_op(stored), shared_map_op) 

569 for t, stored, shared_map_op 

570 in zip(tensor_list, stored_list, maybe_shared_map_ops)] 

571 # Expand dims of stored tensors by 1 for proper enqueue shape 

572 stored_list = [ 

573 array_ops.expand_dims(s, [-1]) if s_info.sparse else s 

574 for s, s_info in zip(stored_list, sparse_info_list)] 

575 return stored_list, sparse_info_list 

576 

577 

578def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input): 

579 """Store SparseTensors for feeding into batch_join, etc.""" 

580 (s0, sparse_info_list) = _store_sparse_tensors( 

581 tensor_list_list[0], enqueue_many, keep_input) 

582 stored_list_list = [s0] 

583 for tensor_list in tensor_list_list[1:]: 

584 s, sparse_info_candidate = _store_sparse_tensors( 

585 tensor_list, enqueue_many, keep_input, 

586 [st.map_op for st in sparse_info_list]) 

587 if sparse_info_list != sparse_info_candidate: 

588 raise ValueError("Inconsistent SparseTensors list: %s vs. %s" 

589 % (tensor_list_list[0], tensor_list)) 

590 sparse_info_list = [ 

591 info.merge_with(candidate) 

592 for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)] 

593 stored_list_list.append(s) 

594 

595 return (stored_list_list, sparse_info_list) 

596 

597 

598def _restore_sparse_tensors(stored_list, sparse_info_list): 

599 """Restore SparseTensors after dequeue in batch, batch_join, etc.""" 

600 received_sequence = isinstance(stored_list, collections_abc.Sequence) 

601 if not received_sequence: 

602 stored_list = (stored_list,) 

603 tensors = [ 

604 _restore_sparse(sparse_map_op=info.map_op, 

605 sparse_handles=array_ops.squeeze(s, [1]), 

606 rank=tensor_shape.dimension_value(info.rank + 1)) 

607 if info.sparse else s 

608 for (s, info) in zip(stored_list, sparse_info_list)] 

609 has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors) 

610 if has_st: 

611 t_values = [ 

612 x.values if isinstance(x, sparse_tensor.SparseTensor) 

613 else x 

614 for x in tensors] 

615 with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x) 

616 ensure_restore_tensors = [ 

617 sparse_tensor.SparseTensor(indices=with_deps(x.indices), 

618 values=with_deps(x.values), 

619 dense_shape=with_deps(x.dense_shape)) 

620 if isinstance(x, sparse_tensor.SparseTensor) 

621 else with_deps(x) 

622 for x in tensors] 

623 else: 

624 ensure_restore_tensors = tensors 

625 return ensure_restore_tensors if received_sequence else tensors[0] 

626 

627 

628def _validate(tensor_list): 

629 tensor_list = indexed_slices.convert_n_to_tensor_or_indexed_slices( 

630 tensor_list) 

631 if not tensor_list: 

632 raise ValueError("Expected at least one tensor in batch().") 

633 return tensor_list 

634 

635 

636def _validate_join(tensor_list_list): 

637 tensor_list_list = [ 

638 indexed_slices.convert_n_to_tensor_or_indexed_slices(tl) 

639 for tl in tensor_list_list 

640 ] 

641 if not tensor_list_list: 

642 raise ValueError("Expected at least one input in batch_join().") 

643 return tensor_list_list 

644 

645 

646def _validate_keep_input(keep_input, enqueue_many): 

647 """Validate `keep_input` argument to conditional batching functions.""" 

648 keep_input = ops.convert_to_tensor(keep_input) 

649 if keep_input.shape.ndims is None: 

650 raise ValueError( 

651 "`keep_input` dimensions must be known at graph construction.") 

652 if not enqueue_many and keep_input.shape.ndims == 1: 

653 raise ValueError( 

654 "`keep_input` cannot be a vector when `enqueue_many=False`.") 

655 if keep_input.shape.ndims > 1: 

656 raise ValueError("`keep_input` must be 0 or 1 dimensions.") 

657 return keep_input 

658 

659 

660def _dtypes(tensor_list_list): 

661 all_types = [[t.dtype for t in tl] for tl in tensor_list_list] 

662 types = all_types[0] 

663 for other_types in all_types[1:]: 

664 if other_types != types: 

665 raise TypeError("Expected types to be consistent: %s vs. %s." % 

666 (", ".join(x.name for x in types), 

667 ", ".join(x.name for x in other_types))) 

668 return types 

669 

670 

671def _merge_shapes(shape_list, enqueue_many): 

672 shape_list = [tensor_shape.as_shape(s) for s in shape_list] 

673 if enqueue_many: 

674 # We want the shapes without the leading batch dimension. 

675 shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list] 

676 merged_shape = shape_list[0] 

677 for s in shape_list[1:]: 

678 merged_shape.merge_with(s) 

679 return merged_shape.as_list() 

680 

681 

682def _shapes(tensor_list_list, shapes, enqueue_many): 

683 """Calculate and merge the shapes of incoming tensors. 

684 

685 Args: 

686 tensor_list_list: List of tensor lists. 

687 shapes: List of shape tuples corresponding to tensors within the lists. 

688 enqueue_many: Boolean describing whether shapes will be enqueued as 

689 batches or individual entries. 

690 

691 Returns: 

692 A list of shapes aggregating shape inference info from `tensor_list_list`, 

693 or returning `shapes` if it is not `None`. 

694 

695 Raises: 

696 ValueError: If any of the inferred shapes in `tensor_list_list` lack a 

697 well defined rank. 

698 """ 

699 if shapes is None: 

700 len0 = len(tensor_list_list[0]) 

701 

702 for tl in tensor_list_list: 

703 for i in range(len0): 

704 if tl[i].shape.ndims is None: 

705 raise ValueError("Cannot infer Tensor's rank: %s" % tl[i]) 

706 

707 shapes = [ 

708 _merge_shapes([tl[i].shape.as_list() 

709 for tl in tensor_list_list], enqueue_many) 

710 for i in range(len0) 

711 ] 

712 return shapes 

713 

714 

715def _select_which_to_enqueue(tensor_list, keep_input): 

716 """Select which examples to enqueue based on vector `keep_input`.""" 

717 select_i = math_ops.cast(keep_input, dtypes.int32) 

718 tensor_list = [ 

719 data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1] 

720 for x in tensor_list] 

721 return tensor_list 

722 

723 

724def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input): 

725 """Enqueue `tensor_list_list` in `queue`.""" 

726 if enqueue_many: 

727 enqueue_fn = queue.enqueue_many 

728 else: 

729 enqueue_fn = queue.enqueue 

730 if keep_input.shape.ndims == 1: 

731 enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input)) 

732 for x in tensor_list_list] 

733 else: 

734 enqueue_ops = [utils.smart_cond( 

735 keep_input, 

736 lambda: enqueue_fn(tl), # pylint:disable=cell-var-from-loop 

737 control_flow_ops.no_op) for tl in tensor_list_list] 

738 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 

739 

740 

741def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input): 

742 """Enqueue `tensor_list` in `queue`.""" 

743 if enqueue_many: 

744 enqueue_fn = queue.enqueue_many 

745 else: 

746 enqueue_fn = queue.enqueue 

747 if keep_input.shape.ndims == 1: 

748 enqueue_ops = [ 

749 enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads 

750 else: 

751 enqueue_ops = [utils.smart_cond( 

752 keep_input, 

753 lambda: enqueue_fn(tensor_list), 

754 control_flow_ops.no_op)] * threads 

755 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 

756 

757 

758def _which_queue(dynamic_pad): 

759 return (data_flow_ops.PaddingFIFOQueue if dynamic_pad 

760 else data_flow_ops.FIFOQueue) 

761 

762 

763def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32, 

764 enqueue_many=False, shapes=None, dynamic_pad=False, 

765 allow_smaller_final_batch=False, shared_name=None, 

766 name=None): 

767 """Helper function for `batch` and `maybe_batch`.""" 

768 if context.executing_eagerly(): 

769 raise ValueError( 

770 "Input pipelines based on Queues are not supported when eager execution" 

771 " is enabled. Please use tf.data to ingest data into your model" 

772 " instead.") 

773 tensor_list = _as_tensor_list(tensors) 

774 with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name: 

775 tensor_list = _validate(tensor_list) 

776 keep_input = _validate_keep_input(keep_input, enqueue_many) 

777 (tensor_list, sparse_info) = _store_sparse_tensors( 

778 tensor_list, enqueue_many, keep_input) 

779 types = _dtypes([tensor_list]) 

780 shapes = _shapes([tensor_list], shapes, enqueue_many) 

781 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 

782 queue = _which_queue(dynamic_pad)( 

783 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 

784 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 

785 summary.scalar( 

786 "fraction_of_%d_full" % capacity, 

787 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 

788 

789 if allow_smaller_final_batch: 

790 dequeued = queue.dequeue_up_to(batch_size, name=name) 

791 else: 

792 dequeued = queue.dequeue_many(batch_size, name=name) 

793 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 

794 return _as_original_type(tensors, dequeued) 

795 

796 

797# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be 

798# a multiple of len(tensor_list_list)?) parameter, to address the use 

799# case where you want more parallelism than you can support different 

800# readers (either because you don't have that many files or can't 

801# read that many files in parallel due to the number of seeks required). 

802# Once this is done, batch() can be written as a call to batch_join(). 

803def _batch_join(tensors_list, batch_size, keep_input, capacity=32, 

804 enqueue_many=False, shapes=None, dynamic_pad=False, 

805 allow_smaller_final_batch=False, shared_name=None, name=None): 

806 """Helper function for `batch_join` and `maybe_batch_join`.""" 

807 if context.executing_eagerly(): 

808 raise ValueError( 

809 "Input pipelines based on Queues are not supported when eager execution" 

810 " is enabled. Please use tf.data to ingest data into your model" 

811 " instead.") 

812 tensor_list_list = _as_tensor_list_list(tensors_list) 

813 with ops.name_scope(name, "batch_join", 

814 _flatten(tensor_list_list) + [keep_input]) as name: 

815 tensor_list_list = _validate_join(tensor_list_list) 

816 keep_input = _validate_keep_input(keep_input, enqueue_many) 

817 tensor_list_list, sparse_info = _store_sparse_tensors_join( 

818 tensor_list_list, enqueue_many, keep_input) 

819 types = _dtypes(tensor_list_list) 

820 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 

821 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 

822 queue = _which_queue(dynamic_pad)( 

823 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 

824 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 

825 summary.scalar( 

826 "fraction_of_%d_full" % capacity, 

827 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 

828 

829 if allow_smaller_final_batch: 

830 dequeued = queue.dequeue_up_to(batch_size, name=name) 

831 else: 

832 dequeued = queue.dequeue_many(batch_size, name=name) 

833 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 

834 # tensors_list was validated to not be empty. 

835 return _as_original_type(tensors_list[0], dequeued) 

836 

837 

838def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 

839 keep_input, num_threads=1, seed=None, enqueue_many=False, 

840 shapes=None, allow_smaller_final_batch=False, 

841 shared_name=None, name=None): 

842 """Helper function for `shuffle_batch` and `maybe_shuffle_batch`.""" 

843 if context.executing_eagerly(): 

844 raise ValueError( 

845 "Input pipelines based on Queues are not supported when eager execution" 

846 " is enabled. Please use tf.data to ingest data into your model" 

847 " instead.") 

848 tensor_list = _as_tensor_list(tensors) 

849 with ops.name_scope(name, "shuffle_batch", 

850 list(tensor_list) + [keep_input]) as name: 

851 if capacity <= min_after_dequeue: 

852 raise ValueError("capacity %d must be bigger than min_after_dequeue %d." 

853 % (capacity, min_after_dequeue)) 

854 tensor_list = _validate(tensor_list) 

855 keep_input = _validate_keep_input(keep_input, enqueue_many) 

856 tensor_list, sparse_info = _store_sparse_tensors( 

857 tensor_list, enqueue_many, keep_input) 

858 types = _dtypes([tensor_list]) 

859 shapes = _shapes([tensor_list], shapes, enqueue_many) 

860 queue = data_flow_ops.RandomShuffleQueue( 

861 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 

862 dtypes=types, shapes=shapes, shared_name=shared_name) 

863 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 

864 full = (math_ops.cast( 

865 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 

866 (1. / (capacity - min_after_dequeue))) 

867 # Note that name contains a '/' at the end so we intentionally do not place 

868 # a '/' after %s below. 

869 summary_name = ( 

870 "fraction_over_%d_of_%d_full" % 

871 (min_after_dequeue, capacity - min_after_dequeue)) 

872 summary.scalar(summary_name, full) 

873 

874 if allow_smaller_final_batch: 

875 dequeued = queue.dequeue_up_to(batch_size, name=name) 

876 else: 

877 dequeued = queue.dequeue_many(batch_size, name=name) 

878 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 

879 return _as_original_type(tensors, dequeued) 

880 

881 

882def _shuffle_batch_join(tensors_list, batch_size, capacity, 

883 min_after_dequeue, keep_input, seed=None, 

884 enqueue_many=False, shapes=None, 

885 allow_smaller_final_batch=False, shared_name=None, 

886 name=None): 

887 """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`.""" 

888 if context.executing_eagerly(): 

889 raise ValueError( 

890 "Input pipelines based on Queues are not supported when eager execution" 

891 " is enabled. Please use tf.data to ingest data into your model" 

892 " instead.") 

893 tensor_list_list = _as_tensor_list_list(tensors_list) 

894 with ops.name_scope(name, "shuffle_batch_join", 

895 _flatten(tensor_list_list) + [keep_input]) as name: 

896 tensor_list_list = _validate_join(tensor_list_list) 

897 keep_input = _validate_keep_input(keep_input, enqueue_many) 

898 tensor_list_list, sparse_info = _store_sparse_tensors_join( 

899 tensor_list_list, enqueue_many, keep_input) 

900 types = _dtypes(tensor_list_list) 

901 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 

902 queue = data_flow_ops.RandomShuffleQueue( 

903 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 

904 dtypes=types, shapes=shapes, shared_name=shared_name) 

905 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 

906 full = (math_ops.cast( 

907 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 

908 (1. / (capacity - min_after_dequeue))) 

909 # Note that name contains a '/' at the end so we intentionally do not place 

910 # a '/' after %s below. 

911 summary_name = ( 

912 "fraction_over_%d_of_%d_full" % 

913 (min_after_dequeue, capacity - min_after_dequeue)) 

914 summary.scalar(summary_name, full) 

915 

916 if allow_smaller_final_batch: 

917 dequeued = queue.dequeue_up_to(batch_size, name=name) 

918 else: 

919 dequeued = queue.dequeue_many(batch_size, name=name) 

920 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 

921 # tensors_list was validated to not be empty. 

922 return _as_original_type(tensors_list[0], dequeued) 

923 

924# Batching functions ---------------------------------------------------------- 

925 

926 

927@tf_export(v1=["train.batch"]) 

928@deprecation.deprecated( 

929 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

930 "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if " 

931 "`dynamic_pad=True`).") 

932def batch(tensors, batch_size, num_threads=1, capacity=32, 

933 enqueue_many=False, shapes=None, dynamic_pad=False, 

934 allow_smaller_final_batch=False, shared_name=None, name=None): 

935 """Creates batches of tensors in `tensors`. 

936 

937 The argument `tensors` can be a list or a dictionary of tensors. 

938 The value returned by the function will be of the same type 

939 as `tensors`. 

940 

941 This function is implemented using a queue. A `QueueRunner` for the 

942 queue is added to the current `Graph`'s `QUEUE_RUNNER` collection. 

943 

944 If `enqueue_many` is `False`, `tensors` is assumed to represent a single 

945 example. An input tensor with shape `[x, y, z]` will be output as a tensor 

946 with shape `[batch_size, x, y, z]`. 

947 

948 If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of 

949 examples, where the first dimension is indexed by example, and all members of 

950 `tensors` should have the same size in the first dimension. If an input 

951 tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x, 

952 y, z]`. The `capacity` argument controls the how long the prefetching is 

953 allowed to grow the queues. 

954 

955 The returned operation is a dequeue operation and will throw 

956 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 

957 operation is feeding another input queue, its queue runner will catch 

958 this exception, however, if this operation is used in your main thread 

959 you are responsible for catching this yourself. 

960 

961 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 

962 (i) the `shapes` argument is passed, or (ii) all of the tensors in 

963 `tensors` must have fully-defined shapes. `ValueError` will be 

964 raised if neither of these conditions holds. 

965 

966 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 

967 tensors is known, but individual dimensions may have shape `None`. 

968 In this case, for each enqueue the dimensions with value `None` 

969 may have a variable length; upon dequeue, the output tensors will be padded 

970 on the right to the maximum shape of the tensors in the current minibatch. 

971 For numbers, this padding takes value 0. For strings, this padding is 

972 the empty string. See `PaddingFIFOQueue` for more info. 

973 

974 If `allow_smaller_final_batch` is `True`, a smaller batch value than 

975 `batch_size` is returned when the queue is closed and there are not enough 

976 elements to fill the batch, otherwise the pending elements are discarded. 

977 In addition, all output tensors' static shapes, as accessed via the 

978 `shape` property will have a first `Dimension` value of `None`, and 

979 operations that depend on fixed batch_size would fail. 

980 

981 Args: 

982 tensors: The list or dictionary of tensors to enqueue. 

983 batch_size: The new batch size pulled from the queue. 

984 num_threads: The number of threads enqueuing `tensors`. The batching will 

985 be nondeterministic if `num_threads > 1`. 

986 capacity: An integer. The maximum number of elements in the queue. 

987 enqueue_many: Whether each tensor in `tensors` is a single example. 

988 shapes: (Optional) The shapes for each example. Defaults to the 

989 inferred shapes for `tensors`. 

990 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 

991 The given dimensions are padded upon dequeue so that tensors within a 

992 batch have the same shapes. 

993 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

994 batch to be smaller if there are insufficient items left in the queue. 

995 shared_name: (Optional). If set, this queue will be shared under the given 

996 name across multiple sessions. 

997 name: (Optional) A name for the operations. 

998 

999 Returns: 

1000 A list or dictionary of tensors with the same types as `tensors` (except if 

1001 the input is a list of one element, then it returns a tensor, not a list). 

1002 

1003 Raises: 

1004 ValueError: If the `shapes` are not specified, and cannot be 

1005 inferred from the elements of `tensors`. 

1006 

1007 @compatibility(eager) 

1008 Input pipelines based on Queues are not supported when eager execution is 

1009 enabled. Please use the `tf.data` API to ingest data under eager execution. 

1010 @end_compatibility 

1011 """ 

1012 return _batch( 

1013 tensors, 

1014 batch_size, 

1015 keep_input=True, 

1016 num_threads=num_threads, 

1017 capacity=capacity, 

1018 enqueue_many=enqueue_many, 

1019 shapes=shapes, 

1020 dynamic_pad=dynamic_pad, 

1021 allow_smaller_final_batch=allow_smaller_final_batch, 

1022 shared_name=shared_name, 

1023 name=name) 

1024 

1025 

1026@tf_export(v1=["train.maybe_batch"]) 

1027@deprecation.deprecated( 

1028 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1029 "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`" 

1030 " if `dynamic_pad=True`).") 

1031def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32, 

1032 enqueue_many=False, shapes=None, dynamic_pad=False, 

1033 allow_smaller_final_batch=False, shared_name=None, name=None): 

1034 """Conditionally creates batches of tensors based on `keep_input`. 

1035 

1036 See docstring in `batch` for more details. 

1037 

1038 Args: 

1039 tensors: The list or dictionary of tensors to enqueue. 

1040 keep_input: A `bool` Tensor. This tensor controls whether the input is 

1041 added to the queue or not. If it is a scalar and evaluates `True`, then 

1042 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 

1043 is `True`, then each example is added to the queue only if the 

1044 corresponding value in `keep_input` is `True`. This tensor essentially 

1045 acts as a filtering mechanism. 

1046 batch_size: The new batch size pulled from the queue. 

1047 num_threads: The number of threads enqueuing `tensors`. The batching will 

1048 be nondeterministic if `num_threads > 1`. 

1049 capacity: An integer. The maximum number of elements in the queue. 

1050 enqueue_many: Whether each tensor in `tensors` is a single example. 

1051 shapes: (Optional) The shapes for each example. Defaults to the 

1052 inferred shapes for `tensors`. 

1053 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 

1054 The given dimensions are padded upon dequeue so that tensors within a 

1055 batch have the same shapes. 

1056 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1057 batch to be smaller if there are insufficient items left in the queue. 

1058 shared_name: (Optional). If set, this queue will be shared under the given 

1059 name across multiple sessions. 

1060 name: (Optional) A name for the operations. 

1061 

1062 Returns: 

1063 A list or dictionary of tensors with the same types as `tensors`. 

1064 

1065 Raises: 

1066 ValueError: If the `shapes` are not specified, and cannot be 

1067 inferred from the elements of `tensors`. 

1068 """ 

1069 return _batch( 

1070 tensors, 

1071 batch_size, 

1072 keep_input, 

1073 num_threads=num_threads, 

1074 capacity=capacity, 

1075 enqueue_many=enqueue_many, 

1076 shapes=shapes, 

1077 dynamic_pad=dynamic_pad, 

1078 allow_smaller_final_batch=allow_smaller_final_batch, 

1079 shared_name=shared_name, 

1080 name=name) 

1081 

1082 

1083@tf_export(v1=["train.batch_join"]) 

1084@deprecation.deprecated( 

1085 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1086 "`tf.data.Dataset.interleave(...).batch(batch_size)` (or " 

1087 "`padded_batch(...)` if `dynamic_pad=True`).") 

1088def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False, 

1089 shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, 

1090 shared_name=None, name=None): 

1091 """Runs a list of tensors to fill a queue to create batches of examples. 

1092 

1093 The `tensors_list` argument is a list of tuples of tensors, or a list of 

1094 dictionaries of tensors. Each element in the list is treated similarly 

1095 to the `tensors` argument of `tf.compat.v1.train.batch()`. 

1096 

1097 WARNING: This function is nondeterministic, since it starts a separate thread 

1098 for each tensor. 

1099 

1100 Enqueues a different list of tensors in different threads. 

1101 Implemented using a queue -- a `QueueRunner` for the queue 

1102 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 

1103 

1104 `len(tensors_list)` threads will be started, 

1105 with thread `i` enqueuing the tensors from 

1106 `tensors_list[i]`. `tensors_list[i1][j]` must match 

1107 `tensors_list[i2][j]` in type and shape, except in the first 

1108 dimension if `enqueue_many` is true. 

1109 

1110 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 

1111 to represent a single example. An input tensor `x` will be output as a 

1112 tensor with shape `[batch_size] + x.shape`. 

1113 

1114 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 

1115 represent a batch of examples, where the first dimension is indexed 

1116 by example, and all members of `tensors_list[i]` should have the 

1117 same size in the first dimension. The slices of any input tensor 

1118 `x` are treated as examples, and the output tensors will have shape 

1119 `[batch_size] + x.shape[1:]`. 

1120 

1121 The `capacity` argument controls the how long the prefetching is allowed to 

1122 grow the queues. 

1123 

1124 The returned operation is a dequeue operation and will throw 

1125 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 

1126 operation is feeding another input queue, its queue runner will catch 

1127 this exception, however, if this operation is used in your main thread 

1128 you are responsible for catching this yourself. 

1129 

1130 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 

1131 (i) the `shapes` argument is passed, or (ii) all of the tensors in 

1132 `tensors_list` must have fully-defined shapes. `ValueError` will be 

1133 raised if neither of these conditions holds. 

1134 

1135 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 

1136 tensors is known, but individual dimensions may have value `None`. 

1137 In this case, for each enqueue the dimensions with value `None` 

1138 may have a variable length; upon dequeue, the output tensors will be padded 

1139 on the right to the maximum shape of the tensors in the current minibatch. 

1140 For numbers, this padding takes value 0. For strings, this padding is 

1141 the empty string. See `PaddingFIFOQueue` for more info. 

1142 

1143 If `allow_smaller_final_batch` is `True`, a smaller batch value than 

1144 `batch_size` is returned when the queue is closed and there are not enough 

1145 elements to fill the batch, otherwise the pending elements are discarded. 

1146 In addition, all output tensors' static shapes, as accessed via the 

1147 `shape` property will have a first `Dimension` value of `None`, and 

1148 operations that depend on fixed batch_size would fail. 

1149 

1150 Args: 

1151 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 

1152 batch_size: An integer. The new batch size pulled from the queue. 

1153 capacity: An integer. The maximum number of elements in the queue. 

1154 enqueue_many: Whether each tensor in `tensor_list_list` is a single 

1155 example. 

1156 shapes: (Optional) The shapes for each example. Defaults to the 

1157 inferred shapes for `tensor_list_list[i]`. 

1158 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 

1159 The given dimensions are padded upon dequeue so that tensors within a 

1160 batch have the same shapes. 

1161 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1162 batch to be smaller if there are insufficient items left in the queue. 

1163 shared_name: (Optional) If set, this queue will be shared under the given 

1164 name across multiple sessions. 

1165 name: (Optional) A name for the operations. 

1166 

1167 Returns: 

1168 A list or dictionary of tensors with the same number and types as 

1169 `tensors_list[i]`. 

1170 

1171 Raises: 

1172 ValueError: If the `shapes` are not specified, and cannot be 

1173 inferred from the elements of `tensor_list_list`. 

1174 

1175 @compatibility(eager) 

1176 Input pipelines based on Queues are not supported when eager execution is 

1177 enabled. Please use the `tf.data` API to ingest data under eager execution. 

1178 @end_compatibility 

1179 """ 

1180 return _batch_join( 

1181 tensors_list, 

1182 batch_size, 

1183 keep_input=True, 

1184 capacity=capacity, 

1185 enqueue_many=enqueue_many, 

1186 shapes=shapes, 

1187 dynamic_pad=dynamic_pad, 

1188 allow_smaller_final_batch=allow_smaller_final_batch, 

1189 shared_name=shared_name, 

1190 name=name) 

1191 

1192 

1193@tf_export(v1=["train.maybe_batch_join"]) 

1194@deprecation.deprecated( 

1195 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1196 "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or " 

1197 "`padded_batch(...)` if `dynamic_pad=True`).") 

1198def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32, 

1199 enqueue_many=False, shapes=None, dynamic_pad=False, 

1200 allow_smaller_final_batch=False, shared_name=None, 

1201 name=None): 

1202 """Runs a list of tensors to conditionally fill a queue to create batches. 

1203 

1204 See docstring in `batch_join` for more details. 

1205 

1206 Args: 

1207 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 

1208 keep_input: A `bool` Tensor. This tensor controls whether the input is 

1209 added to the queue or not. If it is a scalar and evaluates `True`, then 

1210 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 

1211 is `True`, then each example is added to the queue only if the 

1212 corresponding value in `keep_input` is `True`. This tensor essentially 

1213 acts as a filtering mechanism. 

1214 batch_size: An integer. The new batch size pulled from the queue. 

1215 capacity: An integer. The maximum number of elements in the queue. 

1216 enqueue_many: Whether each tensor in `tensor_list_list` is a single 

1217 example. 

1218 shapes: (Optional) The shapes for each example. Defaults to the 

1219 inferred shapes for `tensor_list_list[i]`. 

1220 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 

1221 The given dimensions are padded upon dequeue so that tensors within a 

1222 batch have the same shapes. 

1223 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1224 batch to be smaller if there are insufficient items left in the queue. 

1225 shared_name: (Optional) If set, this queue will be shared under the given 

1226 name across multiple sessions. 

1227 name: (Optional) A name for the operations. 

1228 

1229 Returns: 

1230 A list or dictionary of tensors with the same number and types as 

1231 `tensors_list[i]`. 

1232 

1233 Raises: 

1234 ValueError: If the `shapes` are not specified, and cannot be 

1235 inferred from the elements of `tensor_list_list`. 

1236 """ 

1237 return _batch_join( 

1238 tensors_list, 

1239 batch_size, 

1240 keep_input, 

1241 capacity=capacity, 

1242 enqueue_many=enqueue_many, 

1243 shapes=shapes, 

1244 dynamic_pad=dynamic_pad, 

1245 allow_smaller_final_batch=allow_smaller_final_batch, 

1246 shared_name=shared_name, 

1247 name=name) 

1248 

1249 

1250@tf_export(v1=["train.shuffle_batch"]) 

1251@deprecation.deprecated( 

1252 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1253 "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.") 

1254def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 

1255 num_threads=1, seed=None, enqueue_many=False, shapes=None, 

1256 allow_smaller_final_batch=False, shared_name=None, name=None): 

1257 """Creates batches by randomly shuffling tensors. 

1258 

1259 This function adds the following to the current `Graph`: 

1260 

1261 * A shuffling queue into which tensors from `tensors` are enqueued. 

1262 * A `dequeue_many` operation to create batches from the queue. 

1263 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 

1264 from `tensors`. 

1265 

1266 If `enqueue_many` is `False`, `tensors` is assumed to represent a 

1267 single example. An input tensor with shape `[x, y, z]` will be output 

1268 as a tensor with shape `[batch_size, x, y, z]`. 

1269 

1270 If `enqueue_many` is `True`, `tensors` is assumed to represent a 

1271 batch of examples, where the first dimension is indexed by example, 

1272 and all members of `tensors` should have the same size in the 

1273 first dimension. If an input tensor has shape `[*, x, y, z]`, the 

1274 output will have shape `[batch_size, x, y, z]`. 

1275 

1276 The `capacity` argument controls the how long the prefetching is allowed to 

1277 grow the queues. 

1278 

1279 The returned operation is a dequeue operation and will throw 

1280 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 

1281 operation is feeding another input queue, its queue runner will catch 

1282 this exception, however, if this operation is used in your main thread 

1283 you are responsible for catching this yourself. 

1284 

1285 For example: 

1286 

1287 ```python 

1288 # Creates batches of 32 images and 32 labels. 

1289 image_batch, label_batch = tf.compat.v1.train.shuffle_batch( 

1290 [single_image, single_label], 

1291 batch_size=32, 

1292 num_threads=4, 

1293 capacity=50000, 

1294 min_after_dequeue=10000) 

1295 ``` 

1296 

1297 *N.B.:* You must ensure that either (i) the `shapes` argument is 

1298 passed, or (ii) all of the tensors in `tensors` must have 

1299 fully-defined shapes. `ValueError` will be raised if neither of 

1300 these conditions holds. 

1301 

1302 If `allow_smaller_final_batch` is `True`, a smaller batch value than 

1303 `batch_size` is returned when the queue is closed and there are not enough 

1304 elements to fill the batch, otherwise the pending elements are discarded. 

1305 In addition, all output tensors' static shapes, as accessed via the 

1306 `shape` property will have a first `Dimension` value of `None`, and 

1307 operations that depend on fixed batch_size would fail. 

1308 

1309 Args: 

1310 tensors: The list or dictionary of tensors to enqueue. 

1311 batch_size: The new batch size pulled from the queue. 

1312 capacity: An integer. The maximum number of elements in the queue. 

1313 min_after_dequeue: Minimum number elements in the queue after a 

1314 dequeue, used to ensure a level of mixing of elements. 

1315 num_threads: The number of threads enqueuing `tensor_list`. 

1316 seed: Seed for the random shuffling within the queue. 

1317 enqueue_many: Whether each tensor in `tensor_list` is a single example. 

1318 shapes: (Optional) The shapes for each example. Defaults to the 

1319 inferred shapes for `tensor_list`. 

1320 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1321 batch to be smaller if there are insufficient items left in the queue. 

1322 shared_name: (Optional) If set, this queue will be shared under the given 

1323 name across multiple sessions. 

1324 name: (Optional) A name for the operations. 

1325 

1326 Returns: 

1327 A list or dictionary of tensors with the types as `tensors`. 

1328 

1329 Raises: 

1330 ValueError: If the `shapes` are not specified, and cannot be 

1331 inferred from the elements of `tensors`. 

1332 

1333 @compatibility(eager) 

1334 Input pipelines based on Queues are not supported when eager execution is 

1335 enabled. Please use the `tf.data` API to ingest data under eager execution. 

1336 @end_compatibility 

1337 """ 

1338 return _shuffle_batch( 

1339 tensors, 

1340 batch_size, 

1341 capacity, 

1342 min_after_dequeue, 

1343 keep_input=True, 

1344 num_threads=num_threads, 

1345 seed=seed, 

1346 enqueue_many=enqueue_many, 

1347 shapes=shapes, 

1348 allow_smaller_final_batch=allow_smaller_final_batch, 

1349 shared_name=shared_name, 

1350 name=name) 

1351 

1352 

1353@tf_export(v1=["train.maybe_shuffle_batch"]) 

1354@deprecation.deprecated( 

1355 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1356 "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`" 

1357 ".") 

1358def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 

1359 keep_input, num_threads=1, seed=None, 

1360 enqueue_many=False, shapes=None, 

1361 allow_smaller_final_batch=False, shared_name=None, 

1362 name=None): 

1363 """Creates batches by randomly shuffling conditionally-enqueued tensors. 

1364 

1365 See docstring in `shuffle_batch` for more details. 

1366 

1367 Args: 

1368 tensors: The list or dictionary of tensors to enqueue. 

1369 batch_size: The new batch size pulled from the queue. 

1370 capacity: An integer. The maximum number of elements in the queue. 

1371 min_after_dequeue: Minimum number elements in the queue after a 

1372 dequeue, used to ensure a level of mixing of elements. 

1373 keep_input: A `bool` Tensor. This tensor controls whether the input is 

1374 added to the queue or not. If it is a scalar and evaluates `True`, then 

1375 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 

1376 is `True`, then each example is added to the queue only if the 

1377 corresponding value in `keep_input` is `True`. This tensor essentially 

1378 acts as a filtering mechanism. 

1379 num_threads: The number of threads enqueuing `tensor_list`. 

1380 seed: Seed for the random shuffling within the queue. 

1381 enqueue_many: Whether each tensor in `tensor_list` is a single example. 

1382 shapes: (Optional) The shapes for each example. Defaults to the 

1383 inferred shapes for `tensor_list`. 

1384 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1385 batch to be smaller if there are insufficient items left in the queue. 

1386 shared_name: (Optional) If set, this queue will be shared under the given 

1387 name across multiple sessions. 

1388 name: (Optional) A name for the operations. 

1389 

1390 Returns: 

1391 A list or dictionary of tensors with the types as `tensors`. 

1392 

1393 Raises: 

1394 ValueError: If the `shapes` are not specified, and cannot be 

1395 inferred from the elements of `tensors`. 

1396 

1397 @compatibility(eager) 

1398 Input pipelines based on Queues are not supported when eager execution is 

1399 enabled. Please use the `tf.data` API to ingest data under eager execution. 

1400 @end_compatibility 

1401 """ 

1402 return _shuffle_batch( 

1403 tensors, 

1404 batch_size, 

1405 capacity, 

1406 min_after_dequeue, 

1407 keep_input, 

1408 num_threads=num_threads, 

1409 seed=seed, 

1410 enqueue_many=enqueue_many, 

1411 shapes=shapes, 

1412 allow_smaller_final_batch=allow_smaller_final_batch, 

1413 shared_name=shared_name, 

1414 name=name) 

1415 

1416 

1417@tf_export(v1=["train.shuffle_batch_join"]) 

1418@deprecation.deprecated( 

1419 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1420 "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch" 

1421 "(batch_size)`.") 

1422def shuffle_batch_join(tensors_list, batch_size, capacity, 

1423 min_after_dequeue, seed=None, enqueue_many=False, 

1424 shapes=None, allow_smaller_final_batch=False, 

1425 shared_name=None, name=None): 

1426 """Create batches by randomly shuffling tensors. 

1427 

1428 The `tensors_list` argument is a list of tuples of tensors, or a list of 

1429 dictionaries of tensors. Each element in the list is treated similarly 

1430 to the `tensors` argument of `tf.compat.v1.train.shuffle_batch()`. 

1431 

1432 This version enqueues a different list of tensors in different threads. 

1433 It adds the following to the current `Graph`: 

1434 

1435 * A shuffling queue into which tensors from `tensors_list` are enqueued. 

1436 * A `dequeue_many` operation to create batches from the queue. 

1437 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 

1438 from `tensors_list`. 

1439 

1440 `len(tensors_list)` threads will be started, with thread `i` enqueuing 

1441 the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match 

1442 `tensors_list[i2][j]` in type and shape, except in the first dimension if 

1443 `enqueue_many` is true. 

1444 

1445 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 

1446 to represent a single example. An input tensor with shape `[x, y, z]` 

1447 will be output as a tensor with shape `[batch_size, x, y, z]`. 

1448 

1449 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 

1450 represent a batch of examples, where the first dimension is indexed 

1451 by example, and all members of `tensors_list[i]` should have the 

1452 same size in the first dimension. If an input tensor has shape `[*, x, 

1453 y, z]`, the output will have shape `[batch_size, x, y, z]`. 

1454 

1455 The `capacity` argument controls the how long the prefetching is allowed to 

1456 grow the queues. 

1457 

1458 The returned operation is a dequeue operation and will throw 

1459 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 

1460 operation is feeding another input queue, its queue runner will catch 

1461 this exception, however, if this operation is used in your main thread 

1462 you are responsible for catching this yourself. 

1463 

1464 If `allow_smaller_final_batch` is `True`, a smaller batch value than 

1465 `batch_size` is returned when the queue is closed and there are not enough 

1466 elements to fill the batch, otherwise the pending elements are discarded. 

1467 In addition, all output tensors' static shapes, as accessed via the 

1468 `shape` property will have a first `Dimension` value of `None`, and 

1469 operations that depend on fixed batch_size would fail. 

1470 

1471 Args: 

1472 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 

1473 batch_size: An integer. The new batch size pulled from the queue. 

1474 capacity: An integer. The maximum number of elements in the queue. 

1475 min_after_dequeue: Minimum number elements in the queue after a 

1476 dequeue, used to ensure a level of mixing of elements. 

1477 seed: Seed for the random shuffling within the queue. 

1478 enqueue_many: Whether each tensor in `tensor_list_list` is a single 

1479 example. 

1480 shapes: (Optional) The shapes for each example. Defaults to the 

1481 inferred shapes for `tensors_list[i]`. 

1482 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1483 batch to be smaller if there are insufficient items left in the queue. 

1484 shared_name: (optional). If set, this queue will be shared under the given 

1485 name across multiple sessions. 

1486 name: (Optional) A name for the operations. 

1487 

1488 Returns: 

1489 A list or dictionary of tensors with the same number and types as 

1490 `tensors_list[i]`. 

1491 

1492 Raises: 

1493 ValueError: If the `shapes` are not specified, and cannot be 

1494 inferred from the elements of `tensors_list`. 

1495 

1496 @compatibility(eager) 

1497 Input pipelines based on Queues are not supported when eager execution is 

1498 enabled. Please use the `tf.data` API to ingest data under eager execution. 

1499 @end_compatibility 

1500 """ 

1501 return _shuffle_batch_join( 

1502 tensors_list, 

1503 batch_size, 

1504 capacity, 

1505 min_after_dequeue, 

1506 keep_input=True, 

1507 seed=seed, 

1508 enqueue_many=enqueue_many, 

1509 shapes=shapes, 

1510 allow_smaller_final_batch=allow_smaller_final_batch, 

1511 shared_name=shared_name, 

1512 name=name) 

1513 

1514 

1515@tf_export(v1=["train.maybe_shuffle_batch_join"]) 

1516@deprecation.deprecated( 

1517 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 

1518 "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)" 

1519 ".batch(batch_size)`.") 

1520def maybe_shuffle_batch_join(tensors_list, batch_size, capacity, 

1521 min_after_dequeue, keep_input, seed=None, 

1522 enqueue_many=False, shapes=None, 

1523 allow_smaller_final_batch=False, shared_name=None, 

1524 name=None): 

1525 """Create batches by randomly shuffling conditionally-enqueued tensors. 

1526 

1527 See docstring in `shuffle_batch_join` for more details. 

1528 

1529 Args: 

1530 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 

1531 batch_size: An integer. The new batch size pulled from the queue. 

1532 capacity: An integer. The maximum number of elements in the queue. 

1533 min_after_dequeue: Minimum number elements in the queue after a 

1534 dequeue, used to ensure a level of mixing of elements. 

1535 keep_input: A `bool` Tensor. This tensor controls whether the input is 

1536 added to the queue or not. If it is a scalar and evaluates `True`, then 

1537 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 

1538 is `True`, then each example is added to the queue only if the 

1539 corresponding value in `keep_input` is `True`. This tensor essentially 

1540 acts as a filtering mechanism. 

1541 seed: Seed for the random shuffling within the queue. 

1542 enqueue_many: Whether each tensor in `tensor_list_list` is a single 

1543 example. 

1544 shapes: (Optional) The shapes for each example. Defaults to the 

1545 inferred shapes for `tensors_list[i]`. 

1546 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 

1547 batch to be smaller if there are insufficient items left in the queue. 

1548 shared_name: (optional). If set, this queue will be shared under the given 

1549 name across multiple sessions. 

1550 name: (Optional) A name for the operations. 

1551 

1552 Returns: 

1553 A list or dictionary of tensors with the same number and types as 

1554 `tensors_list[i]`. 

1555 

1556 Raises: 

1557 ValueError: If the `shapes` are not specified, and cannot be 

1558 inferred from the elements of `tensors_list`. 

1559 

1560 @compatibility(eager) 

1561 Input pipelines based on Queues are not supported when eager execution is 

1562 enabled. Please use the `tf.data` API to ingest data under eager execution. 

1563 @end_compatibility 

1564 """ 

1565 return _shuffle_batch_join( 

1566 tensors_list, 

1567 batch_size, 

1568 capacity, 

1569 min_after_dequeue, 

1570 keep_input, 

1571 seed=seed, 

1572 enqueue_many=enqueue_many, 

1573 shapes=shapes, 

1574 allow_smaller_final_batch=allow_smaller_final_batch, 

1575 shared_name=shared_name, 

1576 name=name)