StreamsBuilderFactoryManager.java
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.boot.web.server.context.WebServerApplicationContext;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.streams.KafkaStreamsMicrometerListener;
import org.springframework.lang.NonNull;
/**
* Iterate through all {@link StreamsBuilderFactoryBean} in the application context and
* start them. As each one completes starting, register the associated KafkaStreams object
* into {@link InteractiveQueryService}.
*
* This {@link SmartLifecycle} class ensures that the bean created from it is started very
* late through the bootstrap process by setting the phase value closer to
* Integer.MAX_VALUE. This is to guarantee that the {@link StreamsBuilderFactoryBean} on a
* function with multiple bindings is only started after all the binding phases have completed successfully.
*
* @author Soby Chacko
*/
public class StreamsBuilderFactoryManager implements SmartLifecycle {
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final KafkaStreamsRegistry kafkaStreamsRegistry;
private final KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics;
private final KafkaStreamsMicrometerListener listener;
private volatile boolean running;
private final KafkaProperties kafkaProperties;
StreamsBuilderFactoryManager(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsRegistry kafkaStreamsRegistry,
KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics,
KafkaStreamsMicrometerListener listener,
KafkaProperties kafkaProperties) {
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
this.kafkaStreamsBinderMetrics = kafkaStreamsBinderMetrics;
this.listener = listener;
this.kafkaProperties = kafkaProperties;
}
@Override
public boolean isAutoStartup() {
return this.kafkaProperties == null || this.kafkaProperties.getStreams().isAutoStartup();
}
@Override
public void stop(@NonNull Runnable callback) {
stop();
callback.run();
}
@Override
public synchronized void start() {
if (!this.running) {
try {
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeans();
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
if (this.listener != null) {
streamsBuilderFactoryBean.addListener(this.listener);
}
// By default, we shut down the client if there is an uncaught exception in the application.
// Users can override this by customizing SBFB. See this issue for more details:
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1110
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
// Starting the stream.
final Map<StreamsBuilderFactoryBean, List<ConsumerProperties>> bindingServicePropertiesPerSbfb =
this.kafkaStreamsBindingInformationCatalogue.getConsumerPropertiesPerSbfb();
final List<ConsumerProperties> consumerProperties = bindingServicePropertiesPerSbfb.get(streamsBuilderFactoryBean);
final boolean autoStartupDisabledOnAtLeastOneConsumerBinding = consumerProperties.stream().anyMatch(consumerProperties1 -> !consumerProperties1.isAutoStartup());
if (streamsBuilderFactoryBean instanceof SmartInitializingSingleton) {
((SmartInitializingSingleton) streamsBuilderFactoryBean).afterSingletonsInstantiated();
}
if (!autoStartupDisabledOnAtLeastOneConsumerBinding) {
streamsBuilderFactoryBean.start();
this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
}
if (this.kafkaStreamsBinderMetrics != null) {
this.kafkaStreamsBinderMetrics.addMetrics(streamsBuilderFactoryBeans);
}
this.running = true;
}
catch (Exception ex) {
throw new KafkaException("Could not start stream: ", ex);
}
}
}
@Override
public synchronized void stop() {
if (this.running) {
try {
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeans();
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
streamsBuilderFactoryBean.removeListener(this.listener);
streamsBuilderFactoryBean.stop();
}
for (ProducerFactory<byte[], byte[]> dlqProducerFactory : this.kafkaStreamsBindingInformationCatalogue.getDlqProducerFactories()) {
((DisposableBean) dlqProducerFactory).destroy();
}
}
catch (Exception ex) {
throw new IllegalStateException(ex);
}
finally {
this.running = false;
}
}
}
@Override
public synchronized boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return WebServerApplicationContext.GRACEFUL_SHUTDOWN_PHASE - 1;
}
}