{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "rA5Mubike7OJ" }, "source": [ "##### Copyright 2020 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "cellView": "form", "execution": { "iopub.execute_input": "2024-01-11T18:21:23.208425Z", "iopub.status.busy": "2024-01-11T18:21:23.208189Z", "iopub.status.idle": "2024-01-11T18:21:23.211989Z", "shell.execute_reply": "2024-01-11T18:21:23.211409Z" }, "id": "fY0a3LRYfHUl" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "iNz7xXMSsAQa" }, "source": [ "# ParameterServerStrategy でパラメータサーバーをトレーニングする" ] }, { "cell_type": "markdown", "metadata": { "id": "jHyqRIqxsJuc" }, "source": [ "
![]() | \n",
" ![]() | \n",
" ![]() | \n",
" ![]() | \n",
"
tf.distribute.ParameterServerStrategy
トレーニングで評価ループを定義して実行する別の方法で、最新のチェックポイントでチェックポイントを繰り返し読み取り評価を実行する専用の評価タスクを作成します。(チェックポイントの詳細については、[このガイド](../../guide/checkpoint.ipynb)を参照してください)。コーディネータータスクとワーカータスクは評価に時間を費やさないため、反復回数が一定であれば、全体のトレーニング時間は他の評価方法を使用するよりも短くなります。ただし、評価をトリガーするには、追加のエバリュエータタスクと定期的なチェックポイントが必要です。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "HonyjnXK9-ys"
},
"source": [
"サイドカー評価の評価ループを作成するには、次の 2 つのオプションがあります。\n",
"\n",
"1. `tf.keras.utils.SidecarEvaluator` API を使用する。\n",
"2. カスタム評価ループを作成する。\n",
"\n",
"オプション 1 の詳細については、`tf.keras.utils.SidecarEvaluator` API ドキュメントを参照してください。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "U_c0EiwB88OG"
},
"source": [
"サイドカー評価は、単一のタスクでのみサポートされています。 これは、次のことを意味します。\n",
"\n",
"- 各サンプルが 1 回評価されることが保証されます。エバリュエータがプリエンプトまたは再起動された場合、最新のチェックポイントから評価ループを再起動し、再起動前に行われた部分的な評価の進行状況は破棄されます。\n",
"\n",
"- ただし、単一のタスクで評価を実行すると、完全な評価に時間がかかる可能性があります。\n",
"\n",
"- モデルのサイズが大きすぎてエバリュエータのメモリに収まらない場合、単一のサイドカー評価は適用されません。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "VNJoWVc797B1"
},
"source": [
"もう 1 つの注意点は、`tf.keras.utils.SidecarEvaluator` の実装と以下のカスタム評価ループが、一部のチェックポイントをスキップする可能性があるということです。利用可能な最新のチェックポイントは、常に取得され、評価エポック中に複数のチェックポイントがトレーニングクラスタから生成されるからです。すべてのチェックポイントを評価するカスタム評価ループを作成できますが、このチュートリアルでは扱いません。一方、評価の実行にかかる時間よりもチェックポイントの生成頻度が低い場合は、アイドル状態になる可能性があります。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "G5jopxBd85Ji"
},
"source": [
"カスタム評価ループを使用すると、評価するチェックポイントを選択したり、評価とともに実行する追加のロジックを提供したりするなど、詳細を制御できます。以下は、カスタムサイドカー評価ループの例です。\n",
"\n",
"```python\n",
"checkpoint_dir = ...\n",
"eval_model = ...\n",
"eval_data = ...\n",
"checkpoint = tf.train.Checkpoint(model=eval_model)\n",
"\n",
"for latest_checkpoint in tf.train.checkpoints_iterator(\n",
" checkpoint_dir):\n",
" try:\n",
" checkpoint.restore(latest_checkpoint).expect_partial()\n",
" except (tf.errors.OpError,) as e:\n",
" # checkpoint may be deleted by training when it is about to read it.\n",
" continue\n",
"\n",
" # Optionally add callbacks to write summaries.\n",
" eval_model.evaluate(eval_data)\n",
"\n",
" # Evaluation finishes when it has evaluated the last epoch.\n",
" if latest_checkpoint.endswith('-{}'.format(train_epochs)):\n",
" break\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "9TkNbtpPhFRQ"
},
"source": [
"## 現実世界のクラスタ\n",
"\n",
"\n",
"\n",
"注意: このセクションは、このページのチュートリアルコードを実行するためには必要ありません。\n",
"\n",
"実際の運用環境では、すべてのタスクをさまざまなマシンのさまざまなプロセスで実行します。各タスクでクラスタ情報を構成する最も簡単な方法は、`\"TF_CONFIG\"` 環境変数を設定し、`tf.distribute.cluster_resolver.TFConfigClusterResolver` を使用して `\"TF_CONFIG\"` を解析することです。\n",
"\n",
"`\"TF_CONFIG\"` 環境変数の一般的な説明については、[分散トレーニング](../../guide/distributed_training.ipynb)ガイドの「`TF_CONFIG` 環境変数の設定」を参照してください。\n",
"\n",
"Kubernetes やその他の構成テンプレートを使用してトレーニングタスクを開始すると、これらのテンプレートにより `“TF_CONFIG\"` が既に設定されている可能性があります。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "n7AK9SJGt3tQ"
},
"source": [
"### `\"TF_CONFIG\"` 環境変数の設定\n",
"\n",
"3 つのワーカーと 2 つのパラメータサーバーがあるとします。ワーカー 1 の `\"TF_CONFIG\"` は次のようになります。\n",
"\n",
"```python\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"worker\": [\"host1:port\", \"host2:port\", \"host3:port\"],\n",
" \"ps\": [\"host4:port\", \"host5:port\"],\n",
" \"chief\": [\"host6:port\"]\n",
" },\n",
" \"task\": {\"type\": \"worker\", \"index\": 1}\n",
"})\n",
"```\n",
"\n",
"エバリュエータの `\"TF_CONFIG\"` は次のとおりです。\n",
"\n",
"```python\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({\n",
" \"cluster\": {\n",
" \"evaluator\": [\"host7:port\"]\n",
" },\n",
" \"task\": {\"type\": \"evaluator\", \"index\": 0}\n",
"})\n",
"```\n",
"\n",
"上記のエバリュエータの `\"TF_CONFIG\"` 文字列の `\"cluster\"` の部分はオプションです"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fZRjMS0pt1LM"
},
"source": [
"### すべてのタスクで同じバイナリを使用する場合\n",
"\n",
"単一のバイナリを使用してこれらすべてのタスクを実行する場合は、最初にプログラムをさまざまなロールに分岐させる必要があります。\n",
"\n",
"```python\n",
"cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()\n",
"if cluster_resolver.task_type in (\"worker\", \"ps\"):\n",
" # Start a TensorFlow server and wait.\n",
"elif cluster_resolver.task_type == \"evaluator\":\n",
" # Run sidecar evaluation\n",
"else:\n",
" # Run the coordinator.\n",
"```\n",
"\n",
"次のコードは、TensorFlow サーバーを起動して待機します。これは、`\"worker\"` および `\"ps\"` ロールに役立ちます。\n",
"\n",
"```python\n",
"# Set the environment variable to allow reporting worker and ps failure to the\n",
"# coordinator. This is a workaround and won't be necessary in the future.\n",
"os.environ[\"GRPC_FAIL_FAST\"] = \"use_caller\"\n",
"\n",
"server = tf.distribute.Server(\n",
" cluster_resolver.cluster_spec(),\n",
" job_name=cluster_resolver.task_type,\n",
" task_index=cluster_resolver.task_id,\n",
" protocol=cluster_resolver.rpc_layer or \"grpc\",\n",
" start=True)\n",
"server.join()\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZWdYfK593eOL"
},
"source": [
"## タスクの障害の処理"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Bl9eK5r13cOv"
},
"source": [
"### ワーカーの障害\n",
"\n",
"`tf.distribute.coordinator.ClusterCoordinator` カスタムトレーニングループと `Model.fit` アプローチの両方が、ワーカーの障害に対する組み込みのフォールトトレランスを提供します。ワーカーの復旧時に、`ClusterCoordinator` はワーカーでデータセットの再作成を呼び出します。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aP0OHZ1-Ne-B"
},
"source": [
"### パラメータサーバーまたはコーディネータの障害\n",
"\n",
"コーディネータがパラメータサーバーエラーを検出すると、すぐに `UnavailableError` または `AbortedError` が発生します。この場合、コーディネータを再起動できます。また、コーディネータ自体も利用できなくなる可能性があるので、トレーニングの進行状況を失わないようにするためのツールを使用することを推薦します。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "f7m7Itoz8lsI"
},
"source": [
"- `Model.fit` の場合、進行状況の保存と復元を自動的に処理する `BackupAndRestore` コールバックを使用する必要があります。例については、上記の[コールバックとトレーニング](#callbacks-and-training) セクションを参照してください。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-XlLyJp53Z8A"
},
"source": [
"- カスタムトレーニングループの場合、モデル変数を定期的にチェックポイントし、チェックポイントがある場合は、トレーニングを開始する前にモデル変数を読み込む必要があります。オプティマイザがチェックポイントされている場合、トレーニングの進行状況は `optimizer.iterations` からおおよそ推測できます。\n",
"\n",
"```python\n",
"checkpoint_manager = tf.train.CheckpointManager(\n",
" tf.train.Checkpoint(model=model, optimizer=optimizer),\n",
" checkpoint_dir,\n",
" max_to_keep=3)\n",
"if checkpoint_manager.latest_checkpoint:\n",
" checkpoint = checkpoint_manager.checkpoint\n",
" checkpoint.restore(\n",
" checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()\n",
"\n",
"global_steps = int(optimizer.iterations.numpy())\n",
"starting_epoch = global_steps // steps_per_epoch\n",
"\n",
"for _ in range(starting_epoch, num_epochs):\n",
" for _ in range(steps_per_epoch):\n",
" coordinator.schedule(step_fn, args=(per_worker_iterator,))\n",
" coordinator.join()\n",
" checkpoint_manager.save()\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "PlN1P7C53XK9"
},
"source": [
"### `RemoteValue` のフェッチ\n",
"\n",
"関数が正常に実行された場合、`RemoteValue` のフェッチは確実に成功します。これは、現在、関数が実行された後、戻り値がすぐにコーディネータにコピーされるためです。コピー中にワーカーに障害が発生した場合、関数は別の使用可能なワーカーで再試行されます。したがって、パフォーマンスを最適化するには、戻り値なしで関数をスケジュールします。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "iZcR_xNZ3UdU"
},
"source": [
"## エラーレポート\n",
"\n",
"コーディネータは、パラメータサーバーからの `UnavailableError` などのエラーや、`tf.debugging.check_numerics` からの `InvalidArgument` などの他のアプリケーションエラーを確認すると、エラーを発生する前に、保留中およびキューに入れられたすべての関数をキャンセルします。対応する `RemoteValue` をフェッチすると、`CancelledError` が発生します。\n",
"\n",
"エラーが発生した後、コーディネータは同じエラーまたはキャンセルされた関数からのエラーを発生しません。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "QfhbXH-j3NVw"
},
"source": [
"## パフォーマンスの改善\n",
"\n",
"`tf.distribute.ParameterServerStrategy` と `tf.distribute.coordinator.ClusterCoordinator` でトレーニングするときにパフォーマンスの問題が発生することがあります。\n",
"\n",
"一般的に、パラメータサーバーの負荷が不均衡であり、負荷の高い一部のパラメータサーバーが制限容量に達した場合に発生します。 また、複数の根本原因が存在する場合もあります。この問題を軽減する簡単な方法は次のとおりです。\n",
"\n",
"1. `ParameterServerStrategy` を構築するときに `variable_partitioner` を指定して、大規模なモデルの変数を分割します。\n",
"2. 次のようにして、すべてのパラメータサーバーで必要なホットスポット変数を 1 つのステップで作成することは避けてください。\n",
"\n",
"1. オプティマイザで一定の学習率またはサブクラス `tf.keras.optimizers.schedules.LearningRateSchedule` を使用します。これは、デフォルトの動作では、学習率は特定のパラメータサーバーに配置される変数になり、各ステップで他のすべてのパラメータサーバーによって要求されるためです。\n",
"\n",
"2. `tf.keras.optimizers.legacy.Optimizer` を使用します(標準の `tf.keras.optimizers.Optimizer` では、ホットスポット変数になる可能性があります)。\n",
"\n",
"1. 大きな語彙は、Keras の前処理レイヤーに渡す前にシャッフルします。\n",
"\n",
"もう 1 つのパフォーマンスの問題の原因は、コーディネータです。 `schedule`/`join` の実装は Python ベースであるため、スレッドのオーバーヘッドが発生する場合があります。また、コーディネータとワーカー間の待ち時間が長くなる可能性があります。このような場合は、次のようにします。\n",
"\n",
"- `Model.fit` では、`Model.compile` で提供される `steps_per_execution` 引数を 1 より大きい値に設定します。\n",
"\n",
"- カスタムトレーニングループでは、複数のステップを 1 つの `tf.function` にまとめることができます。\n",
"\n",
"```python\n",
"steps_per_invocation = 10\n",
"\n",
"@tf.function\n",
"def step_fn(iterator):\n",
" for _ in range(steps_per_invocation):\n",
" features, labels = next(iterator)\n",
" def replica_fn(features, labels):\n",
" ...\n",
"\n",
" strategy.run(replica_fn, args=(features, labels))\n",
"```\n",
"\n",
"今後ライブラリがさらに最適化されるにつれて、ほとんどのユーザーはステップを手動でまとめる必要がなくなることでしょう。\n",
"\n",
"また、上記の[タスクの障害の処理セクション](#handling_task_failure)で説明したように、パフォーマンスを向上させるために、戻り値なしで関数をスケジュールすることもできます。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "chu5F7M_JmVk"
},
"source": [
"## 既知の制限\n",
"\n",
" \n",
"\n",
"既知の制限のほとんどは、上記のセクションで既に説明されています。このセクションでは、概要を説明します。\n",
"\n",
"### `ParameterServerStrategy` 全般\n",
"\n",
"- `os.environment[\"grpc_fail_fast\"]=\"use_caller\"` は、フォールトトレランスを適切に機能させるために、コーディネータを含むすべてのタスクで必要です。\n",
"- 同期パラメータサーバートレーニングはサポートされていません。\n",
"- 通常、パフォーマンスを最適化するには、複数のステップを 1 つの関数にまとめる必要があります。\n",
"- 分割された変数を含む `tf.saved_model.load` 経由での saved_model の読み込みはサポートされていません。注意: TensorFlow Serving を使用したこのような saved_model の読み込みは機能することが期待されています (詳細については、[サービングのチュートリアル](https://www.tensorflow.org/tfx/tutorials/serving/rest_simple)を参照してください)。\n",
"- コーディネータタスクを再起動せずにパラメータサーバーの障害から回復することできません。\n",
"- `tf.keras.layers.IntegerLookup`、`tf.keras.layers.StringLookup`、`tf.keras.layers.TextVectorization`、などの一部の Keras 前処理レイヤーで一般的に使用される `tf.lookup.StaticHashTable` は、`Strategy.scope` の下に配置する必要があります。そうしないと、リソースがコーディネータに配置され、ワーカーからコーディネータへのルックアップ RPC がパフォーマンスに影響を与えます。\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2MKBF0RPSvzB"
},
"source": [
"### `Model.fit` のみ\n",
"\n",
"- `Model.fit` には `steps_per_epoch` 引数が必要です。エポックで適切な間隔を提供する値を選択します。\n",
"- `ParameterServerStrategy` は、パフォーマンス上の理由から、バッチレベルの呼び出しを持つカスタムコールバックをサポートしていません。これらの呼び出しを適切に選択された `steps_per_epoch` を持つエポックレベルの呼び出しに変換して、`steps_per_epoch` のステップ数ごとに呼び出されるようにする必要があります。バッチレベルの呼び出しはパフォーマンスが向上するように変更されているので、組み込みのコールバックは影響を受けません。`ParameterServerStrategy` のバッチレベルの呼び出しのサポートは計画されています。\n",
"- 同じ理由で、他のストラテジーとは異なり、進捗バーと指標はエポック境界でのみログに記録されます。\n",
"- `run_eagerly` は、サポートされていません。\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "wvY-mg35Sx5L"
},
"source": [
"### カスタムトレーニングループのみ\n",
"\n",
"- `ClusterCoordinator.schedule` は一般にデータセットの評価保証をサポートしていませんが、評価保証は `Model.fit/.evaluate` を通じて可能です。[1 回限りの評価を有効にする](#exactly_once_evaluation)をご覧ください。\n",
"- `ClusterCoordinator.create_per_worker_dataset` が callable と入力として使用される場合、渡された関数内でデータセット全体を作成する必要があります。\n",
"- `tf.data.Options` は、`ClusterCoordinator.create_per_worker_dataset` により作成されたデータセットでは無視されます。"
]
}
],
"metadata": {
"accelerator": "GPU",
"colab": {
"name": "parameter_server_training.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 0
}