KafkaBinderConfigurationProperties.java
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.kafka.autoconfigure.KafkaConnectionDetails;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties.CompressionType;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.expression.Expression;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Configuration properties for the Kafka binder. The properties in this class are
* prefixed with <b>spring.cloud.stream.kafka.binder</b>.
*
* @author David Turanski
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gary Russell
* @author Rafal Zukowski
* @author Aldo Sinanaj
* @author Lukasz Kaminski
* @author Chukwubuikem Ume-Ugwa
* @author Nico Heller
* @author Norbert Gyurian
* @author Boini Srinivas
* @author Felix Schultze
*/
public class KafkaBinderConfigurationProperties {
private static final String DEFAULT_KAFKA_CONNECTION_STRING = "localhost:9092";
private final Log logger = LogFactory.getLog(getClass());
private final Transaction transaction = new Transaction();
private final Metrics metrics = new Metrics();
private final KafkaProperties kafkaProperties;
private final KafkaConnectionDetails kafkaConnectionDetails;
/**
* Arbitrary kafka properties that apply to both producers and consumers.
*/
private Map<String, String> configuration = new HashMap<>();
/**
* Arbitrary kafka consumer properties.
*/
private Map<String, String> consumerProperties = new HashMap<>();
/**
* Arbitrary kafka producer properties.
*/
private Map<String, String> producerProperties = new HashMap<>();
private String[] brokers = new String[] { "localhost" };
private String defaultBrokerPort = "9092";
private String[] headers = new String[] {};
private boolean autoCreateTopics = true;
private boolean autoAlterTopics;
private boolean autoAddPartitions;
private boolean considerDownWhenAnyPartitionHasNoLeader = true;
private String requiredAcks = "all";
private short replicationFactor = -1;
private int minPartitionCount = 1;
/**
* Time to wait to get partition information in seconds; default 60.
*/
private int healthTimeout = 60;
private JaasLoginModuleConfiguration jaas;
/**
* The bean name of a custom header mapper to use instead of a
* {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}.
*/
private String headerMapperBeanName;
/**
* Time between retries after AuthorizationException is caught in
* the ListenerContainer; defalt is null which disables retries.
* For more info see: {@link org.springframework.kafka.listener.ConsumerProperties#setAuthorizationExceptionRetryInterval(java.time.Duration)}
*/
private Duration authorizationExceptionRetryInterval;
/**
* When a certificate store location is not given as a local file system path, then the binder
* converts the path to {@link org.springframework.core.io.Resource} resource, then copies
* the resource from the original location to a location on
* the filesystem. If this value is set, then this location is used, otherwise, the
* certificate file is copied to the directory returned by java.io.tmpdir.
*/
private String certificateStoreDirectory;
/**
* Enable Micrometer observation registry across all the bindings in the binder.
*/
private boolean enableObservation;
/**
* Schema registry ssl configuration properties.
*/
private final String[] schemaRegistryProperties = new String[]{"schema.registry.url",
"schema.registry.ssl.keystore.location", "schema.registry.ssl.keystore.password",
"schema.registry.ssl.truststore.location", "schema.registry.ssl.truststore.password",
"schema.registry.ssl.key.password"};
/**
* Consumer group.id of the Kafka consumer in
* {@link org.springframework.cloud.stream.binder.kafka.common.AbstractKafkaBinderHealthIndicator} that is used
* for querying metadata from the broker (such as metadata information about the topics).
*/
private String healthIndicatorConsumerGroup;
/**
* Earlier, @Autowired on this constructor was necessary for all the properties to be discovered
* and bound when running as a native application. However, now that Spring Boot fixed the underlying
* issue, we are removing the @Autowired from the constructor. See these Boot issues for more details.
* https://github.com/spring-projects/spring-boot/issues/34507
* https://github.com/spring-projects/spring-boot/issues/35564
*
* @param kafkaProperties Spring Kafka properties autoconfigured by Spring Boot
* @param kafkaConnectionDetails Kafka connection details autoconfigured by Spring Boot
*/
public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties, ObjectProvider<KafkaConnectionDetails> kafkaConnectionDetails) {
Assert.notNull(kafkaProperties, "'kafkaProperties' cannot be null");
this.kafkaProperties = kafkaProperties;
this.kafkaConnectionDetails = kafkaConnectionDetails.getIfAvailable();
}
public KafkaProperties getKafkaProperties() {
return this.kafkaProperties;
}
public Transaction getTransaction() {
return this.transaction;
}
public Metrics getMetrics() {
return metrics;
}
public String getKafkaConnectionString() {
// We need to do a check on certificate file locations to see if they are given as non-local file system resources.
// If that is the case, then we will copy them to a file system location and use those as the certificate locations.
// This is due to a limitation in Kafka itself in which it doesn't allow reading certificate resources from non-local
// file system locations e.g. classpath, http.
// See this: https://issues.apache.org/jira/browse/KAFKA-7685
// and this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-398%3A+Support+reading+trust+store+from+classpath
moveCertsToFileSystemIfNecessary();
return toConnectionString(this.brokers, this.defaultBrokerPort);
}
private void moveCertsToFileSystemIfNecessary() {
try {
// broker certificates
moveCertsIfApplicable("ssl.truststore.location");
moveCertsIfApplicable("ssl.keystore.location");
// schema registry certificates
moveCertsIfApplicable("schema.registry.ssl.truststore.location");
moveCertsIfApplicable("schema.registry.ssl.keystore.location");
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
private void moveCertsIfApplicable(String storeProperty) throws IOException {
final String storeLocation = this.configuration.get(storeProperty);
// If the path is not defined, or it is a local file path do not move the file
if (StringUtils.hasText(storeLocation) && !checkIfFileExists(storeLocation)) {
final String fileSystemLocation = moveCertToFileSystem(storeLocation, this.certificateStoreDirectory);
// Overriding the value with absolute filesystem path.
this.configuration.put(storeProperty, fileSystemLocation);
}
}
private boolean checkIfFileExists(String path) {
try {
return Files.isRegularFile(Paths.get(path));
}
catch (Exception e) {
return false;
}
}
private String moveCertToFileSystem(String resourceLocation, String fileSystemLocation) throws IOException {
File targetFile;
final String tempDir = System.getProperty("java.io.tmpdir");
Resource resource = new DefaultResourceLoader().getResource(resourceLocation);
if (StringUtils.hasText(fileSystemLocation)) {
final Path path = Paths.get(fileSystemLocation);
if (!Files.exists(path) || !Files.isDirectory(path) || !Files.isWritable(path)) {
logger.warn("The filesystem location to move the cert files (" + fileSystemLocation + ") " +
"is not found or a directory that is writable. The system temp folder (java.io.tmpdir) will be used instead.");
targetFile = new File(Paths.get(tempDir, resource.getFilename()).toString());
}
else {
// the given location is verified to be a writable directory.
targetFile = new File(Paths.get(fileSystemLocation, resource.getFilename()).toString());
}
}
else {
targetFile = new File(Paths.get(tempDir, resource.getFilename()).toString());
}
try (InputStream inputStream = resource.getInputStream()) {
Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
return targetFile.getAbsolutePath();
}
public String getDefaultKafkaConnectionString() {
return DEFAULT_KAFKA_CONNECTION_STRING;
}
public String[] getHeaders() {
return this.headers;
}
public String[] getBrokers() {
return this.brokers;
}
public void setBrokers(String... brokers) {
this.brokers = brokers;
}
public void setDefaultBrokerPort(String defaultBrokerPort) {
this.defaultBrokerPort = defaultBrokerPort;
}
public void setHeaders(String... headers) {
this.headers = headers;
}
/**
* Converts an array of host values to a comma-separated String. It will append the
* default port value, if not already specified.
* @param hosts host string
* @param defaultPort port
* @return formatted connection string
*/
private String toConnectionString(String[] hosts, String defaultPort) {
String[] fullyFormattedHosts = new String[hosts.length];
for (int i = 0; i < hosts.length; i++) {
if (hosts[i].contains(":") || !StringUtils.hasText(defaultPort)) {
fullyFormattedHosts[i] = hosts[i];
}
else {
fullyFormattedHosts[i] = hosts[i] + ":" + defaultPort;
}
}
return StringUtils.arrayToCommaDelimitedString(fullyFormattedHosts);
}
public String getRequiredAcks() {
return this.requiredAcks;
}
public void setRequiredAcks(String requiredAcks) {
this.requiredAcks = requiredAcks;
}
public short getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(short replicationFactor) {
this.replicationFactor = replicationFactor;
}
public int getMinPartitionCount() {
return this.minPartitionCount;
}
public void setMinPartitionCount(int minPartitionCount) {
this.minPartitionCount = minPartitionCount;
}
public int getHealthTimeout() {
return this.healthTimeout;
}
public void setHealthTimeout(int healthTimeout) {
this.healthTimeout = healthTimeout;
}
public boolean isAutoCreateTopics() {
return this.autoCreateTopics;
}
public void setAutoCreateTopics(boolean autoCreateTopics) {
this.autoCreateTopics = autoCreateTopics;
}
public boolean isAutoAlterTopics() {
return autoAlterTopics;
}
public void setAutoAlterTopics(boolean autoAlterTopics) {
this.autoAlterTopics = autoAlterTopics;
}
public boolean isAutoAddPartitions() {
return this.autoAddPartitions;
}
public void setAutoAddPartitions(boolean autoAddPartitions) {
this.autoAddPartitions = autoAddPartitions;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}
public Map<String, String> getConsumerProperties() {
return this.consumerProperties;
}
public void setConsumerProperties(Map<String, String> consumerProperties) {
Assert.notNull(consumerProperties, "'consumerProperties' cannot be null");
this.consumerProperties = consumerProperties;
}
public Map<String, String> getProducerProperties() {
return this.producerProperties;
}
public void setProducerProperties(Map<String, String> producerProperties) {
Assert.notNull(producerProperties, "'producerProperties' cannot be null");
this.producerProperties = producerProperties;
}
/**
* Merge boot consumer properties, general properties from
* {@link #setConfiguration(Map)} that apply to consumers, properties from
* {@link #setConsumerProperties(Map)}, in that order.
* @return the merged properties.
*/
public Map<String, Object> mergedConsumerConfiguration() {
Map<String, Object> consumerConfiguration = new HashMap<>(this.kafkaProperties.buildConsumerProperties());
if (this.kafkaConnectionDetails != null) {
consumerConfiguration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getConsumer().getBootstrapServers());
}
// Copy configured binder properties that apply to consumers
// allow schema registry properties to be propagated to consumer configuration
for (Map.Entry<String, String> configurationEntry : this.configuration
.entrySet()) {
if (ConsumerConfig.configNames().contains(configurationEntry.getKey())
|| ObjectUtils.containsElement(schemaRegistryProperties, configurationEntry.getKey())) {
consumerConfiguration.put(configurationEntry.getKey(),
configurationEntry.getValue());
}
}
consumerConfiguration.putAll(this.consumerProperties);
filterStreamManagedConfiguration(consumerConfiguration);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
return getConfigurationWithBootstrapServer(consumerConfiguration,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
/**
* Merge boot producer properties, general properties from
* {@link #setConfiguration(Map)} that apply to producers, properties from
* {@link #setProducerProperties(Map)}, in that order.
* @return the merged properties.
*/
public Map<String, Object> mergedProducerConfiguration() {
Map<String, Object> producerConfiguration = new HashMap<>(this.kafkaProperties.buildProducerProperties());
if (this.kafkaConnectionDetails != null) {
producerConfiguration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaConnectionDetails.getProducer().getBootstrapServers());
}
// Copy configured binder properties that apply to producers
for (Map.Entry<String, String> configurationEntry : this.configuration
.entrySet()) {
if (ProducerConfig.configNames().contains(configurationEntry.getKey())
|| ObjectUtils.containsElement(schemaRegistryProperties, configurationEntry.getKey())) {
producerConfiguration.put(configurationEntry.getKey(),
configurationEntry.getValue());
}
}
producerConfiguration.putAll(this.producerProperties);
// Override Spring Boot bootstrap server setting if left to default with the value
// configured in the binder
return getConfigurationWithBootstrapServer(producerConfiguration,
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
private void filterStreamManagedConfiguration(Map<String, Object> configuration) {
if (configuration.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
&& configuration.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString().equalsIgnoreCase("true")) {
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) +
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=true is not supported by the Kafka binder");
configuration.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
if (configuration.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
logger.warn(constructIgnoredConfigMessage(ConsumerConfig.GROUP_ID_CONFIG) +
"Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify " +
"the group instead of " + ConsumerConfig.GROUP_ID_CONFIG);
configuration.remove(ConsumerConfig.GROUP_ID_CONFIG);
}
}
private String constructIgnoredConfigMessage(String config) {
return String.format("Ignoring provided value(s) for '%s'. ", config);
}
private Map<String, Object> getConfigurationWithBootstrapServer(
Map<String, Object> configuration, String bootstrapServersConfig) {
final String kafkaConnectionString = getKafkaConnectionString();
if (ObjectUtils.isEmpty(configuration.get(bootstrapServersConfig)) ||
!kafkaConnectionString.equals("localhost:9092")) {
configuration.put(bootstrapServersConfig, kafkaConnectionString);
}
return Collections.unmodifiableMap(configuration);
}
public JaasLoginModuleConfiguration getJaas() {
return this.jaas;
}
public void setJaas(JaasLoginModuleConfiguration jaas) {
this.jaas = jaas;
}
public String getHeaderMapperBeanName() {
return this.headerMapperBeanName;
}
public void setHeaderMapperBeanName(String headerMapperBeanName) {
this.headerMapperBeanName = headerMapperBeanName;
}
public Duration getAuthorizationExceptionRetryInterval() {
return authorizationExceptionRetryInterval;
}
public void setAuthorizationExceptionRetryInterval(Duration authorizationExceptionRetryInterval) {
this.authorizationExceptionRetryInterval = authorizationExceptionRetryInterval;
}
public boolean isConsiderDownWhenAnyPartitionHasNoLeader() {
return this.considerDownWhenAnyPartitionHasNoLeader;
}
public void setConsiderDownWhenAnyPartitionHasNoLeader(boolean considerDownWhenAnyPartitionHasNoLeader) {
this.considerDownWhenAnyPartitionHasNoLeader = considerDownWhenAnyPartitionHasNoLeader;
}
public String getCertificateStoreDirectory() {
return this.certificateStoreDirectory;
}
public void setCertificateStoreDirectory(String certificateStoreDirectory) {
this.certificateStoreDirectory = certificateStoreDirectory;
}
public boolean isEnableObservation() {
return this.enableObservation;
}
public void setEnableObservation(boolean enableObservation) {
this.enableObservation = enableObservation;
}
public String getHealthIndicatorConsumerGroup() {
return healthIndicatorConsumerGroup;
}
public void setHealthIndicatorConsumerGroup(String healthIndicatorConsumerGroup) {
this.healthIndicatorConsumerGroup = healthIndicatorConsumerGroup;
}
/**
* Domain class that models transaction capabilities in Kafka.
*/
public static class Transaction {
private final CombinedProducerProperties producer = new CombinedProducerProperties();
private String transactionIdPrefix;
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
public CombinedProducerProperties getProducer() {
return this.producer;
}
}
/**
* An combination of {@link ProducerProperties} and {@link KafkaProducerProperties} so
* that common and kafka-specific properties can be set for the transactional
* producer.
*
* @since 2.1
*/
public static class CombinedProducerProperties {
private final ProducerProperties producerProperties = new ProducerProperties();
private final KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
public Expression getPartitionKeyExpression() {
return this.producerProperties.getPartitionKeyExpression();
}
public void setPartitionKeyExpression(Expression partitionKeyExpression) {
this.producerProperties.setPartitionKeyExpression(partitionKeyExpression);
}
public boolean isPartitioned() {
return this.producerProperties.isPartitioned();
}
public Expression getPartitionSelectorExpression() {
return this.producerProperties.getPartitionSelectorExpression();
}
public void setPartitionSelectorExpression(
Expression partitionSelectorExpression) {
this.producerProperties
.setPartitionSelectorExpression(partitionSelectorExpression);
}
public @Min(value = 1, message = "Partition count should be greater than zero.") int getPartitionCount() {
return this.producerProperties.getPartitionCount();
}
public void setPartitionCount(int partitionCount) {
this.producerProperties.setPartitionCount(partitionCount);
}
public String[] getRequiredGroups() {
return this.producerProperties.getRequiredGroups();
}
public void setRequiredGroups(String... requiredGroups) {
this.producerProperties.setRequiredGroups(requiredGroups);
}
public @AssertTrue(message = "Partition key expression and partition key extractor class properties "
+ "are mutually exclusive.") boolean isValidPartitionKeyProperty() {
return this.producerProperties.isValidPartitionKeyProperty();
}
public @AssertTrue(message = "Partition selector class and partition selector expression "
+ "properties are mutually exclusive.") boolean isValidPartitionSelectorProperty() {
return this.producerProperties.isValidPartitionSelectorProperty();
}
public HeaderMode getHeaderMode() {
return this.producerProperties.getHeaderMode();
}
public void setHeaderMode(HeaderMode headerMode) {
this.producerProperties.setHeaderMode(headerMode);
}
public boolean isUseNativeEncoding() {
return this.producerProperties.isUseNativeEncoding();
}
public void setUseNativeEncoding(boolean useNativeEncoding) {
this.producerProperties.setUseNativeEncoding(useNativeEncoding);
}
public boolean isErrorChannelEnabled() {
return this.producerProperties.isErrorChannelEnabled();
}
public void setErrorChannelEnabled(boolean errorChannelEnabled) {
this.producerProperties.setErrorChannelEnabled(errorChannelEnabled);
}
public String getPartitionKeyExtractorName() {
return this.producerProperties.getPartitionKeyExtractorName();
}
public void setPartitionKeyExtractorName(String partitionKeyExtractorName) {
this.producerProperties
.setPartitionKeyExtractorName(partitionKeyExtractorName);
}
public String getPartitionSelectorName() {
return this.producerProperties.getPartitionSelectorName();
}
public void setPartitionSelectorName(String partitionSelectorName) {
this.producerProperties.setPartitionSelectorName(partitionSelectorName);
}
public int getBufferSize() {
return this.kafkaProducerProperties.getBufferSize();
}
public void setBufferSize(int bufferSize) {
this.kafkaProducerProperties.setBufferSize(bufferSize);
}
public @NotNull CompressionType getCompressionType() {
return this.kafkaProducerProperties.getCompressionType();
}
public void setCompressionType(CompressionType compressionType) {
this.kafkaProducerProperties.setCompressionType(compressionType);
}
public boolean isSync() {
return this.kafkaProducerProperties.isSync();
}
public void setSync(boolean sync) {
this.kafkaProducerProperties.setSync(sync);
}
public int getBatchTimeout() {
return this.kafkaProducerProperties.getBatchTimeout();
}
public void setBatchTimeout(int batchTimeout) {
this.kafkaProducerProperties.setBatchTimeout(batchTimeout);
}
public Expression getMessageKeyExpression() {
return this.kafkaProducerProperties.getMessageKeyExpression();
}
public void setMessageKeyExpression(Expression messageKeyExpression) {
this.kafkaProducerProperties.setMessageKeyExpression(messageKeyExpression);
}
public String[] getHeaderPatterns() {
return this.kafkaProducerProperties.getHeaderPatterns();
}
public void setHeaderPatterns(String[] headerPatterns) {
this.kafkaProducerProperties.setHeaderPatterns(headerPatterns);
}
public Map<String, String> getConfiguration() {
return this.kafkaProducerProperties.getConfiguration();
}
public void setConfiguration(Map<String, String> configuration) {
this.kafkaProducerProperties.setConfiguration(configuration);
}
public KafkaTopicProperties getTopic() {
return this.kafkaProducerProperties.getTopic();
}
public void setTopic(KafkaTopicProperties topic) {
this.kafkaProducerProperties.setTopic(topic);
}
public KafkaProducerProperties getExtension() {
return this.kafkaProducerProperties;
}
}
public static class Metrics {
/**
* When set to true, the offset lag metric of each consumer topic is computed whenever the metric is queried (default: true).
* When set to false only the periodically calculated offset lag is used.
* See {@link #offsetLagMetricsInterval} for more information.
*/
private boolean defaultOffsetLagMetricsEnabled = true;
/**
* The interval in which the offset lag for each consumer topic is computed (default: 60 seconds).
* This value is used whenever {@link #defaultOffsetLagMetricsEnabled} is disabled or its computation is taking too long.
*/
private Duration offsetLagMetricsInterval = Duration.ofSeconds(60);
public boolean isDefaultOffsetLagMetricsEnabled() {
return defaultOffsetLagMetricsEnabled;
}
public void setDefaultOffsetLagMetricsEnabled(boolean defaultOffsetLagMetricsEnabled) {
this.defaultOffsetLagMetricsEnabled = defaultOffsetLagMetricsEnabled;
}
public Duration getOffsetLagMetricsInterval() {
return offsetLagMetricsInterval;
}
public void setOffsetLagMetricsInterval(Duration offsetLagMetricsInterval) {
this.offsetLagMetricsInterval = offsetLagMetricsInterval;
}
}
}