DruidPageWriter.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.druid.ingestion;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.druid.DruidConfig;
import com.facebook.presto.druid.metadata.DruidColumnInfo;
import com.facebook.presto.druid.metadata.DruidColumnType;
import com.facebook.presto.spi.PrestoException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.inject.Inject;

import java.io.IOException;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.druid.DruidErrorCode.DRUID_DEEP_STORAGE_ERROR;
import static java.util.Objects.requireNonNull;

public class DruidPageWriter
{
    public static final JsonFactory JSON_FACTORY = new JsonFactory();
    public static final String DATA_FILE_EXTENSION = ".json.gz";

    private final Configuration hadoopConfiguration;
    private final DruidConfig druidConfig;

    @Inject
    public DruidPageWriter(DruidConfig druidConfig)
    {
        this.druidConfig = requireNonNull(druidConfig, "druidConfig is null");
        this.hadoopConfiguration = druidConfig.readHadoopConfiguration();
    }

    public Path append(Page page, DruidIngestionTableHandle tableHandle, Path dataPath)
    {
        Path dataFile = new Path(dataPath, UUID.randomUUID() + DATA_FILE_EXTENSION);
        try {
            FileSystem fileSystem = dataFile.getFileSystem(hadoopConfiguration);
            try (FSDataOutputStream outputStream = fileSystem.create(dataFile);
                    GZIPOutputStream zipOutputStream = new GZIPOutputStream(outputStream);
                    JsonGenerator jsonGen = JSON_FACTORY.createGenerator(zipOutputStream)) {
                for (int position = 0; position < page.getPositionCount(); position++) {
                    jsonGen.writeStartObject();
                    for (int channel = 0; channel < page.getChannelCount(); channel++) {
                        DruidColumnInfo column = tableHandle.getColumns().get(channel);
                        Block block = page.getBlock(channel);
                        jsonGen.writeFieldName(column.getColumnName());
                        writeFieldValue(jsonGen, column.getDataType(), block, position);
                    }
                    jsonGen.writeEndObject();
                }
            }
            return dataFile;
        }
        catch (IOException e) {
            throw new PrestoException(DRUID_DEEP_STORAGE_ERROR, "Ingestion failed on " + tableHandle.getTableName(), e);
        }
    }

    private void writeFieldValue(JsonGenerator jsonGen, DruidColumnType dataType, Block block, int position)
            throws IOException
    {
        switch (dataType) {
            case VARCHAR:
            case OTHER:
                //hyperUnique, approxHistogram Druid column types
                jsonGen.writeString(VARCHAR.getSlice(block, position).toStringUtf8());
                return;
            case BIGINT:
            case TIMESTAMP:
                jsonGen.writeNumber(BIGINT.getLong(block, position));
                return;
            case FLOAT:
            case DOUBLE:
                jsonGen.writeNumber(DOUBLE.getDouble(block, position));
                return;
            default:
                throw new IllegalArgumentException("unsupported type: " + dataType);
        }
    }
}