DruidIngestTask.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.druid.ingestion;
import com.facebook.airlift.json.JsonCodec;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.fs.Path;
import java.util.List;
public class DruidIngestTask
{
public static final String TASK_TYPE_INDEX_PARALLEL = "index_parallel";
public static final String INPUT_FORMAT_JSON = "json";
public static final String DEFAULT_INPUT_FILE_FILTER = "*.json.gz";
private final String type;
private final DruidIngestSpec spec;
private DruidIngestTask(String type, DruidIngestSpec spec)
{
this.type = type;
this.spec = spec;
}
public static class Builder
{
private String dataSource;
private String timestampColumn;
private List<DruidIngestDimension> dimensions;
private DruidIngestInputSource inputSource;
private boolean appendToExisting;
public Builder withDataSource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public Builder withTimestampColumn(String timestampColumn)
{
this.timestampColumn = timestampColumn;
return this;
}
public Builder withDimensions(List<DruidIngestDimension> dimensions)
{
this.dimensions = dimensions;
return this;
}
public Builder withInputSource(Path baseDir, List<String> dataFileList)
{
switch (baseDir.toUri().getScheme()) {
case "file":
inputSource = new DruidIngestLocalInput("local", baseDir.toString(), DEFAULT_INPUT_FILE_FILTER);
break;
case "hdfs":
inputSource = new DruidIngestHDFSInput("hdfs", dataFileList);
break;
default:
throw new IllegalArgumentException("Unsupported ingestion input source:" + baseDir.toUri().getScheme());
}
return this;
}
public Builder withAppendToExisting(boolean appendToExisting)
{
this.appendToExisting = appendToExisting;
return this;
}
public DruidIngestTask build()
{
DruidIngestDataSchema dataSchema = new DruidIngestDataSchema(
dataSource,
new DruidIngestTimestampSpec(timestampColumn),
new DruidIngestDimensionsSpec(dimensions));
DruidIngestIOConfig ioConfig = new DruidIngestIOConfig(
TASK_TYPE_INDEX_PARALLEL,
inputSource,
new DruidIngestInputFormat(INPUT_FORMAT_JSON),
appendToExisting);
DruidIngestSpec spec = new DruidIngestSpec(dataSchema, ioConfig);
return new DruidIngestTask(TASK_TYPE_INDEX_PARALLEL, spec);
}
}
@JsonProperty("type")
public String getType()
{
return type;
}
@JsonProperty("spec")
public DruidIngestSpec getSpec()
{
return spec;
}
public String toJson()
{
return JsonCodec.jsonCodec(DruidIngestTask.class).toJson(this);
}
public static class DruidIngestSpec
{
private final DruidIngestDataSchema dataSchema;
private final DruidIngestIOConfig ioConfig;
public DruidIngestSpec(DruidIngestDataSchema dataSchema, DruidIngestIOConfig ioConfig)
{
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
}
@JsonProperty("dataSchema")
public DruidIngestDataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty("ioConfig")
public DruidIngestIOConfig getIoConfig()
{
return ioConfig;
}
}
public static class DruidIngestDataSchema
{
private final String dataSource;
private final DruidIngestTimestampSpec timestampSpec;
private final DruidIngestDimensionsSpec dimensionsSpec;
public DruidIngestDataSchema(String dataSource, DruidIngestTimestampSpec timestampSpec, DruidIngestDimensionsSpec dimensionsSpec)
{
this.dataSource = dataSource;
this.timestampSpec = timestampSpec;
this.dimensionsSpec = dimensionsSpec;
}
@JsonProperty("dataSource")
public String getDataSource()
{
return dataSource;
}
@JsonProperty("timestampSpec")
public DruidIngestTimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty("dimensionsSpec")
public DruidIngestDimensionsSpec getDimensionsSpec()
{
return dimensionsSpec;
}
}
public static class DruidIngestTimestampSpec
{
private final String column;
public DruidIngestTimestampSpec(String column)
{
this.column = column;
}
@JsonProperty("column")
public String getColumn()
{
return column;
}
}
public static class DruidIngestDimensionsSpec
{
private final List<DruidIngestDimension> dimensions;
public DruidIngestDimensionsSpec(List<DruidIngestDimension> dimensions)
{
this.dimensions = dimensions;
}
@JsonProperty("dimensions")
public List<DruidIngestDimension> getDimensions()
{
return dimensions;
}
}
public static class DruidIngestDimension
{
private final String type;
private final String name;
public DruidIngestDimension(String type, String name)
{
this.type = type;
this.name = name;
}
@JsonProperty("type")
public String getType()
{
return type;
}
@JsonProperty("name")
public String getName()
{
return name;
}
}
public static class DruidIngestIOConfig
{
private final String type;
private final DruidIngestInputSource inputSource;
private final DruidIngestInputFormat inputFormat;
private final boolean appendToExisting;
public DruidIngestIOConfig(
String type,
DruidIngestInputSource inputSource,
DruidIngestInputFormat inputFormat,
boolean appendToExisting)
{
this.type = type;
this.inputSource = inputSource;
this.inputFormat = inputFormat;
this.appendToExisting = appendToExisting;
}
@JsonProperty("type")
public String getType()
{
return type;
}
@JsonProperty("inputSource")
public DruidIngestInputSource getInputSource()
{
return inputSource;
}
@JsonProperty("inputFormat")
public DruidIngestInputFormat getInputFormat()
{
return inputFormat;
}
@JsonProperty("appendToExisting")
public boolean isAppendToExisting()
{
return appendToExisting;
}
}
public interface DruidIngestInputSource
{
}
public static class DruidIngestLocalInput
implements DruidIngestInputSource
{
private final String type;
private final String baseDir;
private final String filter;
public DruidIngestLocalInput(String type, String baseDir, String filter)
{
this.type = type;
this.baseDir = baseDir;
this.filter = filter;
}
@JsonProperty("type")
public String getType()
{
return type;
}
@JsonProperty("baseDir")
public String getBaseDir()
{
return baseDir;
}
@JsonProperty("filter")
public String getFilter()
{
return filter;
}
}
public static class DruidIngestHDFSInput
implements DruidIngestInputSource
{
private final String type;
private final List<String> paths;
public DruidIngestHDFSInput(String type, List<String> paths)
{
this.type = type;
this.paths = paths;
}
@JsonProperty("type")
public String getType()
{
return type;
}
@JsonProperty("paths")
public List<String> getBaseDir()
{
return paths;
}
}
public static class DruidIngestInputFormat
{
private final String type;
public DruidIngestInputFormat(String type)
{
this.type = type;
}
@JsonProperty("type")
public String getType()
{
return type;
}
}
}