AvroSchemaServiceManagerTests.java
/*
* Copyright 2017-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema.avro;
import java.io.File;
import java.io.IOException;
import tools.jackson.databind.ObjectMapper;
import tools.jackson.dataformat.avro.AvroFactory;
import tools.jackson.dataformat.avro.AvroMapper;
import tools.jackson.dataformat.avro.AvroSchema;
import tools.jackson.dataformat.avro.schema.AvroSchemaGenerator;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.springframework.cloud.stream.schema.avro.domain.FoodOrder;
import org.springframework.cloud.stream.schema.registry.avro.AvroSchemaMessageConverter;
import org.springframework.cloud.stream.schema.registry.avro.AvroSchemaServiceManager;
import org.springframework.cloud.stream.schema.registry.avro.AvroSchemaServiceManagerImpl;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.util.MimeType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
/**
* @author Ish Mahajan
*/
class AvroSchemaServiceManagerTests {
private final Log logger = LogFactory.getLog(AvroSchemaServiceManagerTests.class);
@Test
public void withDefaultImplementation() throws IOException {
assertThatThrownBy(() -> {
AvroSchemaServiceManager defaultServiceManager = new AvroSchemaServiceManagerImpl();
Schema schema = defaultServiceManager.getSchema(FoodOrder.class);
FoodOrder foodOrder = new FoodOrder();
foodOrder.setRestaurant("Spring Kitchen");
foodOrder.setOrderDescription("avro makhani");
foodOrder.setCustomerAddress("world wide web");
File file = new File("target/foodorder.avro");
DatumWriter datumWriter = defaultServiceManager.getDatumWriter(foodOrder.getClass(), schema);
DataFileWriter<FoodOrder> dataFileWriter = new DataFileWriter<FoodOrder>(datumWriter);
dataFileWriter.create(schema, file);
dataFileWriter.append(foodOrder);
FoodOrder foodOrder2 = new FoodOrder();
dataFileWriter.append(foodOrder2);
dataFileWriter.close();
DatumReader userDatumReader = defaultServiceManager.getDatumReader(foodOrder.getClass(), schema, schema);
DataFileReader<FoodOrder> dataFileReader = new DataFileReader<FoodOrder>(file, userDatumReader);
FoodOrder foodOrderDeserialized = null;
while (dataFileReader.hasNext()) {
// Reuse user object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
foodOrderDeserialized = dataFileReader.next(foodOrderDeserialized);
logger.info("De-serialised Successfully : " + foodOrderDeserialized);
}
}).isInstanceOf(DataFileWriter.AppendWriteException.class);
}
@Test
public void withCustomImplementation() throws IOException {
AvroSchemaServiceManager manager = new AvroSchemaServiceManager() {
@Override
public Schema getSchema(Class<?> clazz) {
ObjectMapper mapper = new ObjectMapper(new AvroFactory());
AvroSchemaGenerator gen = new AvroSchemaGenerator();
try {
mapper.acceptJsonFormatVisitor(FoodOrder.class, gen);
}
catch (Exception e) {
fail("Error while setting acceptJsonFormatVisitor {}", e);
}
AvroSchema schemaWrapper = gen.getGeneratedSchema();
return schemaWrapper.getAvroSchema();
}
@Override
public DatumWriter<Object> getDatumWriter(Class<?> type, Schema schema) {
return new AvroSchemaServiceManagerImpl().getDatumWriter(type, schema);
}
@Override
public DatumReader<Object> getDatumReader(Class<?> type, Schema schema, Schema writerSchema) {
return new AvroSchemaServiceManagerImpl().getDatumReader(type, schema, schema);
}
@Override
public Object readData(Class<? extends Object> targetClass, byte[] payload, Schema readerSchema,
Schema writerSchema) throws IOException {
ObjectMapper mapper = new ObjectMapper(new AvroFactory());
AvroSchemaGenerator gen = new AvroSchemaGenerator();
try {
mapper.acceptJsonFormatVisitor(targetClass, gen);
}
catch (Exception e) {
fail("Error while setting acceptJsonFormatVisitor {}", e);
}
return mapper.readerFor(targetClass)
.with(new AvroSchema(readerSchema))
.readValue(payload);
}
};
FoodOrder foodOrder1 = new FoodOrder();
foodOrder1.setRestaurant("Spring Kitchen");
foodOrder1.setOrderDescription("avro makhani");
foodOrder1.setCustomerAddress("world wide web");
FoodOrder foodOrder2 = new FoodOrder();
foodOrder2.setRestaurant("Spring Kitchen");
Schema schema = manager.getSchema(FoodOrder.class);
AvroMapper mapper = new AvroMapper();
byte[] payload1 = mapper.writer(new AvroSchema(schema)).writeValueAsBytes(foodOrder1);
byte[] payload2 = mapper.writer(new AvroSchema(schema)).writeValueAsBytes(foodOrder2);
foodOrder1 = (FoodOrder) manager.readData(foodOrder1.getClass(), payload1, schema, schema);
foodOrder2 = (FoodOrder) manager.readData(foodOrder1.getClass(), payload2, schema, schema);
assertThat(foodOrder2.getOrderDescription()).isNull();
assertThat(foodOrder2.getCustomerAddress()).isNull();
}
@Test
public void avroSchemaMessageConverter() {
MimeType mimeType = new MimeType("application", "avro");
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
AvroSchemaMessageConverter converter4 = new AvroSchemaMessageConverter(manager);
assertThat(mimeType).isEqualTo(converter4.getSupportedMimeTypes().get(0));
AvroSchemaMessageConverter converter5 =
new AvroSchemaMessageConverter(Lists.newArrayList(mimeType), manager);
Schema schema = manager.getSchema(FoodOrder.class);
converter5.setSchema(schema);
assertThat(mimeType).isEqualTo(converter5.getSupportedMimeTypes().get(0));
assertThat(schema).isEqualTo(converter5.getSchema());
}
@Test
public void avroSchemaMessageConverterException() {
assertThatThrownBy(() -> {
MimeType mimeType = new MimeType("application", "avro");
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
AvroSchemaMessageConverter converter =
new AvroSchemaMessageConverter(Lists.newArrayList(mimeType), manager);
converter.setSchemaLocation(new ByteArrayResource(new byte[2]) {
});
}).isInstanceOf(SchemaParseException.class);
}
}