{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "Tce3stUlHN0L" }, "source": [ "##### Copyright 2018 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "cellView": "form", "execution": { "iopub.execute_input": "2024-01-11T20:33:52.374264Z", "iopub.status.busy": "2024-01-11T20:33:52.373850Z", "iopub.status.idle": "2024-01-11T20:33:52.377353Z", "shell.execute_reply": "2024-01-11T20:33:52.376777Z" }, "id": "tuOe1ymfHZPu" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "MfBg1C5NB3X0" }, "source": [ "# TensorFlow による分散型トレーニング" ] }, { "cell_type": "markdown", "metadata": { "id": "r6P32iYYV27b" }, "source": [ "
![]() | \n",
" ![]() | \n",
" ![]() | \n",
" ![]() | \n",
"
tf.compat.v1.distribute.experimental.ParameterServerStrategy` シンボルを介した Estimator でのみ使用できます。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "E20tG21LFfv1"
},
"source": [
"注意: このストラテジーは現在改善中であり、より多くのシナリオで機能できるようにしていることから、「[`実験的`](https://www.tensorflow.org/guide/versions#what_is_not_covered)」に指定されています。この改善プロセスの一環として、API の振る舞いが今後変更される可能性があることにご注意ください。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "45H0Wa8WKI8z"
},
"source": [
"### CentralStorageStrategy\n",
"\n",
"`tf.distribute.experimental.CentralStorageStrategy` も同期トレーニングを行います。変数はミラーリングされず、代わりに CPU に配置され、演算はすべてのローカル GPU に複製されます。GPU が 1 つしかない場合は、すべての変数と演算はその GPU に配置されます。\n",
"\n",
"次のようにして、`CentralStorageStrategy` のインスタンスを作成します。\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:33:57.977372Z",
"iopub.status.busy": "2024-01-11T20:33:57.976818Z",
"iopub.status.idle": "2024-01-11T20:33:57.980670Z",
"shell.execute_reply": "2024-01-11T20:33:57.980060Z"
},
"id": "rtjZOyaoMWrP"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3'], variable_device = '/device:CPU:0'\n"
]
}
],
"source": [
"central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KY1nJHNkMl7b"
},
"source": [
"これにより、すべての GPU と CPU を使用する `CentralStorageStrategy` インスタンスが作成されます。レプリカでの変数の更新は、変数に適用される前に集計されます。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "aAFycYUiNCUb"
},
"source": [
"注意: このストラテジーは進行中の作業であり、[`experimental`](https://www.tensorflow.org/guide/versions#what_is_not_covered) となっています。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "t2XUdmIxKljq"
},
"source": [
"### その他のストラテジー\n",
"\n",
"上述のストラテジーのほかに、`tf.distribute` API を使用する際のプロトタイピングとデバッグに役立つ可能性のあるストラテジーが 2 つあります。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "UD5I1beTpc7a"
},
"source": [
"#### デフォルトストラテジー\n",
"\n",
"明示的な分散ストラテジーがスコープにない場合には、分散ストラテジーとしてデフォルトストラテジーが使用されます。`tf.distribute.Strategy` インターフェースを実装しますが、パススルーであるため、実際の分散を提供しません。たとえば、`strategy.run(fn)` は `fn` だけを呼び出します。このストラテジーを使用して書かれたコードは、ストラテジーを指定せずに書かれたコードとまった同じ振る舞いとなります。「演算なし」ストラテジーとして考えるとわかりやすいかもしれません。\n",
"\n",
"デフォルトストラテジーはシングルトンであり、インスタンスを 1 つしか作成できません。明示的なストラテジーのスコープの外で、`tf.distribute.get_strategy()` を使って取得することができます(明示的なストラテジーのスコープ内で現在のストラテジーを取得するために使用するのと同じ API)。"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:33:57.984320Z",
"iopub.status.busy": "2024-01-11T20:33:57.983797Z",
"iopub.status.idle": "2024-01-11T20:33:57.986889Z",
"shell.execute_reply": "2024-01-11T20:33:57.986285Z"
},
"id": "ibHleFOOmPn9"
},
"outputs": [],
"source": [
"default_strategy = tf.distribute.get_strategy()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "EkxPl_5ImLzc"
},
"source": [
"このストラテジーには、主に 2 つの目的があります。\n",
"\n",
"- 無条件で分散対応のライブラリコードを記述すること。たとえば、`tf.keras.optimizers` において、`tf.distribute.get_strategy` を実行し、勾配を減らすためにそのストラテジーを使用します。これは常に、`Strategy.reduce` API を呼び出せるストラテジーオブジェクトを返します。\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:33:57.990118Z",
"iopub.status.busy": "2024-01-11T20:33:57.989566Z",
"iopub.status.idle": "2024-01-11T20:33:57.996414Z",
"shell.execute_reply": "2024-01-11T20:33:57.995870Z"
},
"id": "WECeRzUdT6bU"
},
"outputs": [
{
"data": {
"text/plain": [
"1.0"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# In optimizer or other library code\n",
"# Get currently active strategy\n",
"strategy = tf.distribute.get_strategy()\n",
"strategy.reduce(\"SUM\", 1., axis=None) # reduce some values"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "JURbH-pUT51B"
},
"source": [
"- ライブラリコードと同様に、条件論理を必要とせずに、分散ストラテジーの有無に関係なく機能するエンドユーザーのプログラムを記述すること。これを説明したサンプルコードスニペットを、次に示します。"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:33:57.999835Z",
"iopub.status.busy": "2024-01-11T20:33:57.999268Z",
"iopub.status.idle": "2024-01-11T20:33:58.020164Z",
"shell.execute_reply": "2024-01-11T20:33:58.019566Z"
},
"id": "O4Vmae5jmSE6"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"MirroredVariable:{\n",
" 0: ,\n",
" 1: ,\n",
" 2: ,\n",
" 3: \n",
"}\n"
]
}
],
"source": [
"if tf.config.list_physical_devices('GPU'):\n",
" strategy = tf.distribute.MirroredStrategy()\n",
"else: # Use the Default Strategy\n",
" strategy = tf.distribute.get_strategy()\n",
"\n",
"with strategy.scope():\n",
" # Do something interesting\n",
" print(tf.Variable(1.))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "kTzsqN4lmJ0d"
},
"source": [
"#### OneDeviceStrategy\n",
"\n",
"`tf.distribute.OneDeviceStrategy` は、すべての変数と計算を単一の指定デバイスに配置するストラテジーです。\n",
"\n",
"```python\n",
"strategy = tf.distribute.OneDeviceStrategy(device=\"/gpu:0\")\n",
"```\n",
"\n",
"このストラテジーは、多数の点において、デフォルトストラテジーと異なります。デフォルトストラテジーでは、分散ストラテジーを使用せずに TensorFlow を実行した場合と比べ、変数の配置ロジックに変化はありません。しかし、`OneDeviceStrategy` を使用した場合、そのスコープで作成されるすべての変数は、指定されたデバイスに明示的に配置されます。さらに、`OneDeviceStrategy.run` から呼び出される関数も、その指定デバイスに配置されるようになります。\n",
"\n",
"このストラテジーを通じて分散された入力は、指定デバイスにプリフェッチされます。デフォルトストラテジーには、入力の分散はありません。\n",
"\n",
"デフォルトストラテジーと同様に、このストラテジーも、複数のデバイス/マシンに実際に分散する別のストラテジーに切り替える前のコードテストに使用することができます。これにより、分散ストラテジーの仕組みはある程度はデフォルトストラテジーよりもが強化されますが、`MirroredStrategy` や `TPUStrategy` などを使用した場合ほど最大限には強化されません。ストラテジーが指定されていないかのようなコードの振る舞いを希望する場合は、デフォルトストラテジーを使用してください。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "hQv1lm9UPDFy"
},
"source": [
"以上、利用できるさまざまなストラテジーとそのインスタンス化の方法を見てきました。以下では、トレーニングを分散化するために使用できるさまざまな方法を見ていきます。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_mcuy3UhPcen"
},
"source": [
"## `tf.keras.Model.fit` を使った `tf.distribute.Strategy` の使用\n",
"\n",
"`tf.distribute.Strategy` は、TensorFlow の [Keras API 仕様](https://keras.io/api/)の実装である `tf.keras` に統合されています。`tf.keras` は、モデルを構築してトレーニングするための高位 API です。`tf.keras` バックエンドに統合することによって、[Model.fitを使用する](https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit) Keras トレーニングフレームワークに記述されたトレーニングの分散をシームレスに行えるようになっています。\n",
"\n",
"コードを次のように変更してください。\n",
"\n",
"1. 適切な `tf.distribute.Strategy` のインスタンスを作成します。\n",
"2. 作成した Keras モデル、オプティマイザ、および指標を`strategy.scope` に移動します。モデルの `call()`、`train_step()`、および `test_step()` メソッド内のコードは、分散化されてアクセラレータで実行されます。\n",
"\n",
"TensorFlow 分散ストラテジーは、[Sequential](https://www.tensorflow.org/guide/keras/sequential_model)、[Functional](https://www.tensorflow.org/guide/keras/functional)、および [subclassed](https://www.tensorflow.org/guide/keras/custom_layers_and_models) のすべての Keras モデルをサポートしています。\n",
"\n",
"次に、1 つの `Dense` レイヤーを持つ非常に単純な Keras モデルに対してこれを実行するコードスニペットを示します。"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:33:58.023868Z",
"iopub.status.busy": "2024-01-11T20:33:58.023274Z",
"iopub.status.idle": "2024-01-11T20:33:58.142691Z",
"shell.execute_reply": "2024-01-11T20:33:58.141977Z"
},
"id": "gbbcpzRnPZ6V"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')\n"
]
}
],
"source": [
"mirrored_strategy = tf.distribute.MirroredStrategy()\n",
"\n",
"with mirrored_strategy.scope():\n",
" model = tf.keras.Sequential([\n",
" tf.keras.layers.Dense(1, input_shape=(1,),\n",
" kernel_regularizer=tf.keras.regularizers.L2(1e-4))])\n",
" model.compile(loss='mse', optimizer='sgd')"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "773EOxCRVlTg"
},
"source": [
"この例では、複数の GPU を持つマシンで実行できるように、`MirroredStrategy` を使用しています。`strategy.scope()` は、トレーニングの分散に度のストラテジーを使用するかを Keras に示しています。このスコープ内にモデル/オプティマイザー/メトリックを作成することで、通常の変数ではなく、分散化された変数を作成することができます。これをセットアップしたら、通常通り、モデルをフィットさせることができます。利用できる GPU に対するモデルのトレーニングの複製や勾配の集計などは、`MirroredStrategy` によって行われます。"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:33:58.146484Z",
"iopub.status.busy": "2024-01-11T20:33:58.145878Z",
"iopub.status.idle": "2024-01-11T20:34:03.650216Z",
"shell.execute_reply": "2024-01-11T20:34:03.649146Z"
},
"id": "ZMmxEFRTEjH5"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 1/2\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective all_reduce tensors: 2 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective all_reduce tensors: 2 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" 1/10 [==>...........................] - ETA: 31s - loss: 1.0575e-04"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r",
"10/10 [==============================] - 4s 7ms/step - loss: 1.0416e-04\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n",
"I0000 00:00:1705005242.237586 283178 device_compiler.h:186] Compiled cluster using XLA! This line is logged at most once for the lifetime of the process.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 2/2\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" 1/10 [==>...........................] - ETA: 0s - loss: 1.0267e-04"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r",
"10/10 [==============================] - 0s 6ms/step - loss: 1.0196e-04\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" 1/10 [==>...........................] - ETA: 8s - loss: 1.0130e-04"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r",
"10/10 [==============================] - 1s 4ms/step - loss: 1.0130e-04\n"
]
},
{
"data": {
"text/plain": [
"0.0001013019063975662"
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)\n",
"model.fit(dataset, epochs=2)\n",
"model.evaluate(dataset)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "nofTLwyXWHK8"
},
"source": [
"ここでは、`tf.data.Dataset` を使用してトレーニングと eval 入力を提供していますが、次のように NumPy 配列を使用することもできます。"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:03.654043Z",
"iopub.status.busy": "2024-01-11T20:34:03.653705Z",
"iopub.status.idle": "2024-01-11T20:34:04.139996Z",
"shell.execute_reply": "2024-01-11T20:34:04.139299Z"
},
"id": "Lqgd9SdxW5OW"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 1/2\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" 1/10 [==>...........................] - ETA: 0s - loss: 1.0130e-04"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r",
"10/10 [==============================] - 0s 5ms/step - loss: 1.0099e-04\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Epoch 2/2\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\r",
" 1/10 [==>...........................] - ETA: 0s - loss: 1.0070e-04"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r",
"10/10 [==============================] - 0s 5ms/step - loss: 1.0056e-04\n"
]
},
{
"data": {
"text/plain": [
""
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import numpy as np\n",
"\n",
"inputs, targets = np.ones((100, 1)), np.ones((100, 1))\n",
"model.fit(inputs, targets, epochs=2, batch_size=10)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IKqaj7QwX0Zb"
},
"source": [
"いずれの場合(`Dataset` または Numpy)でも、指定された入力の各バッチは、複数のレプリカで均等に分割されます。たとえば、2 GPU で `MirroredStrategy` を使用している場合、サイズが 10 の各バッチは 2 GPU 間で分割されるため、各ステップでそれぞれに 5 つの入力例が提供されます。その後、GPU をさらに追加するにつれ、各エポックはより高速にトレーニングするようになります。一般的に、アクセラレータを増やすたびにバッチサイズを増加することが望ましく、そうすることで、追加の計算パワーを有効活用できるようになります。また、モデルに応じて学習速度を再調整することも必要です。レプリカの数を取得するには、`strategy.num_replicas_in_sync` を使用できます。"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:04.143822Z",
"iopub.status.busy": "2024-01-11T20:34:04.143233Z",
"iopub.status.idle": "2024-01-11T20:34:04.147857Z",
"shell.execute_reply": "2024-01-11T20:34:04.147235Z"
},
"id": "8ZmJqErtS4A1"
},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mirrored_strategy.num_replicas_in_sync"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:04.151204Z",
"iopub.status.busy": "2024-01-11T20:34:04.150562Z",
"iopub.status.idle": "2024-01-11T20:34:04.160027Z",
"shell.execute_reply": "2024-01-11T20:34:04.159423Z"
},
"id": "quNNTytWdGBf"
},
"outputs": [],
"source": [
"# Compute a global batch size using a number of replicas.\n",
"BATCH_SIZE_PER_REPLICA = 5\n",
"global_batch_size = (BATCH_SIZE_PER_REPLICA *\n",
" mirrored_strategy.num_replicas_in_sync)\n",
"dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)\n",
"dataset = dataset.batch(global_batch_size)\n",
"\n",
"LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15, 20:0.175}\n",
"learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "z1Muy0gDZwO5"
},
"source": [
"### 現在、何がサポートされていますか?\n",
"\n",
"トレーニング API | `MirroredStrategy` | `TPUStrategy` | `MultiWorkerMirroredStrategy` | `ParameterServerStrategy` | `CentralStorageStrategy`\n",
"--- | --- | --- | --- | --- | ---\n",
"Keras API | サポート中 | サポート中 | サポート中 | 実験的サポート | 2.3 以降でサポート予定\n",
"\n",
"### 例とチュートリアル\n",
"\n",
"次は、上述した Keras のエンドツーエンド統合を説明するチュートリアルと例の一覧です。\n",
"\n",
"1. MirroredStrategy
で MNIST をトレーニングするチュートリアル\n",
"2. `MultiWorkerMirroredStrategy` を使って MNIST をトレーニングする[チュートリアル](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras)\n",
"3. [ガイド](tpu.ipynb): `Model.fit` と `TPUStrategy` を使用する例を含むガイド。\n",
"4. [チュートリアル](../tutorials/distribute/parameter_server_training.ipynb): `Model.fit` と `ParameterServerStrategy` でパラメータサーバーをトレーニングする。\n",
"5. [チュートリアル](https://www.tensorflow.org/text/tutorials/bert_glue): `Model.fit` と `TPUStrategy` を使用して、GLUE ベンチマークから多くのタスクの BERT を微調整する。\n",
"6. さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden [リポジトリ](https://github.com/tensorflow/models/tree/master/official)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "IlYVC0goepdk"
},
"source": [
"## カスタムトレーニングループで tf.distribute.Strategy
を使用する\n",
"\n",
"これまで見てきたように、Keras の `Model.fit` で `tf.distribute.Strategy` を使用するにはコードの数行のみを変更する必要がありました。もう少し手を加えれば、[カスタムトレーニングループ](https://www.tensorflow.org/guide/keras/writing_a_training_loop_from_scratch)でも `tf.distribute.Strategy` を使用できるようになります。\n",
"\n",
"トレーニングループに Estimator や Keras よりも高い柔軟性と制御が必要な場合は、カスタムトレーニングループを作成することができます。たとえば、GANを使用する際に、各ラウンドで異なる数のジェネレータやディスクリミネータのステップを実行することができます。同様に、高度なフレームワークは、強化学習トレーニングにはあまり適していません。\n",
"\n",
"カスタムトレニングループをサポートするために、`tf.distribute.Strategy` クラスでメソッドのコアセットを提供しています。これらを使用するには、最初にコードをわずかに再構成する必要があるかもしれませんが、それを一度行うと、ストラテジーインスタンスを変更するだけで、GPU、TPU、および複数のマシンを切り替えられるようになります。\n",
"\n",
"ここでは、以前と同じ Keras モデルを使って、単純なトレーニングでの使用事例を表した簡単なスニペットを示します。\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "XNHvSY32nVBi"
},
"source": [
"まず、ストラテジーのスコープ内にモデルとオプティマイザーを作成します。こうすることで、そのモデルとオプティマイザーを使って作成された変数がミラーリングされた変数であることを保証することができます。"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:04.164024Z",
"iopub.status.busy": "2024-01-11T20:34:04.163378Z",
"iopub.status.idle": "2024-01-11T20:34:04.204946Z",
"shell.execute_reply": "2024-01-11T20:34:04.204335Z"
},
"id": "W-3Bn-CaiPKD"
},
"outputs": [],
"source": [
"with mirrored_strategy.scope():\n",
" model = tf.keras.Sequential([\n",
" tf.keras.layers.Dense(1, input_shape=(1,),\n",
" kernel_regularizer=tf.keras.regularizers.L2(1e-4))])\n",
" optimizer = tf.keras.optimizers.SGD()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "mYkAyPeYnlXk"
},
"source": [
"次に、入力データセットを作成し、`tf.distribute.Strategy.experimental_distribute_dataset` を呼び出して、ストラテジーに応じてデータセットを分散します。"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:04.208108Z",
"iopub.status.busy": "2024-01-11T20:34:04.207850Z",
"iopub.status.idle": "2024-01-11T20:34:04.229222Z",
"shell.execute_reply": "2024-01-11T20:34:04.228616Z"
},
"id": "94BkvkLInkKd"
},
"outputs": [],
"source": [
"dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(\n",
" global_batch_size)\n",
"dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "grzmTlSvn2j8"
},
"source": [
"そして、トレーニングの一ステップを定義します。勾配とオプティマイザーを計算し、その勾配を適用してモデルの変数を更新するために、`tf.GradientTape` を使用します。このトレーニングステップを分散するには、`train_step` 関数に入れて、前に作成した `dist_dataset` から取得するデータセット入力とともに、`tf.distrbute.Strategy.run` に渡します。"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:04.232609Z",
"iopub.status.busy": "2024-01-11T20:34:04.232357Z",
"iopub.status.idle": "2024-01-11T20:34:04.238444Z",
"shell.execute_reply": "2024-01-11T20:34:04.237841Z"
},
"id": "NJxL5YrVniDe"
},
"outputs": [],
"source": [
"# Sets `reduction=NONE` to leave it to tf.nn.compute_average_loss() below.\n",
"loss_object = tf.keras.losses.BinaryCrossentropy(\n",
" from_logits=True,\n",
" reduction=tf.keras.losses.Reduction.NONE)\n",
"\n",
"def train_step(inputs):\n",
" features, labels = inputs\n",
"\n",
" with tf.GradientTape() as tape:\n",
" predictions = model(features, training=True)\n",
" per_example_loss = loss_object(labels, predictions)\n",
" loss = tf.nn.compute_average_loss(per_example_loss)\n",
" model_losses = model.losses\n",
" if model_losses:\n",
" loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))\n",
"\n",
" gradients = tape.gradient(loss, model.trainable_variables)\n",
" optimizer.apply_gradients(zip(gradients, model.trainable_variables))\n",
" return loss\n",
"\n",
"@tf.function\n",
"def distributed_train_step(dist_inputs):\n",
" per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))\n",
" return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,\n",
" axis=None)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "yRL5u_NLoTvq"
},
"source": [
"上記のコードには、注意すべき点がいくつかあります。\n",
"\n",
"1. `tf.nn.compute_average_loss` を使用して、サンプルごとの予測損失をスカラーに縮小しました。`tf.nn.compute_average_loss` はサンプルごとの損失を加算し、その和をグローバルバッチサイズで除算しています。これは、各レプリカで勾配が計算された後、その勾配を**加算**することで、レプリカ全体の勾配が集計されるため重要です。\n",
"\n",
"デフォルトでは、`tf.get_strategy().num_replicas_in_sync * tf.shape(per_example_loss)[0]` になるようにグローバルバッチサイズが取られます。これはキーワード引数 `global_batch_size=` として明示的に指定することも可能です。短いバッチがない場合、このデフォルトは、`tf.nn.compute_average_loss(..., global_batch_size=global_batch_size)` と上記で定義した `global_batch_size` と同等です。(短いバッチ、およびそれらの回避または処理の方法については、[カスタムトレーニングチュートリアル](../tutorials/distribute/custom_training.ipynb)をご覧ください。)\n",
"\n",
"1. `tf.nn.scale_regularization_loss` を使用して、`Model` オブジェクトに登録されている正則化損失(存在する場合)も、`1/num_replicas_in_sync` によってスケーリングしました。入力に依存する正則化損失の場合、カスタムトレーニングループではなくモデリングコードでレプリカ(!) ごとのバッチサイズの平均化を実行します。そうすることで、モデリングコードはレプリケーションに、トレーニングループは正則化損失の計算方法に依存することがありません。\n",
"\n",
"2. 分散ストラテジーのスコープ内で `apply_gradients` が呼び出されると、その振る舞いが変更されます。具体的には、同期トレーニング中に各並列インスタンスに勾配を適用する前に、勾配の sum-over-all-replicas を実行します。\n",
"\n",
"3. `tf.distribute.Strategy.run` が返した結果を集計するために、`tf.distribute.Strategy.reduce` API も使用しました。`tf.distribute.Strategy.run` はストラテジー内の各ローカルレプリカの結果を返し、この結果の使用方法は多様です。`reduce` で、集約された値を取得することができます。また、`tf.distribute.Strategy.experimental_local_results` を実行して、ローカルレプリカごとに 1 つ、結果に含まれる値のリストを取得することもできます。\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "o9k_6-6vpQ-P"
},
"source": [
"最後に、トレーニングステップの定義が済んだら、`dist_dataset` をイテレートしてトレーニングをループで実行できます。"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:04.241751Z",
"iopub.status.busy": "2024-01-11T20:34:04.241169Z",
"iopub.status.idle": "2024-01-11T20:34:06.682524Z",
"shell.execute_reply": "2024-01-11T20:34:06.681752Z"
},
"id": "Egq9eufToRf6"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective all_reduce tensors: 2 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Collective all_reduce tensors: 2 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"tf.Tensor(1.0082505, shape=(), dtype=float32)\n",
"tf.Tensor(1.0002, shape=(), dtype=float32)\n",
"tf.Tensor(0.9922241, shape=(), dtype=float32)\n",
"tf.Tensor(0.9843224, shape=(), dtype=float32)\n",
"tf.Tensor(0.97649455, shape=(), dtype=float32)\n",
"tf.Tensor(0.96874, shape=(), dtype=float32)\n",
"tf.Tensor(0.96105826, shape=(), dtype=float32)\n",
"tf.Tensor(0.9534487, shape=(), dtype=float32)\n",
"tf.Tensor(0.9459112, shape=(), dtype=float32)\n",
"tf.Tensor(0.9384451, shape=(), dtype=float32)\n",
"tf.Tensor(0.9310499, shape=(), dtype=float32)\n",
"tf.Tensor(0.92372507, shape=(), dtype=float32)\n",
"tf.Tensor(0.91647035, shape=(), dtype=float32)\n",
"tf.Tensor(0.909285, shape=(), dtype=float32)\n",
"tf.Tensor(0.90216863, shape=(), dtype=float32)\n",
"tf.Tensor(0.8951206, shape=(), dtype=float32)\n",
"tf.Tensor(0.8881406, shape=(), dtype=float32)\n",
"tf.Tensor(0.8812279, shape=(), dtype=float32)\n",
"tf.Tensor(0.8743823, shape=(), dtype=float32)\n",
"tf.Tensor(0.867603, shape=(), dtype=float32)\n",
"tf.Tensor(0.8608897, shape=(), dtype=float32)\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"tf.Tensor(0.8542416, shape=(), dtype=float32)\n",
"tf.Tensor(0.8476584, shape=(), dtype=float32)\n",
"tf.Tensor(0.8411393, shape=(), dtype=float32)\n",
"tf.Tensor(0.8346843, shape=(), dtype=float32)\n",
"tf.Tensor(0.82829237, shape=(), dtype=float32)\n",
"tf.Tensor(0.8219633, shape=(), dtype=float32)\n",
"tf.Tensor(0.8156963, shape=(), dtype=float32)\n",
"tf.Tensor(0.80949104, shape=(), dtype=float32)\n",
"tf.Tensor(0.80334705, shape=(), dtype=float32)\n",
"tf.Tensor(0.79726344, shape=(), dtype=float32)\n",
"tf.Tensor(0.79124004, shape=(), dtype=float32)\n",
"tf.Tensor(0.7852762, shape=(), dtype=float32)\n",
"tf.Tensor(0.7793713, shape=(), dtype=float32)\n",
"tf.Tensor(0.773525, shape=(), dtype=float32)\n",
"tf.Tensor(0.7677367, shape=(), dtype=float32)\n",
"tf.Tensor(0.7620057, shape=(), dtype=float32)\n",
"tf.Tensor(0.75633174, shape=(), dtype=float32)\n",
"tf.Tensor(0.75071406, shape=(), dtype=float32)\n",
"tf.Tensor(0.74515235, shape=(), dtype=float32)\n",
"tf.Tensor(0.739646, shape=(), dtype=float32)\n",
"tf.Tensor(0.73419434, shape=(), dtype=float32)\n",
"tf.Tensor(0.7287971, shape=(), dtype=float32)\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"tf.Tensor(0.7234536, shape=(), dtype=float32)\n",
"tf.Tensor(0.7181633, shape=(), dtype=float32)\n",
"tf.Tensor(0.71292585, shape=(), dtype=float32)\n",
"tf.Tensor(0.70774055, shape=(), dtype=float32)\n",
"tf.Tensor(0.7026069, shape=(), dtype=float32)\n",
"tf.Tensor(0.6975246, shape=(), dtype=float32)\n",
"tf.Tensor(0.69249296, shape=(), dtype=float32)\n"
]
}
],
"source": [
"for dist_inputs in dist_dataset:\n",
" print(distributed_train_step(dist_inputs))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "jK8eQXF_q1Zs"
},
"source": [
"上記の例では、`dist_dataset` をイテレートして、トレーニングに入力を提供しました。また、numpy 入力をサポートするために、`tf.distribute.Strategy.make_experimental_numpy_dataset` も提供しています。`tf.distribute.Strategy.experimental_distribute_dataset` を呼び出す前にこの API を使って、データセットを作成することができます。\n",
"\n",
"データをイテレートするための別の方法は、明示的にイテレータを使用することです。データセット全体をイテレートするのに対し、特定のステップ数だけ実行する場合に、これを行うことができます。上記のイテレートは、最初にイテレータを作成し、それに明示的に`next` を呼び出して入力データを取得するように変更されます。"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {
"execution": {
"iopub.execute_input": "2024-01-11T20:34:06.685849Z",
"iopub.status.busy": "2024-01-11T20:34:06.685576Z",
"iopub.status.idle": "2024-01-11T20:34:07.049104Z",
"shell.execute_reply": "2024-01-11T20:34:07.048390Z"
},
"id": "e5BEvR0-LJAc"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"tf.Tensor(0.6875115, shape=(), dtype=float32)\n",
"tf.Tensor(0.6825797, shape=(), dtype=float32)\n",
"tf.Tensor(0.6776972, shape=(), dtype=float32)\n",
"tf.Tensor(0.6728633, shape=(), dtype=float32)\n",
"tf.Tensor(0.6680777, shape=(), dtype=float32)\n",
"tf.Tensor(0.6633398, shape=(), dtype=float32)\n",
"tf.Tensor(0.6586491, shape=(), dtype=float32)\n",
"tf.Tensor(0.65400517, shape=(), dtype=float32)\n",
"tf.Tensor(0.6494075, shape=(), dtype=float32)\n",
"tf.Tensor(0.6448556, shape=(), dtype=float32)\n"
]
}
],
"source": [
"iterator = iter(dist_dataset)\n",
"for _ in range(10):\n",
" print(distributed_train_step(next(iterator)))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vDJO8mnypqBA"
},
"source": [
"これは、`tf.distribute.Strategy` APIを使用してカスタムトレーニングループを分散する最も単純な事例をカバーしています。これらの API は改善プロセスにあります。この使用事例にはコードを適合させる作業がさらに必要であるため、いずれ、詳細なガイドを別途公開する予定です。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BZjNwCt1qBdw"
},
"source": [
"### 現在、何がサポートされていますか?\n",
"\n",
"トレーニング API | `MirroredStrategy` | `TPUStrategy` | `MultiWorkerMirroredStrategy` | `ParameterServerStrategy` | `CentralStorageStrategy`\n",
":-- | :-- | :-- | :-- | :-- | :--\n",
"カスタムトレーニングループ | サポート中 | サポート中 | サポート中 | 実験的サポート | 2.3 以降でサポート予定\n",
"\n",
"### 例とチュートリアル\n",
"\n",
"次は、カスタムトレーニングループを使って分散ストラテジーを使用するいくつかの例です。\n",
"\n",
"1. `MirroredStrategy` で MNIST をトレーニングする[チュートリアル](https://www.tensorflow.org/tutorials/distribute/custom_training)\n",
"2. `TPUStrategy` による MNIST のトレーニングに関する[ガイド](https://www.tensorflow.org/guide/tpu#train_a_model_using_custom_training_loop)\n",
"3. さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden [リポジトリ](https://github.com/tensorflow/models/tree/master/official)\n",
"4. [チュートリアル](../tutorials/distribute/parameter_server_training.ipynb): カスタムトレーニングループと `ParameterServerStrategy` でパラメータサーバーをトレーニングする。\n",
"5. さまざまなストラテジーを使って実装された最新モデルのコレクションが含まれる TensorFlow Model Garden [リポジトリ](https://github.com/tensorflow/models/tree/master/official)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Xk0JdsTHyUnE"
},
"source": [
"## その他のトピック\n",
"\n",
"このセクションでは、複数の使用事例に関連のあるトピックをいくつかカバーしています。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cP6BUIBtudRk"
},
"source": [
"\n",
"\n",
"### TF_CONFIG 環境変数のセットアップ\n",
"\n",
"マルチワーカートレーニングでは、前述のとおり、クラスタで実行されている各バイナリに対して ` TF_CONFIG` 環境変数を設定する必要があります。`TF_CONFIG` 環境変数は、クラスタを構成するタスク、そのアドレス、およびクラスタ内の各タスクのロールを指定する JSON 文字列です。[tensorflow/ecosystem](https://github.com/tensorflow/ecosystem) レポジトリには、トレーニングタスクの `TF_CONFIG` を設定する Kubernotes テンプレートを提供しています。\n",
"\n",
"`TF_CONFIG` には、cluster と task の 2 つのコンポーネントがあります。\n",
"\n",
"- cluster はトレーニングクラスタに関する情報を提供します。これは、worker などのさまざまなタイプのジョブで構成されるディクショナリーです。マルチワーカートレーニングでは、通常、一般的なワーカーの作業に加えて、チェックポイントの保存や TensorBoard のサマリーファイルの書き込みなど、ほかよりタスクを担うワーカーが 1 つあります。こういったワーカーは、「チーフ」ワーカーと呼ばれ、index 0 のワーカーがチーフワーカーに指定されるようになっています(実際、このようにして、`tf.distribute.Strategy` が実装されています)。\n",
"- 一方、task は現在のタスクに関する情報を提供します。最初のコンポーネント cluster はすべてのワーカーで同じであり、2番目のコンポーネント task はワーカーごとに異なり、そのワーカーのタイプとインデックスを指定します。\n",
"\n",
"次は、`TF_CONFIG` の一例です。\n",
"\n",
"```python\n",
"os.environ[\"TF_CONFIG\"] = json.dumps({ \"cluster\": { \"worker\": [\"host1:port\", \"host2:port\", \"host3:port\"], \"ps\": [\"host4:port\", \"host5:port\"] }, \"task\": {\"type\": \"worker\", \"index\": 1} })\n",
"```\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "fezd3aF8wj9r"
},
"source": [
"この `'TF_CONFIG'`は、`\"cluster\"` 内に 3 つのワーカーと 2 つの `\"ps\"` タスク、およびそれらのホストとポートがあることを指定しています。 `\"task\"` の部分は、`\"cluster\"` の現在のタスクのロールである worker `1`(2番目のワーカー)を指定します。クラスタ内の有効なロールは、`\"chief\"`、`\"worker\"`、`\"ps\"`、および `\"evaluator\"` です。`tf.distribute.experimental.ParameterServerStrategy`を使用する場合を除いて、`\"ps\"` ジョブはありません。"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GXIbqSW-sFVg"
},
"source": [
"## 次のステップ\n",
"\n",
"`tf.distribute.Strategy` は積極的な開発が進められています。ぜひお試しいただき、[GitHub 課題](https://github.com/tensorflow/tensorflow/issues/new) に皆さんの感想をお寄せください。"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [
"Tce3stUlHN0L"
],
"name": "distributed_training.ipynb",
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 0
}