ProducerProperties.java

/*
 * Copyright 2016-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder;

import java.time.Duration;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import tools.jackson.core.JacksonException;
import tools.jackson.core.JsonGenerator;
import tools.jackson.databind.SerializationContext;
import tools.jackson.databind.annotation.JsonSerialize;

import org.springframework.expression.Expression;
import tools.jackson.databind.ser.std.StdSerializer;

/**
 * Common producer properties.
 *
 * @author Marius Bogoevici
 * @author Ilayaperumal Gopinathan
 * @author Gary Russell
 * @author Oleg Zhurakousky
 */
@JsonInclude(Include.NON_DEFAULT)
public class ProducerProperties {

	/**
	 * Binding name for this producer binding.
	 */
	private String bindingName;

	/**
	 * Signals if this producer needs to be started automatically. Default: true
	 */
	private boolean autoStartup = true;

	@JsonSerialize(using = ExpressionSerializer.class)
	private Expression partitionKeyExpression;

	/**
	 * The name of the bean that implements {@link PartitionKeyExtractorStrategy}\. Used
	 * to extract a key used to compute the partition id (see 'partitionSelector*') <br>
	 * Mutually exclusive with 'partitionKeyExpression'.
	 */
	private String partitionKeyExtractorName;

	/**
	 * The name of the bean that implements {@link PartitionSelectorStrategy}\. Used to
	 * determine partition id based on partition key (see 'partitionKeyExtractor*'). <br>
	 * Mutually exclusive with 'partitionSelectorExpression'.
	 */
	private String partitionSelectorName;

	@JsonSerialize(using = ExpressionSerializer.class)
	private Expression partitionSelectorExpression;

	private int partitionCount = 1;

	private String[] requiredGroups = new String[] {};

	private HeaderMode headerMode;

	private boolean useNativeEncoding = false;

	private boolean errorChannelEnabled = false;

	private PollerProperties poller;

	private boolean dynamicPartitionUpdatesEnabled = false;

	public String getBindingName() {
		return bindingName;
	}

	/**
	 * This method is not intended as a configuration property to set by the applications.
	 * Therefore, we are not providing a proper setter method for this.
	 * @param bindingName binding name populated by the framework.
	 */
	public void populateBindingName(String bindingName) {
		this.bindingName = bindingName;
	}

	public Expression getPartitionKeyExpression() {
		return this.partitionKeyExpression;
	}

	public void setPartitionKeyExpression(Expression partitionKeyExpression) {
		this.partitionKeyExpression = partitionKeyExpression;
	}

	public boolean isPartitioned() {
		return this.partitionKeyExpression != null
				|| this.partitionKeyExtractorName != null;
	}

	public Expression getPartitionSelectorExpression() {
		return this.partitionSelectorExpression;
	}

	public void setPartitionSelectorExpression(Expression partitionSelectorExpression) {
		this.partitionSelectorExpression = partitionSelectorExpression;
	}

	public int getPartitionCount() {
		return this.partitionCount;
	}

	public void setPartitionCount(int partitionCount) {
		this.partitionCount = partitionCount;
	}

	public String[] getRequiredGroups() {
		return this.requiredGroups;
	}

	public void setRequiredGroups(String... requiredGroups) {
		this.requiredGroups = requiredGroups;
	}

	public boolean isValidPartitionKeyProperty() {
		return this.partitionKeyExpression == null;
	}

	public boolean isValidPartitionSelectorProperty() {
		return this.partitionSelectorExpression == null;
	}

	public HeaderMode getHeaderMode() {
		return this.headerMode;
	}

	public void setHeaderMode(HeaderMode headerMode) {
		this.headerMode = headerMode;
	}

	public boolean isUseNativeEncoding() {
		return this.useNativeEncoding;
	}

	public void setUseNativeEncoding(boolean useNativeEncoding) {
		this.useNativeEncoding = useNativeEncoding;
	}

	public boolean isErrorChannelEnabled() {
		return this.errorChannelEnabled;
	}

	public void setErrorChannelEnabled(boolean errorChannelEnabled) {
		this.errorChannelEnabled = errorChannelEnabled;
	}

	public String getPartitionKeyExtractorName() {
		return this.partitionKeyExtractorName;
	}

	public void setPartitionKeyExtractorName(String partitionKeyExtractorName) {
		this.partitionKeyExtractorName = partitionKeyExtractorName;
	}

	public String getPartitionSelectorName() {
		return this.partitionSelectorName;
	}

	public void setPartitionSelectorName(String partitionSelectorName) {
		this.partitionSelectorName = partitionSelectorName;
	}

	public boolean isAutoStartup() {
		return this.autoStartup;
	}

	public void setAutoStartup(boolean autoStartup) {
		this.autoStartup = autoStartup;
	}

	public PollerProperties getPoller() {
		return poller;
	}

	public void setPoller(PollerProperties poller) {
		this.poller = poller;
	}

	/**
	 * Returns status of property dynamicPartitionUpdatesEnabled.
	 * @return true if dynamic updates should are enabled otherwise false
	 */
	public boolean isDynamicPartitionUpdatesEnabled() {
		return dynamicPartitionUpdatesEnabled;
	}

	/**
	 * A flag which enables/disables partition count updates during runtime. Disabled by default.
	 * Depends on binder if supported or not.
	 * Currently only supported by kafka binder (see 'Partitioning with the Kafka Binder' documentation for details)
	 * @param enabled true if dynamic updates should be enabled otherwise false
	 */
	public void setDynamicPartitionUpdatesEnabled(boolean enabled) {
		this.dynamicPartitionUpdatesEnabled = enabled;
	}

	/**
	 * @deprecated since 5.0.2 in favor of top level class in the same package
	 */
	@Deprecated
	public static class ExpressionSerializer extends StdSerializer<Expression> {
		public ExpressionSerializer() {
			super(Expression.class);
		}
		public ExpressionSerializer(Class<Expression> t) {
			super(t);
		}
		@Override
		public void serialize(Expression expression, JsonGenerator jsonGenerator, SerializationContext provider) throws JacksonException {
			if (expression != null) {
				jsonGenerator.writeString(expression.getExpressionString());
			}
		}
	}

	public static class PollerProperties {

		private Duration fixedDelay = Duration.ofMillis(1000);

		private long maxMessagesPerPoll = 1L;

		private String cron;

		private Duration initialDelay = Duration.ofMillis(0);

		public long getMaxMessagesPerPoll() {
			return maxMessagesPerPoll;
		}

		public void setMaxMessagesPerPoll(long maxMessagesPerPoll) {
			this.maxMessagesPerPoll = maxMessagesPerPoll;
		}

		public String getCron() {
			return cron;
		}

		public void setCron(String cron) {
			this.cron = cron;
		}

		public Duration getInitialDelay() {
			return initialDelay;
		}

		public void setInitialDelay(Duration initialDelay) {
			this.initialDelay = initialDelay;
		}

		public Duration getFixedDelay() {
			return fixedDelay;
		}

		public void setFixedDelay(Duration fixedDelay) {
			this.fixedDelay = fixedDelay;
		}
	}

}