KafkaStreamsConsumerProperties.java

/*
 * Copyright 2017-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.kafka.streams.properties;

import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.DeserializationExceptionHandler;

/**
 * Extended properties for Kafka Streams consumer.
 *
 * @author Marius Bogoevici
 * @author Soby Chacko
 * @author Gihong Park
 */
public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {

	private String applicationId;

	/**
	 * Key serde specified per binding.
	 */
	private String keySerde;

	/**
	 * Value serde specified per binding.
	 */
	private String valueSerde;

	/**
	 * Materialized as a KeyValueStore.
	 */
	private String materializedAs;

	/**
	 * Disable caching for materialized KTable.
	 */
	private boolean cachingDisabled;

	/**
	 * Disable logging for materialized KTable.
	 */
	private boolean loggingDisabled;

	/**
	 * Per input binding deserialization handler.
	 */
	private DeserializationExceptionHandler deserializationExceptionHandler;

	/**
	 * {@link org.apache.kafka.streams.processor.TimestampExtractor} bean name to use for this consumer.
	 */
	private String timestampExtractorBeanName;

	/**
	 * Comma separated list of supported event types for this binding.
	 */
	private String eventTypes;

	/**
	 * Record level header key for event type.
	 * If the default value is overridden, then that is expected on each record header if eventType based
	 * routing is enabled on this binding (by setting eventTypes).
	 */
	private String eventTypeHeaderKey = "event_type";

	/**
	 * Custom name for the source component from which the processor is consuming from.
	 */
	private String consumedAs;

	/**
	 * When event type based routing is enabled, the binder uses the byte[] Serde by default.
	 * Use this property to override this default behavior by forcing the binder to use the configured or inferred Serde.
	 */
	private boolean useConfiguredSerdeWhenRoutingEvents;

	public String getApplicationId() {
		return this.applicationId;
	}

	public void setApplicationId(String applicationId) {
		this.applicationId = applicationId;
	}

	public String getKeySerde() {
		return this.keySerde;
	}

	public void setKeySerde(String keySerde) {
		this.keySerde = keySerde;
	}

	public String getValueSerde() {
		return this.valueSerde;
	}

	public void setValueSerde(String valueSerde) {
		this.valueSerde = valueSerde;
	}

	public String getMaterializedAs() {
		return this.materializedAs;
	}

	public void setMaterializedAs(String materializedAs) {
		this.materializedAs = materializedAs;
	}

	public boolean isCachingDisabled() {
		return this.cachingDisabled;
	}

	public void setCachingDisabled(boolean cachingDisabled) {
		this.cachingDisabled = cachingDisabled;
	}

	public boolean isLoggingDisabled() {
		return this.loggingDisabled;
	}

	public void setLoggingDisabled(boolean loggingDisabled) {
		this.loggingDisabled = loggingDisabled;
	}

	public String getTimestampExtractorBeanName() {
		return timestampExtractorBeanName;
	}

	public void setTimestampExtractorBeanName(String timestampExtractorBeanName) {
		this.timestampExtractorBeanName = timestampExtractorBeanName;
	}

	public DeserializationExceptionHandler getDeserializationExceptionHandler() {
		return deserializationExceptionHandler;
	}

	public void setDeserializationExceptionHandler(DeserializationExceptionHandler deserializationExceptionHandler) {
		this.deserializationExceptionHandler = deserializationExceptionHandler;
	}

	public String getEventTypes() {
		return eventTypes;
	}

	public void setEventTypes(String eventTypes) {
		this.eventTypes = eventTypes;
	}

	public String getEventTypeHeaderKey() {
		return this.eventTypeHeaderKey;
	}

	public void setEventTypeHeaderKey(String eventTypeHeaderKey) {
		this.eventTypeHeaderKey = eventTypeHeaderKey;
	}

	public String getConsumedAs() {
		return consumedAs;
	}

	public void setConsumedAs(String consumedAs) {
		this.consumedAs = consumedAs;
	}

	public boolean isUseConfiguredSerdeWhenRoutingEvents() {
		return this.useConfiguredSerdeWhenRoutingEvents;
	}

	public void setUseConfiguredSerdeWhenRoutingEvents(boolean useConfiguredSerdeWhenRoutingEvents) {
		this.useConfiguredSerdeWhenRoutingEvents = useConfiguredSerdeWhenRoutingEvents;
	}
}