KafkaStreamsRegistry.java

/*
 * Copyright 2018-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

import org.springframework.kafka.config.StreamsBuilderFactoryBean;

/**
 * An internal registry for holding {@link KafkaStreams} objects maintained through
 * {@link StreamsBuilderFactoryManager}.
 *
 * @author Soby Chacko
 */
public class KafkaStreamsRegistry {

	private final Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new ConcurrentHashMap<>();

	private final Set<KafkaStreams> kafkaStreams = ConcurrentHashMap.newKeySet();

	Set<KafkaStreams> getKafkaStreams() {
		Set<KafkaStreams> currentlyRunningKafkaStreams = new HashSet<>();
		for (KafkaStreams ks : this.kafkaStreams) {
			final StreamsBuilderFactoryBean streamsBuilderFactoryBean = streamsBuilderFactoryBeanMap.get(ks);
			if (streamsBuilderFactoryBean.isRunning()) {
				currentlyRunningKafkaStreams.add(ks);
			}
		}
		return currentlyRunningKafkaStreams;
	}

	/**
	 * Register the {@link KafkaStreams} object created in the application.
	 * @param streamsBuilderFactoryBean {@link StreamsBuilderFactoryBean}
	 */
	void registerKafkaStreams(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
		final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
		this.kafkaStreams.add(kafkaStreams);
		this.streamsBuilderFactoryBeanMap.put(kafkaStreams, streamsBuilderFactoryBean);
	}

	void unregisterKafkaStreams(KafkaStreams kafkaStreams) {
		this.kafkaStreams.remove(kafkaStreams);
		this.streamsBuilderFactoryBeanMap.remove(kafkaStreams);
	}

	/**
	 *
	 * @param kafkaStreams {@link KafkaStreams} object
	 * @return Corresponding {@link StreamsBuilderFactoryBean}.
	 */
	StreamsBuilderFactoryBean streamBuilderFactoryBean(KafkaStreams kafkaStreams) {
		return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
	}

	public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
		final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
				.stream()
				.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean.isRunning() && Objects.requireNonNull(streamsBuilderFactoryBean
						.getStreamsConfiguration()).getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
						.equals(applicationId))
				.findFirst();
		return first.orElse(null);
	}

	public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
		return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
	}

}