AvroSchemaRegistryClientMessageConverter.java
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema.registry.avro;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cache.support.NoOpCacheManager;
import org.springframework.cloud.stream.schema.registry.ParsedSchema;
import org.springframework.cloud.stream.schema.registry.SchemaNotFoundException;
import org.springframework.cloud.stream.schema.registry.SchemaReference;
import org.springframework.cloud.stream.schema.registry.SchemaRegistrationResponse;
import org.springframework.cloud.stream.schema.registry.client.SchemaRegistryClient;
import org.springframework.core.io.Resource;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.ObjectUtils;
/**
* A {@link org.springframework.messaging.converter.MessageConverter} for Apache Avro,
* with the ability to publish and retrieve schemas stored in a schema server, allowing
* for schema evolution in applications. The supported content types are in the form
* `application/*+avro`.
* <p>
* During the conversion to a message, the converter will set the 'contentType' header to
* 'application/[prefix].[subject].v[version]+avro', where:
*
* <li>
* <ul>
* <i>prefix</i> is a configurable prefix (default 'vnd');
* </ul>
* <ul>
* <i>subject</i> is a subject derived from the type of the outgoing object - typically
* the class name;
* </ul>
* <ul>
* <i>version</i> is the schema version for the given subject;
* </ul>
* </li>
* <p>
* When converting from a message, the converter will parse the content-type and use it to
* fetch and cache the writer schema using the provided {@link SchemaRegistryClient}.
*
* @author Marius Bogoevici
* @author Vinicius Carvalho
* @author Oleg Zhurakousky
* @author Sercan Karaoglu
* @author Ish Mahajan
*/
public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessageConverter
implements InitializingBean {
/**
* Avro format defined in the Mime type.
*/
public static final String AVRO_FORMAT = "avro";
/**
* Pattern for validating the prefix to be used in the publised subtype.
*/
public static final Pattern PREFIX_VALIDATION_PATTERN = Pattern.compile("[\\p{Alnum}]");
/**
* Spring Cloud Stream schema property prefix.
*/
public static final String CACHE_PREFIX = "org.springframework.cloud.stream.schema";
/**
* Property for reflection cache.
*/
public static final String REFLECTION_CACHE_NAME = CACHE_PREFIX + ".reflectionCache";
/**
* Property for schema cache.
*/
public static final String SCHEMA_CACHE_NAME = CACHE_PREFIX + ".schemaCache";
/**
* Property for reference cache.
*/
public static final String REFERENCE_CACHE_NAME = CACHE_PREFIX + ".referenceCache";
/**
* Default Mime type for Avro.
*/
public static final MimeType DEFAULT_AVRO_MIME_TYPE = new MimeType("application", "*+" + AVRO_FORMAT);
private static final AvroSchemaServiceManager defaultAvroSchemaServiceManager =
new AvroSchemaServiceManagerImpl();
private final CacheManager cacheManager;
protected Resource[] schemaImports = new Resource[]{};
private Pattern versionedSchema;
private boolean dynamicSchemaGenerationEnabled;
private Schema readerSchema;
private Resource[] schemaLocations;
private SchemaRegistryClient schemaRegistryClient;
private String prefix = "vnd";
private String subjectNamePrefix;
private SubjectNamingStrategy subjectNamingStrategy;
private boolean ignoreSchemaRegistryServer;
/**
* Creates a new instance, configuring it with {@link SchemaRegistryClient} and
* {@link CacheManager}.
*
* @param schemaRegistryClient the {@link SchemaRegistryClient} used to interact with
* the schema registry server.
* @param cacheManager instance of {@link CacheManager} to cache parsed schemas. If
* caching is not required use {@link NoOpCacheManager}
* @param manager instance of {@link AvroSchemaServiceManager} to manage schemas.
*/
public AvroSchemaRegistryClientMessageConverter(
SchemaRegistryClient schemaRegistryClient, CacheManager cacheManager, AvroSchemaServiceManager manager) {
super(Collections.singletonList(DEFAULT_AVRO_MIME_TYPE), manager);
Assert.notNull(schemaRegistryClient, "cannot be null");
Assert.notNull(cacheManager, "'cacheManager' cannot be null");
Assert.notNull(manager, "'avroSchemaServiceManager' cannot be null");
this.schemaRegistryClient = schemaRegistryClient;
this.cacheManager = cacheManager;
}
public boolean isDynamicSchemaGenerationEnabled() {
return this.dynamicSchemaGenerationEnabled;
}
/**
* Allows the converter to generate and register schemas automatically. If set to
* false, it only allows the converter to use pre-registered schemas. Default 'true'.
*
* @param dynamicSchemaGenerationEnabled true if dynamic schema generation is enabled
*/
public void setDynamicSchemaGenerationEnabled(boolean dynamicSchemaGenerationEnabled) {
this.dynamicSchemaGenerationEnabled = dynamicSchemaGenerationEnabled;
}
/**
* A set of locations where the converter can load schemas from. Schemas provided at
* these locations will be registered automatically.
*
* @param schemaLocations array of locations
*/
public void setSchemaLocations(Resource[] schemaLocations) {
Assert.notEmpty(schemaLocations, "cannot be empty");
this.schemaLocations = schemaLocations;
}
/**
* A set of schema locations where should be imported first. Schemas provided at these
* locations will be reference, thus they should not reference each other.
*
* @param schemaImports array of schema imports
*/
public void setSchemaImports(Resource[] schemaImports) {
this.schemaImports = schemaImports;
}
/**
* Set the prefix to be used in the published subtype. Default 'vnd'.
*
* @param prefix prefix to be set
*/
public void setPrefix(String prefix) {
Assert.hasText(prefix, "Prefix cannot be empty");
Assert.isTrue(!PREFIX_VALIDATION_PATTERN.matcher(this.prefix).matches(), "Invalid prefix:" + this.prefix);
this.prefix = prefix;
}
public void setReaderSchema(Resource readerSchema) {
Assert.notNull(readerSchema, "cannot be null");
try {
this.readerSchema = parseSchema(readerSchema);
}
catch (IOException e) {
throw new BeanInitializationException("Cannot initialize reader schema", e);
}
}
public void setSubjectNamingStrategy(SubjectNamingStrategy subjectNamingStrategy) {
this.subjectNamingStrategy = subjectNamingStrategy;
}
public void setSubjectNamePrefix(String subjectNamePrefix) {
this.subjectNamePrefix = subjectNamePrefix;
}
@Override
public void afterPropertiesSet() {
this.versionedSchema = Pattern.compile("application/" + this.prefix
+ "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+" + AVRO_FORMAT);
Stream.of(this.schemaImports, this.schemaLocations)
.filter(arr -> !ObjectUtils.isEmpty(arr))
.distinct()
.peek(resources -> {
if (this.logger.isInfoEnabled()) {
this.logger.info("Scanning avro schema resources on classpath");
this.logger.info("Parsing " + this.schemaImports.length + " schemas");
}
})
.flatMap(Arrays::stream)
.forEach(resource -> {
try {
Schema schema = parseSchema(resource);
if (schema.getType().equals(Schema.Type.UNION)) {
schema.getTypes().forEach(innerSchema -> registerSchema(resource, innerSchema));
}
else {
registerSchema(resource, schema);
}
}
catch (IOException e) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Failed to parse schema at " + resource.getFilename(), e);
}
}
});
if (this.cacheManager instanceof NoOpCacheManager) {
this.logger.warn("Schema caching is effectively disabled "
+ "since configured cache manager is a NoOpCacheManager. If this was not "
+ "the intention, please provide the appropriate instance of CacheManager "
+ "(i.e., ConcurrentMapCacheManager).");
}
}
protected String toSubject(String subjectNamePrefix, Schema schema) {
return this.subjectNamingStrategy.toSubject(subjectNamePrefix, schema);
}
@Override
protected boolean supports(Class<?> clazz) {
// we support all types
return true;
}
@Override
protected boolean supportsMimeType(MessageHeaders headers) {
if (super.supportsMimeType(headers)) {
return true;
}
MimeType mimeType = getContentTypeResolver().resolve(headers);
return DEFAULT_AVRO_MIME_TYPE.includes(mimeType);
}
@Override
protected Schema resolveSchemaForWriting(Object payload, MessageHeaders headers,
MimeType hintedContentType) {
Schema schema;
schema = extractSchemaForWriting(payload);
ParsedSchema parsedSchema = this.getCache(REFERENCE_CACHE_NAME).get(schema, ParsedSchema.class);
if (parsedSchema == null) {
parsedSchema = new ParsedSchema(schema);
this.getCache(REFERENCE_CACHE_NAME).putIfAbsent(schema, parsedSchema);
}
if (parsedSchema.getRegistration() == null && !this.ignoreSchemaRegistryServer) {
SchemaRegistrationResponse response = this.schemaRegistryClient.register(toSubject(this.subjectNamePrefix, schema),
AVRO_FORMAT, parsedSchema.getRepresentation());
parsedSchema.setRegistration(response);
}
if (!this.ignoreSchemaRegistryServer) {
SchemaReference schemaReference = parsedSchema.getRegistration().getSchemaReference();
DirectFieldAccessor dfa = new DirectFieldAccessor(headers);
@SuppressWarnings("unchecked")
Map<String, Object> _headers = (Map<String, Object>) dfa.getPropertyValue("headers");
_headers.put(MessageHeaders.CONTENT_TYPE, "application/" + this.prefix + "." + schemaReference.getSubject()
+ ".v" + schemaReference.getVersion() + "+" + AVRO_FORMAT);
}
return schema;
}
@Override
protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) {
SchemaReference schemaReference = extractSchemaReference(mimeType);
if (schemaReference != null) {
ParsedSchema parsedSchema = this.getCache(REFERENCE_CACHE_NAME).get(schemaReference, ParsedSchema.class);
if (parsedSchema == null) {
String schemaContent = this.schemaRegistryClient.fetch(schemaReference);
if (schemaContent != null) {
Schema schema = new Schema.Parser().parse(schemaContent);
parsedSchema = new ParsedSchema(schema);
this.getCache(REFERENCE_CACHE_NAME).putIfAbsent(schemaReference, parsedSchema);
}
}
if (parsedSchema != null) {
return parsedSchema.getSchema();
}
}
return this.readerSchema;
}
@Override
protected Schema resolveReaderSchemaForDeserialization(Class<?> targetClass) {
return this.readerSchema;
}
private Schema extractSchemaForWriting(Object payload) {
Schema schema = null;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Obtaining schema for class " + payload.getClass());
}
if (GenericContainer.class.isAssignableFrom(payload.getClass())) {
schema = ((GenericContainer) payload).getSchema();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Avro type detected, using schema from object");
}
}
else {
schema = this.getCache(REFLECTION_CACHE_NAME).get(payload.getClass().getName(), Schema.class);
if (schema == null) {
if (!isDynamicSchemaGenerationEnabled()) {
throw new SchemaNotFoundException(String.format(
"No schema found in the local cache for %s, and dynamic schema generation is not enabled",
payload.getClass()));
}
else {
schema = super.avroSchemaServiceManager().getSchema(payload.getClass());
}
this.getCache(REFLECTION_CACHE_NAME).put(payload.getClass().getName(), schema);
}
}
return schema;
}
private void registerSchema(Resource schemaLocation, Schema schema) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Resource " + schemaLocation.getFilename() + " parsed into schema "
+ schema.getNamespace() + "." + schema.getName());
}
this.schemaRegistryClient.register(toSubject(this.subjectNamePrefix, schema), AVRO_FORMAT, schema.toString());
if (this.logger.isInfoEnabled()) {
this.logger.info("Schema " + schema.getName() + " registered with id " + schema);
}
this.getCache(REFLECTION_CACHE_NAME).put(schema.getNamespace() + "." + schema.getName(), schema);
}
private SchemaReference extractSchemaReference(MimeType mimeType) {
SchemaReference schemaReference = null;
Matcher schemaMatcher = this.versionedSchema.matcher(mimeType.toString());
if (schemaMatcher.find()) {
String subject = schemaMatcher.group(1);
Integer version = Integer.parseInt(schemaMatcher.group(2));
schemaReference = new SchemaReference(subject, version, AVRO_FORMAT);
}
return schemaReference;
}
private Cache getCache(String name) {
Cache cache = this.cacheManager.getCache(name);
Assert.notNull(cache, "Cache by the name '" + name + "' is not present in this CacheManager - '"
+ this.cacheManager + "'. Typically caches are auto-created by the CacheManagers. "
+ "Consider reporting it as an issue to the developer of this CacheManager.");
return cache;
}
public void setIgnoreSchemaRegistryServer(boolean ignoreSchemaRegistryServer) {
this.ignoreSchemaRegistryServer = ignoreSchemaRegistryServer;
}
}