##### Copyright 2021 The TensorFlow Authors.

In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Reading data from BigQuery with TFX and Vertex Pipelines


<div class="devsite-table-wrapper"><table class="tfo-notebook-buttons" align="left">
<td><a target="_blank" href="https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_bq">
<img src="https://www.tensorflow.org/images/tf_logo_32px.png"/>View on TensorFlow.org</a></td>
<td><a target="_blank" href="https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/gcp/vertex_pipelines_bq.ipynb">
<img src="https://www.tensorflow.org/images/colab_logo_32px.png">Run in Google Colab</a></td>
<td><a target="_blank" href="https://github.com/tensorflow/tfx/tree/master/docs/tutorials/tfx/gcp/vertex_pipelines_bq.ipynb">
<img width=32px src="https://www.tensorflow.org/images/GitHub-Mark-32px.png">View source on GitHub</a></td>
<td><a href="https://storage.googleapis.com/tensorflow_docs/tfx/docs/tutorials/tfx/gcp/vertex_pipelines_bq.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a></td>
<td><a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?q=download_url%3Dhttps%253A%252F%252Fraw.githubusercontent.com%252Ftensorflow%252Ftfx%252Fmaster%252Fdocs%252Ftutorials%252Ftfx%252Fgcp%252Fvertex_pipelines_bq.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Run in Google Cloud Vertex AI Workbench</a></td>
</table></div>


This notebook-based tutorial will use
[Google Cloud BigQuery](https://cloud.google.com/bigquery) as a data source to
train an ML model. The ML pipeline will be constructed using TFX and run on
Google Cloud Vertex Pipelines.

This notebook is based on the TFX pipeline we built in
[Simple TFX Pipeline for Vertex Pipelines Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_simple).
If you have not read that tutorial yet, you should read it before proceeding
with this notebook.

[BigQuery](https://cloud.google.com/bigquery) is serverless, highly scalable,
and cost-effective multi-cloud data warehouse designed for business agility.
TFX can be used to read training data from BigQuery and to
[publish the trained model](https://www.tensorflow.org/tfx/api_docs/python/tfx/v1/extensions/google_cloud_big_query/Pusher)
to BigQuery.

In this tutorial, we will use the `BigQueryExampleGen` component which reads
data from BigQuery to TFX pipelines.



This notebook is intended to be run on
[Google Colab](https://colab.research.google.com/notebooks/intro.ipynb) or on
[AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). If you
are not using one of these, you can simply click "Run in Google Colab" button
above.

## Set up
If you have completed
[Simple TFX Pipeline for Vertex Pipelines Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_simple),
you will have a working GCP project and a GCS bucket and that is all we need
for this tutorial. Please read the preliminary tutorial first if you missed it.

### Install python packages

We will install required Python packages including TFX and KFP to author ML
pipelines and submit jobs to Vertex Pipelines.

In [2]:
# Use the latest version of pip.
!pip install --upgrade pip
!pip install --upgrade "tfx[kfp]<2"







































































































































#### Did you restart the runtime?

If you are using Google Colab, the first time that you run
the cell above, you must restart the runtime by clicking
above "RESTART RUNTIME" button or using "Runtime > Restart
runtime ..." menu. This is because of the way that Colab
loads packages.

If you are not on Colab, you can restart runtime with following cell.

In [None]:
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

### Login in to Google for this notebook
If you are running this notebook on Colab, authenticate with your user account:

In [3]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

**If you are on AI Platform Notebooks**, authenticate with Google Cloud before
running the next section, by running
```sh
gcloud auth login
```
**in the Terminal window** (which you can open via **File** > **New** in the
menu). You only need to do this once per notebook instance.

Check the package versions.

In [4]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

2024-05-08 09:24:47.541894: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-08 09:24:47.541946: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-08 09:24:47.543649: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


TensorFlow version: 2.15.1


TFX version: 1.15.0
KFP version: 1.8.22


### Set up variables

We will set up some variables used to customize the pipelines below. Following
information is required:

* GCP Project id and number. See
[Identifying your project id and number](https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects).
* GCP Region to run pipelines. For more information about the regions that
Vertex Pipelines is available in, see the
[Vertex AI locations guide](https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability).
* Google Cloud Storage Bucket to store pipeline outputs.

**Enter required values in the cell below before running it**.


In [5]:
GOOGLE_CLOUD_PROJECT = ''         # <--- ENTER THIS
GOOGLE_CLOUD_PROJECT_NUMBER = ''  # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''          # <--- ENTER THIS
GCS_BUCKET_NAME = ''              # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and  GOOGLE_CLOUD_PROJECT_NUMBER and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')

ERROR:absl:Please set all required parameters.


Set `gcloud` to use your project.

In [6]:
!gcloud config set project {GOOGLE_CLOUD_PROJECT}

[1;31mERROR:[0m (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help


In [7]:
PIPELINE_NAME = 'penguin-bigquery'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: gs:///pipeline_root/penguin-bigquery


By default the Vertex Pipelines uses the default GCE VM service account of
format `[project-number]-compute@developer.gserviceaccount.com`. We need to
give a permission to use BigQuery to this account to access BigQuery in the
pipeline. We will add 'BigQuery User' role to the account.

In [8]:
!gcloud projects add-iam-policy-binding {GOOGLE_CLOUD_PROJECT} \
  --member=serviceAccount:{GOOGLE_CLOUD_PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
  --role=roles/bigquery.user

[1;31mERROR:[0m (gcloud.projects.add-iam-policy-binding) argument PROJECT_ID: Must be specified.
Usage: gcloud projects add-iam-policy-binding PROJECT_ID --member=PRINCIPAL --role=ROLE [optional flags]
  optional flags may be  --condition | --condition-from-file | --help

For detailed information on this command and its flags, run:
  gcloud projects add-iam-policy-binding --help


Please see
[Vertex documentation](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project)
to learn more about service accounts and IAM configuration.

## Create a pipeline

TFX pipelines are defined using Python APIs as we did in
[Simple TFX Pipeline for Vertex Pipelines Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_simple).
We previously used `CsvExampleGen` which reads data from a CSV file. In this
tutorial, we will use
[`BigQueryExampleGen`](https://www.tensorflow.org/tfx/api_docs/python/tfx/v1/extensions/google_cloud_big_query/BigQueryExampleGen)
component which reads data from BigQuery.


### Prepare BigQuery query

We will use the same
[Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html). However, we will read it from a BigQuery table
`tfx-oss-public.palmer_penguins.palmer_penguins` which is populated using the
same CSV file.

If you are using Google Colab, you can examine the content of the BigQuery
table directly.

In [None]:
# docs_infra: no_execute
%%bigquery --project {GOOGLE_CLOUD_PROJECT}
SELECT *
FROM `tfx-oss-public.palmer_penguins.palmer_penguins`
LIMIT 5

All features were already normalized to 0~1 except `species` which is the
label. We will build a classification model which predicts the `species` of
penguins.

`BigQueryExampleGen` requires a query to specify which data to fetch. Because
we will use all the fields of all rows in the table, the query is quite simple.
You can also specify field names and add `WHERE` conditions as needed according
to the
[BigQuery Standard SQL syntax](https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax).

In [9]:
QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"

### Write model code.

We will use the same model code as in the
[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

In [10]:
_trainer_module_file = 'penguin_trainer.py'

In [11]:
%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Writing penguin_trainer.py


Copy the module file to GCS which can be accessed from the pipeline components.
Because model training happens on GCP, we need to upload this model definition.

Otherwise, you might want to build a container image including the module file
and use the image to run the pipeline.

In [12]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-bigquery/".


### Write a pipeline definition

We will define a function to create a TFX pipeline. We need to use
`BigQueryExampleGen` which takes `query` as an argument. One more change from
the previous tutorial is that we need to pass `beam_pipeline_args` which is
passed to components when they are executed. We will use `beam_pipeline_args`
to pass additional parameters to BigQuery.


In [13]:
from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
  """Creates a TFX pipeline using BigQuery."""

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a file destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      # NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

## Run the pipeline on Vertex Pipelines.

We will use Vertex Pipelines to run the pipeline as we did in
[Simple TFX Pipeline for Vertex Pipelines Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_simple).


We also need to pass `beam_pipeline_args` for the BigQueryExampleGen. It
includes configs like the name of the GCP project and the temporary storage for
the BigQuery execution.

In [None]:
# docs_infra: no_execute
import os

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

The generated definition file can be submitted using kfp client.

In [None]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

Now you can visit the link in the output above or visit 'Vertex AI > Pipelines'
in [Google Cloud Console](https://console.cloud.google.com/) to see the
progress.