{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "rA5Mubike7OJ" }, "source": [ "##### Copyright 2020 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "cellView": "form", "execution": { "iopub.execute_input": "2022-12-15T02:11:29.011040Z", "iopub.status.busy": "2022-12-15T02:11:29.010481Z", "iopub.status.idle": "2022-12-15T02:11:29.015023Z", "shell.execute_reply": "2022-12-15T02:11:29.014351Z" }, "id": "fY0a3LRYfHUl" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "iNz7xXMSsAQa" }, "source": [ "# ParameterServerStrategy를 사용한 매개변수 서버 훈련" ] }, { "cell_type": "markdown", "metadata": { "id": "jHyqRIqxsJuc" }, "source": [ "
![]() | \n",
" ![]() | \n",
" ![]() | \n",
" ![]() | \n",
"
shuffle
및 `repeat`에 대한 자세한 내용은 tf.data 가이드의 \"훈련 워크플로\" 섹션을 참조하세요."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"execution": {
"iopub.execute_input": "2022-12-15T02:11:36.893850Z",
"iopub.status.busy": "2022-12-15T02:11:36.893541Z",
"iopub.status.idle": "2022-12-15T02:11:37.378678Z",
"shell.execute_reply": "2022-12-15T02:11:37.377535Z"
},
"id": "shAo1CCS7wU1"
},
"outputs": [],
"source": [
"global_batch_size = 64\n",
"\n",
"x = tf.random.uniform((10, 10))\n",
"y = tf.random.uniform((10,))\n",
"\n",
"dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()\n",
"dataset = dataset.batch(global_batch_size)\n",
"dataset = dataset.prefetch(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "v_jhF70K7zON"
},
"source": [
"대신 `tf.keras.utils.experimental.DatasetCreator`를 사용하여 데이터세트를 생성하면 각 작업자 머신의 입력 장치(일반적으로 CPU)에서 `dataset_fn`의 코드가 호출됩니다.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "w60PuWrWwBD4"
},
"source": [
"### 모델 구성 및 컴파일\n",
"\n",
"이제 데모 목적의 간단한 `tf.keras.models.Sequential` 모델인 `tf.keras.Model`을 만든 다음 옵티마이저와 같은 구성 요소 및 `steps_per_execution`과 같은 기타 매개변수를 도입하기 위한 `Model.compile` 호출이 이루어집니다."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"execution": {
"iopub.execute_input": "2022-12-15T02:11:37.383060Z",
"iopub.status.busy": "2022-12-15T02:11:37.382739Z",
"iopub.status.idle": "2022-12-15T02:11:37.414958Z",
"shell.execute_reply": "2022-12-15T02:11:37.413799Z"
},
"id": "PhTHUYaD74vT"
},
"outputs": [],
"source": [
"with strategy.scope():\n",
" model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])\n",
"\n",
" model.compile(tf.keras.optimizers.legacy.SGD(), loss=\"mse\", steps_per_execution=10)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "nWb_Ekm377YX"
},
"source": [
"### 콜백 및 훈련\n",
"\n",
" \n",
"\n",
"실제 훈련을 위해 Keras `Model.fit`을 호출하기 전에 다음과 같은 일반적인 작업에 필요한 [콜백](https://www.tensorflow.org/guide/keras/train_and_evaluate)을 준비합니다.\n",
"\n",
"- `tf.keras.callbacks.ModelCheckpoint`: 매 epoch 후와 같이 특정 빈도로 모델을 저장합니다.\n",
"- `tf.keras.callbacks.BackupAndRestore`: 클러스터에 사용 불가능한 상황(중단 또는 선점 등)이 발생하는 경우 모델과 현재 epoch 번호를 백업하여 내결함성을 제공합니다. 그런 다음 작업 실패 후 다시 시작할 때 훈련 상태를 복원하고 중단된 epoch의 시작 부분부터 훈련을 계속할 수 있습니다.\n",
"- `tf.keras.callbacks.TensorBoard`: 요약 파일에 TensorBoard 도구에서 시각화할 수 있는 모델 로그를 주기적으로 작성합니다.\n",
"\n",
"참고: 성능 고려 사항으로 인해 사용자 정의 콜백은 `ParameterServerStrategy`와 함께 사용될 때 일괄 처리 수준 콜백을 재정의할 수 없습니다. 사용자 정의 콜백을 수정하여 epoch 수준 호출이 되도록 하고 `steps_per_epoch`를 적절한 값으로 조정합니다. 또한 `steps_per_epoch`는 `ParameterServerStrategy`와 함께 사용할 때 `Model.fit`에 대한 필수 인수입니다."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2022-12-15T02:11:37.419365Z",
"iopub.status.busy": "2022-12-15T02:11:37.418570Z",
"iopub.status.idle": "2022-12-15T02:11:48.712888Z",
"shell.execute_reply": "2022-12-15T02:11:48.711363Z"
},
"id": "3ddUvUZk7_wm"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 1/5\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow/python/data/ops/dataset_ops.py:461: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.\n",
" warnings.warn(\"To make it possible to preserve tf.data options across \"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2022-12-15 02:11:37.794389: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n",
"op: \"TensorSliceDataset\"\n",
"input: \"Placeholder/_0\"\n",
"input: \"Placeholder/_1\"\n",
"attr {\n",
" key: \"Toutput_types\"\n",
" value {\n",
" list {\n",
" type: DT_FLOAT\n",
" type: DT_FLOAT\n",
" }\n",
" }\n",
"}\n",
"attr {\n",
" key: \"_cardinality\"\n",
" value {\n",
" i: 10\n",
" }\n",
"}\n",
"attr {\n",
" key: \"is_files\"\n",
" value {\n",
" b: false\n",
" }\n",
"}\n",
"attr {\n",
" key: \"metadata\"\n",
" value {\n",
" s: \"\\n\\024TensorSliceDataset:0\"\n",
" }\n",
"}\n",
"attr {\n",
" key: \"output_shapes\"\n",
" value {\n",
" list {\n",
" shape {\n",
" dim {\n",
" size: 10\n",
" }\n",
" }\n",
" shape {\n",
" }\n",
" }\n",
" }\n",
"}\n",
"attr {\n",
" key: \"replicate_on_split\"\n",
" value {\n",
" b: false\n",
" }\n",
"}\n",
"experimental_type {\n",
" type_id: TFT_PRODUCT\n",
" args {\n",
" type_id: TFT_DATASET\n",
" args {\n",
" type_id: TFT_PRODUCT\n",
" args {\n",
" type_id: TFT_TENSOR\n",
" args {\n",
" type_id: TFT_FLOAT\n",
" }\n",
" }\n",
" args {\n",
" type_id: TFT_TENSOR\n",
" args {\n",
" type_id: TFT_FLOAT\n",
" }\n",
" }\n",
" }\n",
" }\n",
"}\n",
"\n",
"2022-12-15 02:11:37.794487: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n",
"op: \"TensorSliceDataset\"\n",
"input: \"Placeholder/_0\"\n",
"input: \"Placeholder/_1\"\n",
"attr {\n",
" key: \"Toutput_types\"\n",
" value {\n",
" list {\n",
" type: DT_FLOAT\n",
" type: DT_FLOAT\n",
" }\n",
" }\n",
"}\n",
"attr {\n",
" key: \"_cardinality\"\n",
" value {\n",
" i: 10\n",
" }\n",
"}\n",
"attr {\n",
" key: \"is_files\"\n",
" value {\n",
" b: false\n",
" }\n",
"}\n",
"attr {\n",
" key: \"metadata\"\n",
" value {\n",
" s: \"\\n\\024TensorSliceDataset:0\"\n",
" }\n",
"}\n",
"attr {\n",
" key: \"output_shapes\"\n",
" value {\n",
" list {\n",
" shape {\n",
" dim {\n",
" size: 10\n",
" }\n",
" }\n",
" shape {\n",
" }\n",
" }\n",
" }\n",
"}\n",
"attr {\n",
" key: \"replicate_on_split\"\n",
" value {\n",
" b: false\n",
" }\n",
"}\n",
"experimental_type {\n",
" type_id: TFT_PRODUCT\n",
" args {\n",
" type_id: TFT_DATASET\n",
" args {\n",
" type_id: TFT_PRODUCT\n",
" args {\n",
" type_id: TFT_TENSOR\n",
" args {\n",
" type_id: TFT_FLOAT\n",
" }\n",
" }\n",
" args {\n",
" type_id: TFT_TENSOR\n",
" args {\n",
" type_id: TFT_FLOAT\n",
" }\n",
" }\n",
" }\n",
" }\n",
"}\n",
"\n",
"2022-12-15 02:11:37.810542: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: \"TensorSliceDataset/_2\"\n",
"op: \"TensorSliceDataset\"\n",
"input: \"Placeholder/_0\"\n",
"input: \"Placeholder/_1\"\n",
"attr {\n",
" key: \"Toutput_types\"\n",
" value {\n",
" list {\n",
" type: DT_FLOAT\n",
" type: DT_FLOAT\n",
" }\n",
" }\n",
"}\n",
"attr {\n",
" key: \"_cardinality\"\n",
" value {\n",
" i: 10\n",
" }\n",
"}\n",
"attr {\n",
" key: \"is_files\"\n",
" value {\n",
" b: false\n",
" }\n",
"}\n",
"attr {\n",
" key: \"metadata\"\n",
" value {\n",
" s: \"\\n\\024TensorSliceDataset:0\"\n",
" }\n",
"}\n",
"attr {\n",
" key: \"output_shapes\"\n",
" value {\n",
" list {\n",
" shape {\n",
" dim {\n",
" size: 10\n",
" }\n",
" }\n",
" shape {\n",
" }\n",
" }\n",
" }\n",
"}\n",
"attr {\n",
" key: \"replicate_on_split\"\n",
" value {\n",
" b: false\n",
" }\n",
"}\n",
"experimental_type {\n",
" type_id: TFT_PRODUCT\n",
" args {\n",
" type_id: TFT_DATASET\n",
" args {\n",
" type_id: TFT_PRODUCT\n",
" args {\n",
" type_id: TFT_TENSOR\n",
" args {\n",
" type_id: TFT_FLOAT\n",
" }\n",
" }\n",
" args {\n",
" type_id: TFT_TENSOR\n",
" args {\n",
" type_id: TFT_FLOAT\n",
" }\n",
" }\n",
" }\n",
" }\n",
"}\n",
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Waiting for all global closures to be finished.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"20/20 - 8s - loss: 0.4865 - 8s/epoch - 419ms/step\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 2/5\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Waiting for all global closures to be finished.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"20/20 - 1s - loss: 0.4072 - 946ms/epoch - 47ms/step\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 3/5\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Waiting for all global closures to be finished.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"WARNING:tensorflow:5 out of the last 5 calls to tf.data.experimental.service
API 문서의 동적 샤딩 섹션 참조)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "H40X-9Gs3i7_"
},
"source": [
"### 사이드카 평가\n",
"\n",
"\n",
"\n",
"`tf.distribute.ParameterServerStrategy` 훈련에서 평가 루프를 정의하고 실행하는 또 다른 방법은 체크포인트를 반복적으로 읽고 최신 체크포인트에서 평가를 실행하는 전용 평가자 작업을 생성하는 *사이드카 평가*입니다(체크포인트에 대한 자세한 내용은 [이 가이드](../../guide/checkpoint.ipynb) 참조). 수석 및 작업자 작업은 평가에 시간을 들이지 않으므로 고정된 반복 횟수에 대해 전체 훈련 시간은 다른 평가 방법을 사용하는 것보다 짧습니다. 그러나 평가를 트리거하려면 추가적인 평가자 작업과 주기적인 체크포인트 절차가 필요합니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HonyjnXK9-ys"
},
"source": [
"사이드카 평가를 위한 평가 루프를 작성할 때 두 가지 옵션이 있습니다.\n",
"\n",
"1. `tf.keras.utils.SidecarEvaluator` API를 사용합니다.\n",
"2. 사용자 정의 평가 루프를 만듭니다.\n",
"\n",
"옵션 1에 대한 자세한 내용은 `tf.keras.utils.SidecarEvaluator` API 문서를 참조하세요."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "U_c0EiwB88OG"
},
"source": [
"사이드카 평가는 단일 작업에서만 지원됩니다. 이것은 다음을 의미합니다.\n",
"\n",
"- 각 예제는 확실하게 한 번만 평가됩니다. 평가자가 선점되거나 다시 시작되는 경우 최근 체크포인트에서 평가 루프를 다시 시작하고 다시 시작하기 전에 이루어진 평가 진행 부분은 폐기됩니다.\n",
"\n",
"- 그러나 단일 작업에 대해 평가를 실행하면 전체 평가에 시간이 오래 걸릴 수 있습니다.\n",
"\n",
"- 모델의 크기가 너무 커서 평가자의 메모리에 맞지 않는 경우 단일 사이드카 평가가 적용되지 않습니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "VNJoWVc797B1"
},
"source": [
"또 다른 주의 사항은 `tf.keras.utils.SidecarEvaluator` 구현과 아래의 사용자 정의 평가 루프가 항상 사용 가능한 최신 체크포인트를 선택하고 평가 epoch 동안 훈련 클러스터에서 여러 체크포인트를 생성할 수 있기 때문에 일부 체크포인트를 건너뛸 수 있다는 것입니다. 모든 체크포인트를 평가하는 사용자 정의 평가 루프를 작성할 수 있지만 이 튜토리얼에서는 다루지 않습니다. 반면에 체크포인트가 평가를 실행하는 데 걸리는 시간보다 덜 자주 생성되면 유휴 상태로 있을 수 있습니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "G5jopxBd85Ji"
},
"source": [
"사용자 정의 평가 루프는 평가할 체크포인트를 선택하거나 평가와 함께 실행할 추가 논리를 제공하는 등 세부 사항에 대한 더 많은 통제력을 제공합니다. 다음은 가능한 사용자 정의 사이드카 평가 루프입니다.\n",
"\n",
"```python\n",
"checkpoint_dir = ...\n",
"eval_model = ...\n",
"eval_data = ...\n",
"checkpoint = tf.train.Checkpoint(model=eval_model)\n",
"\n",
"for latest_checkpoint in tf.train.checkpoints_iterator(\n",
" checkpoint_dir):\n",
" try:\n",
" checkpoint.restore(latest_checkpoint).expect_partial()\n",
" except (tf.errors.OpError,) as e:\n",
" # checkpoint may be deleted by training when it is about to read it.\n",
" continue\n",
"\n",
" # Optionally add callbacks to write summaries.\n",
" eval_model.evaluate(eval_data)\n",
"\n",
" # Evaluation finishes when it has evaluated the last epoch.\n",
" if latest_checkpoint.endswith('-{}'.format(train_epochs)):\n",
" break\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "9TkNbtpPhFRQ"
},
"source": [
"## 실제 상황에서 클러스터\n",
"\n",
"\n",
"\n",
"참고: 이 섹션은 이 페이지의 튜토리얼 코드를 실행하는 데 필요하지 않습니다.\n",
"\n",
"실제 프로덕션 환경에서는 서로 다른 머신의 서로 다른 프로세스에서 모든 작업을 실행합니다. 각 작업에 대한 클러스터 정보를 구성하는 가장 간단한 방법은 `\"TF_CONFIG\"` 환경 변수를 설정하고 `tf.distribute.cluster_resolver.TFConfigClusterResolver`를 사용하여 `\"TF_CONFIG\"`를 구문 분석하는 것입니다.\n",
"\n",
"`\"TF_CONFIG\"` 환경 변수에 대한 일반적인 설명은 분산 훈련 가이드의 \"TF_CONFIG
환경 변수 설정\"을 참조하세요.\n",
"\n",
"Kubernetes 또는 기타 구성 템플릿을 사용하여 훈련 작업을 시작하는 경우 이러한 템플릿이 이미 `“TF_CONFIG\"`를 설정했을 가능성이 높습니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "n7AK9SJGt3tQ"
},
"source": [
"### `\"TF_CONFIG\"` 환경 변수 설정하기\n",
"\n",
"3개의 작업자와 2개의 매개변수 서버가 있다고 가정합니다. 그러면 작업자 1의 `\"TF_CONFIG\"`는 다음과 같을 수 있습니다.\n",
"\n",
"```python\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"worker\": [\"host1:port\", \"host2:port\", \"host3:port\"],\n",
" \"ps\": [\"host4:port\", \"host5:port\"],\n",
" \"chief\": [\"host6:port\"]\n",
" },\n",
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
"})\n",
"```\n",
"\n",
"평가자의 `\"TF_CONFIG\"`는 다음과 같을 수 있습니다.\n",
"\n",
"```python\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"evaluator\": [\"host7:port\"]\n",
" },\n",
" \"task\": {\"type\": \"evaluator\", \"index\": 0}\n",
"})\n",
"```\n",
"\n",
"위의 평가자에 대한 `\"TF_CONFIG\"` 문자열 중 `\"cluster\"` 부분은 선택 사항입니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fZRjMS0pt1LM"
},
"source": [
"### 모든 작업에 동일한 바이너리를 사용하는 경우\n",
"\n",
"단일 바이너리를 사용하여 이러한 모든 작업을 실행하려면 처음에 프로그램이 여러 역할로 분기되도록 해야 합니다.\n",
"\n",
"```python\n",
"cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
"if cluster_resolver.task_type in (\"worker\", \"ps\"):\n",
" # Start a TensorFlow server and wait.\n",
"elif cluster_resolver.task_type == \"evaluator\":\n",
" # Run sidecar evaluation\n",
"else:\n",
" # Run the coordinator.\n",
"```\n",
"\n",
"TensorFlow 서버를 시작하고 대기하는 다음 코드는 `\"worker\"` 및 `\"ps\"` 역할에 유용합니다.\n",
"\n",
"```python\n",
"# Set the environment variable to allow reporting worker and ps failure to the\n",
"# coordinator. This is a workaround and won't be necessary in the future.\n",
"os.environ[\"GRPC_FAIL_FAST\"] = \"use_caller\"\n",
"\n",
"server = tf.distribute.Server(\n",
" cluster_resolver.cluster_spec(),\n",
" job_name=cluster_resolver.task_type,\n",
" task_index=cluster_resolver.task_id,\n",
" protocol=cluster_resolver.rpc_layer or \"grpc\",\n",
" start=True)\n",
"server.join()\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZWdYfK593eOL"
},
"source": [
"## 작업 오류 처리하기"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Bl9eK5r13cOv"
},
"source": [
"### 작업자 오류\n",
"\n",
"`tf.distribute.coordinator.ClusterCoordinator` 사용자 정의 훈련 루프와 `Model.fit` 접근 방식은 모두 작업자 오류에 대한 내결함성을 기본적으로 제공합니다. 작업자 복구 시 `ClusterCoordinator`는 작업자에 대한 데이터세트 재생성을 호출합니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aP0OHZ1-Ne-B"
},
"source": [
"### 매개변수 서버 또는 코디네이터 오류\n",
"\n",
"그러나 코디네이터는 매개변수 서버 오류를 발견하면 즉시 `UnavailableError` 또는 `AbortedError`를 발생시킵니다. 이 경우 코디네이터를 다시 시작할 수 있습니다. 코디네이터 자체도 사용할 수 없게 될 수 있습니다. 따라서 훈련 진행 상황을 잃지 않기 위해 특정 도구를 사용하는 것이 좋습니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "f7m7Itoz8lsI"
},
"source": [
"- `Model.fit`의 경우 진행률 저장 및 복원을 자동으로 처리하는 `BackupAndRestore` 콜백을 사용해야 합니다. 예제는 위의 [콜백 및 훈련](#callbacks-and-training) 섹션을 참조하세요."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-XlLyJp53Z8A"
},
"source": [
"- 사용자 정의 훈련 루프의 경우 훈련이 시작되기 전에 주기적으로 모델 변수를 체크포인트하고 체크포인트에서 모델 변수를 로드해야 합니다(있는 경우). 옵티마이저가 체크포인트된 경우 훈련 진행 상황은 `optimizer.iterations`에서 대략적으로 추론할 수 있습니다.\n",
"\n",
"```python\n",
"checkpoint_manager = tf.train.CheckpointManager(\n",
" tf.train.Checkpoint(model=model, optimizer=optimizer),\n",
" checkpoint_dir,\n",
" max_to_keep=3)\n",
"if checkpoint_manager.latest_checkpoint:\n",
" checkpoint = checkpoint_manager.checkpoint\n",
" checkpoint.restore(\n",
" checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()\n",
"\n",
"global_steps = int(optimizer.iterations.numpy())\n",
"starting_epoch = global_steps // steps_per_epoch\n",
"\n",
"for _ in range(starting_epoch, num_epochs):\n",
" for _ in range(steps_per_epoch):\n",
" coordinator.schedule(step_fn, args=(per_worker_iterator,))\n",
" coordinator.join()\n",
" checkpoint_manager.save()\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "PlN1P7C53XK9"
},
"source": [
"### `RemoteValue` 가져오기\n",
"\n",
"함수가 성공적으로 실행되면 `RemoteValue` 가져오기의 성공이 보장됩니다. 현재는 함수가 실행된 후 반환 값이 즉시 코디네이터에 복사되기 때문입니다. 복사하는 동안 작업자 오류가 발생하면 사용 가능한 다른 작업자에서 함수가 재시도됩니다. 따라서 성능을 최적화하려면 반환 값 없이 함수를 예약할 수 있습니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "iZcR_xNZ3UdU"
},
"source": [
"## 오류 보고\n",
"\n",
"코디네이터가 매개변수 서버의 `UnavailableError`와 같은 오류 또는 `tf.debugging.check_numerics`의 `InvalidArgument`와 같은 기타 애플리케이션 오류를 확인하면 오류를 발생시키기 전에 보류 중 및 대기열에 있는 모든 함수를 취소합니다. 해당 `RemoteValue`를 가져오면 `CancelledError`가 발생합니다.\n",
"\n",
"오류가 발생한 후 코디네이터는 동일한 오류 또는 취소된 함수의 오류를 발생시키지 않습니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "QfhbXH-j3NVw"
},
"source": [
"## 성능 향상\n",
"\n",
"`tf.distribute.ParameterServerStrategy` 및 `tf.distribute.coordinator.ClusterCoordinator`로 훈련할 때 성능 문제가 발생할 수 있는 몇 가지 가능한 이유가 있습니다.\n",
"\n",
"한 가지 일반적인 이유는 매개변수 서버의 로드 균형이 맞지 않고 로드가 심한 일부 매개변수 서버가 용량에 도달했기 때문입니다. 또한 여러 근본 원인이 있을 수 있습니다. 다음과 같은 몇 가지 간단한 방법으로 이 문제를 완화할 수 있습니다.\n",
"\n",
"1. `ParameterServerStrategy`를 구성할 때 `variable_partitioner`를 지정하여 큰 모델 변수를 분할합니다.\n",
"2. 가능하면 모든 매개변수 서버에 필요한 핫스팟 변수를 단일 단계로 생성하지 않습니다. 예를 들어, 옵티마이저에서 일정한 학습률 또는 하위 클래스 `tf.keras.optimizers.schedules.LearningRateSchedule`을 사용합니다. 학습률이 특정 매개변수 서버에 배치되고 각 단계에서 다른 모든 매개변수 서버에서 요청하는 변수가 되는 것이 기본 동작이기 때문입니다.\n",
"3. Keras 전처리 레이어에 전달하기 전에 큰 어휘를 섞습니다.\n",
"\n",
"성능 문제의 또 다른 가능한 이유는 코디네이터에 있습니다. `schedule`/`join`의 구현은 Python 기반이므로 스레딩 오버헤드가 있을 수 있습니다. 또한 코디네이터와 작업자 간의 대기 시간이 클 수 있습니다. 이러한 경우라면 다음과 같이 할 수 있습니다.\n",
"\n",
"- `Model.fit`의 경우 `Model.compile`에 제공된 `steps_per_execution` 인수를 1보다 큰 값으로 설정할 수 있습니다.\n",
"\n",
"- 사용자 정의 훈련 루프의 경우 여러 단계를 단일 `tf.function`으로 묶을 수 있습니다.\n",
"\n",
"```python\n",
"steps_per_invocation = 10\n",
"\n",
"@tf.function\n",
"def step_fn(iterator):\n",
" for _ in range(steps_per_invocation):\n",
" features, labels = next(iterator)\n",
" def replica_fn(features, labels):\n",
" ...\n",
"\n",
" strategy.run(replica_fn, args=(features, labels))\n",
"```\n",
"\n",
"라이브러리가 더욱 최적화됨에 따라 앞으로 대부분의 사용자가 수동으로 단계를 묶을 필요가 없게 되기를 바랍니다.\n",
"\n",
"또한 성능 향상을 위한 약간의 요령은 위의 [작업 오류 처리하기](#handling_task_failure) 섹션에서 설명한 대로 반환 값 없이 함수를 예약하는 것입니다."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "chu5F7M_JmVk"
},
"source": [
"## 알려진 제한 사항\n",
"\n",
" \n",
"\n",
"알려진 대부분의 제한 사항은 이미 위 섹션에서 다루었습니다. 이 섹션에서는 요약을 제공합니다.\n",
"\n",
"### `ParameterServerStrategy` 일반\n",
"\n",
"- 내결함성이 제대로 작동하려면 코디네이터를 포함한 모든 작업에 `os.environment[\"grpc_fail_fast\"]=\"use_caller\"`가 필요합니다.\n",
"- 동기 매개변수 서버 훈련은 지원되지 않습니다.\n",
"- 최적의 성능을 얻으려면 일반적으로 여러 단계를 단일 함수로 압축해야 합니다.\n",
"- 샤딩된 변수를 포함하는 `tf.saved_model.load`를 통해 saved_model을 로드하는 것은 지원되지 않습니다. TensorFlow Serving을 사용하여 이러한 stored_model을 로드하는 것은 작동할 것으로 예상됩니다(자세한 내용은 [제공 튜토리얼](https://www.tensorflow.org/tfx/tutorials/serving/rest_simple) 참조).\n",
"- 코디네이터 작업을 다시 시작하지 않고 매개변수 서버 오류에서 복구하는 것은 지원되지 않습니다.\n",
"- `tf.keras.layers.IntegerLookup`, `tf.keras.layers.StringLookup` 및 `tf.keras.layers.TextVectorization`과 같은 일부 Keras 전처리 레이어에서 일반적으로 사용되는 `tf.lookup.StaticHashTable`의 생성은 `Strategy.scope` 아래에 배치해야 합니다. 그렇지 않으면 리소스가 코디네이터에 배치되고 작업자에서 코디네이터로의 조회 RPC가 성능에 영향을 미칩니다.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2MKBF0RPSvzB"
},
"source": [
"### `Model.fit` 특이 사항\n",
"\n",
"- `steps_per_epoch` 인수는 `Model.fit`에 필요합니다. Epoch에서 적절한 간격을 제공하는 값을 선택할 수 있습니다.\n",
"- `ParameterServerStrategy`는 성능상의 이유로 배치 수준 호출이 있는 사용자 정의 콜백을 지원하지 않습니다. 이러한 호출을 적절하게 선택된 `steps_per_epoch`를 이용해 epoch 수준 호출로 변환하여 `steps_per_epoch` 단계 수마다 호출되도록 해야 합니다. 내장 콜백은 영향을 받지 않습니다(해당 배치 수준 호출이 성능을 발휘하도록 수정되었음). `ParameterServerStrategy`에 대한 배치 수준 호출을 지원할 계획에 있습니다.\n",
"- 같은 이유로, 다른 전략과 달리 진행률 표시줄과 메트릭은 epoch 경계에서만 기록됩니다.\n",
"- `run_eagerly`는 지원되지 않습니다.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wvY-mg35Sx5L"
},
"source": [
"### 사용자 정의 훈련 루프 특이 사항\n",
"\n",
"- `ClusterCoordinator.schedule`은 데이터세트에 대한 방문 보장을 지원하지 않습니다.\n",
"- `ClusterCoordinator.create_per_worker_dataset`이 콜러블을 입력으로 사용하는 경우 전달된 함수 내에서 전체 데이터세트를 생성해야 합니다.\n",
"- `tf.data.Options`는 `ClusterCoordinator.create_per_worker_dataset`에 의해 생성된 데이터세트에서 무시됩니다."
]
}
],
"metadata": {
"accelerator": "GPU",
"colab": {
"collapsed_sections": [],
"name": "parameter_server_training.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.16"
}
},
"nbformat": 4,
"nbformat_minor": 0
}