##### Copyright 2021 The TensorFlow Federated Authors.

In [None]:
# @title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# TFF simulations with accelerators

This tutorial will describe how to setup TFF simulations with accelerators. We focus on single-machine (multi-)GPU for now and will update this tutorial with multi-machine and TPU settings.

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/federated/tutorials/simulations_with_accelerators"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/federated/blob/v0.88.0/docs/tutorials/simulations_with_accelerators.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/federated/blob/v0.88.0/docs/tutorials/simulations_with_accelerators.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
  <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/federated/docs/tutorials/simulations_with_accelerators.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

## Before we begin

First, let us make sure the notebook is connected to a backend that has the relevant components compiled. 

In [None]:
# @test {"skip": true}
!pip install --quite --upgrade federated_language
!pip install --quiet --upgrade tensorflow-federated
!pip install -U tensorboard_plugin_profile

In [None]:
%load_ext tensorboard

In [None]:
import collections
import time

import federated_language
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

Check if TF can detect physical GPUs and create a virtual multi-GPU environment for TFF GPU simulations. The two virtual GPUs will have limited memory to demonstrate how to configure TFF runtime. 

In [None]:
gpu_devices = tf.config.list_physical_devices('GPU')
if not gpu_devices:
  raise ValueError('Cannot detect physical GPU device in TF')
# TODO: b/277213652 - Remove this call, as it doesn't work with C++ executor
tf.config.set_logical_device_configuration(
    gpu_devices[0],
    [
        tf.config.LogicalDeviceConfiguration(memory_limit=1024),
        tf.config.LogicalDeviceConfiguration(memory_limit=1024),
    ],
)
tf.config.list_logical_devices()

[LogicalDevice(name='/device:CPU:0', device_type='CPU'),
 LogicalDevice(name='/device:GPU:0', device_type='GPU'),
 LogicalDevice(name='/device:GPU:1', device_type='GPU')]

Run the following "Hello World"
example to make sure the TFF environment is correctly setup. If it doesn't work,
please refer to the [Installation](../install.md) guide for instructions.

In [None]:
@federated_language.federated_computation
def hello_world():
  return 'Hello, World!'


hello_world()

b'Hello, World!'

## EMNIST experimental setup

In this tutorial, we train an EMNIST image classifier with Federated Averaging algorithm. Let us start by loading  the MNIST example from the TFF website.

In [None]:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True
)

We define a function preprocessing the EMNIST example following the [simple_fedavg](https://github.com/tensorflow/federated/tree/main/examples/simple_fedavg) example. Note that the argument `client_epochs_per_round` controls the number of local epochs on clients in federated learning. 

In [None]:
def preprocess_emnist_dataset(
    client_epochs_per_round, batch_size, test_batch_size
):

  def element_fn(element):
    return collections.OrderedDict(
        x=tf.expand_dims(element['pixels'], -1), y=element['label']
    )

  def preprocess_train_dataset(dataset):
    # Use buffer_size same as the maximum client dataset size,
    # 418 for Federated EMNIST
    return (
        dataset.map(element_fn)
        .shuffle(buffer_size=418)
        .repeat(count=client_epochs_per_round)
        .batch(batch_size, drop_remainder=False)
    )

  def preprocess_test_dataset(dataset):
    return dataset.map(element_fn).batch(test_batch_size, drop_remainder=False)

  train_set = emnist_train.preprocess(preprocess_train_dataset)
  test_set = preprocess_test_dataset(
      emnist_test.create_tf_dataset_from_all_clients()
  )
  return train_set, test_set

We use a VGG-like model, i.e., each block has two 3x3 convolutions and number of filters are doubled when the feature maps are subsampled. 

In [None]:
def _conv_3x3(input_tensor, filters, strides):
  """2D Convolutional layer with kernel size 3x3."""

  x = tf.keras.layers.Conv2D(
      filters=filters,
      strides=strides,
      kernel_size=3,
      padding='same',
      kernel_initializer='he_normal',
      use_bias=False,
  )(input_tensor)
  return x


def _basic_block(input_tensor, filters, strides):
  """A block of two 3x3 conv layers."""

  x = input_tensor
  x = _conv_3x3(x, filters, strides)
  x = tf.keras.layers.Activation('relu')(x)

  x = _conv_3x3(x, filters, 1)
  x = tf.keras.layers.Activation('relu')(x)
  return x


def _vgg_block(input_tensor, size, filters, strides):
  """A stack of basic blocks."""
  x = _basic_block(input_tensor, filters, strides=strides)
  for _ in range(size - 1):
    x = _basic_block(x, filters, strides=1)
  return x


def create_cnn(num_blocks, conv_width_multiplier=1, num_classes=10):
  """Create a VGG-like CNN model.

  The CNN has (6*num_blocks + 2) layers.
  """
  input_shape = (28, 28, 1)  # channels_last
  img_input = tf.keras.layers.Input(shape=input_shape)
  x = img_input
  x = tf.image.per_image_standardization(x)

  x = _conv_3x3(x, 16 * conv_width_multiplier, 1)
  x = _vgg_block(
      x, size=num_blocks, filters=16 * conv_width_multiplier, strides=1
  )
  x = _vgg_block(
      x, size=num_blocks, filters=32 * conv_width_multiplier, strides=2
  )
  x = _vgg_block(
      x, size=num_blocks, filters=64 * conv_width_multiplier, strides=2
  )

  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(num_classes)(x)

  model = tf.keras.models.Model(
      img_input,
      x,
      name='cnn-{}-{}'.format(6 * num_blocks + 2, conv_width_multiplier),
  )
  return model

Now let us define the training loop for EMNIST. Note that `use_experimental_simulation_loop=True` in `tff.learning.algorithms.build_weighted_fed_avg` is suggested for performant TFF simulation, and required to take advantage of multi-GPUs on a single machine. See [simple_fedavg](https://github.com/tensorflow/federated/tree/main/examples/simple_fedavg) example for how to define customized federated learning algorithm that has high performance on GPUs, one of the key features is to explicitly use `for ... iter(dataset)` for training loops. 

In [None]:
def keras_evaluate(model, test_data, metric):
  metric.reset_states()
  for batch in test_data:
    preds = model(batch['x'], training=False)
    metric.update_state(y_true=batch['y'], y_pred=preds)
  return metric.result()


def run_federated_training(
    client_epochs_per_round,
    train_batch_size,
    test_batch_size,
    cnn_num_blocks,
    conv_width_multiplier,
    server_learning_rate,
    client_learning_rate,
    total_rounds,
    clients_per_round,
    rounds_per_eval,
    logdir='logdir',
):

  train_data, test_data = preprocess_emnist_dataset(
      client_epochs_per_round, train_batch_size, test_batch_size
  )
  data_spec = test_data.element_spec

  def _model_fn():
    keras_model = create_cnn(cnn_num_blocks, conv_width_multiplier)
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    return tff.learning.models.from_keras_model(
        keras_model, input_spec=data_spec, loss=loss
    )

  server_optimizer = tff.learning.optimizers.build_sgdm(server_learning_rate)
  client_optimizer = tff.learning.optimizers.build_sgdm(client_learning_rate)

  learning_process = tff.learning.algorithms.build_weighted_fed_avg(
      model_fn=_model_fn,
      server_optimizer_fn=server_optimizer,
      client_optimizer_fn=client_optimizer,
      use_experimental_simulation_loop=True,
  )

  metric = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')
  eval_model = create_cnn(cnn_num_blocks, conv_width_multiplier)
  logging.info(eval_model.summary())

  server_state = learning_process.initialize()
  start_time = time.time()
  for round_num in range(total_rounds):
    sampled_clients = np.random.choice(
        train_data.client_ids, size=clients_per_round, replace=False
    )
    sampled_train_data = [
        train_data.create_tf_dataset_for_client(client)
        for client in sampled_clients
    ]
    if round_num == total_rounds - 1:
      with tf.profiler.experimental.Profile(logdir):
        result = learning_process.next(server_state, sampled_train_data)
    else:
      result = learning_process.next(server_state, sampled_train_data)
    server_state = result.state
    train_metrics = result.metrics['client_work']['train']
    print(
        f'Round {round_num} training loss: {train_metrics["loss"]}, '
        f'time: {(time.time()-start_time)/(round_num+1.)} secs'
    )
    if round_num % rounds_per_eval == 0 or round_num == total_rounds - 1:
      model_weights = learning_process.get_model_weights(server_state)
      model_weights.assign_weights_to(eval_model)
      accuracy = keras_evaluate(eval_model, test_data, metric)
      print(f'Round {round_num} validation accuracy: {accuracy * 100.0}')

## Single GPU execution
The default runtime of TFF is the same as TF: when GPUs are provided, the first GPU will be chosen for execution. We run the previously defined federated training for several rounds with a relatively small model. The last round of execution is profiled with `tf.profiler` and visualized by `tensorboard`. The profiling verified the first GPU is used.  


In [None]:
run_federated_training(
    client_epochs_per_round=1,
    train_batch_size=16,
    test_batch_size=128,
    cnn_num_blocks=2,
    conv_width_multiplier=4,
    server_learning_rate=1.0,
    client_learning_rate=0.01,
    total_rounds=10,
    clients_per_round=16,
    rounds_per_eval=2,
)

Model: "cnn-14-4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
tf.image.per_image_standardi (None, 28, 28, 1)         0         
_________________________________________________________________
conv2d (Conv2D)              (None, 28, 28, 64)        576       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 28, 28, 64)        36864     
_________________________________________________________________
activation (Activation)      (None, 28, 28, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 28, 28, 64)        36864     
_________________________________________________________________
activation_1 (Activation)    (None, 28, 28, 64)        0  

In [None]:
# @test {"skip": true}
%tensorboard --logdir=logdir --port=0

## Larger model and OOM
Let us run a larger model on CPU with less federated rounds. 

In [None]:
run_federated_training(
    client_epochs_per_round=1,
    train_batch_size=16,
    test_batch_size=128,
    cnn_num_blocks=4,
    conv_width_multiplier=4,
    server_learning_rate=1.0,
    client_learning_rate=0.01,
    total_rounds=5,
    clients_per_round=16,
    rounds_per_eval=2,
)

Model: "cnn-26-4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
tf.image.per_image_standardi (None, 28, 28, 1)         0         
_________________________________________________________________
conv2d_39 (Conv2D)           (None, 28, 28, 64)        576       
_________________________________________________________________
conv2d_40 (Conv2D)           (None, 28, 28, 64)        36864     
_________________________________________________________________
activation_36 (Activation)   (None, 28, 28, 64)        0         
_________________________________________________________________
conv2d_41 (Conv2D)           (None, 28, 28, 64)        36864     
_________________________________________________________________
activation_37 (Activation)   (None, 28, 28, 64)        0  

This model might hit an out of memory issue on a single GPU. The migration from large scale CPU experiments to GPU simulation can be constrained by memory usage as GPUs often have limited memeories. There are several parameters can be tuned in TFF runtime to mitigate OOM issue


*   Adjust `max_concurrent_computation_calls` in `tff.backends.native.set_sync_local_cpp_execution_context` to control the concurrency of client traininng. 


In [None]:
# Control concurrency by `max_concurrent_computation_calls`.
tff.backends.native.set_sync_local_cpp_execution_context(
    max_concurrent_computation_calls=16 / 2
)

run_federated_training(
    client_epochs_per_round=1,
    train_batch_size=16,
    test_batch_size=128,
    cnn_num_blocks=4,
    conv_width_multiplier=4,
    server_learning_rate=1.0,
    client_learning_rate=0.01,
    total_rounds=5,
    clients_per_round=16,
    rounds_per_eval=2,
)

Model: "cnn-26-4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
tf.image.per_image_standardi (None, 28, 28, 1)         0         
_________________________________________________________________
conv2d (Conv2D)              (None, 28, 28, 64)        576       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 28, 28, 64)        36864     
_________________________________________________________________
activation (Activation)      (None, 28, 28, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 28, 28, 64)        36864     
_________________________________________________________________
activation_1 (Activation)    (None, 28, 28, 64)        0  

## Optimize performance

Techniques in TF that could achieve better performance can generally be used in TFF, e.g., [mixed precision training ](https://www.tensorflow.org/guide/mixed_precision) and [XLA](https://www.tensorflow.org/xla). The speedup (on GPUs like V100) and memory saving of mixed precision can often be significant, which could be examined by `tf.profiler`. 

In [None]:
# Mixed precision training.
tff.backends.native.set_sync_local_cpp_execution_context()
policy = tf.keras.mixed_precision.experimental.Policy('mixed_float16')
tf.keras.mixed_precision.experimental.set_policy(policy)

run_federated_training(
    client_epochs_per_round=1,
    train_batch_size=16,
    test_batch_size=128,
    cnn_num_blocks=4,
    conv_width_multiplier=4,
    server_learning_rate=1.0,
    client_learning_rate=0.01,
    total_rounds=5,
    clients_per_round=16,
    rounds_per_eval=2,
    logdir='mixed',
)

Model: "cnn-26-4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
tf.image.per_image_standardi (None, 28, 28, 1)         0         
_________________________________________________________________
conv2d (Conv2D)              (None, 28, 28, 64)        576       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 28, 28, 64)        36864     
_________________________________________________________________
activation (Activation)      (None, 28, 28, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 28, 28, 64)        36864     
_________________________________________________________________
activation_1 (Activation)    (None, 28, 28, 64)        0  

In [None]:
# @test {"skip": true}
%tensorboard --logdir=mixed --port=0