AvroReaderFactory.java
package com.fasterxml.jackson.dataformat.avro.deser;
import java.io.IOException;
import java.util.*;
import org.apache.avro.Schema;
import com.fasterxml.jackson.dataformat.avro.deser.ScalarDecoder.*;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
/**
* Helper class used for constructing a hierarchic reader for given
* (reader-) schema.
*/
public abstract class AvroReaderFactory
{
protected final static ScalarDecoder READER_BOOLEAN = new BooleanDecoder();
protected final static ScalarDecoder READER_BYTES = new BytesDecoder();
protected final static ScalarDecoder READER_DOUBLE = new DoubleReader();
protected final static ScalarDecoder READER_FLOAT = new FloatReader();
protected final static ScalarDecoder READER_INT = new IntReader();
protected final static ScalarDecoder READER_LONG = new LongReader();
protected final static ScalarDecoder READER_NULL = new NullReader();
protected final static ScalarDecoder READER_STRING = new StringReader();
/**
* To resolve cyclic types, need to keep track of resolved named
* types.
*/
protected final TreeMap<String, AvroStructureReader> _knownReaders = new TreeMap<>();
/*
/**********************************************************************
/* Public API: root methods to create reader
/**********************************************************************
*/
public static AvroStructureReader createFor(Schema schema) throws IOException {
return new NonResolving().createReader(schema);
}
public static AvroStructureReader createFor(Schema writerSchema,
Schema readerSchema) throws IOException {
return new Resolving().createReader(writerSchema, readerSchema);
}
/*
/**********************************************************************
/* Public API: factory methods
/**********************************************************************
*/
public ScalarDecoder createScalarValueDecoder(Schema type)
{
switch (type.getType()) {
case BOOLEAN:
return READER_BOOLEAN;
case BYTES:
return READER_BYTES;
case DOUBLE:
return READER_DOUBLE;
case ENUM:
return new EnumDecoder(AvroSchemaHelper.getFullName(type), type.getEnumSymbols());
case FIXED:
return new FixedDecoder(type.getFixedSize(), AvroSchemaHelper.getFullName(type));
case FLOAT:
return READER_FLOAT;
case INT:
{
final String typeId = AvroSchemaHelper.getTypeId(type);
return (typeId != null) ? new IntReader(typeId) : READER_INT;
}
case LONG:
return READER_LONG;
case NULL:
return READER_NULL;
case STRING:
{
final String typeId = AvroSchemaHelper.getTypeId(type);
return (typeId != null) ? new StringReader(typeId) : READER_STRING;
}
case UNION:
// Union is a "scalar union" if all the alternative types
// are scalar. One common type is that of "nullable" one,
// but general handling should work just fine.
List<Schema> types = type.getTypes();
{
ScalarDecoder[] readers = new ScalarDecoder[types.size()];
int i = 0;
for (Schema schema : types) {
ScalarDecoder reader = createScalarValueDecoder(schema);
if (reader == null) { // non-scalar; no go
return null;
}
readers[i++] = reader;
}
return new ScalarUnionDecoder(readers);
}
case ARRAY: // ok to call just can't handle
case MAP:
case RECORD:
return null;
}
// but others are not recognized
throw new IllegalStateException("Unrecognized Avro Schema type: "+type.getType());
}
/*
/**********************************************************************
/* Factory methods for non-resolving cases, shared by sub-classes
/**********************************************************************
*/
/**
* Method for creating a reader instance for specified type,
* only using specific schema that was used to encoded data
* ("writer schema").
*/
public AvroStructureReader createReader(Schema schema) throws IOException
{
AvroStructureReader reader = _knownReaders.get(AvroSchemaHelper.getFullName(schema));
if (reader != null) {
return reader;
}
switch (schema.getType()) {
case ARRAY:
return createArrayReader(schema);
case MAP:
return createMapReader(schema);
case RECORD:
return createRecordReader(schema);
case UNION:
return createUnionReader(schema);
default:
// for other types, we need wrappers
return new ScalarDecoderWrapper(createScalarValueDecoder(schema));
}
}
protected AvroStructureReader createArrayReader(Schema schema) throws IOException
{
Schema elementType = schema.getElementType();
ScalarDecoder scalar = createScalarValueDecoder(elementType);
String typeId = AvroSchemaHelper.getTypeId(schema);
String elementTypeId = schema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_ELEMENT_CLASS);
if (elementTypeId == null) {
elementTypeId = AvroSchemaHelper.getTypeId(elementType);
}
if (scalar != null) {
// EnumSet has to know element type information up front; take advantage of the fact that the id resolver handles canonical IDs
if (EnumSet.class.getName().equals(typeId)) {
typeId += "<" + elementTypeId + ">";
}
return ArrayReader.construct(scalar, typeId, elementTypeId);
}
return ArrayReader.construct(createReader(elementType), typeId, elementTypeId);
}
protected AvroStructureReader createMapReader(Schema schema) throws IOException
{
Schema elementType = schema.getValueType();
ScalarDecoder dec = createScalarValueDecoder(elementType);
String typeId = AvroSchemaHelper.getTypeId(schema);
String keyTypeId = schema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_KEY_CLASS);
// EnumMap requires value type information up front; take advantage of the fact that the id resolver handles canonical IDs
if (EnumMap.class.getName().equals(typeId)) {
typeId += "<" + keyTypeId + "," + Object.class.getName() + ">";
}
if (dec != null) {
String valueTypeId = AvroSchemaHelper.getTypeId(elementType);
return MapReader.construct(dec, typeId, keyTypeId, valueTypeId);
}
return MapReader.construct(createReader(elementType), typeId, keyTypeId);
}
protected AvroStructureReader createRecordReader(Schema schema) throws IOException
{
final List<Schema.Field> fields = schema.getFields();
AvroFieldReader[] fieldReaders = new AvroFieldReader[fields.size()];
RecordReader reader = new RecordReader.Std(fieldReaders, AvroSchemaHelper.getTypeId(schema));
_knownReaders.put(AvroSchemaHelper.getFullName(schema), reader);
int i = 0;
for (Schema.Field field : fields) {
fieldReaders[i++] = createFieldReader(field);
}
return reader;
}
protected AvroStructureReader createUnionReader(Schema schema) throws IOException
{
final List<Schema> types = schema.getTypes();
AvroStructureReader[] typeReaders = new AvroStructureReader[types.size()];
int i = 0;
for (Schema type : types) {
typeReaders[i++] = createReader(type);
}
return new UnionReader(typeReaders);
}
protected AvroFieldReader createFieldReader(Schema.Field field) throws IOException {
final String name = field.name();
final Schema type = field.schema();
ScalarDecoder scalar = createScalarValueDecoder(type);
if (scalar != null) {
return scalar.asFieldReader(name, false);
}
return AvroFieldReader.construct(name, createReader(type));
}
/*
/**********************************************************************
/* Implementations
/**********************************************************************
*/
/**
* Implementation used when no schema-resolution is needed, when we are using
* same schema for reading as was used for writing.
*
*/
private static class NonResolving extends AvroReaderFactory
{
protected NonResolving() { }
}
/**
* Implementation used when schema-resolution is needed, when we are using
* different schema for reading ("reader schema") than was used for
* writing encoded data ("writer schema")
*/
private static class Resolving extends AvroReaderFactory
{
protected Resolving() { }
/**
* Method for creating a reader instance for specified type.
*/
public AvroStructureReader createReader(Schema writerSchema, Schema readerSchema)
throws IOException
{
// NOTE: it is assumed writer-schema has been modified with aliases so
// that the names are same, so we could use either name:
AvroStructureReader reader = _knownReaders.get(AvroSchemaHelper.getFullName(readerSchema));
if (reader != null) {
return reader;
}
// but the type to decode must be writer-schema indicated one (but
// also same as or promotable to reader-schema one)
switch (writerSchema.getType()) {
case ARRAY:
return createArrayReader(writerSchema, readerSchema);
case MAP:
return createMapReader(writerSchema, readerSchema);
case RECORD:
return createRecordReader(writerSchema, readerSchema);
case UNION:
return createUnionReader(writerSchema, readerSchema);
default:
// for other types, we need wrappers
return new ScalarDecoderWrapper(createScalarValueDecoder(writerSchema));
}
}
protected AvroStructureReader createArrayReader(Schema writerSchema, Schema readerSchema)
throws IOException
{
readerSchema = _verifyMatchingStructure(readerSchema, writerSchema);
Schema writerElementType = writerSchema.getElementType();
ScalarDecoder scalar = createScalarValueDecoder(writerElementType);
String typeId = AvroSchemaHelper.getTypeId(readerSchema);
String elementTypeId = readerSchema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_ELEMENT_CLASS);
if (scalar != null) {
return ArrayReader.construct(scalar, typeId, elementTypeId);
}
return ArrayReader.construct(createReader(writerElementType, readerSchema.getElementType()), typeId, elementTypeId);
}
protected AvroStructureReader createMapReader(Schema writerSchema, Schema readerSchema) throws IOException
{
readerSchema = _verifyMatchingStructure(readerSchema, writerSchema);
Schema writerElementType = writerSchema.getValueType();
ScalarDecoder dec = createScalarValueDecoder(writerElementType);
String typeId = AvroSchemaHelper.getTypeId(readerSchema);
String keyTypeId = readerSchema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_KEY_CLASS);
if (dec != null) {
String valueTypeId = readerSchema.getValueType().getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS);
return MapReader.construct(dec, typeId, keyTypeId, valueTypeId);
}
return MapReader.construct(createReader(writerElementType, readerSchema.getValueType()), typeId, keyTypeId);
}
protected AvroStructureReader createRecordReader(Schema writerSchema, Schema readerSchema)
throws IOException
{
readerSchema = _verifyMatchingStructure(readerSchema, writerSchema);
// Ok, this gets bit more complicated: need to iterate over writer schema
// (since that will be the order fields are decoded in!), but also
// keep track of which reader fields are being produced.
final List<Schema.Field> writerFields = writerSchema.getFields();
// but first: find fields that only exist in reader-side and need defaults,
// and remove those from
Map<String,Schema.Field> readerFields = new HashMap<>();
List<Schema.Field> defaultFields = new ArrayList<>();
{
Set<String> writerNames = new HashSet<>();
for (Schema.Field f : writerFields) {
writerNames.add(f.name());
}
for (Schema.Field f : readerSchema.getFields()) {
String name = f.name();
if (writerNames.contains(name)) {
readerFields.put(name, f);
} else {
defaultFields.add(f);
}
}
}
// note: despite changes, we will always have known number of field entities,
// ones from writer schema -- some may skip, but there's entry there
AvroFieldReader[] fieldReaders = new AvroFieldReader[writerFields.size()
+ defaultFields.size()];
RecordReader reader = new RecordReader.Resolving(fieldReaders, AvroSchemaHelper.getTypeId(readerSchema));
// as per earlier, names should be the same
_knownReaders.put(AvroSchemaHelper.getFullName(readerSchema), reader);
int i = 0;
for (Schema.Field writerField : writerFields) {
Schema.Field readerField = readerFields.get(writerField.name());
// need a skipper:
fieldReaders[i++] = (readerField == null)
? createFieldSkipper(writerField.name(),
writerField.schema())
: createFieldReader(readerField.name(),
writerField.schema(), readerField.schema());
}
// Any defaults to consider?
if (!defaultFields.isEmpty()) {
for (Schema.Field defaultField : defaultFields) {
AvroFieldReader fr = AvroFieldDefaulters.createDefaulter(defaultField.name(),
AvroSchemaHelper.objectToJsonNode(defaultField.defaultVal())
);
if (fr == null) {
throw new IllegalArgumentException("Unsupported default type: "+defaultField.schema().getType());
}
fieldReaders[i++] = fr;
}
}
return reader;
}
protected AvroStructureReader createUnionReader(Schema writerSchema, Schema readerSchema)
throws IOException
{
final List<Schema> types = writerSchema.getTypes();
AvroStructureReader[] typeReaders = new AvroStructureReader[types.size()];
int i = 0;
// !!! TODO: actual resolution !!!
for (Schema type : types) {
typeReaders[i++] = createReader(type);
}
return new UnionReader(typeReaders);
}
protected AvroFieldReader createFieldReader(String name,
Schema writerSchema, Schema readerSchema) throws IOException
{
ScalarDecoder scalar = createScalarValueDecoder(writerSchema);
if (scalar != null) {
return scalar.asFieldReader(name, false);
}
return AvroFieldReader.construct(name,
createReader(writerSchema, readerSchema));
}
protected AvroFieldReader createFieldSkipper(String name,
Schema writerSchema) throws IOException
{
ScalarDecoder scalar = createScalarValueDecoder(writerSchema);
if (scalar != null) {
return scalar.asFieldReader(name, true);
}
return AvroFieldReader.constructSkipper(name,
createReader(writerSchema));
}
/**
* Helper method that verifies that the given reader schema is compatible
* with specified writer schema type: either directly (same type), or
* via latter being a union with compatible type. In latter case, type
* (schema) within union that matches writer schema is returned instead
* of containing union
*
* @return Reader schema that matches expected writer schema
*/
private Schema _verifyMatchingStructure(Schema readerSchema, Schema writerSchema)
{
final Schema.Type expectedType = writerSchema.getType();
Schema.Type actualType = readerSchema.getType();
// Simple rules: if structures are the same (both arrays, both maps, both records),
// fine, without yet verifying element types
if (actualType == expectedType) {
return readerSchema;
}
// Or, similarly, find the first structure of same type within union.
// !!! 07-Feb-2017, tatu: Quite possibly we should do recursive match check here,
// in case there are multiple alternatives of same structured type.
// But since that is quite non-trivial let's wait for a good example of actual
// usage before tackling that.
if (actualType == Schema.Type.UNION) {
for (Schema sch : readerSchema.getTypes()) {
if (sch.getType() == expectedType) {
return sch;
}
}
throw new IllegalStateException(String.format(
"Mismatch between types: expected %s (name '%s'), encountered %s of %d types without match",
expectedType, writerSchema.getName(), actualType, readerSchema.getTypes().size()));
}
throw new IllegalStateException(String.format(
"Mismatch between types: expected %s (name '%s'), encountered %s",
expectedType, writerSchema.getName(), actualType));
}
}
}