DruidSegmentInfo.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.druid.metadata;

import com.facebook.presto.spi.PrestoException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static com.facebook.presto.druid.DruidErrorCode.DRUID_METADATA_ERROR;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class DruidSegmentInfo
{
    private static final String DEEP_STORAGE_TYPE_KEY = "type";
    private static final String DEEP_STORAGE_S3_SCHEMA_KEY = "S3Schema";
    private static final String DEEP_STORAGE_BUCKET_KEY = "bucket";
    private static final String DEEP_STORAGE_PATH_KEY = "path";
    private static final String S3A_SCHEMA = "s3a";
    private static final String S3N_SCHEMA = "s3n";

    private final String dataSource;
    private final String version;
    private final Optional<Map<String, String>> loadSpecification;
    private final Optional<Map<String, Object>> shardSpecification;
    private final Integer binaryVersion;
    private final long size;

    public enum DeepStorageType
    {
        HDFS("hdfs"),
        S3("s3_zip"),
        GCS("google"),
        LOCAL("local");

        private final String type;

        DeepStorageType(String type)
        {
            this.type = type;
        }

        static DeepStorageType fromType(String type)
        {
            for (DeepStorageType deepStorageType : DeepStorageType.values()) {
                if (deepStorageType.type.equalsIgnoreCase(type)) {
                    return deepStorageType;
                }
            }
            throw new IllegalArgumentException("Unknown deep storage type: " + type);
        }
    }

    @JsonCreator
    public DruidSegmentInfo(
            @JsonProperty("dataSource") String dataSource,
            @JsonProperty("version") String version,
            @JsonProperty("loadSpec") Optional<Map<String, String>> loadSpecification,
            @JsonProperty("shardSpec") @Nullable Optional<Map<String, Object>> shardSpecification,
            @JsonProperty("binaryVersion") Integer binaryVersion,
            @JsonProperty("size") long size)
    {
        this.dataSource = requireNonNull(dataSource, "dataSource is null");
        this.version = requireNonNull(version, "version is null");
        this.loadSpecification = requireNonNull(loadSpecification, "loadSpecification is null");
        this.shardSpecification = requireNonNull(shardSpecification, "shardSpecification is null");
        this.binaryVersion = requireNonNull(binaryVersion, "binaryVersion is null");
        this.size = size;
    }

    @JsonProperty
    public String getDataSource()
    {
        return dataSource;
    }

    @JsonProperty
    public String getVersion()
    {
        return version;
    }

    @JsonProperty
    public Optional<Map<String, String>> getLoadSpecification()
    {
        return loadSpecification;
    }

    @JsonProperty
    public Optional<Map<String, Object>> getShardSpecification()
    {
        return shardSpecification;
    }

    @JsonProperty
    public Integer getBinaryVersion()
    {
        return binaryVersion;
    }

    @JsonProperty
    public long getSize()
    {
        return size;
    }

    public DeepStorageType getDeepStorageType()
    {
        Map<String, String> loadSpecification = getLoadSpecification()
                .orElseThrow(() -> new PrestoException(DRUID_METADATA_ERROR, format("Malformed segment loadSpecification: %s", getLoadSpecification())));
        return DeepStorageType.fromType(loadSpecification.get(DEEP_STORAGE_TYPE_KEY));
    }

    public URI getDeepStoragePath()
    {
        final Map<String, String> loadSpec = getLoadSpecification()
                .orElseThrow(() -> new PrestoException(DRUID_METADATA_ERROR, format("Malformed segment loadSpecification: %s", getLoadSpecification())));
        final String type = loadSpec.get(DEEP_STORAGE_TYPE_KEY);
        final URI segmentLocURI;
        try {
            switch (DeepStorageType.fromType(type)) {
                case S3:
                    final String s3schema = S3A_SCHEMA.equals(loadSpec.get(DEEP_STORAGE_S3_SCHEMA_KEY)) ? S3A_SCHEMA : S3N_SCHEMA;
                    segmentLocURI = URI.create(format("%s://%s/%s", s3schema, loadSpec.get(DEEP_STORAGE_BUCKET_KEY), loadSpec.get("key")));
                    break;
                case HDFS:
                    segmentLocURI = URI.create(loadSpec.get(DEEP_STORAGE_PATH_KEY));
                    break;
                case GCS:
                    segmentLocURI = URI.create(
                            format("gs://%s/%s",
                                loadSpec.get(DEEP_STORAGE_BUCKET_KEY),
                                URLEncoder.encode(loadSpec.get(DEEP_STORAGE_PATH_KEY), "UTF-8")));
                    break;
                case LOCAL:
                    segmentLocURI = new URI("file", null, loadSpec.get(DEEP_STORAGE_PATH_KEY), null, null);
                    break;
                default:
                    throw new PrestoException(DRUID_METADATA_ERROR, format("Unsupported segment filesystem: %s", type));
            }
        }
        catch (URISyntaxException | UnsupportedEncodingException e) {
            throw new PrestoException(DRUID_METADATA_ERROR, e);
        }
        return segmentLocURI;
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(dataSource, version, loadSpecification, shardSpecification, binaryVersion, size);
    }

    @Override
    public boolean equals(Object obj)
    {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }

        DruidSegmentInfo that = (DruidSegmentInfo) obj;
        return Objects.equals(this.dataSource, that.dataSource) &&
                Objects.equals(this.version, that.version) &&
                Objects.equals(this.loadSpecification, that.loadSpecification) &&
                Objects.equals(this.shardSpecification, that.shardSpecification) &&
                Objects.equals(this.binaryVersion, that.binaryVersion) &&
                Objects.equals(this.size, that.size);
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("dataSource", dataSource)
                .add("version", version)
                .add("loadSpecification", loadSpecification)
                .add("shardSpecification", shardSpecification)
                .add("binaryVersion", binaryVersion)
                .add("size", size)
                .toString();
    }
}