##### Copyright 2020 The TensorFlow Authors.

In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 분산 입력

<table class="tfo-notebook-buttons" align="left">
  <td><a target="_blank" href="https://www.tensorflow.org/tutorials/distribute/input"><img src="https://www.tensorflow.org/images/tf_logo_32px.png">TensorFlow.org에서 보기</a></td>
  <td><a target="_blank" href="https://colab.research.google.com/github/tensorflow/docs-l10n/blob/master/site/ko/tutorials/distribute/input.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png">Google Colab에서 실행하기</a></td>
  <td><a target="_blank" href="https://github.com/tensorflow/docs-l10n/blob/master/site/ko/tutorials/distribute/input.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png">GitHub에서 소스 보기</a></td>
  <td><a href="https://storage.googleapis.com/tensorflow_docs/docs-l10n/site/ko/tutorials/distribute/input.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png">노트북 다운로드하기</a></td>
</table>

[tf.distribute](https://www.tensorflow.org/guide/distributed_training) API는 사용자가 단일 머신에서 여러 머신으로 훈련을 쉽게 확장하는 방법을 제공합니다. 모델을 확장할 때 사용자는 입력을 여러 장치로 분산해야 합니다. `tf.distribute`는 입력을 전체 장치에 자동으로 분산할 수 있는 API를 제공합니다.

이 가이드는 `tf.distribute` API를 사용하여 분산 데이터세트 및 반복기를 생성할 수 있는 다양한 방법을 보여줍니다. 또한 다음 주제들도 다룹니다.

- `tf.distribute.Strategy.experimental_distribute_dataset` 및 `tf.distribute.Strategy.distribute_datasets_from_function`을 사용할 때의 사용법, 샤딩 및 일괄 처리 옵션
- 분산 데이터세트를 반복할 수 있는 다양한 방법
- Differences between `tf.distribute.Strategy.experimental_distribute_dataset`/`tf.distribute.Strategy.distribute_datasets_from_function` APIs and `tf.data` APIs as well any limitations that users may come across in their usage.

이 가이드는 Keras API를 사용한 분산 입력 사용법에 대해서는 다루지 않습니다.

## 분산 데이터세트

`tf.distribute` API를 사용하여 확장하려면 `tf.data.Dataset`을 사용하여 입력을 나타냅니다. `tf.distribute`는 `tf.data.Dataset`과 효율적으로 작동합니다. 각 가속기 장치에 대한 자동 프리페치 및 정기적인 성능 업데이트를 예로 들 수 있습니다. `tf.data.Dataset` 이외의 것을 사용하는 사용 사례가 있는 경우 이 가이드의 [Tensor 입력 섹션](#tensorinputs)을 참조하세요. 비분산 훈련 루프에서 먼저 `tf.data.Dataset` 인스턴스를 만든 다음 요소를 반복합니다. 예를 들면 다음과 같습니다.


In [2]:
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)

2022-12-15 02:00:49.883291: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-12-15 02:00:49.883393: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory


2.11.0


In [3]:
# Simulate multiple CPUs with virtual devices
N_VIRTUAL_DEVICES = 2
physical_devices = tf.config.list_physical_devices("CPU")
tf.config.set_logical_device_configuration(
    physical_devices[0], [tf.config.LogicalDeviceConfiguration() for _ in range(N_VIRTUAL_DEVICES)])

In [4]:
print("Available devices:")
for i, device in enumerate(tf.config.list_logical_devices()):
  print("%d) %s" % (i, device))

Available devices:
0) LogicalDevice(name='/device:CPU:0', device_type='CPU')
1) LogicalDevice(name='/device:CPU:1', device_type='CPU')
2) LogicalDevice(name='/device:GPU:0', device_type='GPU')
3) LogicalDevice(name='/device:GPU:1', device_type='GPU')
4) LogicalDevice(name='/device:GPU:2', device_type='GPU')
5) LogicalDevice(name='/device:GPU:3', device_type='GPU')


In [5]:
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))


tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)


사용자가 기존 코드를 최소한으로 변경하면서 `tf.distribute` 전략을 사용할 수 있도록 `tf.data.Dataset` 인스턴스를 분산시키고 분산된 데이터세트 객체를 반환하는 두 개의 API가 도입되었습니다. 그러면 사용자가 이 분산 데이터세트 인스턴스를 반복하고 이전과 같이 모델을 훈련할 수 있습니다. 이제 두 가지 API인 `tf.distribute.Strategy.experimental_distribute_dataset` 및 `tf.distribute.Strategy.distribute_datasets_from_function`를 자세히 살펴 보겠습니다.

### `tf.distribute.Strategy.experimental_distribute_dataset`

#### 사용법

이 API는 `tf.data.Dataset` 인스턴스를 입력으로 받고 `tf.distribute.DistributedDataset` 인스턴스를 반환합니다. 입력 데이터세트를 전역 배치 크기와 동일한 값으로 배치 처리해야 합니다. 이 전역 배치 크기는 모든 장치에 걸쳐 1스텝에서 처리하려는 샘플의 수입니다. 이 분산 데이터세트를 Python 방식으로 반복하거나 `iter`를 사용하여 반복기를 만들 수 있습니다. 반환된 객체는 `tf.data.Dataset` 인스턴스가 아니며 어떤 식으로든 데이터세트를 변환하거나 검사하는 다른 API를 지원하지 않습니다. 여러 복제본에 걸쳐 입력을 샤딩하려는 특정한 방법이 없는 경우 이것이 권장되는 API입니다.


In [6]:
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


2022-12-15 02:00:56.055218: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args 

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
}, PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtyp

#### 속성

##### 배치 처리

`tf.distribute`는 전역 배치 크기를 동기화된 복제본 수로 나눈 값인 새 배치 크기로 입력 `tf.data.Dataset` 인스턴스의 배치를 다시 처리합니다. 동기화된 복제본의 수는 훈련 중 기울기 allreduce에 참여하는 장치의 수와 같습니다. 사용자가 분산 반복기에서 `next`를 호출하면 각 복제본에서 복제본당 배치 크기의 데이터가 반환됩니다. 배치가 다시 처리된 데이터세트의 카디널리티는 항상 복제본 수의 배수입니다. 다음은 몇 가지 예입니다.

- `tf.data.Dataset.range(6).batch(4, drop_remainder=False)`

    - 분산 제외:

        - 배치 1: [0, 1, 2, 3]
        - 배치 2: [4, 5]

    - 2개 복제본에서 분산 포함. 마지막 배치 ([4, 5])는 2개의 복제본으로 분할됩니다.

    - 배치 1:

        - 복제본 1: [0, 1]
        - 복제본 2: [2, 3]

    - 배치 2:

        - 복제본 1: [4]
        - 복제본 2: [5]

- `tf.data.Dataset.range(4).batch(4)`

    - 분산 제외:
        - 배치 1: [0, 1, 2, 3]
    - 5개 복제본에서 분산:
        - 배치 1:
            - 복제본 1: [0]
            - 복제본 2: [1]
            - 복제본 3: [2]
            - 복제본 4: [3]
            - 복제본 5: []

- `tf.data.Dataset.range(8).batch(4)`

    - 분산 제외:
        - 배치 1: [0, 1, 2, 3]
        - 배치 2: [4, 5, 6, 7]
    - 3개 복제본에서 분산:
        - 배치 1:
            - 복제본 1: [0, 1]
            - 복제본 2: [2, 3]
            - 복제본 3: []
        - 배치 2:
            - 복제본 1: [4, 5]
            - 복제본 2: [6, 7]
            - 복제본 3: []

참고: 위의 예는 전역 배치가 여러 복제본에서 분할되는 방식만 보여줍니다. 구현에 따라 각 복제본은 변경될 수 있으므로 각 복제본에서 발생 수 있는 실제 값에 의존하지 않는 것이 좋습니다.

데이터세트를 다시 배치 처리하면 복제본 수에 비례하여 공간 복잡성이 선형적으로 증가합니다. 이는 다중 작업자 훈련의 사용 사례에서 입력 파이프라인에 OOM 오류가 발생할 수 있음을 의미합니다. 

##### 샤딩

`tf.distribute`는 또한 `MultiWorkerMirroredStrategy` 및 `TPUStrategy`를 사용하여 다중 작업자 훈련에서 입력 데이터세트를 자동 샤딩합니다. 각 데이터세트는 작업자의 CPU 장치에 생성됩니다. 일단의 작업자에 대해 데이터세트를 자동 샤딩한다는 것은 각 작업자에게 전체 데이터세트의 하위 집합이 할당된다는 것을 의미합니다(올바른 `tf.data.experimental.AutoShardPolicy`가 설정된 경우). 이는 각 단계에서 겹치지 않는 데이터세트 요소의 전역 배치 크기가 각 작업자에 의해 처리되도록 하기 위한 것입니다. 자동 샤딩에는 `tf.data.experimental.DistributeOptions`를 사용하여 지정할 수 있는 몇 가지 다른 옵션이 있습니다. `ParameterServerStrategy`를 사용한 다중 작업자 훈련에는 자동 샤딩이 없으며 이 전략을 사용한 데이터세트 생성에 대한 자세한 내용은 [매개변수 서버 전략 튜토리얼](parameter_server_training.ipynb)에서 확인할 수 있습니다. 

In [7]:
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

`tf.data.experimental.AutoShardPolicy`에 대해 세 가지 다른 옵션을 설정할 수 있습니다.

- AUTO: 기본 옵션이며 FILE별 샤딩 시도가 이루어짐을 의미합니다. 파일 기반 데이터세트가 탐지되지 않으면 FILE별 샤딩 시도가 실패합니다. 그러면 `tf.distribute`가 DATA별 샤딩으로 폴백합니다. 입력 데이터세트가 파일 기반이지만 파일 수가 작업자 수보다 적으면, `InvalidArgumentError`가 ​​발생합니다. 이 경우, 정책을 명시적으로 `AutoShardPolicy.DATA`로 설정하거나 파일 수가 작업자 수보다 많도록 입력 소스를 더 작은 파일로 분할합니다.

- FILE: 모든 작업자에 대해 입력 파일을 샤딩하려는 경우 사용하는 옵션입니다. 입력 파일의 수가 작업자의 수보다 훨씬 많고 파일의 데이터가 균등하게 분산된 경우, 이 옵션을 사용해야 합니다. 이 옵션의 단점은 파일의 데이터가 균등하게 분산되어 있지 않으면 유휴 작업자가 생긴다는 것입니다. 파일의 수가 작업자의 수보다 적으면 `InvalidArgumentError`가 발생합니다. 이 경우에는 정책을 명시적으로 `AutoShardPolicy.DATA`로 설정합니다. 예를 들어, 각각 1개의 복제본이 있는 두 작업자에 2개의 파일을 배포해 보겠습니다. 파일 1에는 [0, 1, 2, 3, 4, 5]가 포함되고 파일 2에는 [6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로, 전역 배치 크기를 4로 설정합니다.

    - 작업자 0:
        - 배치 1 =  복제본 1: [0, 1]
        - 배치 2 = 복제본 1: [2, 3]
        - 배치 3 =  복제본 1: [4]
        - 배치 4 =  복제본 1: [5]
    - 작업자 1:
        - 배치 1 =  복제본 2: [6, 7]
        - 배치 2 =  복제본 2: [8, 9]
        - 배치 3 =  복제본 2: [10]
        - 배치 4 =  복제본 2: [11]

- DATA: 모든 작업자에 걸쳐 요소를 자동 샤딩합니다. 각 작업자는 전체 데이터세트를 읽고 할당된 샤드만 처리합니다. 다른 모든 샤드는 삭제됩니다. 이 옵션은 일반적으로 입력 파일의 수가 작업자의 수보다 적고 모든 작업자에서 데이터를 더 잘 샤딩하려는 경우에 사용됩니다. 단점은 각 작업자에서 전체 데이터세트를 읽는다는 것입니다. 예를 들어, 두 작업자에 1개의 파일을 배포해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로 설정합니다.

    - 작업자 0:
        - 배치 1 =  복제본 1: [0, 1]
        - 배치 2 =  복제본 1: [4, 5]
        - 배치 3 =  복제본 1: [8, 9]
    - 작업자 1:
        - 배치 1 = 복제본 2: [2, 3]
        - 배치 2 =  복제본 2: [6, 7]
        - 배치 3 = 복제본 2: [10, 11]

- OFF: 자동 샤딩을 끄면 각 작업자가 모든 데이터를 처리합니다. 예를 들어, 두 작업자에 1개의 파일을 배포해 보겠습니다. 파일 1에 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]이 포함됩니다. 동기화된 총 복제본의 수를 2개로 설정합니다. 그러면 각 작업자는 다음과 같은 분산을 볼 수 있습니다.

    - 작업자 0:

        - 배치 1 =  복제본 1: [0, 1]
        - 배치 2 = 복제본 1: [2, 3]
        - 배치 3 = 복제본 1: [4, 5]
        - 배치 4 = 복제본 1: [6, 7]
        - 배치 5 = 복제본 1: [8, 9]
        - 배치 6 = 복제본 1: [10, 11]

    - 작업자 1:

        - 배치 1 = 복제본 2: [0, 1]
        - 배치 2 = 복제본 2: [2, 3]
        - 배치 3 = 복제본 2: [4, 5]
        - 배치 4 = 복제본 2: [6, 7]
        - 배치 5 = 복제본 2: [8, 9]
        - 배치 6 = 복제본 2: [10, 11] 

##### 프리페칭

기본적으로 `tf.distribute`는 사용자가 제공한 `tf.data.Dataset` 인스턴스 끝에 프리페치 변환을 추가합니다. 프리페치 변환에 대한 인수 `buffer_size`는 동기화되는 복제본의 수에 해당합니다.

### `tf.distribute.Strategy.distribute_datasets_from_function`

#### 사용법

이 API는 입력 함수를 받고 `tf.distribute.DistributedDataset` 인스턴스를 반환합니다. 사용자가 전달하는 입력 함수는 `tf.distribute.InputContext` 인수를 갖고 `tf.data.Dataset` 인스턴스를 반환해야 합니다. 이 API를 사용하면 `tf.distribute`가 입력 함수로부터 반환된 사용자의 `tf.data.Dataset` 인스턴스를 더 이상 변경하지 않습니다. 데이터세트를 배치 처리하고 샤딩하는 것은 사용자의 몫입니다. `tf.distribute`는 각 작업자의 CPU 장치에서 입력 함수를 호출합니다. 사용자가 자체 배치 및 샤딩 로직을 지정할 수 있게 해주는 이외에도 이 API는 다중 작업자 훈련을 사용할 때 `tf.distribute.Strategy.experimental_distribute_dataset`에 비해 확장성과 성능이 더 우수합니다.

In [8]:
mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16)
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)  # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


#### 속성

##### 배치 처리

입력 함수의 반환 값인 `tf.data.Dataset` 인스턴스는 복제본별 배치 크기를 사용하여 배치 처리해야 합니다. 복제본별 배치 크기는 전역 배치 크기를 동기화 훈련에 참여하는 복제본의 수로 나눈 값입니다. `tf.distribute`가 각 작업자의 CPU 장치에서 입력 함수를 호출하기 때문입니다. 지정된 작업자에서 생성된 데이터세트는 해당 작업자의 모든 복제본에서 사용할 수 있어야 합니다. 

##### 샤딩

사용자의 입력 함수에 대한 인수로 암시적으로 전달되는 `tf.distribute.InputContext` 객체는 내부에서 `tf.distribute`에 의해 생성됩니다. 여기에는 작업자 수, 현재 작업자 ID 등에 대한 정보가 들어 있습니다. 이 입력 함수는 `tf.distribute.InputContext` 객체의 일부인 이러한 속성을 사용하여 사용자가 설정한 정책에 따라 샤딩을 처리할 수 있습니다.


##### 프리페칭

`tf.distribute`는 사용자가 제공한 입력 함수에 의해 반환된 `tf.data.Dataset`의 끝에 프리페치 변환을 추가하지 않으므로 위의 예에서 `Dataset.prefetch`를 명시적으로 호출합니다.

참고: `tf.distribute.Strategy.experimental_distribute_dataset` 및 `tf.distribute.Strategy.distribute_datasets_from_function` 모두 **`tf.data.Dataset` 형식이 아닌 `tf.distribute.DistributedDataset` 인스턴스를 반환합니다.** 분산 반복기 섹션에 표시된 대로 이러한 인스턴스를 반복하고 `element_spec` 속성을 사용할 수 있습니다. 

## 분산 반복기

비분산 `tf.data.Dataset` 인스턴스와 유사하게, `tf.distribute.DistributedDataset` 인스턴스에서 반복기를 생성하여 이를 반복하고 `tf.distribute.DistributedDataset`에서 요소에 액세스해야 합니다. 다음은 `tf.distribute.DistributedIterator`를 생성하고 모델을 훈련할 때 사용하는 방법입니다.


### 사용법

#### 루프 구성에 Python 사용하기

사용자 친화적인 Python 루프를 사용하여 `tf.distribute.DistributedDataset`을 반복할 수 있습니다. `tf.distribute.DistributedIterator`에서 반환된 요소는 단일 `tf.Tensor` 또는 복제본당 값을 포함하는 `tf.distribute.DistributedValues`일 수 있습니다. 루프를 `tf.function` 내에 배치하면 성능이 향상됩니다. 그러나 `break` 및 `return`은 현재 `tf.function` 내에 있는 `tf.distribute.DistributedDataset`에 대한 루프에 대해 지원되지 않습니다.

In [9]:
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


2022-12-15 02:00:56.399901: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:38"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor([[0.7]], shape=(1, 1), dtype=float32),
  1: tf.Tensor([[0.7]], shape=(1, 1), dtype=float32),
  2: tf.Tensor([[0.7]], shape=(1, 1), dtype=float32),
  3: tf.Tensor([[0.7]], shape=(1, 1), dtype=float32)
}


#### `iter`를 사용하여 명시적인 반복기 만들기

`tf.distribute.DistributedDataset` 인스턴스의 요소를 반복하기 위해 `iter` API를 사용하여 `tf.distribute.DistributedIterator`를 작성할 수 있습니다. 명시적인 반복자를 사용하면 고정된 수의 스텝을 반복할 수 있습니다. `tf.distribute.DistributedIterator` 인스턴스 `dist_iterator`에서 다음 요소를 가져오려면 `next(dist_iterator)`, `dist_iterator.get_next()` 또는 `dist_iterator.get_next_as_optional()`을 호출할 수 있습니다. 앞의 두 가지는 본질적으로 동일합니다.

In [10]:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica:{
  0: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  1: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  2: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32),
  3: tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)
}
Loss is  PerReplica

`next` 또는 `tf.distribute.DistributedIterator.get_next()`를 사용하여 `tf.distribute.DistributedIterator`의 끝에 도달하면 OutOfRange 오류가 발생합니다. 클라이언트는 Python 측에서 오류를 포착하고 검사점 및 평가와 같은 다른 작업을 계속할 수 있습니다. 그러나 다음과 같이 호스트 훈련 루프를 사용하는 경우(예: `tf.function`당 여러 스텝 실행) 작동하지 않습니다.

```
@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
```

이 예제 `train_fn`에는 `tf.range` 내부에 스텝 본문을 래핑하여 여러 스텝이 포함되어 있습니다. 이 경우 종속성이 없는 루프의 여러 반복이 병렬로 시작될 수 있으므로 이전 반복의 계산이 완료되기 전에 이후 반복에서 OutOfRange 오류가 트리거될 수 있습니다. OutOfRange 오류가 발생하면 함수의 모든 ops가 즉시 종료됩니다. 이러한 상황을 피하고 싶은 경우 OutOfRange 오류를 발생시키지 않는 대안은 `tf.distribute.DistributedIterator.get_next_as_optional`입니다. `get_next_as_optional`은 다음 요소를 포함하거나 `tf.distribute.DistributedIterator`가 끝에 도달한 경우 값이 없는 `tf.experimental.Optional`을 반환합니다.

In [11]:
# You can break the loop with `get_next_as_optional` by checking if the `Optional` contains a value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x: x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


2022-12-15 02:01:01.859283: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:212"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}



Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


([0], [1], [2], [3])


([4], [5], [6], [7])


([8], [], [], [])


## `element_spec` 속성 사용하기

분산 데이터세트의 요소를 `tf.function`에 전달하고 `tf.TypeSpec` 보장을 원하는 경우 `tf.function`의 `input_signature` 인수를 지정할 수 있습니다. 분산 데이터세트의 출력은 단일 장치 또는 여러 장치에 대한 입력을 나타낼 수 있는 `tf.distribute.DistributedValues`입니다. 이 분산 값에 해당하는 `tf.TypeSpec`을 얻으려면 `tf.distribute.DistributedDataset.element_spec` 또는 `tf.distribute.DistributedIterator.element_spec`을 사용할 수 있습니다.

In [12]:
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


2022-12-15 02:01:02.436659: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:235"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      arg

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

(PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>
},
 PerReplica:{
  0: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  1: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  2: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>,
  3: <tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.]], dty

## 데이터 전처리

지금까지 `tf.data.Dataset`을 배포하는 방법을 배웠습니다. 그러나 데이터를 모델에 맞게 준비하려면 예를 들어 데이터를 정리, 변환 및 보강하는 등 사전에 처리가 필요합니다. 이를 위해 다음과 같은 두 가지 편리한 도구를 사용할 수 있습니다.

- [Keras 전처리 레이어](https://www.tensorflow.org/guide/keras/preprocessing_layers): 개발자가 Keras 네이티브 입력 처리 파이프라인을 구축할 수 있도록 하는 Keras 레이어 집합입니다. 일부 Keras 전처리 레이어에는 훈련할 수 없는 상태가 포함되어 있으며 이는 초기화 시 설정되거나 `adapt`될 수 있습니다(<a>Keras 전처리 레이어 가이드</a>의 <code>adapt</code> 섹션 참조). 상태 저장 전처리 레이어를 배포할 때 상태는 모든 작업자에게 복제되어야 합니다. 이러한 레이어를 사용하려면 모델의 일부로 만들거나 데이터세트에 적용할 수 있습니다.

- [TensorFlow Transform(tf.Transform)](https://www.tensorflow.org/tfx/transform/get_started): 데이터 전처리 파이프라인을 통해 인스턴스 수준 및 전체 패스 데이터 변환을 모두 정의할 수 있는 TensorFlow용 라이브러리입니다. Tensorflow Transform에는 두 단계가 있습니다. 첫 번째는 분석 단계로, 변환에 필요한 통계를 계산하기 위해 원시 훈련 데이터가 전체 패스 프로세스에서 분석되고 변환 논리가 인스턴스 수준 작업으로 생성됩니다. 두 번째 단계는 원시 훈련 데이터가 인스턴스 수준 프로세스에서 변환되는 변환 단계입니다.


### Keras 전처리 레이어 대 Tensorflow Transform

Tensorflow Transform 및 Keras 전처리 레이어는 모두 훈련 중 전처리를 분리하고 추론 중 모델과 전처리를 묶어 훈련/서브 스큐를 줄이는 방법을 제공합니다.

[TFX](https://www.tensorflow.org/tfx)와 긴밀하게 통합된 Tensorflow Transform은 훈련 파이프라인과 별개의 작업에서 모든 크기의 데이터세트를 분석하고 변환하기 위한 확장 가능한 맵 축소 솔루션을 제공합니다. 단일 시스템에 맞지 않을 수 있는 데이터세트에 대한 분석을 실행해야 하는 경우, Tensorflow Transform이 가장 먼저 선택되어야 합니다.

Keras 전처리 레이어는 디스크에서 데이터를 읽은 후 훈련 중에 적용되는 전처리에 더 적합합니다. Keras 라이브러리의 모델 개발과 완벽하게 맞습니다. [`adapt`](https://www.tensorflow.org/guide/keras/preprocessing_layers#the_adapt_method)을 통해 더 작은 데이터세트의 분석을 지원하고 입력 데이터세트를 통과할 때마다 훈련을 위한 다른 예가 생성되는 이미지 데이터 강화와 같은 사용 사례를 지원합니다.

두 라이브러리를 혼합할 수도 있습니다. 여기서 Tensorflow Transform은 입력 데이터의 분석 및 정적 변환에 사용되고 Keras 전처리 레이어는 기차 시간 변환(예: 원-핫 인코딩 또는 데이터 강화)에 사용됩니다.


### tf.distribute의 모범 사례

두 도구를 모두 사용하려면 데이터에 적용할 변환 논리를 초기화해야 하며, 이로부터 Tensorflow 리소스가 생성될 수 있습니다. 이러한 리소스 또는 상태는 작업자 간 또는 작업자-조정자 통신을 저장하기 위해 모든 작업자에게 복제되어야 합니다. 이렇게 하려면 다른 Keras 레이어와 마찬가지로 `tf.distribute.Strategy.scope`에서 `tft.TFTransformOutput.transform_features_layer` 또는 `tft.TransformFeaturesLayer`의 Keras 전처리 레이어를 생성하는 것이 좋습니다.

다음 예는 `tf.distribute.Strategy` API를 높은 수준의 Keras `Model.fit` API 및 사용자 지정 훈련 루프와 함께 개별적으로 사용하는 방법을 보여줍니다.

#### Keras 전처리 레이어 사용자를 위한 추가 참고 사항:

**전처리 레이어 및 방대한 어휘**

다중 작업자 설정(예: `tf.distribute.MultiWorkerMirroredStrategy`, `tf.distribute.experimental.ParameterServerStrategy`, `tf.distribute.TPUStrategy`)에서 방대한 어휘(1기가바이트 이상)를 처리할 때 어휘를 모든 작업자가 액세스할 수 있는 정적 파일(예: Cloud Storage 사용)에 저장하는 것이 좋습니다. 그러면 훈련 중에 모든 작업자에게 어휘를 복제하는 데 소요되는 시간이 줄어듭니다.

**`tf.data` 파이프라인과 모델의 전처리 비교**

Keras 전처리 레이어는 모델의 일부로 적용하거나 `tf.data.Dataset`에 직접 적용할 수 있지만 각 옵션에는 강점이 있습니다.

- 모델 내에서 전처리 레이어를 적용하면 모델을 이식할 수 있고 훈련/서브 스큐를 줄이는 데 도움이 됩니다. 자세한 내용은 [전처리 레이어 작업 가이드](https://www.tensorflow.org/guide/keras/preprocessing_layers#benefits_of_doing_preprocessing_inside_the_model_at_inference_time)의 <em>추론 시 모델 내부에서 전처리를 수행할 때의 이점</em> 섹션을 참조하세요.
- `tf.data` 파이프라인 내에서 적용하면 CPU로 프리패칭하거나 오프로딩할 수 있으므로 일반적으로 가속기를 사용할 때 더 나은 성능을 제공합니다.

하나 이상의 TPU에서 실행할 때 모든 레이어가 TPU를 지원하지는 않고 문자열 ops가 TPU에서 실행되지 않기 때문에 사용자는 거의 항상 `tf.data` 파이프라인에 Keras 전처리 레이어를 배치해야 합니다. 두 가지 예외인 `tf.keras.layers.Normalization` 및 `tf.keras.layers.Rescaling`은 TPU에서 잘 실행되고 이미지 모델에서 첫 레이어로 일반적으로 사용됩니다.

### `Model.fit`으로 전처리하기

`Model.fit`을 사용할 때 `tf.distribute.Strategy.experimental_distribute_dataset` 또는 `tf.distribute.Strategy.distribute_datasets_from_function` 자체를 사용하여 데이터를 배포할 필요가 없습니다. 자세한 내용은 [전처리 레이어로 작업하기](https://www.tensorflow.org/guide/keras/preprocessing_layers) 가이드 및 [Keras를 사용한 분산 훈련](https://www.tensorflow.org/tutorials/distribute/keras) 가이드를 확인하세요. 간단한 예를 들면 다음과 같습니다.

```
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
  # Create the layer(s) under scope.
  integer_preprocessing_layer = tf.keras.layers.IntegerLookup(vocabulary=FILE_PATH)
  model = ...
  model.compile(...)
dataset = dataset.map(lambda x, y: (integer_preprocessing_layer(x), y))
model.fit(dataset)
```


`Model.fit` API를 사용하는 `tf.keras.utils.experimental.DatasetCreator` 사용자는 `tf.distribute.experimental.ParameterServerStrategy`를 입력으로 사용해야 합니다. 자세한 내용은 [매개변수 서버 훈련](https://www.tensorflow.org/tutorials/distribute/parameter_server_training#parameter_server_training_with_modelfit_api) 가이드를 참조하세요.

```
strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

with strategy.scope():
  preprocessing_layer = tf.keras.layers.StringLookup(vocabulary=FILE_PATH)
  model = ...
  model.compile(...)

def dataset_fn(input_context):
  ...
  dataset = dataset.map(preprocessing_layer)
  ...
  return dataset

dataset_creator = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
model.fit(dataset_creator, epochs=5, steps_per_epoch=20, callbacks=callbacks)

```

### 사용자 정의 훈련 루프로 전처리하기

[사용자 정의 훈련 루프](https://www.tensorflow.org/tutorials/distribute/custom_training)를 작성할 때 `tf.distribute.Strategy.experimental_distribute_dataset` API 또는 `tf.distribute.Strategy.distribute_datasets_from_function` API를 사용하여 데이터를 배포합니다. `tf.distribute.Strategy.experimental_distribute_dataset`를 통해 데이터세트를 배포하는 경우, 데이터 파이프라인에 이러한 전처리 API를 적용하면 원격 리소스 액세스를 방지하기 위해 데이터 파이프라인과 함께 자동으로 리소스가 배치됩니다. 따라서 여기의 예제는 모두 `tf.distribute.Strategy.distribute_datasets_from_function`을 사용합니다. 이 경우 효율성을 위해 이러한 API의 초기화를 `strategy.scope()` 아래에 배치하는 것이 중요합니다.

In [13]:
strategy = tf.distribute.MirroredStrategy()
vocab = ["a", "b", "c", "d", "f"]

with strategy.scope():
  # Create the layer(s) under scope.
  layer = tf.keras.layers.StringLookup(vocabulary=vocab)

def dataset_fn(input_context):
  # a tf.data.Dataset
  dataset = tf.data.Dataset.from_tensor_slices(["a", "c", "e"]).repeat()

  # Custom your batching, sharding, prefetching, etc.
  global_batch_size = 4
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = dataset.batch(batch_size)
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)

  # Apply the preprocessing layer(s) to the tf.data.Dataset
  def preprocess_with_kpl(input):
    return layer(input)

  processed_ds = dataset.map(preprocess_with_kpl)
  return processed_ds

distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)

# Print out a few example batches.
distributed_dataset_iterator = iter(distributed_dataset)
for _ in range(3):
  print(next(distributed_dataset_iterator))

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


PerReplica:{
  0: tf.Tensor([1], shape=(1,), dtype=int64),
  1: tf.Tensor([3], shape=(1,), dtype=int64),
  2: tf.Tensor([0], shape=(1,), dtype=int64),
  3: tf.Tensor([1], shape=(1,), dtype=int64)
}
PerReplica:{
  0: tf.Tensor([3], shape=(1,), dtype=int64),
  1: tf.Tensor([0], shape=(1,), dtype=int64),
  2: tf.Tensor([1], shape=(1,), dtype=int64),
  3: tf.Tensor([3], shape=(1,), dtype=int64)
}
PerReplica:{
  0: tf.Tensor([0], shape=(1,), dtype=int64),
  1: tf.Tensor([1], shape=(1,), dtype=int64),
  2: tf.Tensor([3], shape=(1,), dtype=int64),
  3: tf.Tensor([0], shape=(1,), dtype=int64)
}


`tf.distribute.experimental.ParameterServerStrategy`로 훈련하는 경우 `tf.distribute.experimental.coordinator.ClusterCoordinator.create_per_worker_dataset`도 호출합니다.

```
@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
```


Tensorflow Transform의 경우 위에서 언급한 것처럼 분석 단계는 훈련과 별도로 수행되므로 여기서는 생략합니다. 자세한 방법은 [튜토리얼](https://www.tensorflow.org/tfx/tutorials/transform/census)을 참조하세요. 일반적으로 이 단계에는 `tf.Transform` 전처리 기능을 만들고 이 전처리 기능을 사용하여 [Apache Beam](https://beam.apache.org/) 파이프라인의 데이터를 변환하는 작업이 포함됩니다. 분석 단계가 끝나면 출력을 훈련과 제공 모두에 사용할 수 있는 TensorFlow 그래프로 내보낼 수 있습니다. 이 예에서는 훈련 파이프라인 부분만 다룹니다.

```
with strategy.scope():
  # working_dir contains the tf.Transform output.
  tf_transform_output = tft.TFTransformOutput(working_dir)
  # Loading from working_dir to create a Keras layer for applying the tf.Transform output to data
  tft_layer = tf_transform_output.transform_features_layer()
  ...

def dataset_fn(input_context):
  ...
  dataset.map(tft_layer, num_parallel_calls=tf.data.AUTOTUNE)
  ...
  return dataset

distributed_dataset = strategy.distribute_datasets_from_function(dataset_fn)
```

## 부분 배치

부분 배치는 다음의 경우에 발생합니다. 1) 사용자가 생성하는 `tf.data.Dataset` 인스턴스에 복제본 수로 균등하게 나눌 수 없는 배치 크기가 포함될 수 있습니다. 또는 2) 데이터세트 인스턴스의 카디널리티를 배치 크기로 나눌 수 없습니다. 이는 데이터세트가 여러 복제본에 분산되어 있을 때 일부 반복자에 대한 `next` 호출에서 `tf.errors.OutOfRangeError`가 발생함을 의미합니다. 이 사용 사례를 처리하기 위해 `tf.distribute`는 더 이상 처리할 데이터가 없는 복제본에서 배치 크기가 `0`인 더미 배치를 반환합니다.


단일 작업자의 경우 반복자에 대한 `next` 호출에서 데이터가 반환되지 않으면 배치 크기가 0인 더미 배치가 생성되어 데이터세트의 실제 데이터와 함께 사용됩니다. 부분 배치의 경우, 데이터의 마지막 전역 배치에는 더미 데이터 배치와 함께 실제 데이터가 포함됩니다. 데이터 처리를 위한 중지 조건은 이제 복제본에 데이터가 있는지 확인합니다. 데이터가 없는 복제본이 있으면 `tf.errors.OutOfRangeError`가 발생합니다.

다중 작업자 사례의 경우, 각 작업자에서 데이터의 존재를 나타내는 부울 값은 복제본 간 통신을 사용하여 집계하며, 이는 모든 작업자가 분산 데이터세트의 처리를 완료했는지 식별하는 데 사용됩니다. 여기에는 작업자 간 통신이 포함되므로 성능에 약간의 불이익이 따릅니다.


## 주의 사항

- 다중 작업자 설정과 함께 `tf.distribute.Strategy.experimental_distribute_dataset` API를 사용할 때 파일에서 정보를 읽는 `tf.data.Dataset`을 전달하게 됩니다. `tf.data.experimental.AutoShardPolicy`가 `AUTO` 또는 `FILE`로 설정된 경우 실제 단계별 배치 크기는 전역 배치 크기에 대해 정의한 것보다 작을 수 있습니다. 이는 파일의 나머지 요소가 전역 배치 크기보다 작은 경우 발생할 수 있습니다. 실행할 단계 수에 의존하지 않고 데이터세트를 소진하거나 `tf.data.experimental.AutoShardPolicy`를 `DATA`로 설정하여 문제를 해결할 수 있습니다.

- 상태 저장 데이터세트 변환은 현재 `tf.distribute`에서 지원되지 않으며 데이터세트에 있을 수 있는 상태 저장 ops는 현재 무시됩니다. 예를 들어, 데이터세트에 `map_fn`을 사용하여 이미지를 회전시키는 `tf.random.uniform`이 있는 경우, Python 프로세스가 실행되는 로컬 머신의 상태(예: 임의 시드)에 의존하는 데이터세트 그래프가 있습니다.

- `tf.distribute`와 함께 사용될 때와 같이, 특정 컨텍스트에서 기본적으로 비활성화되어 있는 실험적인 `tf.data.experimental.OptimizationOptions`은 성능 저하를 유발합니다. 배포 설정에서 워크로드의 성능 향상이 확인된 후에만 이 옵션을 사용해야 합니다.

- <code>tf.data</code>로 입력 파이프라인을 최적화하는 방법에 대한 전반적인 내용은 <a>이 가이드</a>를 참조하세요. 몇 가지 추가 팁:

    - 여러 작업자가 있고 `tf.data.Dataset.list_files`를 사용하여 하나 이상의 glob 패턴과 일치하는 모든 파일에서 데이터세트를 생성하는 경우 각 작업자가 파일을 일관되게 샤딩하도록 `seed` 인수를 설정하거나 `shuffle=False`를 설정해야 합니다.

- 입력 파이프라인에 레코드 수준의 데이터 셔플링과 데이터 구문 분석이 모두 포함되어 있는 경우, 구문 분석되지 않은 데이터가 구문 분석된 데이터보다 훨씬 크지 않다면(일반적으로 그렇지 않음), 다음 예제와 같이 먼저 셔플링한 다음 구문 분석하세요. 그러면 메모리 사용량과 성능에 도움이 될 수 있습니다.

```
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
```

- `tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)`은 `buffer_size` 요소의 내부 버퍼를 유지하므로 `buffer_size`를 줄이면 OOM 문제가 완화될 수 있습니다.

- `tf.distribute.experimental_distribute_dataset` 또는 `tf.distribute.distribute_datasets_from_function`을 사용할 때 작업자가 데이터를 처리하는 순서는 보장되지 않습니다. 이는 일반적으로 `tf.distribute`를 사용하여 예측을 확장하는 경우 필요합니다. 그러나 배치의 각 요소에 대한 인덱스를 삽입하고 그에 따라 출력 순서를 정할 수 있습니다. 다음 스니펫은 출력 순서를 정하는 방법을 보여주는 예입니다.

참고: 여기서 `tf.distribute.MirroredStrategy`는 편의상 사용됩니다. 여러 작업자를 사용할 때만 입력 순서를 다시 지정하면 되지만 `tf.distribute.MirroredStrategy`는 단일 작업자에 대한 훈련을 배포하는 데 사용됩니다.

In [14]:
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


2022-12-15 02:01:05.726671: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:340"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: true
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}











{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}


<a name="tensorinputs">## tf.data 대신 텐서 입력</a>

사용자가 `tf.data.Dataset`을 사용하여 입력을 나타내고 이후에 위에 언급한 API를 사용하여 데이터세트를 여러 장치에 분배할 수 없는 경우가 가끔 있습니다. 이러한 경우 생성기의 원시 텐서 또는 입력을 사용할 수 있습니다.

### 임의의 텐서 입력에 experiment_distribute_values_from_function 사용하기

`strategy.run`은 `next(iterator)`의 출력인 `tf.distribute.DistributedValues`를 허용합니다. 텐서 값을 전달하려면 `tf.distribute.DistributedValues`을 사용하여 원시 텐서에서 `tf.distribute.Strategy.experimental_distribute_values_from_function`를 생성합니다. 사용자는 이 옵션을 사용하여 입력 기능에서 자신의 일괄 처리 및 분할 논리를 지정해야 하는데, 이를 위해 `tf.distribute.experimental.ValueContext` 입력 개체를 사용합니다.

In [15]:
mirrored_strategy = tf.distribute.MirroredStrategy()

def value_fn(ctx):
  return tf.constant(ctx.replica_id_in_sync_group)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x: x, args=(distributed_values,))
  print(result)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')




PerReplica:{
  0: tf.Tensor(0, shape=(), dtype=int32),
  1: tf.Tensor(1, shape=(), dtype=int32),
  2: tf.Tensor(2, shape=(), dtype=int32),
  3: tf.Tensor(3, shape=(), dtype=int32)
}
PerReplica:{
  0: tf.Tensor(0, shape=(), dtype=int32),
  1: tf.Tensor(1, shape=(), dtype=int32),
  2: tf.Tensor(2, shape=(), dtype=int32),
  3: tf.Tensor(3, shape=(), dtype=int32)
}
PerReplica:{
  0: tf.Tensor(0, shape=(), dtype=int32),
  1: tf.Tensor(1, shape=(), dtype=int32),
  2: tf.Tensor(2, shape=(), dtype=int32),
  3: tf.Tensor(3, shape=(), dtype=int32)
}
PerReplica:{
  0: tf.Tensor(0, shape=(), dtype=int32),
  1: tf.Tensor(1, shape=(), dtype=int32),
  2: tf.Tensor(2, shape=(), dtype=int32),
  3: tf.Tensor(3, shape=(), dtype=int32)
}


### 생성기에서 입력한 경우 tf.data.Dataset.from_generator 사용하기

사용하려는 생성기 함수가 있는 경우, `from_generator` API를 사용하여 `tf.data.Dataset` 인스턴스를 생성할 수 있습니다.

참고: 현재 `tf.distribute.TPUStrategy`에서는 지원하지 않습니다.

In [16]:
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  result = mirrored_strategy.run(lambda x: x, args=(next(iterator),))
  print(result)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


2022-12-15 02:01:06.101836: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_1"
op: "TensorDataset"
input: "Placeholder/_0"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_INT32
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:364"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT32
        }
      }
    }
  }
}



PerReplica:{
  0: tf.Tensor([0.9051125], shape=(1,), dtype=float32),
  1: tf.Tensor([0.4379109], shape=(1,), dtype=float32),
  2: tf.Tensor([0.3473103], shape=(1,), dtype=float32),
  3: tf.Tensor([0.864537], shape=(1,), dtype=float32)
}
PerReplica:{
  0: tf.Tensor([0.33274612], shape=(1,), dtype=float32),
  1: tf.Tensor([0.40963104], shape=(1,), dtype=float32),
  2: tf.Tensor([0.6077633], shape=(1,), dtype=float32),
  3: tf.Tensor([0.3069321], shape=(1,), dtype=float32)
}
PerReplica:{
  0: tf.Tensor([0.75392634], shape=(1,), dtype=float32),
  1: tf.Tensor([0.17748609], shape=(1,), dtype=float32),
  2: tf.Tensor([0.88923043], shape=(1,), dtype=float32),
  3: tf.Tensor([0.30796507], shape=(1,), dtype=float32)
}
PerReplica:{
  0: tf.Tensor([0.61147165], shape=(1,), dtype=float32),
  1: tf.Tensor([0.33569282], shape=(1,), dtype=float32),
  2: tf.Tensor([0.76590794], shape=(1,), dtype=float32),
  3: tf.Tensor([0.41486776], shape=(1,), dtype=float32)
}
