Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/data/experimental/ops/grouping.py: 40%
94 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Grouping dataset transformations."""
16from tensorflow.python.data.ops import dataset_ops
17from tensorflow.python.data.ops import structured_function
18from tensorflow.python.data.util import nest
19from tensorflow.python.data.util import structure
20from tensorflow.python.framework import dtypes
21from tensorflow.python.framework import ops
22from tensorflow.python.framework import tensor_spec
23from tensorflow.python.ops import gen_experimental_dataset_ops as ged_ops
24from tensorflow.python.util import deprecation
25from tensorflow.python.util.tf_export import tf_export
28@tf_export("data.experimental.group_by_reducer")
29def group_by_reducer(key_func, reducer):
30 """A transformation that groups elements and performs a reduction.
32 This transformation maps element of a dataset to a key using `key_func` and
33 groups the elements by key. The `reducer` is used to process each group; its
34 `init_func` is used to initialize state for each group when it is created, the
35 `reduce_func` is used to update the state every time an element is mapped to
36 the matching group, and the `finalize_func` is used to map the final state to
37 an output value.
39 Args:
40 key_func: A function mapping a nested structure of tensors
41 (having shapes and types defined by `self.output_shapes` and
42 `self.output_types`) to a scalar `tf.int64` tensor.
43 reducer: An instance of `Reducer`, which captures the reduction logic using
44 the `init_func`, `reduce_func`, and `finalize_func` functions.
46 Returns:
47 A `Dataset` transformation function, which can be passed to
48 `tf.data.Dataset.apply`.
49 """
51 def _apply_fn(dataset):
52 """Function from `Dataset` to `Dataset` that applies the transformation."""
53 return _GroupByReducerDataset(dataset, key_func, reducer)
55 return _apply_fn
58@deprecation.deprecated(None, "Use `tf.data.Dataset.group_by_window(...)`.")
59@tf_export("data.experimental.group_by_window")
60def group_by_window(key_func,
61 reduce_func,
62 window_size=None,
63 window_size_func=None):
64 """A transformation that groups windows of elements by key and reduces them.
66 This transformation maps each consecutive element in a dataset to a key
67 using `key_func` and groups the elements by key. It then applies
68 `reduce_func` to at most `window_size_func(key)` elements matching the same
69 key. All except the final window for each key will contain
70 `window_size_func(key)` elements; the final window may be smaller.
72 You may provide either a constant `window_size` or a window size determined by
73 the key through `window_size_func`.
75 Args:
76 key_func: A function mapping a nested structure of tensors
77 (having shapes and types defined by `self.output_shapes` and
78 `self.output_types`) to a scalar `tf.int64` tensor.
79 reduce_func: A function mapping a key and a dataset of up to `window_size`
80 consecutive elements matching that key to another dataset.
81 window_size: A `tf.int64` scalar `tf.Tensor`, representing the number of
82 consecutive elements matching the same key to combine in a single
83 batch, which will be passed to `reduce_func`. Mutually exclusive with
84 `window_size_func`.
85 window_size_func: A function mapping a key to a `tf.int64` scalar
86 `tf.Tensor`, representing the number of consecutive elements matching
87 the same key to combine in a single batch, which will be passed to
88 `reduce_func`. Mutually exclusive with `window_size`.
90 Returns:
91 A `Dataset` transformation function, which can be passed to
92 `tf.data.Dataset.apply`.
94 Raises:
95 ValueError: if neither or both of {`window_size`, `window_size_func`} are
96 passed.
97 """
99 def _apply_fn(dataset):
100 """Function from `Dataset` to `Dataset` that applies the transformation."""
101 return dataset.group_by_window(
102 key_func=key_func,
103 reduce_func=reduce_func,
104 window_size=window_size,
105 window_size_func=window_size_func)
107 return _apply_fn
110@deprecation.deprecated(None,
111 "Use `tf.data.Dataset.bucket_by_sequence_length(...)`.")
112@tf_export("data.experimental.bucket_by_sequence_length")
113def bucket_by_sequence_length(element_length_func,
114 bucket_boundaries,
115 bucket_batch_sizes,
116 padded_shapes=None,
117 padding_values=None,
118 pad_to_bucket_boundary=False,
119 no_padding=False,
120 drop_remainder=False):
121 """A transformation that buckets elements in a `Dataset` by length.
123 Elements of the `Dataset` are grouped together by length and then are padded
124 and batched.
126 This is useful for sequence tasks in which the elements have variable length.
127 Grouping together elements that have similar lengths reduces the total
128 fraction of padding in a batch which increases training step efficiency.
130 Below is an example to bucketize the input data to the 3 buckets
131 "[0, 3), [3, 5), [5, inf)" based on sequence length, with batch size 2.
133 >>> elements = [
134 ... [0], [1, 2, 3, 4], [5, 6, 7],
135 ... [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]]
137 >>> dataset = tf.data.Dataset.from_generator(
138 ... lambda: elements, tf.int64, output_shapes=[None])
140 >>> dataset = dataset.apply(
141 ... tf.data.experimental.bucket_by_sequence_length(
142 ... element_length_func=lambda elem: tf.shape(elem)[0],
143 ... bucket_boundaries=[3, 5],
144 ... bucket_batch_sizes=[2, 2, 2]))
146 >>> for elem in dataset.as_numpy_iterator():
147 ... print(elem)
148 [[1 2 3 4]
149 [5 6 7 0]]
150 [[ 7 8 9 10 11 0]
151 [13 14 15 16 19 20]]
152 [[ 0 0]
153 [21 22]]
155 There is also a possibility to pad the dataset till the bucket boundary.
156 You can also provide which value to be used while padding the data.
157 Below example uses `-1` as padding and it also shows the input data
158 being bucketizied to two buckets "[0,3], [4,6]".
160 >>> elements = [
161 ... [0], [1, 2, 3, 4], [5, 6, 7],
162 ... [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]]
164 >>> dataset = tf.data.Dataset.from_generator(
165 ... lambda: elements, tf.int32, output_shapes=[None])
167 >>> dataset = dataset.apply(
168 ... tf.data.experimental.bucket_by_sequence_length(
169 ... element_length_func=lambda elem: tf.shape(elem)[0],
170 ... bucket_boundaries=[4, 7],
171 ... bucket_batch_sizes=[2, 2, 2],
172 ... pad_to_bucket_boundary=True,
173 ... padding_values=-1))
175 >>> for elem in dataset.as_numpy_iterator():
176 ... print(elem)
177 [[ 0 -1 -1]
178 [ 5 6 7]]
179 [[ 1 2 3 4 -1 -1]
180 [ 7 8 9 10 11 -1]]
181 [[21 22 -1]]
182 [[13 14 15 16 19 20]]
184 When using `pad_to_bucket_boundary` option, it can be seen that it is
185 not always possible to maintain the bucket batch size.
186 You can drop the batches that do not maintain the bucket batch size by
187 using the option `drop_remainder`. Using the same input data as in the
188 above example you get the following result.
190 >>> elements = [
191 ... [0], [1, 2, 3, 4], [5, 6, 7],
192 ... [7, 8, 9, 10, 11], [13, 14, 15, 16, 19, 20], [21, 22]]
194 >>> dataset = tf.data.Dataset.from_generator(
195 ... lambda: elements, tf.int32, output_shapes=[None])
197 >>> dataset = dataset.apply(
198 ... tf.data.experimental.bucket_by_sequence_length(
199 ... element_length_func=lambda elem: tf.shape(elem)[0],
200 ... bucket_boundaries=[4, 7],
201 ... bucket_batch_sizes=[2, 2, 2],
202 ... pad_to_bucket_boundary=True,
203 ... padding_values=-1,
204 ... drop_remainder=True))
206 >>> for elem in dataset.as_numpy_iterator():
207 ... print(elem)
208 [[ 0 -1 -1]
209 [ 5 6 7]]
210 [[ 1 2 3 4 -1 -1]
211 [ 7 8 9 10 11 -1]]
213 Args:
214 element_length_func: function from element in `Dataset` to `tf.int32`,
215 determines the length of the element, which will determine the bucket it
216 goes into.
217 bucket_boundaries: `list<int>`, upper length boundaries of the buckets.
218 bucket_batch_sizes: `list<int>`, batch size per bucket. Length should be
219 `len(bucket_boundaries) + 1`.
220 padded_shapes: Nested structure of `tf.TensorShape` to pass to
221 `tf.data.Dataset.padded_batch`. If not provided, will use
222 `dataset.output_shapes`, which will result in variable length dimensions
223 being padded out to the maximum length in each batch.
224 padding_values: Values to pad with, passed to
225 `tf.data.Dataset.padded_batch`. Defaults to padding with 0.
226 pad_to_bucket_boundary: bool, if `False`, will pad dimensions with unknown
227 size to maximum length in batch. If `True`, will pad dimensions with
228 unknown size to bucket boundary minus 1 (i.e., the maximum length in each
229 bucket), and caller must ensure that the source `Dataset` does not contain
230 any elements with length longer than `max(bucket_boundaries)`.
231 no_padding: `bool`, indicates whether to pad the batch features (features
232 need to be either of type `tf.sparse.SparseTensor` or of same shape).
233 drop_remainder: (Optional.) A `tf.bool` scalar `tf.Tensor`, representing
234 whether the last batch should be dropped in the case it has fewer than
235 `batch_size` elements; the default behavior is not to drop the smaller
236 batch.
238 Returns:
239 A `Dataset` transformation function, which can be passed to
240 `tf.data.Dataset.apply`.
242 Raises:
243 ValueError: if `len(bucket_batch_sizes) != len(bucket_boundaries) + 1`.
244 """
246 def _apply_fn(dataset):
247 return dataset.bucket_by_sequence_length(
248 element_length_func=element_length_func,
249 bucket_boundaries=bucket_boundaries,
250 bucket_batch_sizes=bucket_batch_sizes,
251 padded_shapes=padded_shapes,
252 padding_values=padding_values,
253 pad_to_bucket_boundary=pad_to_bucket_boundary,
254 no_padding=no_padding,
255 drop_remainder=drop_remainder)
257 return _apply_fn
260class _GroupByReducerDataset(dataset_ops.UnaryDataset):
261 """A `Dataset` that groups its input and performs a reduction."""
263 def __init__(self, input_dataset, key_func, reducer):
264 """See `group_by_reducer()` for details."""
265 self._input_dataset = input_dataset
266 self._make_key_func(key_func, input_dataset)
267 self._make_init_func(reducer.init_func)
268 self._make_reduce_func(reducer.reduce_func, input_dataset)
269 self._make_finalize_func(reducer.finalize_func)
270 variant_tensor = ged_ops.experimental_group_by_reducer_dataset(
271 self._input_dataset._variant_tensor, # pylint: disable=protected-access
272 self._key_func.function.captured_inputs,
273 self._init_func.function.captured_inputs,
274 self._reduce_func.function.captured_inputs,
275 self._finalize_func.function.captured_inputs,
276 key_func=self._key_func.function,
277 init_func=self._init_func.function,
278 reduce_func=self._reduce_func.function,
279 finalize_func=self._finalize_func.function,
280 **self._flat_structure)
281 super(_GroupByReducerDataset, self).__init__(input_dataset, variant_tensor)
283 def _make_key_func(self, key_func, input_dataset):
284 """Make wrapping defun for key_func."""
285 self._key_func = structured_function.StructuredFunctionWrapper(
286 key_func, self._transformation_name(), dataset=input_dataset)
287 if not self._key_func.output_structure.is_compatible_with(
288 tensor_spec.TensorSpec([], dtypes.int64)):
289 raise ValueError(
290 f"Invalid `key_func`. Expected `key_func` to return a scalar "
291 f"tf.int64 tensor, but instead `key_func` has output "
292 f"types={self._key_func.output_types} "
293 f"and shapes={self._key_func.output_shapes}."
294 )
296 def _make_init_func(self, init_func):
297 """Make wrapping defun for init_func."""
298 self._init_func = structured_function.StructuredFunctionWrapper(
299 init_func,
300 self._transformation_name(),
301 input_structure=tensor_spec.TensorSpec([], dtypes.int64))
303 def _make_reduce_func(self, reduce_func, input_dataset):
304 """Make wrapping defun for reduce_func."""
306 # Iteratively rerun the reduce function until reaching a fixed point on
307 # `self._state_structure`.
308 self._state_structure = self._init_func.output_structure
309 state_types = self._init_func.output_types
310 state_shapes = self._init_func.output_shapes
311 state_classes = self._init_func.output_classes
312 need_to_rerun = True
313 while need_to_rerun:
315 wrapped_func = structured_function.StructuredFunctionWrapper(
316 reduce_func,
317 self._transformation_name(),
318 input_structure=(self._state_structure, input_dataset.element_spec),
319 add_to_graph=False)
321 # Extract and validate class information from the returned values.
322 for new_state_class, state_class in zip(
323 nest.flatten(wrapped_func.output_classes),
324 nest.flatten(state_classes)):
325 if not issubclass(new_state_class, state_class):
326 raise TypeError(
327 f"Invalid `reducer`. The output class of the "
328 f"`reducer.reduce_func` {wrapped_func.output_classes}, "
329 f"does not match the class of the reduce state "
330 f"{self._state_classes}.")
332 # Extract and validate type information from the returned values.
333 for new_state_type, state_type in zip(
334 nest.flatten(wrapped_func.output_types), nest.flatten(state_types)):
335 if new_state_type != state_type:
336 raise TypeError(
337 f"Invalid `reducer`. The element types for the new state "
338 f"{wrapped_func.output_types} do not match the element types "
339 f"of the old state {self._init_func.output_types}."
340 )
342 # Extract shape information from the returned values.
343 flat_state_shapes = nest.flatten(state_shapes)
344 flat_new_state_shapes = nest.flatten(wrapped_func.output_shapes)
345 weakened_state_shapes = [
346 original.most_specific_compatible_shape(new)
347 for original, new in zip(flat_state_shapes, flat_new_state_shapes)
348 ]
350 need_to_rerun = False
351 for original_shape, weakened_shape in zip(flat_state_shapes,
352 weakened_state_shapes):
353 if original_shape.ndims is not None and (
354 weakened_shape.ndims is None or
355 original_shape.as_list() != weakened_shape.as_list()):
356 need_to_rerun = True
357 break
359 if need_to_rerun:
360 state_shapes = nest.pack_sequence_as(
361 self._init_func.output_shapes, weakened_state_shapes)
362 self._state_structure = structure.convert_legacy_structure(
363 state_types, state_shapes, state_classes)
365 self._reduce_func = wrapped_func
366 self._reduce_func.function.add_to_graph(ops.get_default_graph())
368 def _make_finalize_func(self, finalize_func):
369 """Make wrapping defun for finalize_func."""
370 self._finalize_func = structured_function.StructuredFunctionWrapper(
371 finalize_func,
372 self._transformation_name(),
373 input_structure=self._state_structure)
375 @property
376 def element_spec(self):
377 return self._finalize_func.output_structure
379 def _functions(self):
380 return [
381 self._key_func, self._init_func, self._reduce_func, self._finalize_func
382 ]
384 def _transformation_name(self):
385 return "tf.data.experimental.group_by_reducer()"
388@tf_export("data.experimental.Reducer")
389class Reducer:
390 """A reducer is used for reducing a set of elements.
392 A reducer is represented as a tuple of the three functions:
393 - init_func - to define initial value: key => initial state
394 - reducer_func - operation to perform on values with same key: (old state, input) => new state
395 - finalize_func - value to return in the end: state => result
397 For example,
399 ```
400 def init_func(_):
401 return (0.0, 0.0)
403 def reduce_func(state, value):
404 return (state[0] + value['features'], state[1] + 1)
406 def finalize_func(s, n):
407 return s / n
409 reducer = tf.data.experimental.Reducer(init_func, reduce_func, finalize_func)
410 ```
411 """
413 def __init__(self, init_func, reduce_func, finalize_func):
414 self._init_func = init_func
415 self._reduce_func = reduce_func
416 self._finalize_func = finalize_func
418 @property
419 def init_func(self):
420 return self._init_func
422 @property
423 def reduce_func(self):
424 return self._reduce_func
426 @property
427 def finalize_func(self):
428 return self._finalize_func