AvroSchemaServiceManagerImpl.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema.registry.avro;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.stereotype.Component;
/**
* Default Concrete implementation of {@link AvroSchemaServiceManager}.
*
* Helps to substitute the default implementation of {@link org.apache.avro.Schema} Generation using Custom Avro
* schema generator
*
* Provide a custom bean definition of {@link AvroSchemaServiceManager} and mark it as @Primary to override this
* default implementation
*
* @author Ish Mahajan
*
*/
@Component
public class AvroSchemaServiceManagerImpl implements AvroSchemaServiceManager {
protected final Log logger = LogFactory.getLog(this.getClass());
/**
* get {@link Schema}.
* @param clazz {@link Class} for which schema generation is required
* @return returns avro schema for given class
*/
@Override
public Schema getSchema(Class<?> clazz) {
return ReflectData.get().getSchema(clazz);
}
/**
* get {@link DatumWriter}.
* @param type {@link Class} of java object which needs to be serialized
* @param schema {@link Schema} of object which needs to be serialized
* @return datum writer which can be used to write Avro payload
*/
@Override
public DatumWriter<Object> getDatumWriter(Class<?> type, Schema schema) {
DatumWriter<Object> writer;
this.logger.debug("Finding correct DatumWriter for type " + type.getName());
if (SpecificRecord.class.isAssignableFrom(type)) {
if (schema != null) {
writer = new SpecificDatumWriter<>(schema);
}
else {
writer = new SpecificDatumWriter(type);
}
}
else if (GenericRecord.class.isAssignableFrom(type)) {
writer = new GenericDatumWriter<>(schema);
}
else {
if (schema != null) {
writer = new ReflectDatumWriter<>(schema);
}
else {
writer = new ReflectDatumWriter(type);
}
}
return writer;
}
/**
* get {@link DatumReader}.
* @param type {@link Class} of java object which needs to be serialized
* @param readerSchema {@link Schema} default schema of object which needs to be de-serialized
* @param writerSchema {@link Schema} writerSchema provided at run time
* @return datum reader which can be used to read Avro payload
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public DatumReader<Object> getDatumReader(Class<?> type, Schema readerSchema, Schema writerSchema) {
DatumReader<Object> reader = null;
if (SpecificRecord.class.isAssignableFrom(type)) {
if (readerSchema != null) {
if (writerSchema != null) {
reader = new SpecificDatumReader<>(writerSchema, readerSchema);
}
else {
reader = new SpecificDatumReader<>(readerSchema);
}
}
else {
reader = new SpecificDatumReader(type);
if (writerSchema != null) {
reader.setSchema(writerSchema);
}
}
}
else if (GenericRecord.class.isAssignableFrom(type)) {
if (readerSchema != null) {
if (writerSchema != null) {
reader = new GenericDatumReader<>(writerSchema, readerSchema);
}
else {
reader = new GenericDatumReader<>(readerSchema);
}
}
else {
if (writerSchema != null) {
reader = new GenericDatumReader(writerSchema);
}
}
}
else {
reader = new ReflectDatumReader(type);
if (writerSchema != null) {
reader.setSchema(writerSchema);
}
}
if (reader == null) {
throw new MessageConversionException("No schema can be inferred from type "
+ type.getName() + " and no schema has been explicitly configured.");
}
return reader;
}
/**
* read data from avro type payload {@link DatumReader}.
* @param clazz {@link Class} of java object which needs to be serialized
* @param payload {@link byte} serialized payload of object which needs to be de-serialized
* @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized
* @param writerSchema {@link Schema} writerSchema used to while serializing payload
* @return java object after reading Avro Payload
* @throws IOException is thrown in case of error
*/
@Override
public Object readData(Class<? extends Object> clazz, byte[] payload, Schema readerSchema, Schema writerSchema)
throws IOException {
DatumReader<Object> reader = this.getDatumReader(clazz, readerSchema, writerSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
return reader.read(null, decoder);
}
}