Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tensorflow/python/distribute/input_ops.py: 30%

44 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-03 07:57 +0000

1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# ============================================================================== 

15"""Input-pipeline utilities for Distribution strategies.""" 

16 

17from tensorflow.python.data.experimental.ops import distribute 

18from tensorflow.python.data.ops import dataset_ops 

19from tensorflow.python.data.ops.options import AutoShardPolicy 

20from tensorflow.python.data.util import traverse 

21from tensorflow.python.framework import op_def_registry 

22from tensorflow.python.framework import ops 

23from tensorflow.python.types import data as data_types 

24from tensorflow.python.types import distribute as distribute_types 

25 

26 

27# pylint: disable=protected-access 

28def auto_shard_dataset(dataset, num_shards, index, num_replicas_in_sync=None): 

29 """Shard the input pipeline by sharding the underlying list of files. 

30 

31 Args: 

32 dataset: A `tf.data.Dataset` instance, typically the result of a bunch of 

33 dataset transformations. 

34 num_shards: A `tf.int64` scalar `tf.Tensor`, representing the number of 

35 shards operating in parallel. Same usage as in `tf.data.Dataset.shard`. 

36 index: A `tf.int64` scalar `tf.Tensor`, representing the worker index. 

37 Same usage as in `tf.data.Dataset.shard`. 

38 num_replicas_in_sync: An integer representing the total number of replicas 

39 across all workers. This is used in the rewrite when sharding by data. 

40 

41 Returns: 

42 A modified `Dataset` obtained by updating the pipeline sharded by the 

43 files. The input dataset will be returned if we cannot automatically 

44 determine a good way to shard the input dataset. 

45 """ 

46 if isinstance(dataset, distribute_types.DistributedDatasetInterface): 

47 return dataset.auto_shard(num_shards, index) 

48 if (dataset.options().experimental_distribute.auto_shard_policy != 

49 AutoShardPolicy.OFF): 

50 if num_replicas_in_sync is None: 

51 num_replicas_in_sync = 1 

52 if isinstance(dataset, data_types.DatasetV1): 

53 return distribute._AutoShardDatasetV1(dataset, num_shards, index, 

54 num_replicas_in_sync) 

55 else: 

56 return distribute._AutoShardDataset(dataset, num_shards, index, 

57 num_replicas_in_sync) 

58 else: 

59 return dataset 

60 

61 

62def _clone_dataset(dataset): 

63 """Returns a cloned version of `dataset`.""" 

64 variant_tensor_ops = traverse.obtain_all_variant_tensor_ops(dataset) 

65 remap_dict = _clone_helper(dataset._variant_tensor.op, variant_tensor_ops) 

66 new_variant_tensor = remap_dict[dataset._variant_tensor.op].outputs[0] 

67 return dataset_ops._VariantDataset(new_variant_tensor, dataset.element_spec) 

68 

69 

70def _get_op_def(op): 

71 return op.op_def or op_def_registry.get(op.type) 

72 

73 

74def _clone_helper(op_to_clone, variant_tensor_ops): 

75 """Helper method that recursively clones `op_to_clone`. 

76 

77 Args: 

78 op_to_clone: The op we want to clone. 

79 variant_tensor_ops: A list of ops that we have to clone along the way. 

80 

81 Returns: 

82 A dictionary mapping old_ops to new_ops created. Includes op_to_clone 

83 as a key. 

84 """ 

85 remap_dict = {} 

86 for input_tensor in op_to_clone.inputs: 

87 input_tensor_op = input_tensor.op 

88 if input_tensor_op in variant_tensor_ops: 

89 recursive_map = _clone_helper(input_tensor_op, variant_tensor_ops) 

90 remap_dict.update(recursive_map) 

91 inputs_list = [] 

92 for input_tensor in op_to_clone.inputs: 

93 input_tensor_op = input_tensor.op 

94 if input_tensor_op in remap_dict: 

95 remapped_input = remap_dict[input_tensor_op].outputs[0] 

96 inputs_list.append(remapped_input) 

97 else: 

98 inputs_list.append(input_tensor_op.outputs[input_tensor.value_index]) 

99 g = ops.get_default_graph() 

100 new_op = g.create_op( 

101 op_to_clone.type, 

102 inputs_list, [o.dtype for o in op_to_clone.outputs], 

103 name=op_to_clone.name, 

104 attrs=op_to_clone.node_def.attr, 

105 op_def=_get_op_def(op_to_clone)) 

106 remap_dict[op_to_clone] = new_op 

107 return remap_dict