KafkaConnectorConfig.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.kafka;
import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplier;
import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.util.List;
import static com.google.common.collect.ImmutableList.toImmutableList;
public class KafkaConnectorConfig
{
/**
* Timeout to connect to Kafka.
*/
private Duration kafkaConnectTimeout = Duration.valueOf("10s");
/**
* The schema name to use in the connector.
*/
private String defaultSchema = "default";
/**
* Whether internal columns are shown in table metadata or not. Default is no.
*/
private boolean hideInternalColumns = true;
/**
* Maximum number of records per poll()
*/
private int maxPollRecords = 500;
/**
* Maximum number of bytes from one partition per poll()
*/
private int maxPartitionFetchBytes = 1024 * 1024;
/**
* The table description supplier to use, default is FILE
*/
private String tableDescriptionSupplier = FileTableDescriptionSupplier.NAME;
/**
* The kafka cluster metadata supplier to use, default is FILE
*/
private String clusterMetadataSupplier = FileKafkaClusterMetadataSupplier.NAME;
/**
* The resourceConfigFiles to use for additional properties, default is empty
*/
private List<File> resourceConfigFiles = ImmutableList.of();
@NotNull
public String getDefaultSchema()
{
return defaultSchema;
}
@Config("kafka.default-schema")
public KafkaConnectorConfig setDefaultSchema(String defaultSchema)
{
this.defaultSchema = defaultSchema;
return this;
}
@MinDuration("1s")
public Duration getKafkaConnectTimeout()
{
return kafkaConnectTimeout;
}
@Config("kafka.connect-timeout")
public KafkaConnectorConfig setKafkaConnectTimeout(String kafkaConnectTimeout)
{
this.kafkaConnectTimeout = Duration.valueOf(kafkaConnectTimeout);
return this;
}
public int getMaxPollRecords()
{
return maxPollRecords;
}
@Config("kafka.max-poll-records")
public KafkaConnectorConfig setMaxPollRecords(int maxPollRecords)
{
this.maxPollRecords = maxPollRecords;
return this;
}
public int getMaxPartitionFetchBytes()
{
return maxPartitionFetchBytes;
}
@Config("kafka.max-partition-fetch-bytes")
public KafkaConnectorConfig setMaxPartitionFetchBytes(int maxPartitionFetchBytes)
{
this.maxPartitionFetchBytes = maxPartitionFetchBytes;
return this;
}
@NotNull
public String getTableDescriptionSupplier()
{
return tableDescriptionSupplier;
}
@Config("kafka.table-description-supplier")
public KafkaConnectorConfig setTableDescriptionSupplier(String tableDescriptionSupplier)
{
this.tableDescriptionSupplier = tableDescriptionSupplier;
return this;
}
@NotNull
public String getClusterMetadataSupplier()
{
return clusterMetadataSupplier;
}
@Config("kafka.cluster-metadata-supplier")
public KafkaConnectorConfig setClusterMetadataSupplier(String clusterMetadataSupplier)
{
this.clusterMetadataSupplier = clusterMetadataSupplier;
return this;
}
public boolean isHideInternalColumns()
{
return hideInternalColumns;
}
@Config("kafka.hide-internal-columns")
public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns)
{
this.hideInternalColumns = hideInternalColumns;
return this;
}
@NotNull
public List<File> getResourceConfigFiles()
{
return resourceConfigFiles;
}
@Config("kafka.config.resources")
@ConfigDescription("Optional config files")
public KafkaConnectorConfig setResourceConfigFiles(String files)
{
this.resourceConfigFiles = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(files).stream()
.map(File::new)
.collect(toImmutableList());
return this;
}
}