Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/data/experimental/ops/grouping.py: 40%

94 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Grouping dataset transformations.""" 

16from tensorflow.python.data.ops import dataset_ops 

17from tensorflow.python.data.ops import structured_function 

18from tensorflow.python.data.util import nest 

19from tensorflow.python.data.util import structure 

20from tensorflow.python.framework import dtypes 

21from tensorflow.python.framework import ops 

22from tensorflow.python.framework import tensor_spec 

23from tensorflow.python.ops import gen_experimental_dataset_ops as ged_ops 

24from tensorflow.python.util import deprecation 

25from tensorflow.python.util.tf_export import tf_export 

26 

27 

28@tf_export("data.experimental.group_by_reducer") 

29def group_by_reducer(key_func, reducer): 

30 """A transformation that groups elements and performs a reduction. 

31 

32 This transformation maps element of a dataset to a key using `key_func` and 

33 groups the elements by key. The `reducer` is used to process each group; its 

34 `init_func` is used to initialize state for each group when it is created, the 

35 `reduce_func` is used to update the state every time an element is mapped to 

36 the matching group, and the `finalize_func` is used to map the final state to 

37 an output value. 

38 

39 Args: 

40 key_func: A function mapping a nested structure of tensors 

41 (having shapes and types defined by `self.output_shapes` and 

42 `self.output_types`) to a scalar `tf.int64` tensor. 

43 reducer: An instance of `Reducer`, which captures the reduction logic using 

44 the `init_func`, `reduce_func`, and `finalize_func` functions. 

45 

46 Returns: 

47 A `Dataset` transformation function, which can be passed to 

48 `tf.data.Dataset.apply`. 

49 """ 

50 

51 def _apply_fn(dataset): 

52 """Function from `Dataset` to `Dataset` that applies the transformation.""" 

53 return _GroupByReducerDataset(dataset, key_func, reducer) 

54 

55 return _apply_fn 

56 

57 

58@deprecation.deprecated(None, "Use `tf.data.Dataset.group_by_window(...)`.") 

59@tf_export("data.experimental.group_by_window") 

60def group_by_window(key_func, 

61 reduce_func, 

62 window_size=None, 

63 window_size_func=None): 

64 """A transformation that groups windows of elements by key and reduces them. 

65 

66 This transformation maps each consecutive element in a dataset to a key 

67 using `key_func` and groups the elements by key. It then applies 

68 `reduce_func` to at most `window_size_func(key)` elements matching the same 

69 key. All except the final window for each key will contain 

70 `window_size_func(key)` elements; the final window may be smaller. 

71 

72 You may provide either a constant `window_size` or a window size determined by 

73 the key through `window_size_func`. 

74 

75 Args: 

76 key_func: A function mapping a nested structure of tensors 

77 (having shapes and types defined by `self.output_shapes` and 

78 `self.output_types`) to a scalar `tf.int64` tensor. 

79 reduce_func: A function mapping a key and a dataset of up to `window_size` 

80 consecutive elements matching that key to another dataset. 

81 window_size: A `tf.int64` scalar `tf.Tensor`, representing the number of 

82 consecutive elements matching the same key to combine in a single 

83 batch, which will be passed to `reduce_func`. Mutually exclusive with 

84 `window_size_func`. 

85 window_size_func: A function mapping a key to a `tf.int64` scalar 

86 `tf.Tensor`, representing the number of consecutive elements matching 

87 the same key to combine in a single batch, which will be passed to 

88 `reduce_func`. Mutually exclusive with `window_size`. 

89 

90 Returns: 

91 A `Dataset` transformation function, which can be passed to 

92 `tf.data.Dataset.apply`. 

93 

94 Raises: 

95 ValueError: if neither or both of {`window_size`, `window_size_func`} are 

96 passed. 

97 """ 

98 

99 def _apply_fn(dataset): 

100 """Function from `Dataset` to `Dataset` that applies the transformation.""" 

101 return dataset.group_by_window( 

102 key_func=key_func, 

103 reduce_func=reduce_func, 

104 window_size=window_size, 

105 window_size_func=window_size_func) 

106 

107 return _apply_fn 

108 

109 

110@deprecation.deprecated(None, 

111 "Use `tf.data.Dataset.bucket_by_sequence_length(...)`.") 

112@tf_export("data.experimental.bucket_by_sequence_length") 

113def bucket_by_sequence_length(element_length_func, 

114 bucket_boundaries, 

115 bucket_batch_sizes, 

116 padded_shapes=None, 

117 padding_values=None, 

118 pad_to_bucket_boundary=False, 

119 no_padding=False, 

120 drop_remainder=False): 

121 """A transformation that buckets elements in a `Dataset` by length. 

122 

123 Elements of the `Dataset` are grouped together by length and then are padded 

124 and batched. 

125 

126 This is useful for sequence tasks in which the elements have variable length. 

127 Grouping together elements that have similar lengths reduces the total 

128 fraction of padding in a batch which increases training step efficiency. 

129 

130 Below is an example to bucketize the input data to the 3 buckets 

131 "[0, 3), [3, 5), [5, inf)" based on sequence length, with batch size 2. 

132 

133 >>> elements = [ 

134 ... [0], [1, 2, 3, 4], [5, 6, 7], 

135 ... [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]] 

136 

137 >>> dataset = tf.data.Dataset.from_generator( 

138 ... lambda: elements, tf.int64, output_shapes=[None]) 

139 

140 >>> dataset = dataset.apply( 

141 ... tf.data.experimental.bucket_by_sequence_length( 

142 ... element_length_func=lambda elem: tf.shape(elem)[0], 

143 ... bucket_boundaries=[3, 5], 

144 ... bucket_batch_sizes=[2, 2, 2])) 

145 

146 >>> for elem in dataset.as_numpy_iterator(): 

147 ... print(elem) 

148 [[1 2 3 4] 

149 [5 6 7 0]] 

150 [[ 7 8 9 10 11 0] 

151 [13 14 15 16 19 20]] 

152 [[ 0 0] 

153 [21 22]] 

154 

155 There is also a possibility to pad the dataset till the bucket boundary. 

156 You can also provide which value to be used while padding the data. 

157 Below example uses `-1` as padding and it also shows the input data 

158 being bucketizied to two buckets "[0,3], [4,6]". 

159 

160 >>> elements = [ 

161 ... [0], [1, 2, 3, 4], [5, 6, 7], 

162 ... [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]] 

163 

164 >>> dataset = tf.data.Dataset.from_generator( 

165 ... lambda: elements, tf.int32, output_shapes=[None]) 

166 

167 >>> dataset = dataset.apply( 

168 ... tf.data.experimental.bucket_by_sequence_length( 

169 ... element_length_func=lambda elem: tf.shape(elem)[0], 

170 ... bucket_boundaries=[4, 7], 

171 ... bucket_batch_sizes=[2, 2, 2], 

172 ... pad_to_bucket_boundary=True, 

173 ... padding_values=-1)) 

174 

175 >>> for elem in dataset.as_numpy_iterator(): 

176 ... print(elem) 

177 [[ 0 -1 -1] 

178 [ 5 6 7]] 

179 [[ 1 2 3 4 -1 -1] 

180 [ 7 8 9 10 11 -1]] 

181 [[21 22 -1]] 

182 [[13 14 15 16 19 20]] 

183 

184 When using `pad_to_bucket_boundary` option, it can be seen that it is 

185 not always possible to maintain the bucket batch size. 

186 You can drop the batches that do not maintain the bucket batch size by 

187 using the option `drop_remainder`. Using the same input data as in the 

188 above example you get the following result. 

189 

190 >>> elements = [ 

191 ... [0], [1, 2, 3, 4], [5, 6, 7], 

192 ... [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]] 

193 

194 >>> dataset = tf.data.Dataset.from_generator( 

195 ... lambda: elements, tf.int32, output_shapes=[None]) 

196 

197 >>> dataset = dataset.apply( 

198 ... tf.data.experimental.bucket_by_sequence_length( 

199 ... element_length_func=lambda elem: tf.shape(elem)[0], 

200 ... bucket_boundaries=[4, 7], 

201 ... bucket_batch_sizes=[2, 2, 2], 

202 ... pad_to_bucket_boundary=True, 

203 ... padding_values=-1, 

204 ... drop_remainder=True)) 

205 

206 >>> for elem in dataset.as_numpy_iterator(): 

207 ... print(elem) 

208 [[ 0 -1 -1] 

209 [ 5 6 7]] 

210 [[ 1 2 3 4 -1 -1] 

211 [ 7 8 9 10 11 -1]] 

212 

213 Args: 

214 element_length_func: function from element in `Dataset` to `tf.int32`, 

215 determines the length of the element, which will determine the bucket it 

216 goes into. 

217 bucket_boundaries: `list<int>`, upper length boundaries of the buckets. 

218 bucket_batch_sizes: `list<int>`, batch size per bucket. Length should be 

219 `len(bucket_boundaries) + 1`. 

220 padded_shapes: Nested structure of `tf.TensorShape` to pass to 

221 `tf.data.Dataset.padded_batch`. If not provided, will use 

222 `dataset.output_shapes`, which will result in variable length dimensions 

223 being padded out to the maximum length in each batch. 

224 padding_values: Values to pad with, passed to 

225 `tf.data.Dataset.padded_batch`. Defaults to padding with 0. 

226 pad_to_bucket_boundary: bool, if `False`, will pad dimensions with unknown 

227 size to maximum length in batch. If `True`, will pad dimensions with 

228 unknown size to bucket boundary minus 1 (i.e., the maximum length in each 

229 bucket), and caller must ensure that the source `Dataset` does not contain 

230 any elements with length longer than `max(bucket_boundaries)`. 

231 no_padding: `bool`, indicates whether to pad the batch features (features 

232 need to be either of type `tf.sparse.SparseTensor` or of same shape). 

233 drop_remainder: (Optional.) A `tf.bool` scalar `tf.Tensor`, representing 

234 whether the last batch should be dropped in the case it has fewer than 

235 `batch_size` elements; the default behavior is not to drop the smaller 

236 batch. 

237 

238 Returns: 

239 A `Dataset` transformation function, which can be passed to 

240 `tf.data.Dataset.apply`. 

241 

242 Raises: 

243 ValueError: if `len(bucket_batch_sizes) != len(bucket_boundaries) + 1`. 

244 """ 

245 

246 def _apply_fn(dataset): 

247 return dataset.bucket_by_sequence_length( 

248 element_length_func=element_length_func, 

249 bucket_boundaries=bucket_boundaries, 

250 bucket_batch_sizes=bucket_batch_sizes, 

251 padded_shapes=padded_shapes, 

252 padding_values=padding_values, 

253 pad_to_bucket_boundary=pad_to_bucket_boundary, 

254 no_padding=no_padding, 

255 drop_remainder=drop_remainder) 

256 

257 return _apply_fn 

258 

259 

260class _GroupByReducerDataset(dataset_ops.UnaryDataset): 

261 """A `Dataset` that groups its input and performs a reduction.""" 

262 

263 def __init__(self, input_dataset, key_func, reducer): 

264 """See `group_by_reducer()` for details.""" 

265 self._input_dataset = input_dataset 

266 self._make_key_func(key_func, input_dataset) 

267 self._make_init_func(reducer.init_func) 

268 self._make_reduce_func(reducer.reduce_func, input_dataset) 

269 self._make_finalize_func(reducer.finalize_func) 

270 variant_tensor = ged_ops.experimental_group_by_reducer_dataset( 

271 self._input_dataset._variant_tensor, # pylint: disable=protected-access 

272 self._key_func.function.captured_inputs, 

273 self._init_func.function.captured_inputs, 

274 self._reduce_func.function.captured_inputs, 

275 self._finalize_func.function.captured_inputs, 

276 key_func=self._key_func.function, 

277 init_func=self._init_func.function, 

278 reduce_func=self._reduce_func.function, 

279 finalize_func=self._finalize_func.function, 

280 **self._flat_structure) 

281 super(_GroupByReducerDataset, self).__init__(input_dataset, variant_tensor) 

282 

283 def _make_key_func(self, key_func, input_dataset): 

284 """Make wrapping defun for key_func.""" 

285 self._key_func = structured_function.StructuredFunctionWrapper( 

286 key_func, self._transformation_name(), dataset=input_dataset) 

287 if not self._key_func.output_structure.is_compatible_with( 

288 tensor_spec.TensorSpec([], dtypes.int64)): 

289 raise ValueError( 

290 f"Invalid `key_func`. Expected `key_func` to return a scalar " 

291 f"tf.int64 tensor, but instead `key_func` has output " 

292 f"types={self._key_func.output_types} " 

293 f"and shapes={self._key_func.output_shapes}." 

294 ) 

295 

296 def _make_init_func(self, init_func): 

297 """Make wrapping defun for init_func.""" 

298 self._init_func = structured_function.StructuredFunctionWrapper( 

299 init_func, 

300 self._transformation_name(), 

301 input_structure=tensor_spec.TensorSpec([], dtypes.int64)) 

302 

303 def _make_reduce_func(self, reduce_func, input_dataset): 

304 """Make wrapping defun for reduce_func.""" 

305 

306 # Iteratively rerun the reduce function until reaching a fixed point on 

307 # `self._state_structure`. 

308 self._state_structure = self._init_func.output_structure 

309 state_types = self._init_func.output_types 

310 state_shapes = self._init_func.output_shapes 

311 state_classes = self._init_func.output_classes 

312 need_to_rerun = True 

313 while need_to_rerun: 

314 

315 wrapped_func = structured_function.StructuredFunctionWrapper( 

316 reduce_func, 

317 self._transformation_name(), 

318 input_structure=(self._state_structure, input_dataset.element_spec), 

319 add_to_graph=False) 

320 

321 # Extract and validate class information from the returned values. 

322 for new_state_class, state_class in zip( 

323 nest.flatten(wrapped_func.output_classes), 

324 nest.flatten(state_classes)): 

325 if not issubclass(new_state_class, state_class): 

326 raise TypeError( 

327 f"Invalid `reducer`. The output class of the " 

328 f"`reducer.reduce_func` {wrapped_func.output_classes}, " 

329 f"does not match the class of the reduce state " 

330 f"{self._state_classes}.") 

331 

332 # Extract and validate type information from the returned values. 

333 for new_state_type, state_type in zip( 

334 nest.flatten(wrapped_func.output_types), nest.flatten(state_types)): 

335 if new_state_type != state_type: 

336 raise TypeError( 

337 f"Invalid `reducer`. The element types for the new state " 

338 f"{wrapped_func.output_types} do not match the element types " 

339 f"of the old state {self._init_func.output_types}." 

340 ) 

341 

342 # Extract shape information from the returned values. 

343 flat_state_shapes = nest.flatten(state_shapes) 

344 flat_new_state_shapes = nest.flatten(wrapped_func.output_shapes) 

345 weakened_state_shapes = [ 

346 original.most_specific_compatible_shape(new) 

347 for original, new in zip(flat_state_shapes, flat_new_state_shapes) 

348 ] 

349 

350 need_to_rerun = False 

351 for original_shape, weakened_shape in zip(flat_state_shapes, 

352 weakened_state_shapes): 

353 if original_shape.ndims is not None and ( 

354 weakened_shape.ndims is None or 

355 original_shape.as_list() != weakened_shape.as_list()): 

356 need_to_rerun = True 

357 break 

358 

359 if need_to_rerun: 

360 state_shapes = nest.pack_sequence_as( 

361 self._init_func.output_shapes, weakened_state_shapes) 

362 self._state_structure = structure.convert_legacy_structure( 

363 state_types, state_shapes, state_classes) 

364 

365 self._reduce_func = wrapped_func 

366 self._reduce_func.function.add_to_graph(ops.get_default_graph()) 

367 

368 def _make_finalize_func(self, finalize_func): 

369 """Make wrapping defun for finalize_func.""" 

370 self._finalize_func = structured_function.StructuredFunctionWrapper( 

371 finalize_func, 

372 self._transformation_name(), 

373 input_structure=self._state_structure) 

374 

375 @property 

376 def element_spec(self): 

377 return self._finalize_func.output_structure 

378 

379 def _functions(self): 

380 return [ 

381 self._key_func, self._init_func, self._reduce_func, self._finalize_func 

382 ] 

383 

384 def _transformation_name(self): 

385 return "tf.data.experimental.group_by_reducer()" 

386 

387 

388@tf_export("data.experimental.Reducer") 

389class Reducer: 

390 """A reducer is used for reducing a set of elements. 

391 

392 A reducer is represented as a tuple of the three functions: 

393 - init_func - to define initial value: key => initial state 

394 - reducer_func - operation to perform on values with same key: (old state, input) => new state 

395 - finalize_func - value to return in the end: state => result 

396  

397 For example, 

398  

399 ``` 

400 def init_func(_): 

401 return (0.0, 0.0) 

402 

403 def reduce_func(state, value): 

404 return (state[0] + value['features'], state[1] + 1) 

405 

406 def finalize_func(s, n): 

407 return s / n 

408 

409 reducer = tf.data.experimental.Reducer(init_func, reduce_func, finalize_func) 

410 ``` 

411 """ 

412 

413 def __init__(self, init_func, reduce_func, finalize_func): 

414 self._init_func = init_func 

415 self._reduce_func = reduce_func 

416 self._finalize_func = finalize_func 

417 

418 @property 

419 def init_func(self): 

420 return self._init_func 

421 

422 @property 

423 def reduce_func(self): 

424 return self._reduce_func 

425 

426 @property 

427 def finalize_func(self): 

428 return self._finalize_func