##### Copyright 2020 The TensorFlow IO Authors.

In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Avro Dataset API

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/io/tutorials/avro"><img src="https://www.tensorflow.org/images/tf_logo_32px.png" />View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/io/blob/master/docs/tutorials/avro.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/io/blob/master/docs/tutorials/avro.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
      <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/io/docs/tutorials/avro.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

## Overview

The objective of Avro Dataset API is to load Avro formatted data natively into TensorFlow as <a target="_blank" href="https://www.tensorflow.org/api_docs/python/tf/data/Dataset">TensorFlow dataset</a>. Avro is a data serialization system similiar to Protocol Buffers. It's widely used in Apache Hadoop where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes. Avro data is a row-oriented, compacted binary data format. It relies on schema which is stored as a separate JSON file. For the spec of Avro format and schema declaration, please refer to <a target="_blank" href="https://avro.apache.org/docs/current/spec.html">the official manual</a>.


## Setup package


### Install the required tensorflow-io package

In [2]:
!pip install tensorflow-io

Collecting tensorflow-io


  Using cached tensorflow_io-0.18.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (24.1 MB)


Collecting tensorflow-io-gcs-filesystem==0.18.0
  Using cached tensorflow_io_gcs_filesystem-0.18.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (2.5 MB)












Installing collected packages: tensorflow-io-gcs-filesystem, tensorflow-io


Successfully installed tensorflow-io-0.18.0 tensorflow-io-gcs-filesystem-0.18.0


### Import packages

In [3]:
import tensorflow as tf
import tensorflow_io as tfio


### Validate tf and tfio imports

In [4]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

tensorflow-io version: 0.18.0
tensorflow version: 2.5.0


## Usage

### Explore the dataset

For the purpose of this tutorial, let's download the sample Avro dataset. 


Download a sample Avro file:

In [5]:
!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
!ls -l train.avro

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0

100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268


100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255


-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro


Download the corresponding schema file of the sample Avro file:

In [6]:
!curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
!ls -l train.avsc

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0

100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247


100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780


-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc


In the above example, a testing Avro dataset were created based on mnist dataset. The original mnist dataset in TFRecord format is generated from <a target="_blank" href="https://www.tensorflow.org/datasets/api_docs/python/tfds/load">TF named dataset</a>. However, the mnist dataset is too large as a demo dataset. For simplicity purpose, most of it were trimmed and first few records only were kept. Moreover, additional trimming was done for `image` field in original mnist dataset and mapped it to `features` field in Avro. So the avro file `train.avro` has 4 records, each of which has 3 fields: `features`, which is an array of int, `label`, an int or null, and `dataType`, an enum. To view the decoded `train.avro` (Note <a target="_blank" href="https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro">the original avro data file</a> is not human readable as avro is a compacted format):


Install the required package to read Avro file:


In [7]:
!pip install avro


Collecting avro


  Downloading avro-1.10.2.tar.gz (68 kB)
[?25l[K     |████▉                           | 10 kB 28.4 MB/s eta 0:00:01[K     |█████████▋                      | 20 kB 9.1 MB/s eta 0:00:01[K     |██████████████▍                 | 30 kB 6.7 MB/s eta 0:00:01

[K     |███████████████████▎            | 40 kB 2.4 MB/s eta 0:00:01[K     |████████████████████████        | 51 kB 2.9 MB/s eta 0:00:01[K     |████████████████████████████▉   | 61 kB 3.2 MB/s eta 0:00:01[K     |████████████████████████████████| 68 kB 2.7 MB/s 


[?25hBuilding wheels for collected packages: avro


  Building wheel for avro (setup.py) ... [?25l-

 \

 done
[?25h  Created wheel for avro: filename=avro-1.10.2-py3-none-any.whl size=96832 sha256=e22345a9d1a1b98242b044b16bf7b74bd98eba112e6fccd7e0a4081429555d9d
  Stored in directory: /home/kbuilder/.cache/pip/wheels/e7/93/e8/7e16388beb0837cbfb9065ff9d3fe33e4111a3f4bedea1c2c6
Successfully built avro


Installing collected packages: avro


Successfully installed avro-1.10.2


To read and print an Avro file in a human-readable format:


In [8]:
from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')


{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}


And the schema of `train.avro` which is represented by `train.avsc` is a JSON-formatted file.
To view the `train.avsc`: 


In [9]:
def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')


{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}


### Prepare the dataset


Load `train.avro` as TensorFlow dataset with Avro dataset API: 


In [10]:
features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")


SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------


The above example converts `train.avro` into tensorflow dataset. Each element of the dataset is a dictionary whose key is the feature name, value is the converted sparse or dense tensor. 
E.g, it converts `features`, `label`, `dataType` field to a VarLenFeature(SparseTensor), FixedLenFeature(DenseTensor), and FixedLenFeature(DenseTensor) respectively. Since batch_size is 3, it coerce 3 records from `train.avro` into one element in the result dataset.
For the first record in `train.avro` whose label is null, avro reader replaces it with the specified default value(-100).
In this example, there're 4 records in total in `train.avro`. Since batch size is 3, the result dataset contains 3 elements, last of which's batch size is 1. However user is also able to drop the last batch if the size is smaller than batch size by enabling `drop_final_batch`. E.g: 


In [11]:
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)


{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}


One can also increase num_parallel_reads to expediate Avro data processing by increasing avro parse/read parallelism.


In [12]:
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)


{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}


For detailed usage of `make_avro_record_dataset`, please refer to <a target="_blank" href="https://www.tensorflow.org/io/api_docs/python/tfio/experimental/columnar/make_avro_record_dataset">API doc</a>.


### Train tf.keras models with Avro dataset

Now let's walk through an end-to-end example of tf.keras model training with Avro dataset based on mnist dataset.


Load `train.avro` as TensorFlow dataset with Avro dataset API: 


In [13]:
features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)


Define a simple keras model: 


In [14]:
def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()


### Train the keras model with Avro dataset:


In [15]:
model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)


Consider rewriting this model with the Functional API.


Consider rewriting this model with the Functional API.






<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

The avro dataset can parse and coerce any avro data into TensorFlow tensors, including records in records, maps, arrays, branches, and enumerations. The parsing information is passed into the avro dataset implementation as a map where 
keys encode how to parse the data 
values encode on how to coerce the data into TensorFlow tensors – deciding the primitive type (e.g. bool, int, long, float, double, string) as well as the tensor type (e.g. sparse or dense). A listing of TensorFlow's parser types (see Table 1) and the coercion of primitive types (Table 2) is provided. 

Table 1 the supported TensorFlow parser types:

TensorFlow Parser Types|TensorFlow Tensors|Explanation
----|----|------
tf.FixedLenFeature([], tf.int32)|dense tensor|Parse a fixed length feature; that is all rows have the same constant number of elements, e.g. just one element or an array that has always the same number of elements for each row 
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) |sparse tensor|Parse a sparse feature where each row has a variable length list of indices and values. The 'index_key' identifies the indices. The 'value_key' identifies the value. The 'dtype' is the data type. The 'size' is the expected maximum index value for each index entry
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) |sparse tensor|Parse a variable length feature; that means each data row can have a variable number of elements, e.g. the 1st row has 5 elements, the 2nd row has 7 elements

Table 2 the supported conversion from Avro types to TensorFlow's types:

Avro Primitive Type|TensorFlow Primitive Type
----|----
boolean: a binary value|tf.bool
bytes: a sequence of 8-bit unsigned bytes|tf.string
double: double precision 64-bit IEEE floating point number|tf.float64
enum: enumeration type|tf.string using the symbol name
float: single precision 32-bit IEEE floating point number|tf.float32
int: 32-bit signed integer|tf.int32
long: 64-bit signed integer|tf.int64
null: no value|uses default value
string: unicode character sequence|tf.string


A comprehensive set of examples of Avro dataset API is provided within <a target="_blank" href="https://github.com/tensorflow/io/blob/master/tests/test_parse_avro.py#L437">the tests</a>.
