{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "tghWegsjhpkt" }, "source": [ "##### Copyright 2021 The TensorFlow Authors.\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "cellView": "form", "execution": { "iopub.execute_input": "2024-04-30T10:54:42.088450Z", "iopub.status.busy": "2024-04-30T10:54:42.087893Z", "iopub.status.idle": "2024-04-30T10:54:42.091689Z", "shell.execute_reply": "2024-04-30T10:54:42.091107Z" }, "id": "rSGJWC5biBiG" }, "outputs": [], "source": [ "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": { "id": "j-Iyf5gv5oBq" }, "source": [ "# Preprocess data with TensorFlow Transform\n", "***The Feature Engineering Component of TensorFlow Extended (TFX)***" ] }, { "cell_type": "markdown", "metadata": { "id": "S5ST8dI25wbA" }, "source": [ "Note: We recommend running this tutorial in a Colab notebook, with no setup required! Just click \"Run in Google Colab\".\n", "\n", "
\n",
"![]() | \n",
"\n",
"![]() | \n",
"\n",
"![]() | \n",
"\n",
"![]() | \n",
"
tf.Transform
) can be used to preprocess data using exactly the same code for both training a model and serving inferences in production.\n",
"\n",
"TensorFlow Transform is a library for preprocessing input data for TensorFlow, including creating features that require a full pass over the training dataset. For example, using TensorFlow Transform you could:\n",
"\n",
"* Normalize an input value by using the mean and standard deviation\n",
"* Convert strings to integers by generating a vocabulary over all of the input values\n",
"* Convert floats to integers by assigning them to buckets, based on the observed data distribution\n",
"\n",
"TensorFlow has built-in support for manipulations on a single example or a batch of examples. `tf.Transform` extends these capabilities to support full passes over the entire training dataset.\n",
"\n",
"The output of `tf.Transform` is exported as a TensorFlow graph which you can use for both training and serving. Using the same graph for both training and serving can prevent skew, since the same transformations are applied in both stages."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6c8lD3uQm8m5"
},
"source": [
"### Upgrade Pip\n",
"\n",
"To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab. Local systems can of course be upgraded separately."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:42.095539Z",
"iopub.status.busy": "2024-04-30T10:54:42.095039Z",
"iopub.status.idle": "2024-04-30T10:54:42.102449Z",
"shell.execute_reply": "2024-04-30T10:54:42.101837Z"
},
"id": "EmiQXNLZm8z-"
},
"outputs": [],
"source": [
"try:\n",
" import colab\n",
" !pip install --upgrade pip\n",
"except:\n",
" pass"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "hiBxgnc-m8-X"
},
"source": [
"### Install TensorFlow Transform"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:42.105729Z",
"iopub.status.busy": "2024-04-30T10:54:42.105191Z",
"iopub.status.idle": "2024-04-30T10:54:47.408819Z",
"shell.execute_reply": "2024-04-30T10:54:47.407695Z"
},
"id": "j2CTKbMNm9I4"
},
"outputs": [],
"source": [
"!pip install -q -U tensorflow_transform"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:47.413337Z",
"iopub.status.busy": "2024-04-30T10:54:47.413043Z",
"iopub.status.idle": "2024-04-30T10:54:47.540569Z",
"shell.execute_reply": "2024-04-30T10:54:47.539986Z"
},
"id": "R0mXLOJR_-dv"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmpfs/tmp/ipykernel_192169/639106435.py:3: DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html\n",
" import pkg_resources\n"
]
},
{
"data": {
"text/plain": [
"Tensor
or SparseTensor
. There are two main groups of API calls that typically form the heart of a preprocessing function:"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Zadh6MXLS3eD"
},
"source": [
"1. **TensorFlow Ops:** Any function that accepts and returns tensors, which usually means TensorFlow ops. These add TensorFlow operations to the graph that transforms raw data into transformed data one feature vector at a time. These will run for every example, during both training and serving.\n",
"2. **Tensorflow Transform Analyzers/Mappers:** Any of the analyzers/mappers provided by tf.Transform. These also accept and return tensors, and typically contain a combination of Tensorflow ops and Beam computation, but unlike TensorFlow ops they only run in the Beam pipeline during analysis requiring a full pass over the entire training dataset. The Beam computation runs only once, (prior to training, during analysis), and typically make a full pass over the entire training dataset. They create `tf.constant` tensors, which are added to your graph. For example, `tft.min` computes the minimum of a tensor over the training dataset.\n",
"\n",
"Caution: When you apply your preprocessing function to serving inferences, the constants that were created by analyzers during training do not change. If your data has trend or seasonality components, plan accordingly.\n",
"\n",
"Note: The `preprocessing_fn` is not directly callable. This means that\n",
"calling `preprocessing_fn(raw_data)` will not work. Instead, it must\n",
"be passed to the Transform Beam API as shown in the following cells."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:50.828844Z",
"iopub.status.busy": "2024-04-30T10:54:50.828604Z",
"iopub.status.idle": "2024-04-30T10:54:50.833282Z",
"shell.execute_reply": "2024-04-30T10:54:50.832622Z"
},
"id": "H2wANNF_2dCR"
},
"outputs": [],
"source": [
"def preprocessing_fn(inputs):\n",
" \"\"\"Preprocess input columns into transformed columns.\"\"\"\n",
" x = inputs['x']\n",
" y = inputs['y']\n",
" s = inputs['s']\n",
" x_centered = x - tft.mean(x)\n",
" y_normalized = tft.scale_to_0_1(y)\n",
" s_integerized = tft.compute_and_apply_vocabulary(s)\n",
" x_centered_times_y_normalized = (x_centered * y_normalized)\n",
" return {\n",
" 'x_centered': x_centered,\n",
" 'y_normalized': y_normalized,\n",
" 's_integerized': s_integerized,\n",
" 'x_centered_times_y_normalized': x_centered_times_y_normalized,\n",
" }"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cSl9qyTCbBKR"
},
"source": [
"## Syntax\n",
"\n",
"You're almost ready to put everything together and use Apache Beam to run it.\n",
"\n",
"Apache Beam uses a special syntax to define and invoke transforms. For example, in this line:\n",
"\n",
"```\n",
"result = pass_this | 'name this step' >> to_this_call\n",
"```\n",
"\n",
"The method `to_this_call` is being invoked and passed the object called `pass_this`, and this operation will be referred to as `name this step` in a stack trace. The result of the call to `to_this_call` is returned in `result`. You will often see stages of a pipeline chained together like this:\n",
"\n",
"```\n",
"result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()\n",
"```\n",
"\n",
"and since that started with a new pipeline, you can continue like this:\n",
"\n",
"```\n",
"next_result = result | 'doing more stuff' >> another_function()\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "5kLDSxOQ8xgg"
},
"source": [
"## Putting it all together\n",
"Now we're ready to transform our data. We'll use Apache Beam with a direct runner, and supply three inputs:\n",
"\n",
"1. `raw_data` - The raw input data that we created above\n",
"2. `raw_data_metadata` - The schema for the raw data\n",
"3. `preprocessing_fn` - The function that we created to do our transformation"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:50.836781Z",
"iopub.status.busy": "2024-04-30T10:54:50.836181Z",
"iopub.status.idle": "2024-04-30T10:54:50.840277Z",
"shell.execute_reply": "2024-04-30T10:54:50.839685Z"
},
"id": "mAF9w7RTZU7c"
},
"outputs": [],
"source": [
"def main(output_dir):\n",
" # Ignore the warnings\n",
" with tft_beam.Context(temp_dir=tempfile.mkdtemp()):\n",
" transformed_dataset, transform_fn = ( # pylint: disable=unused-variable\n",
" (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(\n",
" preprocessing_fn))\n",
"\n",
" transformed_data, transformed_metadata = transformed_dataset # pylint: disable=unused-variable\n",
"\n",
" # Save the transform_fn to the output_dir\n",
" _ = (\n",
" transform_fn\n",
" | 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir))\n",
"\n",
" return transformed_data, transformed_metadata"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:50.843342Z",
"iopub.status.busy": "2024-04-30T10:54:50.843076Z",
"iopub.status.idle": "2024-04-30T10:54:58.761420Z",
"shell.execute_reply": "2024-04-30T10:54:58.760752Z"
},
"id": "zZPQl0X19ni2"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.\n"
]
},
{
"data": {
"application/javascript": [
"\n",
" if (typeof window.interactive_beam_jquery == 'undefined') {\n",
" var jqueryScript = document.createElement('script');\n",
" jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
" jqueryScript.type = 'text/javascript';\n",
" jqueryScript.onload = function() {\n",
" var datatableScript = document.createElement('script');\n",
" datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
" datatableScript.type = 'text/javascript';\n",
" datatableScript.onload = function() {\n",
" window.interactive_beam_jquery = jQuery.noConflict(true);\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" });\n",
" }\n",
" document.head.appendChild(datatableScript);\n",
" };\n",
" document.head.appendChild(jqueryScript);\n",
" } else {\n",
" window.interactive_beam_jquery(document).ready(function($){\n",
" \n",
" });\n",
" }"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:absl:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:absl:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch).\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:absl:You are outputting instance dicts from `TransformDataset` which will not provide optimal performance. Consider setting `output_record_batches=True` to upgrade to the TFXIO format (Apache Arrow RecordBatch). Encoding functionality in this module works with both formats.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpgsoge9im.json', '--HistoryManager.hist_file=:memory:']\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/c576d13575254973b6f7263cfcf3ffc3/assets\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/c576d13575254973b6f7263cfcf3ffc3/assets\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:struct2tensor is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:struct2tensor is not available.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_decision_forests is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_decision_forests is not available.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_text is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_text is not available.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/b9fda3835766458d8e33d05f6357bed2/assets\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/b9fda3835766458d8e33d05f6357bed2/assets\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:struct2tensor is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:struct2tensor is not available.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_decision_forests is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_decision_forests is not available.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_text is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:tensorflow:tensorflow_text is not available.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpgsoge9im.json', '--HistoryManager.hist_file=:memory:']\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"Raw data:\n",
"[{'s': 'hello', 'x': 1, 'y': 1},\n",
" {'s': 'world', 'x': 2, 'y': 2},\n",
" {'s': 'hello', 'x': 3, 'y': 3}]\n",
"\n",
"Transformed data:\n",
"[{'s_integerized': 0,\n",
" 'x_centered': -1.0,\n",
" 'x_centered_times_y_normalized': -0.0,\n",
" 'y_normalized': 0.0},\n",
" {'s_integerized': 1,\n",
" 'x_centered': 0.0,\n",
" 'x_centered_times_y_normalized': 0.0,\n",
" 'y_normalized': 0.5},\n",
" {'s_integerized': 0,\n",
" 'x_centered': 1.0,\n",
" 'x_centered_times_y_normalized': 1.0,\n",
" 'y_normalized': 1.0}]\n"
]
}
],
"source": [
"output_dir = pathlib.Path(tempfile.mkdtemp())\n",
"\n",
"transformed_data, transformed_metadata = main(str(output_dir))\n",
"\n",
"print('\\nRaw data:\\n{}\\n'.format(pprint.pformat(raw_data)))\n",
"print('Transformed data:\\n{}'.format(pprint.pformat(transformed_data)))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NO6LyTneNndy"
},
"source": [
"## Is this the right answer?\n",
"Previously, we used `tf.Transform` to do this:\n",
"```\n",
"x_centered = x - tft.mean(x)\n",
"y_normalized = tft.scale_to_0_1(y)\n",
"s_integerized = tft.compute_and_apply_vocabulary(s)\n",
"x_centered_times_y_normalized = (x_centered * y_normalized)\n",
"```\n",
"\n",
"* **x_centered** - With input of `[1, 2, 3]` the mean of x is 2, and we subtract it from x to center our x values at 0. So our result of `[-1.0, 0.0, 1.0]` is correct.\n",
"* **y_normalized** - We wanted to scale our y values between 0 and 1. Our input was `[1, 2, 3]` so our result of `[0.0, 0.5, 1.0]` is correct.\n",
"* **s_integerized** - We wanted to map our strings to indexes in a vocabulary, and there were only 2 words in our vocabulary (\"hello\" and \"world\"). So with input of `[\"hello\", \"world\", \"hello\"]` our result of `[0, 1, 0]` is correct. Since \"hello\" occurs most frequently in this data, it will be the first entry in the vocabulary.\n",
"* **x_centered_times_y_normalized** - We wanted to create a new feature by crossing `x_centered` and `y_normalized` using multiplication. Note that this multiplies the results, not the original values, and our new result of `[-0.0, 0.0, 1.0]` is correct."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "dXw790Sr8Jws"
},
"source": [
"## Use the resulting `transform_fn`"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:58.764879Z",
"iopub.status.busy": "2024-04-30T10:54:58.764628Z",
"iopub.status.idle": "2024-04-30T10:54:58.927126Z",
"shell.execute_reply": "2024-04-30T10:54:58.926303Z"
},
"id": "We4Mafrq8id6"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"total 8\r\n",
"drwxr-xr-x 4 kbuilder kbuilder 4096 Apr 30 10:54 transform_fn\r\n",
"drwxr-xr-x 2 kbuilder kbuilder 4096 Apr 30 10:54 transformed_metadata\r\n"
]
}
],
"source": [
"!ls -l {output_dir}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "SoaaAXxk_vWP"
},
"source": [
"The `transform_fn/` directory contains a `tf.saved_model` implementing with all the constants tensorflow-transform analysis results built into the graph. \n",
"\n",
"It is possible to load this directly with `tf.saved_model.load`, but this not easy to use:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"execution": {
"iopub.execute_input": "2024-04-30T10:54:58.931682Z",
"iopub.status.busy": "2024-04-30T10:54:58.930998Z",
"iopub.status.idle": "2024-04-30T10:54:59.018550Z",
"shell.execute_reply": "2024-04-30T10:54:59.017890Z"
},
"id": "cz8dqFW6ANJQ"
},
"outputs": [
{
"data": {
"text/plain": [
"