AvroGenerator.java

package com.fasterxml.jackson.dataformat.avro;

import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;

import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.core.base.GeneratorBase;
import com.fasterxml.jackson.core.io.IOContext;
import com.fasterxml.jackson.core.util.JacksonFeatureSet;
import com.fasterxml.jackson.dataformat.avro.apacheimpl.ApacheCodecRecycler;
import com.fasterxml.jackson.dataformat.avro.ser.AvroWriteContext;
import com.fasterxml.jackson.dataformat.avro.ser.EncodedDatum;

public class AvroGenerator extends GeneratorBase
{
    /**
     * Enumeration that defines all togglable features for Avro generators
     */
    public enum Feature
        implements FormatFeature // since 2.7
    {
        /**
         * Feature that can be disabled to prevent Avro from buffering any more
         * data than absolutely necessary.
         * This affects buffering by underlying codec.
         * Note that disabling buffer is likely to reduce performance if the underlying
         * input/output is unbuffered.
         *<p>
         * Enabled by default to preserve the existing behavior.
         *
         * @since 2.7
         */
        AVRO_BUFFERING(true),

        /**
         * Feature that tells Avro to write data in file format (i.e. including the schema with the data)
         * rather than the RPC format which is otherwise default
         *<p>
         * NOTE: reader-side will have to be aware of distinction as well, since possible inclusion
         * of this header is not 100% reliably auto-detectable (while header has distinct marker,
         * "raw" Avro content has no limitations and could theoretically have same pre-amble from data).
         *
         * @since 2.9
         */
        AVRO_FILE_OUTPUT(false)
        ;

        protected final boolean _defaultState;
        protected final int _mask;

        /**
         * Method that calculates bit set (flags) of all features that
         * are enabled by default.
         */
        public static int collectDefaults()
        {
            int flags = 0;
            for (Feature f : values()) {
                if (f.enabledByDefault()) {
                    flags |= f.getMask();
                }
            }
            return flags;
        }

        Feature(boolean defaultState) {
            _defaultState = defaultState;
            _mask = (1 << ordinal());
        }

        @Override
        public boolean enabledByDefault() { return _defaultState; }

        @Override
        public int getMask() { return _mask; }

        @Override
        public boolean enabledIn(int flags) { return (flags & _mask) != 0; }
    }

    /*
    /**********************************************************
    /* Configuration
    /**********************************************************
     */

    /**
     * @since 2.16
     */
    protected final static EncoderFactory ENCODER_FACTORY = EncoderFactory.get();

    /**
     * @since 2.16
     */
    protected ApacheCodecRecycler _apacheCodecRecycler;

    /**
     * @since 2.16
     */
    protected final StreamWriteConstraints _streamWriteConstraints;
    
    /**
     * Bit flag composed of bits that indicate which
     * {@link AvroGenerator.Feature}s
     * are enabled.
     */
    protected int _formatFeatures;

    protected AvroSchema _rootSchema;

    /*
    /**********************************************************
    /* Output state
    /**********************************************************
     */

    final protected OutputStream _output;

    /**
     * Reference to the root context since that is needed for serialization
     */
    protected AvroWriteContext _rootContext;

    /**
     * Current context
     */
    protected AvroWriteContext _avroContext;

    /**
     * Lazily constructed encoder; reused in case of writing root-value sequences.
     */
    protected BinaryEncoder _encoder;

    /**
     * Flag that is set when the whole content is complete, can
     * be output.
     */
    protected boolean _complete;

    /*
    /**********************************************************
    /* Life-cycle
    /**********************************************************
     */

    public AvroGenerator(IOContext ctxt, int jsonFeatures, int avroFeatures,
            ApacheCodecRecycler apacheCodecRecycler,
            ObjectCodec codec, OutputStream output)
        throws IOException
    {
        super(jsonFeatures, codec, ctxt);
        _streamWriteConstraints = ctxt.streamWriteConstraints();
        _formatFeatures = avroFeatures;
        _output = output;
        _avroContext = AvroWriteContext.nullContext();

        _apacheCodecRecycler = apacheCodecRecycler;
        final boolean buffering = isEnabled(Feature.AVRO_BUFFERING);
        BinaryEncoder encoderToReuse = _apacheCodecRecycler.acquireEncoder();
        _encoder = buffering
                ? ENCODER_FACTORY.binaryEncoder(output, encoderToReuse)
                : ENCODER_FACTORY.directBinaryEncoder(output, encoderToReuse);
    }

    public void setSchema(AvroSchema schema)
    {
        if (_rootSchema == schema) {
            return;
        }
        _rootSchema = schema;
        // start with temporary root...
        _avroContext = _rootContext = AvroWriteContext.createRootContext(this,
                schema.getAvroSchema(), _encoder);
    }

    @Override
    public StreamWriteConstraints streamWriteConstraints() {
        return _streamWriteConstraints;
    }

    /*
    /**********************************************************
    /* Versioned
    /**********************************************************
     */

    @Override
    public Version version() {
        return PackageVersion.VERSION;
    }

    /*
    /**********************************************************
    /* Overridden methods, configuration
    /**********************************************************
     */

    /**
     * Not sure what to do here; could reset indentation to some value maybe?
     */
    @Override
    public AvroGenerator useDefaultPrettyPrinter() {
        return this;
    }

    /**
     * Not relevant, as binary formats typically have no indentation.
     */
    @Override
    public AvroGenerator setPrettyPrinter(PrettyPrinter pp) {
        return this;
    }

    @Override
    public Object getOutputTarget() {
        return _output;
    }

    @Override
    public JsonStreamContext getOutputContext() {
        return _avroContext;
    }

    /**
     * Unfortunately we have no visibility into buffering Avro codec does;
     * and need to return <code>-1</code> to reflect that lack of knowledge.
     */
    @Override
    public int getOutputBuffered() {
        return -1;
    }

    @Override public AvroSchema getSchema() {
        return _rootSchema;
    }

    @Override
    public void setSchema(FormatSchema schema)
    {
        if (!(schema instanceof AvroSchema)) {
            throw new IllegalArgumentException("Can not use FormatSchema of type "
                    +schema.getClass().getName());
        }
        setSchema((AvroSchema) schema);
    }

    /*
    /**********************************************************
    /* Public API, capability introspection methods
    /**********************************************************
     */

    @Override
    public boolean canUseSchema(FormatSchema schema) {
        return (schema instanceof AvroSchema);
    }

    // 26-Nov-2019, tatu: [dataformats-binary#179] needed this; could
    //   only add in 2.11
    @Override // since 2.11
    public boolean canWriteBinaryNatively() { return true; }

    @Override // @since 2.12
    public JacksonFeatureSet<StreamWriteCapability> getWriteCapabilities() {
        return DEFAULT_BINARY_WRITE_CAPABILITIES;
    }

    /*
    /**********************************************************
    /* Extended API, configuration
    /**********************************************************
     */

    public AvroGenerator enable(Feature f) {
        _formatFeatures |= f.getMask();
        return this;
    }

    public AvroGenerator disable(Feature f) {
        _formatFeatures &= ~f.getMask();
        return this;
    }

    public final boolean isEnabled(Feature f) {
        return (_formatFeatures & f.getMask()) != 0;
    }

    public AvroGenerator configure(Feature f, boolean state) {
        if (state) {
            enable(f);
        } else {
            disable(f);
        }
        return this;
    }

    @Override
    public JsonGenerator overrideFormatFeatures(int values, int mask) {
        int oldF = _formatFeatures;
        int newF = (_formatFeatures & ~mask) | (values & mask);

        if (oldF != newF) {
            _formatFeatures = newF;
            // 22-Oct-2015, tatu: Actually, not way to change buffering details at
            //   this point. If change needs to be dynamic have to change it
        }
        return this;
    }

    /*
    /**********************************************************************
    /* Overridden methods; writing field names
    /**********************************************************************
     */

    // And then methods overridden to make final, streamline some aspects...

    @Override
    public final void writeFieldName(String name) throws IOException
    {
        _avroContext.writeFieldName(name);
    }

    @Override
    public final void writeFieldName(SerializableString name)
        throws IOException
    {
        _avroContext.writeFieldName(name.getValue());
    }

    /*
    /**********************************************************
    /* Public API: low-level I/O
    /**********************************************************
     */

    @Override
    public final void flush() throws IOException {
        if (isEnabled(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)) {
            _output.flush();
        }
    }

    @Override
    public void close() throws IOException
    {
        if (!isClosed()) {
            if (isEnabled(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT)) {
                AvroWriteContext ctxt;
                while ((ctxt = _avroContext) != null) {
                    if (ctxt.inArray()) {
                        writeEndArray();
                    } else if (ctxt.inObject()) {
                        writeEndObject();
                    } else {
                        break;
                    }
                }
            }
            // May need to finalize...
            /* 18-Nov-2014, tatu: Since this method is (a) often called as a result of an exception,
             *   and (b) quite likely to cause an exception of its own, need to work around
             *   combination of problems; one part being to catch non-IOExceptions; something that
             *   is usually NOT done. Partly this is because Avro codec is leaking low-level exceptions
             *   such as NPE.
             */
            if (!_complete) {
                try {
                    _complete();
                } catch (IOException e) {
                    throw e;
                } catch (Exception e) {
                    throw new JsonGenerationException("Failed to close AvroGenerator: ("
                            +e.getClass().getName()+"): "+e.getMessage(), e, this);
                }
            }
            if (_output != null) {
                if (_ioContext.isResourceManaged() || isEnabled(JsonGenerator.Feature.AUTO_CLOSE_TARGET)) {
                    _output.close();
                } else  if (isEnabled(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)) {
                    // If we can't close it, we should at least flush
                    _output.flush();
                }
            }
            // Internal buffer(s) generator has can now be released as well
            _releaseBuffers();
            super.close();
        }
    }

    /*
    /**********************************************************
    /* Public API: structural output
    /**********************************************************
     */

    @Override
    public final void writeStartArray() throws IOException {
        _avroContext = _avroContext.createChildArrayContext(null);
        streamWriteConstraints().validateNestingDepth(_avroContext.getNestingDepth());
        _complete = false;
    }

    @Override
    public final void writeEndArray() throws IOException
    {
        if (!_avroContext.inArray()) {
            _reportError("Current context not Array but "+_avroContext.typeDesc());
        }
        _avroContext = _avroContext.getParent();
        if (_avroContext.inRoot() && !_complete) {
            _complete();
        }
    }

    @Override
    public final void writeStartObject() throws IOException {
        _avroContext = _avroContext.createChildObjectContext(null);
        streamWriteConstraints().validateNestingDepth(_avroContext.getNestingDepth());
        _complete = false;
    }

    @Override
    public void writeStartObject(Object forValue) throws IOException {
        _avroContext = _avroContext.createChildObjectContext(forValue);
        streamWriteConstraints().validateNestingDepth(_avroContext.getNestingDepth());
        _complete = false;
    }

    @Override
    public final void writeEndObject() throws IOException
    {
        if (!_avroContext.inObject()) {
            _reportError("Current context not Object but "+_avroContext.typeDesc());
        }
        if (!_avroContext.canClose()) {
            _reportError("Can not write END_OBJECT after writing FIELD_NAME but not value");
        }
        _avroContext = _avroContext.getParent();

        if (_avroContext.inRoot() && !_complete) {
            _complete();
        }
    }

    /*
    /**********************************************************
    /* Output method implementations, textual
    /**********************************************************
     */

    @Override
    public void writeString(String text) throws IOException
    {
        if (text == null) {
            writeNull();
            return;
        }
        _avroContext.writeString(text);
    }

    @Override
    public void writeString(char[] text, int offset, int len) throws IOException {
        writeString(new String(text, offset, len));
    }

    @Override
    public final void writeString(SerializableString sstr) throws IOException {
        writeString(sstr.toString());
    }

    @Override
    public void writeRawUTF8String(byte[] text, int offset, int len) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public final void writeUTF8String(byte[] text, int offset, int len) throws IOException {
        writeString(new String(text, offset, len, StandardCharsets.UTF_8));
    }

    /*
    /**********************************************************
    /* Output method implementations, unprocessed ("raw")
    /**********************************************************
     */

    @Override
    public void writeEmbeddedObject(Object object) throws IOException {
        if (object instanceof EncodedDatum) {
            _avroContext.writeValue(object);
            return;
        }
        super.writeEmbeddedObject(object);
    }

    @Override
    public void writeRaw(String text) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public void writeRaw(String text, int offset, int len) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public void writeRaw(char[] text, int offset, int len) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public void writeRaw(char c) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public void writeRawValue(String text) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public void writeRawValue(String text, int offset, int len) throws IOException {
        _reportUnsupportedOperation();
    }

    @Override
    public void writeRawValue(char[] text, int offset, int len) throws IOException {
        _reportUnsupportedOperation();
    }

    /*
    /**********************************************************
    /* Output method implementations, base64-encoded binary
    /**********************************************************
     */

    @Override
    public void writeBinary(Base64Variant b64variant, byte[] data, int offset, int len) throws IOException
    {
        if (data == null) {
            writeNull();
            return;
        }
        _avroContext.writeBinary(data, offset, len);
    }

    /*
    /**********************************************************
    /* Output method implementations, primitive
    /**********************************************************
     */

    @Override
    public void writeBoolean(boolean state) throws IOException {
        _avroContext.writeValue(state ? Boolean.TRUE : Boolean.FALSE);
    }

    @Override
    public void writeNull() throws IOException {
        _avroContext.writeNull();
    }

    @Override
    public void writeNumber(int i) throws IOException {
        _avroContext.writeValue(i);
    }

    @Override
    public void writeNumber(long l) throws IOException {
        _avroContext.writeValue(l);
    }

    @Override
    public void writeNumber(BigInteger v) throws IOException
    {
        if (v == null) {
            writeNull();
            return;
        }
        _avroContext.writeValue(v);
    }

    @Override
    public void writeNumber(double d) throws IOException {
        _avroContext.writeValue(d);
    }

    @Override
    public void writeNumber(float f) throws IOException {
        _avroContext.writeValue(f);
    }

    @Override
    public void writeNumber(BigDecimal dec) throws IOException
    {
        if (dec == null) {
            writeNull();
            return;
        }
        _avroContext.writeValue(dec);
    }

    @Override
    public void writeNumber(String encodedValue) throws IOException {
        /* 08-Mar-2016, tatu: Looks like this may need to be supported, eventually,
         *   for things like floating-point (Decimal) types. But, for now,
         *   let's at least handle null.
         */
        if (encodedValue == null) {
            writeNull();
            return;
        }
        throw new UnsupportedOperationException("Can not write 'untyped' numbers");
    }

    /*
    /**********************************************************
    /* Implementations for methods from base class
    /**********************************************************
     */

    @Override
    protected final void _verifyValueWrite(String typeMsg) throws IOException {
        _throwInternal();
    }

    @Override
    protected void _releaseBuffers() {
        // no super implementation to call
        ApacheCodecRecycler recycler = _apacheCodecRecycler;
        if (recycler != null) {
            _apacheCodecRecycler = null;
            BinaryEncoder e = _encoder;
            if (e != null) {
                _encoder = null;
                recycler.release(e);
            }
            recycler.releaseToPool();
        }
    }

    /*
    /**********************************************************
    /* Helper methods
    /**********************************************************
     */

    protected void _complete() throws IOException
    {
        _complete = true;

        // add defensive coding here but only because this often gets triggered due
        // to forced closure resulting from another exception; so, we typically
        // do not want to hide the original problem...
        // First one sanity check, for a (relatively?) common case
        if (_rootContext != null) {
            _rootContext.complete();
        }
    }
}