SerdeResolverUtilsTests.java
/*
* Copyright 2022-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.support.serializer.JacksonJsonSerde;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
/**
* Unit tests for {@link SerdeResolverUtils}.
*
* @author Chris Bono
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
class SerdeResolverUtilsTests {
@Nested
class ResolveForType {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.function.ineligible-definitions: sendToDlqAndContinue");
private final Serde<?> fallback = mock(Serde.class);
@Test
void returnsFallbackSerdeForWildcard() {
this.contextRunner
.withConfiguration(AutoConfigurations.of(SerdeResolverSimpleTestApp.class))
.run((context) -> {
ResolvableType wildcardType = ResolvableType.forClass(Serde.class).getGeneric(0);
assertThat(SerdeResolverUtils.resolveForType(context, wildcardType, fallback)).isSameAs(fallback);
});
}
@Test
void returnsSerdeBeanForMatchingType() {
this.contextRunner
.withConfiguration(AutoConfigurations.of(SerdeResolverSimpleTestApp.class))
.run((context) -> {
ResolvableType fooType = ResolvableType.forClass(Foo.class);
assertThat(SerdeResolverUtils.resolveForType(context, fooType, fallback)).isInstanceOf(FooSerde.class);
});
}
@Nested
class NoMatchingSerdeBeans {
@ParameterizedTest
@MethodSource("kafkaStreamsBuiltInTypes")
void returnsStandardSerdeForKafkaStreamsBuiltInType(Class<?> builtInType, Serde<?> expectedBuiltInSerde) {
contextRunner.run((context) ->
assertThat(SerdeResolverUtils.resolveForType(context, ResolvableType.forClass(builtInType), fallback))
.isInstanceOf(expectedBuiltInSerde.getClass()));
}
static Stream<Arguments> kafkaStreamsBuiltInTypes() {
return Stream.of(
arguments(String.class, Serdes.String()),
arguments(Short.class, Serdes.Short()),
arguments(Integer.class, Serdes.Integer()),
arguments(Long.class, Serdes.Long()),
arguments(Float.class, Serdes.Float()),
arguments(Double.class, Serdes.Double()),
arguments(byte[].class, Serdes.ByteArray()),
arguments(ByteBuffer.class, Serdes.ByteBuffer()),
arguments(Bytes.class, Serdes.Bytes()),
arguments(UUID.class, Serdes.UUID())
);
}
@Nested
class ForNonKafkaStreamsBuiltInType {
@Test
void returnsFallbackSerdeWhenValidFallbackSpecified() {
contextRunner.run((context) ->
assertThat(SerdeResolverUtils.resolveForType(context, ResolvableType.forClass(Foo.class), fallback))
.isSameAs(fallback));
}
@ParameterizedTest
@MethodSource("invalidFallbackSerdeProvider")
void returnsJsonSerdeWhenInvalidFallbackSpecified(Serde<?> invalidFallback) {
contextRunner.run((context) ->
assertThat(SerdeResolverUtils.resolveForType(context, ResolvableType.forClass(Foo.class), invalidFallback))
.isInstanceOf(JacksonJsonSerde.class));
}
static Stream<Serde<?>> invalidFallbackSerdeProvider() {
return Stream.of(
Serdes.String(),
Serdes.Short(),
Serdes.Integer(),
Serdes.Long(),
Serdes.Float(),
Serdes.Double(),
Serdes.ByteArray(),
Serdes.ByteBuffer(),
Serdes.Bytes(),
Serdes.UUID()
);
}
@Test
void returnsJsonSerdeWhenFallbackNotSpecified() {
contextRunner.run((context ->
assertThat(SerdeResolverUtils.resolveForType(context, ResolvableType.forClass(Foo.class), null))
.isInstanceOf(JacksonJsonSerde.class)));
}
@Test
void returnsFallbackSerdeForJavaLangObject() {
// This is an edge case as the only way to get to the JsonSerde step in the 1st place is when
// no fallback is specified or a fallback is specified but its invalid (for a built-in type).
// We will use the 'fallback is not specified' scenario for this test.
contextRunner.run((context) ->
assertThat(SerdeResolverUtils.resolveForType(context, ResolvableType.forClass(Object.class), null))
.isNull());
}
}
}
}
@Nested
class BeanNamesForMatchingSerdes {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.function.ineligible-definitions: sendToDlqAndContinue");
@Test
void returnsNoSerdesForWildcardType() {
this.contextRunner.withUserConfiguration(SerdeResolverSimpleTestApp.class)
.run((context) -> {
ResolvableType wildcardType = ResolvableType.forClass(Serde.class).getGeneric(0);
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, wildcardType)).isEmpty();
});
}
/**
*
* Verify that {@link SerdeResolverUtils#beanNamesForMatchingSerdes} returns the proper serdes in the proper order
* for the following grid:
* <p><br>
* <i>NOTE:</i> {@code GE = GenericEvent}
* <p><pre>{@code
* ------------------------------------------------------------------
* KStream type | Serde type
* ------------------------------------------------------------------
* | GE<Date> | GE<? extends Date> | GE<?> | GE
* ------------------------------------------------------------------
* GE<Date> | 1 | - | - | -
* GE<? extends Date> | 2 | 1 | - | -
* GE<?> | 4 | 2 | 1 | 3
* GE | 1 | - | - | 2
* ------------------------------------------------------------------
* }</pre>
* <p><br>
* <i>NOTE:</i> On the last row, one might expect the {@code GE} serde to be the top match with the {@code GE}
* kstream. However, that is not the case because {@code GE} is a parameterized type and therefore when
* specified as a raw type (without any type info) it resolves to {@code GE<?>} which throws off the ordering.
* A best practice is to specify the type info (even if it is wildcard) for KStream parameterized types.
*/
@Test
@Disabled
void returnsProperlyOrderedSerdesForSimpleGenericTypes() {
ResolvableType geDate = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<Date>>() { });
ResolvableType geBounded = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<? extends Date>>() { });
ResolvableType geWildcard = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<?>>() { });
ResolvableType geRaw = ResolvableType.forRawClass(GenericEvent.class);
this.contextRunner.withUserConfiguration(SerdeResolverSimpleTestApp.class)
.run((context) -> {
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geDate))
.containsExactly(
"geDateSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geBounded))
.containsExactly(
"geDateBoundedSerde",
"geDateSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geWildcard))
.containsExactly(
"geWildcardSerde",
"geDateBoundedSerde",
"geRawSerde",
"geDateSerde",
"geStringSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geRaw))
.containsExactly(
"geDateSerde",
"geStringSerde",
"geRawSerde");
});
}
/**
* Verify that {@link SerdeResolverUtils#beanNamesForMatchingSerdes} returns the proper serdes in the proper order
* for the following grid:
* <p><br>
* <i>NOTE:</i> {@code GE = GenericEvent}
* <p><pre>{@code
* -------------------------------------------------------------------------------------------------------------------
* KStream type | Serde type
* -------------------------------------------------------------------------------------------------------------------
* | GE<F<D>> | GE<F<?D>> | GE<F<?>> | GE<F> | GE<?F<D>> | GE<?F<?D>> | GE<?F<?>> | GE<?F> | GE<?> | GE
* -------------------------------------------------------------------------------------------------------------------
* GE<F<D>> | 1 | - | - | - | - | - | - | - | - | -
* GE<F<?D>> | 2 | 1 | - | - | - | - | - | - | - | -
* GE<F<?>> | 4 | 3 | 1 | 2 | - | - | - | - | - | -
* GE<F> | 2 | - | - | 1 | - | - | - | - | - | -
* GE<?F<D>> | 2 | - | - | - | 1 | - | - | - | - | -
* GE<?F<?D>> | 3 | 4 | - | - | 2 | 1 | - | - | - | -
* GE<?F<?>> | 7 | 8 | 5 | 6 | 4 | 3 | 1 | 2 | - | -
* GE<?F> | 4 | - | - | 3 | 2 | - | - | 1 | - | -
* GE<?> | 7 | 8 | 9 | 10 | 2 | 3 | 4 | 5 | 1 | 6
* GE | 1 | 2 | 3 | 4 | - | - | - | - | - | 5
* -------------------------------------------------------------------------------------------------------------------
* }</pre>
* <p><br>
* <i>NOTE:</i> On the last row, one might expect the {@code GE} serde to be the top match with the {@code GE}
* kstream. However, that is not the case because {@code GE} is a parameterized type and therefore when
* specified as a raw type (without any type info) it resolves to {@code GE<?>} which throws off the ordering.
* A best practice is to specify the type info (even if it is wildcard) for KStream parameterized types.
*/
@Test
@Disabled
void returnsProperlyOrderedSerdesForComplexGenericTypes() {
ResolvableType geFooDate = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<Foo<Date>>>() { });
ResolvableType geFooDateBounded = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<Foo<? extends Date>>>() { });
ResolvableType geFooWildcard = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<Foo<?>>>() { });
ResolvableType geFooRaw = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<Foo>>() { });
ResolvableType geFooBoundedDate = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<? extends Foo<Date>>>() { });
ResolvableType geFooBoundedDateBounded = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<? extends Foo<? extends Date>>>() { });
ResolvableType geFooBoundedWildcard = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<? extends Foo<?>>>() { });
ResolvableType geFooBoundedRaw = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<? extends Foo>>() { });
ResolvableType geWildcard = ResolvableType.forType(new ParameterizedTypeReference<GenericEvent<?>>() { });
ResolvableType geRaw = ResolvableType.forRawClass(GenericEvent.class);
this.contextRunner.withUserConfiguration(SerdeResolverComplexTestApp.class)
.run((context) -> {
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooDate))
.containsExactly(
"geFooDateSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooDateBounded))
.containsExactly(
"geFooDateBoundedSerde",
"geFooDateSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooWildcard))
.containsExactly(
"geFooWildcardSerde",
"geFooRawSerde",
"geFooDateBoundedSerde",
"geFooDateSerde",
"geFooStringSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooRaw))
.containsExactly(
"geFooRawSerde",
"geFooDateSerde",
"geFooStringSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooBoundedDate))
.containsExactly(
"geFooBoundedDateSerde",
"geFooDateSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooBoundedDateBounded))
.containsExactly(
"geFooBoundedDateBoundedSerde",
"geFooBoundedDateSerde",
"geFooDateSerde",
"geFooDateBoundedSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooBoundedWildcard))
.containsExactly(
"geFooBoundedWildcardSerde",
"geFooBoundedRawSerde",
"geFooBoundedDateBoundedSerde",
"geFooBoundedDateSerde",
"geFooBoundedStringSerde",
"geFooWildcardSerde",
"geFooRawSerde",
"geFooDateSerde",
"geFooDateBoundedSerde",
"geFooStringSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geFooBoundedRaw))
.containsExactly(
"geFooBoundedRawSerde",
"geFooBoundedDateSerde",
"geFooBoundedStringSerde",
"geFooRawSerde",
"geFooDateSerde",
"geFooStringSerde");
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geWildcard))
.containsExactly(
"geWildcardSerde",
"geFooBoundedDateSerde",
"geFooBoundedDateBoundedSerde",
"geFooBoundedStringSerde",
"geFooBoundedWildcardSerde",
"geFooBoundedRawSerde",
"geRawSerde",
"geFooDateSerde",
"geFooDateBoundedSerde",
"geFooStringSerde",
"geFooWildcardSerde",
"geFooRawSerde");
// One might expect geRawSerde to win in order, but it does not because GE is a parameterized type and
// therefore GE resolves to GE<?> which throws off the ordering. Bottom line, for parameterized types
// be sure to specify a type (even if it's wildcard) in the KStream<T> definition.
assertThat(SerdeResolverUtils.beanNamesForMatchingSerdes(context, geRaw))
.containsExactly(
"geFooDateSerde",
"geFooDateBoundedSerde",
"geFooStringSerde",
"geFooWildcardSerde",
"geFooRawSerde",
"geRawSerde");
});
}
}
static class GenericEventSerde<T> implements Serde<GenericEvent<? extends T>> {
private final String name;
GenericEventSerde(String name) {
this.name = name;
}
@Override
public Serializer<GenericEvent<? extends T>> serializer() {
return null;
}
@Override
public Deserializer<GenericEvent<? extends T>> deserializer() {
return null;
}
@Override
public String toString() {
return "GenericEventSerde(" + name + ")";
}
}
static class GenericEvent<T> { }
static class FooSerde implements Serde<Foo> {
@Override
public Serializer<Foo> serializer() {
return null;
}
@Override
public Deserializer<Foo> deserializer() {
return null;
}
}
static class Foo<T> { }
@EnableAutoConfiguration
static class SerdeResolverSimpleTestApp {
@Bean
public Serde<GenericEvent<Date>> geDateSerde() {
return new GenericEventSerde("geDateSerde");
}
@Bean
public Serde<GenericEvent<? extends Date>> geDateBoundedSerde() {
return new GenericEventSerde("geDateBoundedSerde");
}
@Bean
public Serde<GenericEvent<String>> geStringSerde() {
return new GenericEventSerde("geStringSerde");
}
@Bean
public Serde<GenericEvent<?>> geWildcardSerde() {
return new GenericEventSerde("geWildcardSerde");
}
@Bean
public Serde<GenericEvent> geRawSerde() {
return new GenericEventSerde("geRawSerde");
}
@Bean
public Serde<?> widlcardSerde() {
return Serdes.Void();
}
@Bean
public Serde<Foo> fooSerde() {
return new FooSerde();
}
}
@EnableAutoConfiguration
static class SerdeResolverComplexTestApp {
@Bean
public Serde<GenericEvent<Foo<Date>>> geFooDateSerde() {
return new GenericEventSerde("geFooDateSerde");
}
@Bean
public Serde<GenericEvent<Foo<? extends Date>>> geFooDateBoundedSerde() {
return new GenericEventSerde("geFooDateBoundedSerde");
}
@Bean
public Serde<GenericEvent<Foo<String>>> geFooStringSerde() {
return new GenericEventSerde("geFooStringSerde");
}
@Bean
public Serde<GenericEvent<Foo<?>>> geFooWildcardSerde() {
return new GenericEventSerde("geFooWildcardSerde");
}
@Bean
public Serde<GenericEvent<Foo>> geFooRawSerde() {
return new GenericEventSerde("geFooRawSerde");
}
@Bean
public Serde<GenericEvent<? extends Foo<Date>>> geFooBoundedDateSerde() {
return new GenericEventSerde("geFooBoundedDateSerde");
}
@Bean
public Serde<GenericEvent<? extends Foo<? extends Date>>> geFooBoundedDateBoundedSerde() {
return new GenericEventSerde("geFooBoundedDateBoundedSerde");
}
@Bean
public Serde<GenericEvent<? extends Foo<String>>> geFooBoundedStringSerde() {
return new GenericEventSerde("geFooBoundedStringSerde");
}
@Bean
public Serde<GenericEvent<? extends Foo<?>>> geFooBoundedWildcardSerde() {
return new GenericEventSerde("geFooBoundedWildcardSerde");
}
@Bean
public Serde<GenericEvent<? extends Foo>> geFooBoundedRawSerde() {
return new GenericEventSerde("geFooBoundedRawSerde");
}
@Bean
public Serde<GenericEvent<?>> geWildcardSerde() {
return new GenericEventSerde("geWildcardSerde");
}
@Bean
public Serde<GenericEvent> geRawSerde() {
return new GenericEventSerde("geRawSerde");
}
@Bean
public Serde<?> widlcardSerde() {
return Serdes.Void();
}
@Bean
public Serde<Foo> fooSerde() {
return new FooSerde();
}
}
}