{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "wJcYs_ERTnnI" }, "source": [ "##### Copyright 2021 The TensorFlow Authors." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "cellView": "form", "id": "HMUDt0CiUJk9" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "77z2OchJTk0l" }, "source": [ "# Migrate multi-worker CPU/GPU training\n", "\n", "\n", " \n", " \n", " \n", " \n", "
\n", " \n", " \n", " View on TensorFlow.org\n", " \n", " \n", " \n", " Run in Google Colab\n", " \n", " \n", " \n", " View source on GitHub\n", " \n", " Download notebook\n", "
" ] }, { "cell_type": "markdown", "metadata": { "id": "meUTrR4I6m1C" }, "source": [ "This guide demonstrates how to migrate your multi-worker distributed training workflow from TensorFlow 1 to TensorFlow 2.\n", "\n", "To perform multi-worker training with CPUs/GPUs:\n", "\n", "- In TensorFlow 1, you traditionally use the `tf.estimator.train_and_evaluate` and `tf.estimator.Estimator` APIs.\n", "- In TensorFlow 2, use the Keras APIs for writing the model, the loss function, the optimizer, and metrics. Then, distribute the training with Keras `Model.fit` API or a custom training loop (with `tf.GradientTape`) across multiple workers with `tf.distribute.experimental.ParameterServerStrategy` or `tf.distribute.MultiWorkerMirroredStrategy`. For more details, refer to the following tutorials:\n", " - [Distributed training with TensorFlow](../../guide/distributed_training.ipynb)\n", " - [Parameter server training with Keras Model.fit/a custom training loop](../../tutorials/distribute/parameter_server_training.ipynb)\n", " - [MultiWorkerMirroredStrategy with Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n", " - [MultiWorkerMirroredStrategy with a custom training loop](../../tutorials/distribute/multi_worker_with_ctl.ipynb)." ] }, { "cell_type": "markdown", "metadata": { "id": "YdZSoIXEbhg-" }, "source": [ "## Setup" ] }, { "cell_type": "markdown", "metadata": { "id": "28f46832b54d" }, "source": [ "Start with some necessary imports and a simple dataset for demonstration purposes:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "iE0vSfMXumKI" }, "outputs": [], "source": [ "# The notebook uses a dataset instance for `Model.fit` with\n", "# `ParameterServerStrategy`, which depends on symbols in TF 2.7.\n", "# Install a utility needed for this demonstration\n", "!pip install portpicker\n", "\n", "import tensorflow as tf\n", "import tensorflow.compat.v1 as tf1" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "m7rnGxsXtDkV" }, "outputs": [], "source": [ "features = [[1., 1.5], [2., 2.5], [3., 3.5]]\n", "labels = [[0.3], [0.5], [0.7]]\n", "eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]\n", "eval_labels = [[0.8], [0.9], [1.]]" ] }, { "cell_type": "markdown", "metadata": { "id": "T2uaw9QaDM_X" }, "source": [ "You will need the `'TF_CONFIG'` configuration environment variable for training on multiple machines in TensorFlow. Use `'TF_CONFIG'` to specify the `'cluster'` and the `'task'`s' addresses. (Learn more in the [Distributed_training](../...guide/distributed_training.ipynb) guide.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "4OUzwoQgXgkG" }, "outputs": [], "source": [ "import json\n", "import os\n", "\n", "tf_config = {\n", " 'cluster': {\n", " 'chief': ['localhost:11111'],\n", " 'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],\n", " 'ps': ['localhost:12121', 'localhost:13131'],\n", " },\n", " 'task': {'type': 'chief', 'index': 0}\n", "}\n", "\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)" ] }, { "cell_type": "markdown", "metadata": { "id": "PbeoSbbmDdc0" }, "source": [ "Note: Unfortunately, since multi-worker training with `tf.estimator` APIs in TensorFlow 1 requires multiple clients (which would be especially tricky to be done here in this Colab notebook), you will make the notebook runnable without a `'TF_CONFIG'` environment variable, so it falls back to local training. (Learn more in the *Setting up the `'TF_CONFIG'` environment variable* section in the [Distributed training with TensorFlow](../../guide/distributed_training.ipynb) guide.)\n", "\n", "Use the `del` statement to remove the variable (but in real-world multi-worker training in TensorFlow 1, you won't have to do this):" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "AHuynAR5D8sU" }, "outputs": [], "source": [ "del os.environ['TF_CONFIG']" ] }, { "cell_type": "markdown", "metadata": { "id": "4uXff1BEssdE" }, "source": [ "## TensorFlow 1: Multi-worker distributed training with tf.estimator APIs" ] }, { "cell_type": "markdown", "metadata": { "id": "MpyINdiLEN3c" }, "source": [ "The following code snippet demonstrates the canonical workflow of multi-worker training in TF1: you will use a `tf.estimator.Estimator`, a `tf.estimator.TrainSpec`, a `tf.estimator.EvalSpec`, and the `tf.estimator.train_and_evaluate` API to distribute the training:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "lqe9obf7suIj" }, "outputs": [], "source": [ "def _input_fn():\n", " return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)\n", "\n", "def _eval_input_fn():\n", " return tf1.data.Dataset.from_tensor_slices(\n", " (eval_features, eval_labels)).batch(1)\n", "\n", "def _model_fn(features, labels, mode):\n", " logits = tf1.layers.Dense(1)(features)\n", " loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)\n", " optimizer = tf1.train.AdagradOptimizer(0.05)\n", " train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())\n", " return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)\n", "\n", "estimator = tf1.estimator.Estimator(model_fn=_model_fn)\n", "train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)\n", "eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)\n", "tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)" ] }, { "cell_type": "markdown", "metadata": { "id": "KEmzBjfnsxwT" }, "source": [ "## TensorFlow 2: Multi-worker training with distribution strategies" ] }, { "cell_type": "markdown", "metadata": { "id": "Syb66qsbEp1x" }, "source": [ "In TensorFlow 2, distributed training across multiple workers with CPUs, GPUs, and TPUs is done via `tf.distribute.Strategy`s.\n", "\n", "The following example demonstrates how to use two such strategies: `tf.distribute.experimental.ParameterServerStrategy` and `tf.distribute.MultiWorkerMirroredStrategy`, both of which are designed for CPU/GPU training with multiple workers.\n", "\n", "`ParameterServerStrategy` employs a _coordinator_ (`'chief'`), which makes it more friendly with the environment in this Colab notebook. You will be using some utilities here to set up the supporting elements essential for a runnable experience here: you will create an _in-process cluster_, where threads are used to simulate the parameter servers (`'ps'`) and workers (`'worker'`). For more information about parameter server training, refer to the [Parameter server training with ParameterServerStrategy](../../tutorials/distribute/parameter_server_training.ipynb) tutorial.\n", "\n", "In this example, first define the `'TF_CONFIG'` environment variable with a `tf.distribute.cluster_resolver.TFConfigClusterResolver` to provide the cluster information. If you are using a cluster management system for your distributed training, check if it provides `'TF_CONFIG'` for you already, in which case you don't need to explicitly set this environment variable. (Learn more in the *Setting up the `'TF_CONFIG'` environment variable* section in the [Distributed training with TensorFlow](../../guide/distributed_training.ipynb) guide.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "rp-gFY0H5rF-" }, "outputs": [], "source": [ "# Find ports that are available for the `'chief'` (the coordinator),\n", "# `'worker'`s, and `'ps'` (parameter servers).\n", "import portpicker\n", "\n", "chief_port = portpicker.pick_unused_port()\n", "worker_ports = [portpicker.pick_unused_port() for _ in range(3)]\n", "ps_ports = [portpicker.pick_unused_port() for _ in range(2)]\n", "\n", "# Dump the cluster information to `'TF_CONFIG'`.\n", "tf_config = {\n", " 'cluster': {\n", " 'chief': [\"localhost:%s\" % chief_port],\n", " 'worker': [\"localhost:%s\" % port for port in worker_ports],\n", " 'ps': [\"localhost:%s\" % port for port in ps_ports],\n", " },\n", " 'task': {'type': 'chief', 'index': 0}\n", "}\n", "os.environ['TF_CONFIG'] = json.dumps(tf_config)\n", "\n", "# Use a cluster resolver to bridge the information to the strategy created below.\n", "cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()" ] }, { "cell_type": "markdown", "metadata": { "id": "o_8uVvJb6dqq" }, "source": [ "Then, create `tf.distribute.Server`s for the workers and parameter servers one-by-one:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "ZJopinmG6b2z" }, "outputs": [], "source": [ "# Workers need some inter_ops threads to work properly.\n", "# This is only needed for this notebook to demo. Real servers\n", "# should not need this.\n", "worker_config = tf.compat.v1.ConfigProto()\n", "worker_config.inter_op_parallelism_threads = 4\n", "\n", "for i in range(3):\n", " tf.distribute.Server(\n", " cluster_resolver.cluster_spec(),\n", " job_name=\"worker\",\n", " task_index=i,\n", " config=worker_config)\n", "\n", "for i in range(2):\n", " tf.distribute.Server(\n", " cluster_resolver.cluster_spec(),\n", " job_name=\"ps\",\n", " task_index=i)" ] }, { "cell_type": "markdown", "metadata": { "id": "IpfCcF0g6Ao8" }, "source": [ "In real-world distributed training, instead of starting all the `tf.distribute.Server`s on the coordinator, you will be using multiple machines, and the ones that are designated as `\"worker\"`s and `\"ps\"` (parameter servers) will each run a `tf.distribute.Server`. Refer to *Clusters in the real world* section in the [Parameter server training](../../tutorials/distribute/parameter_server_training.ipynb) tutorial for more details.\n", "\n", "With everything ready, create the `ParameterServerStrategy` object:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "t45iQeBT7Us_" }, "outputs": [], "source": [ "strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)" ] }, { "cell_type": "markdown", "metadata": { "id": "diNsps1MGRS6" }, "source": [ "Once you have created a strategy object, define the model, the optimizer, and other variables, and call the Keras `Model.compile` within the `Strategy.scope` API to distribute the training. (Refer to the `Strategy.scope` API docs for more information.)\n", "\n", "If you prefer to customize your training by, for instance, defining the forward and backward passes, refer to *Training with a custom training loop* section in [Parameter server training](../../tutorials/distribute/parameter_server_training.ipynb) tutorial for more details." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "atVciNgPs0fw" }, "outputs": [], "source": [ "dataset = tf.data.Dataset.from_tensor_slices(\n", " (features, labels)).shuffle(10).repeat().batch(64)\n", "\n", "eval_dataset = tf.data.Dataset.from_tensor_slices(\n", " (eval_features, eval_labels)).repeat().batch(1)\n", "\n", "with strategy.scope():\n", " model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])\n", " optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)\n", " model.compile(optimizer, \"mse\")\n", "\n", "model.fit(dataset, epochs=5, steps_per_epoch=10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "akZ0aaaS1vA9" }, "outputs": [], "source": [ "model.evaluate(eval_dataset, steps=10, return_dict=True)" ] }, { "cell_type": "markdown", "metadata": { "id": "pXbS71XmMSoO" }, "source": [ "> **Partitioners (`tf.distribute.experimental.partitioners`)**\n", ">\n", "> `ParameterServerStrategy` in TensorFlow 2 supports variable partitioning and offers same partitioners as TensorFlow 1, with less confusing names:\n", "> - `tf.compat.v1.variable_axis_size_partitioner` -> `tf.distribute.experimental.partitioners.MaxSizePartitioner`: a partitioner that keeps shards under a maximum size).\n", "> - `tf.compat.v1.min_max_variable_partitioner` -> `tf.distribute.experimental.partitioners.MinSizePartitioner`: a partitioner that allocates a minimum size per shard.\n", "> - `tf.compat.v1.fixed_size_partitioner` -> `tf.distribute.experimental.partitioners.FixedShardsPartitioner`: a partitioner that allocates a fixed number of shards." ] }, { "cell_type": "markdown", "metadata": { "id": "Ig0-uCUbGprd" }, "source": [ "Alternatively, you can use a `MultiWorkerMirroredStrategy` object:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "xHXP8bOBGtXL" }, "outputs": [], "source": [ "# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.\n", "del os.environ['TF_CONFIG']\n", "strategy = tf.distribute.MultiWorkerMirroredStrategy()" ] }, { "cell_type": "markdown", "metadata": { "id": "tOsmqefTGwUf" }, "source": [ "You can replace the strategy used above with a `MultiWorkerMirroredStrategy` object to perform training with this strategy.\n", "\n", "As with the `tf.estimator` APIs, since `MultiWorkerMirroredStrategy` is a multi-client strategy, there is no easy way to run distributed training in this Colab notebook. Therefore, replacing the code above with this strategy ends up running things locally. The Multi-worker training [with Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)/[a custom training loop](../../tutorials/distribute/multi_worker_with_ctl.ipynb) tutorials demonstrate how to run multi-worker training with\n", " the `'TF_CONFIG'` variable set up, with two workers on a localhost in Colab. In practice, you would create multiple workers on external IP addresses/ports, and use the `'TF_CONFIG'` variable to specify the cluster configuration for each worker." ] }, { "cell_type": "markdown", "metadata": { "id": "917ef6135660" }, "source": [ "## Next steps" ] }, { "cell_type": "markdown", "metadata": { "id": "e76fd9d5c98c" }, "source": [ "To learn more about multi-worker distributed training with `tf.distribute.experimental.ParameterServerStrategy` and `tf.distribute.MultiWorkerMirroredStrategy` in TensorFlow 2, consider the following resources:\n", "\n", "- Tutorial: [Parameter server training with ParameterServerStrategy and Keras Model.fit/a custom training loop](../../tutorials/distribute/parameter_server_training.ipynb)\n", "- Tutorial: [Multi-worker training with MultiWorkerMirroredStrategy and Keras Model.fit](../../tutorials/distribute/multi_worker_with_keras.ipynb)\n", "- Tutorial: [Multi-worker training with MultiWorkerMirroredStrategy and a custom training loop](../../tutorials/distribute/multi_worker_with_ctl.ipynb)\n", "- Guide: [Distributed training with TensorFlow](../../guide/distributed_training.ipynb)\n", "- Guide: [Optimize TensorFlow GPU performance with the TensorFlow Profiler](../../guide/gpu_performance_analysis.ipynb)\n", "- Guide: [Use a GPU](../../guide/gpu.ipynb) (the Using multiple GPUs section)" ] } ], "metadata": { "colab": { "collapsed_sections": [], "name": "multi_worker_cpu_gpu_training.ipynb", "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 0 }