RecordVisitor.java

package tools.jackson.dataformat.avro.schema;

import java.util.*;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.reflect.AvroMeta;
import org.apache.avro.reflect.AvroSchema;

import tools.jackson.databind.*;
import tools.jackson.databind.introspect.AnnotatedClass;
import tools.jackson.databind.jsonFormatVisitors.JsonFormatVisitable;
import tools.jackson.databind.jsonFormatVisitors.JsonObjectFormatVisitor;
import tools.jackson.databind.jsontype.NamedType;
import tools.jackson.databind.ser.BeanPropertyWriter;
import tools.jackson.dataformat.avro.AvroFactory;
import tools.jackson.dataformat.avro.AvroWriteFeature;
import tools.jackson.dataformat.avro.annotation.AvroDecimal;
import tools.jackson.dataformat.avro.annotation.AvroFixedSize;
import tools.jackson.dataformat.avro.ser.CustomEncodingSerializer;

public class RecordVisitor
    extends JsonObjectFormatVisitor.Base
    implements SchemaBuilder
{
    protected final JavaType _type;

    protected final VisitorFormatWrapperImpl _visitorWrapper;

    /**
     * Tracks if the schema for this record has been overridden (by an annotation or other means),
     * and calls to the {@code property} and {@code optionalProperty} methods should be ignored.
     */
    protected final boolean _overridden;

    protected final boolean _cfgAddNullDefaults;

    /**
     * When Avro schema for this JavaType ({@code _type}) results in UNION of multiple Avro types,
     * _typeSchema keeps track of which Avro type in the UNION represents this JavaType ({@code _type})
     * so that fields of this JavaType can be set to the right Avro type by {@code builtAvroSchema()}.
     *<br>
     * Example:
     * <pre>
     *   @JsonSubTypes({
     *     @JsonSubTypes.Type(value = Apple.class),
     *     @JsonSubTypes.Type(value = Pear.class) })
     *   class Fruit {}
     *
     *   class Apple extends Fruit {}
     *   class Orange extends Fruit {}
     * </pre>
     * When {@code _type = Fruit.class}
     * Then
     * _avroSchema if Fruit.class is union of Fruit record, Apple record and Orange record schemas: [
     *     { name: Fruit, type: record, fields: [..] }, <--- _typeSchema points here
     *     { name: Apple, type: record, fields: [..] },
     *     { name: Orange, type: record, fields: [..]}
     *   ]
     * _typeSchema points to Fruit.class without subtypes record schema
     *
     * FIXME: When _typeSchema is not null, then _overridden must be true, therefore (_overridden == true) can be replaced with (_typeSchema != null),
     * but it might be considered API change cause _overridden has protected access modifier.
     *
     * @since 2.19.1
     */
    private final Schema _typeSchema;

    protected final Schema _avroSchema;

    protected final List<Schema.Field> _fields = new ArrayList<>();

    public RecordVisitor(SerializationContext ctxt, JavaType type,
            VisitorFormatWrapperImpl visitorWrapper)
    {
        super(ctxt);
        _type = type;
        _visitorWrapper = visitorWrapper;

        AvroFactory avroFactory = (AvroFactory) ctxt.tokenStreamFactory();
        _cfgAddNullDefaults = avroFactory.isEnabled(AvroWriteFeature.ADD_NULL_AS_DEFAULT_VALUE_IN_SCHEMA);

        // Check if the schema for this record is overridden
        SerializationConfig config = ctxt.getConfig();

        // 12-Oct-2019, tatu: VERY important: only get direct annotations, not for supertypes --
        //   otherwise there's infinite loop awaiting for... some reason. Other parts of code
        //   should probably check for loops but bit hard for me to fix as I did not author
        //   code in question (so may be unaware of some nuances)
        final AnnotatedClass annotations = ctxt.introspectDirectClassAnnotations(_type);
        final AnnotationIntrospector intr = ctxt.getAnnotationIntrospector();
        //List<NamedType> subTypes = intr.findSubtypes(config, annotations);
        AvroSchema ann = annotations.getAnnotation(AvroSchema.class);
        if (ann != null) {
            _avroSchema = AvroSchemaHelper.parseJsonSchema(ann.value());
            _overridden = true;
            _typeSchema = null;
        } else {
            // If Avro schema for this _type results in UNION I want to know Avro type where to assign fields
            _typeSchema = AvroSchemaHelper.initializeRecordSchema(ctxt.getConfig(), _type, annotations);
            _overridden = false;
            AvroMeta meta = annotations.getAnnotation(AvroMeta.class);
            if (meta != null) {
                _typeSchema.addProp(meta.key(), meta.value());
            }

            List<NamedType> subTypes = intr.findSubtypes(config, annotations);
            if (subTypes != null && !subTypes.isEmpty()) {
                // alreadySeenClasses prevents subType processing in endless loop
                Set<Class<?>> alreadySeenClasses = new HashSet<>();
                alreadySeenClasses.add(_type.getRawClass());

                // At this point calculating hashCode for _typeSchema fails with
                // NPE because RecordSchema.fields is NULL
                // (see org.apache.avro.Schema.RecordSchema#computeHash).
                // Therefore, unionSchemas must not be HashSet (or any other type
                // using hashCode() for equality check).
                // ArrayList ensures that ordering of subTypes is preserved.
                final List<Schema> unionSchemas = new ArrayList<>();
                // Initialize with this schema
                if (_type.isConcrete()) {
                    unionSchemas.add(_typeSchema);
                }

                for (NamedType subType : subTypes) {
                    if (!alreadySeenClasses.add(subType.getType())) {
                        continue;
                    }
                    ValueSerializer<?> ser = ctxt.findValueSerializer(subType.getType());
                    VisitorFormatWrapperImpl visitor = _visitorWrapper.createChildWrapper();
                    ser.acceptJsonFormatVisitor(visitor,ctxt.constructType(subType.getType()));
                    // Add subType schema into this union, unless it is already there.
                    Schema subTypeSchema = visitor.getAvroSchema();
                    // When subType schema is union itself, include each its type into this union if not there already
                    if (subTypeSchema.getType() == Type.UNION) {
                        unionSchemas.addAll(subTypeSchema.getTypes());
                    } else {
                        unionSchemas.add(subTypeSchema);
                    }
                }
                _avroSchema = Schema.createUnion(deduplicateByReference(unionSchemas));
            } else {
                _avroSchema = _typeSchema;
            }
        }
        _visitorWrapper.getSchemas().addSchema(type, _avroSchema);
    }

    private static List<Schema> deduplicateByReference(List<Schema> schemas) {
        final List<Schema> result = new ArrayList<>();
        // Set based on IdentityHashMap is used because we need to deduplicate by reference.
        final Set<Schema> seenSchemas = Collections.newSetFromMap(new IdentityHashMap<>());

        for(Schema s : schemas) {
            if(seenSchemas.add(s)) {
                result.add(s);             // preserve order
            }
        }
        return result;
    }

    @Override
    public Schema builtAvroSchema() {
        if (!_overridden) {
            // Assumption now is that we are done, so let's assign fields
            _typeSchema.setFields(_fields);
        }
        return _avroSchema;
    }

    /*
    /**********************************************************************
    /* JsonObjectFormatVisitor implementation
    /**********************************************************************
     */

    @Override
    public void property(BeanProperty writer)
    {
        if (_overridden) {
            return;
        }
        _fields.add(schemaFieldForWriter(writer, false));
    }

    @Override
    public void property(String name, JsonFormatVisitable handler,
            JavaType type)
    {
        if (_overridden) {
            return;
        }
        VisitorFormatWrapperImpl visitorWrapper = _visitorWrapper.createChildWrapper();
        handler.acceptJsonFormatVisitor(visitorWrapper, type);
        Schema schema = visitorWrapper.getAvroSchema();
        _fields.add(new Schema.Field(name, schema, null, (Object) null));
    }

    @Override
    public void optionalProperty(BeanProperty writer) {
        if (_overridden) {
            return;
        }
        _fields.add(schemaFieldForWriter(writer, true));
    }

    @Override
    public void optionalProperty(String name, JsonFormatVisitable handler,
            JavaType type)
    {
        if (_overridden) {
            return;
        }
        VisitorFormatWrapperImpl visitorWrapper = _visitorWrapper.createChildWrapper();
        handler.acceptJsonFormatVisitor(visitorWrapper, type);
        Schema schema = visitorWrapper.getAvroSchema();
        if (!type.isPrimitive()) {
            schema = AvroSchemaHelper.unionWithNull(schema);
        }
        _fields.add(new Schema.Field(name, schema, null, (Object) null));
    }

    /*
    /**********************************************************************
    /* Internal methods
    /**********************************************************************
     */

    protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional)
    {
        Schema writerSchema = null;
        // Check if schema for property is overridden
        AvroSchema schemaOverride = prop.getAnnotation(AvroSchema.class);
        if (schemaOverride != null) {
            Schema.Parser parser = new Schema.Parser();
            writerSchema = parser.parse(schemaOverride.value());
        } else {
            AvroFixedSize fixedSize = prop.getAnnotation(AvroFixedSize.class);
            if (fixedSize != null) {
                writerSchema = Schema.createFixed(fixedSize.typeName(), null, fixedSize.typeNamespace(), fixedSize.size());
            }
            if (_visitorWrapper.isLogicalTypesEnabled()) {
                AvroDecimal avroDecimal = prop.getAnnotation(AvroDecimal.class);
                if (avroDecimal != null) {
                    if (writerSchema == null) {
                        writerSchema = Schema.create(Type.BYTES);
                    }
                    writerSchema = LogicalTypes.decimal(avroDecimal.precision(), avroDecimal.scale())
                            .addToSchema(writerSchema);
                }
            }
            if (writerSchema == null) {
                ValueSerializer<?> ser = null;

                // 23-Nov-2012, tatu: Ideally shouldn't need to do this but...
                if (prop instanceof BeanPropertyWriter) {
                    BeanPropertyWriter bpw = (BeanPropertyWriter) prop;
                    ser = bpw.getSerializer();
                    // 2-Mar-2017, bryan: AvroEncode annotation expects to have the schema used directly
                    optional = optional && !(ser instanceof CustomEncodingSerializer); // Don't modify schema
                }
                final SerializationContext prov = getContext();
                if (ser == null) {
                    if (prov == null) {
                        throw DatabindException.from(prov, "SerializationContext missing for RecordVisitor");
                    }
                    ser = prov.findPrimaryPropertySerializer(prop.getType(), prop);
                }
                VisitorFormatWrapperImpl visitorWrapper = _visitorWrapper.createChildWrapper();
                ser.acceptJsonFormatVisitor(visitorWrapper, prop.getType());
                writerSchema = visitorWrapper.getAvroSchema();
            }

            // 23-Nov-2012, tatu: Actually let's also assume that primitive type values
            //   are required, as Jackson does not distinguish whether optional has been
            //   defined, or is merely the default setting.
            if (optional && !prop.getType().isPrimitive()) {
                writerSchema = AvroSchemaHelper.unionWithNull(writerSchema);
            }
        }
        JsonNode defaultValue = AvroSchemaHelper.parseDefaultValue(prop.getMetadata().getDefaultValue());

        // [dataformats-binary#145]: induce `null` default value, if compatible
        if (_cfgAddNullDefaults
                && (defaultValue == null)
                && writerSchema.getType() == Type.UNION
                && writerSchema.getIndexNamed(Type.NULL.getName()) != null) {
            defaultValue = AvroSchemaHelper.nullNode();
        } else {
            writerSchema = reorderUnionToMatchDefaultType(writerSchema, defaultValue);
        }
        Schema.Field field = new Schema.Field(prop.getName(), writerSchema, prop.getMetadata().getDescription(),
                AvroSchemaHelper.jsonNodeToObject(defaultValue));

        AvroMeta meta = prop.getAnnotation(AvroMeta.class);
        if (meta != null) {
            field.addProp(meta.key(), meta.value());
        }
        List<PropertyName> aliases = prop.findAliases(getContext().getConfig());
        if (!aliases.isEmpty()) {
            for (PropertyName pn : aliases) {
                field.addAlias(pn.getSimpleName());
            }
        }
        return field;
    }

    /**
     * A union schema with a default value must always have the schema branch corresponding to the default value first, or Avro will print a
     * warning complaining that the default value is not compatible. If {@code schema} is a {@link Type#UNION UNION} schema and
     * {@code defaultValue} is non-{@code null}, this finds the appropriate branch in the union and reorders the union so that it is first.
     *
     * @param schema
     *     Schema to reorder; If {@code null} or not a {@code UNION}, then it is returned unmodified.
     * @param defaultValue
     *     Default value to match with the union
     *
     * @return A schema modified so the first branch matches the type of {@code defaultValue}; otherwise, {@code schema} is returned
     * unmodified.
     */
    protected Schema reorderUnionToMatchDefaultType(Schema schema, JsonNode defaultValue) {
        if (schema == null || defaultValue == null || schema.getType() != Type.UNION) {
            return schema;
        }
        List<Schema> types = new ArrayList<>(schema.getTypes());
        Integer matchingIndex = null;
        if (defaultValue.isArray()) {
            matchingIndex = schema.getIndexNamed(Type.ARRAY.getName());
        } else if (defaultValue.isObject()) {
            matchingIndex = schema.getIndexNamed(Type.MAP.getName());
            if (matchingIndex == null) {
                // search for a record
                for (int i = 0; i < types.size(); i++) {
                    if (types.get(i).getType() == Type.RECORD) {
                        matchingIndex = i;
                        break;
                    }
                }
            }
        } else if (defaultValue.isBoolean()) {
            matchingIndex = schema.getIndexNamed(Type.BOOLEAN.getName());
        } else if (defaultValue.isNull()) {
            matchingIndex = schema.getIndexNamed(Type.NULL.getName());
        } else if (defaultValue.isBinary()) {
            matchingIndex = schema.getIndexNamed(Type.BYTES.getName());
        } else if (defaultValue.isFloatingPointNumber()) {
            matchingIndex = schema.getIndexNamed(Type.DOUBLE.getName());
            if (matchingIndex == null) {
                matchingIndex = schema.getIndexNamed(Type.FLOAT.getName());
            }
        } else if (defaultValue.isIntegralNumber()) {
            matchingIndex = schema.getIndexNamed(Type.LONG.getName());
            if (matchingIndex == null) {
                matchingIndex = schema.getIndexNamed(Type.INT.getName());
            }
        } else if (defaultValue.isString()) {
            matchingIndex = schema.getIndexNamed(Type.STRING.getName());
            if (matchingIndex == null) {
                // search for an enum
                for (int i = 0; i < types.size(); i++) {
                    if (types.get(i).getType() == Type.ENUM) {
                        matchingIndex = i;
                        break;
                    }
                }
            }
        }
        if (matchingIndex != null) {
            types.add(0, types.remove((int)matchingIndex));
            Map<String, Object> jsonProps = schema.getObjectProps();
            schema = Schema.createUnion(types);
            // copy any properties over
            for (String property : jsonProps.keySet()) {
                schema.addProp(property, jsonProps.get(property));
            }
        }
        return schema;
    }
}