AvroMessageConverterSerializationTests.java
/*
* Copyright 2017-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema.serialization;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import example.avro.Command;
import example.avro.Email;
import example.avro.PushNotification;
import example.avro.Sms;
import example.avro.User;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.restclient.RestTemplateBuilder;
import org.springframework.cache.support.NoOpCacheManager;
import org.springframework.cloud.stream.schema.registry.EnableSchemaRegistryServer;
import org.springframework.cloud.stream.schema.registry.SchemaReference;
import org.springframework.cloud.stream.schema.registry.avro.AvroSchemaRegistryClientMessageConverter;
import org.springframework.cloud.stream.schema.registry.avro.AvroSchemaServiceManager;
import org.springframework.cloud.stream.schema.registry.avro.AvroSchemaServiceManagerImpl;
import org.springframework.cloud.stream.schema.registry.avro.DefaultSubjectNamingStrategy;
import org.springframework.cloud.stream.schema.registry.client.DefaultSchemaRegistryClient;
import org.springframework.cloud.stream.schema.registry.client.SchemaRegistryClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Vinicius Carvalho
* @author Sercan Karaoglu
* @author Soby Chacko
*/
class AvroMessageConverterSerializationTests {
Pattern versionedSchema = Pattern.compile(
"application/" + "vnd" + "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro");
Log logger = LogFactory.getLog(getClass());
private ConfigurableApplicationContext schemaRegistryServerContext;
private RestTemplateBuilder restTemplateBuilder;
public static Command notification() {
Command messageToSend = getCommandToSend();
messageToSend.setType("notification");
PushNotification pushNotification = new PushNotification();
pushNotification.setArn("google");
pushNotification.setText("hello");
messageToSend.setPayload(pushNotification);
return messageToSend;
}
public static Command sms() {
Command messageToSend = getCommandToSend();
messageToSend.setType("sms");
Sms sms = new Sms();
sms.setPhoneNumber("6141231212");
sms.setText("hello");
messageToSend.setPayload(sms);
return messageToSend;
}
public static Command email() {
Command messageToSend = getCommandToSend();
messageToSend.setType("email");
Email email = new Email();
email.setAddressTo("sercan");
email.setText("hello");
email.setTitle("hi");
messageToSend.setPayload(email);
return messageToSend;
}
public static Command getCommandToSend() {
Command messageToSend = new Command();
messageToSend.setCorrelationId("abc");
return messageToSend;
}
@BeforeEach
public void setup() {
this.schemaRegistryServerContext = SpringApplication.run(
ServerApplication.class, "--spring.main.allow-bean-definition-overriding=true");
restTemplateBuilder = this.schemaRegistryServerContext.getBean(RestTemplateBuilder.class);
}
@AfterEach
public void tearDown() {
this.schemaRegistryServerContext.close();
}
@Test
public void schemaImport() throws Exception {
SchemaRegistryClient client = new DefaultSchemaRegistryClient(restTemplateBuilder);
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
client, new NoOpCacheManager(), manager);
converter.setSubjectNamingStrategy(new DefaultSubjectNamingStrategy());
converter.setDynamicSchemaGenerationEnabled(false);
converter.setSchemaLocations(this.schemaRegistryServerContext
.getResources("classpath:schemas/Command.avsc"));
converter.setSchemaImports(this.schemaRegistryServerContext
.getResources("classpath:schemas/imports/*.avsc"));
converter.afterPropertiesSet();
Command notification = notification();
Message specificMessage = converter.toMessage(notification,
new MutableMessageHeaders(Collections.<String, Object>emptyMap()));
Object o = converter.fromMessage(specificMessage, Command.class);
assertThat(o).isEqualTo(notification)
.as("Serialization issue when use schema-imports");
}
@Test
public void sourceWriteSameVersion() throws Exception {
User specificRecord = new User();
specificRecord.setName("joe");
Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class
.getClassLoader().getResourceAsStream("schemas/user.avsc"));
GenericRecord genericRecord = new GenericData.Record(v1);
genericRecord.put("name", "joe");
SchemaRegistryClient client = new DefaultSchemaRegistryClient(restTemplateBuilder);
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
client, new NoOpCacheManager(), manager);
converter.setSubjectNamingStrategy(new DefaultSubjectNamingStrategy());
converter.setDynamicSchemaGenerationEnabled(false);
converter.afterPropertiesSet();
Message specificMessage = converter.toMessage(specificRecord,
new MutableMessageHeaders(Collections.<String, Object>emptyMap()),
MimeTypeUtils.parseMimeType("application/*+avro"));
SchemaReference specificRef = extractSchemaReference(MimeTypeUtils.parseMimeType(
specificMessage.getHeaders().get("contentType").toString()));
Message genericMessage = converter.toMessage(genericRecord,
new MutableMessageHeaders(Collections.<String, Object>emptyMap()),
MimeTypeUtils.parseMimeType("application/*+avro"));
SchemaReference genericRef = extractSchemaReference(MimeTypeUtils.parseMimeType(
genericMessage.getHeaders().get("contentType").toString()));
assertThat(specificRef).isEqualTo(genericRef);
assertThat(genericRef.getVersion()).isEqualTo(1);
}
public void testOriginalContentTypeHeaderOnly() throws Exception {
User specificRecord = new User();
specificRecord.setName("joe");
Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class
.getClassLoader().getResourceAsStream("schemas/user.avsc"));
GenericRecord genericRecord = new GenericData.Record(v1);
genericRecord.put("name", "joe");
SchemaRegistryClient client = new DefaultSchemaRegistryClient(restTemplateBuilder);
client.register("user", "avro", v1.toString());
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
client, new NoOpCacheManager(), manager);
converter.setDynamicSchemaGenerationEnabled(false);
converter.afterPropertiesSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
writer.write(specificRecord, encoder);
encoder.flush();
Message source = MessageBuilder.withPayload(baos.toByteArray())
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_OCTET_STREAM)
.build();
Object converted = converter.fromMessage(source, User.class);
assertThat(converted).isNotNull();
assertThat(specificRecord.getName().toString())
.isEqualTo(((User) converted).getName().toString());
}
private SchemaReference extractSchemaReference(MimeType mimeType) {
SchemaReference schemaReference = null;
Matcher schemaMatcher = this.versionedSchema.matcher(mimeType.toString());
if (schemaMatcher.find()) {
String subject = schemaMatcher.group(1);
Integer version = Integer.parseInt(schemaMatcher.group(2));
schemaReference = new SchemaReference(subject, version,
AvroSchemaRegistryClientMessageConverter.AVRO_FORMAT);
}
return schemaReference;
}
@EnableAutoConfiguration
@Configuration
@EnableSchemaRegistryServer
public static class ServerApplication {
public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
}
}
}