DruidBrokerPageSource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.druid;

import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.RealType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.druid.DruidQueryGenerator.GeneratedDql;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.PrestoException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;

import static com.facebook.presto.druid.DruidErrorCode.DRUID_BROKER_RESULT_ERROR;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.Float.floatToRawIntBits;
import static java.util.Objects.requireNonNull;

public class DruidBrokerPageSource
        implements ConnectorPageSource
{
    private static final ObjectMapper OBJECT_MAPPER = new JsonObjectMapperProvider().get();

    private final List<ColumnHandle> columnHandles;

    private boolean finished;
    private long readTimeNanos;
    private long completedBytes;
    private long completedPositions;
    private BufferedReader responseStream;
    private final PageBuilder pageBuilder;
    private List<Type> columnTypes;

    public DruidBrokerPageSource(
            GeneratedDql brokerDql,
            List<ColumnHandle> columnHandles,
            DruidClient druidClient)
    {
        requireNonNull(brokerDql, "broker is null");
        this.columnHandles = ImmutableList.copyOf(requireNonNull(columnHandles, "columnHandles is null"));
        requireNonNull(druidClient, "druid client is null");
        this.responseStream = new BufferedReader(new InputStreamReader(druidClient.getData(brokerDql.getDql())));
        List<DruidColumnHandle> handles = columnHandles.stream()
                .map(column -> (DruidColumnHandle) column)
                .collect(toImmutableList());
        this.columnTypes = handles.stream()
                .map(DruidColumnHandle::getColumnType)
                .collect(toImmutableList());
        this.pageBuilder = new PageBuilder(this.columnTypes);
    }

    @Override
    public long getCompletedBytes()
    {
        return completedBytes;
    }

    @Override
    public long getCompletedPositions()
    {
        return completedPositions;
    }

    @Override
    public long getReadTimeNanos()
    {
        return readTimeNanos;
    }

    @Override
    public boolean isFinished()
    {
        return finished;
    }

    @Override
    public Page getNextPage()
    {
        if (finished) {
            return null;
        }

        long start = System.nanoTime();
        boolean columnHandlesHasErrorMessageField = columnHandles.stream().anyMatch(
                handle -> ((DruidColumnHandle) handle).getColumnName().equals("errorMessage"));
        try {
            String readLine;
            while ((readLine = responseStream.readLine()) != null) {
                // if read a blank line,it means read finish
                if (readLine.isEmpty()) {
                    finished = true;
                    break;
                }
                else {
                    JsonNode rootNode = OBJECT_MAPPER.readTree(readLine);
                    if (rootNode.has("errorMessage") && !columnHandlesHasErrorMessageField) {
                        throw new PrestoException(DRUID_BROKER_RESULT_ERROR, rootNode.findValue("errorMessage").asText());
                    }
                    for (int i = 0; i < columnHandles.size(); i++) {
                        Type type = columnTypes.get(i);
                        BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(i);
                        JsonNode value = rootNode.get(((DruidColumnHandle) columnHandles.get(i)).getColumnName());
                        if (value == null) {
                            blockBuilder.appendNull();
                            continue;
                        }
                        if (type instanceof BigintType) {
                            type.writeLong(blockBuilder, value.longValue());
                        }
                        else if (type instanceof DoubleType) {
                            type.writeDouble(blockBuilder, value.doubleValue());
                        }
                        else if (type instanceof RealType) {
                            type.writeLong(blockBuilder, floatToRawIntBits(value.floatValue()));
                        }
                        else if (type instanceof TimestampType) {
                            DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser()
                                    .withChronology(ISOChronology.getInstanceUTC())
                                    .withOffsetParsed();
                            DateTime dateTime = formatter.parseDateTime(value.textValue());
                            type.writeLong(blockBuilder, dateTime.getMillis());
                        }
                        else {
                            Slice slice = Slices.utf8Slice(value.textValue());
                            type.writeSlice(blockBuilder, slice);
                        }
                    }
                }
                pageBuilder.declarePosition();
                if (pageBuilder.isFull()) {
                    break;
                }
            }
            // if responseStream.readLine() is null, it means read finish
            if (readLine == null) {
                finished = true;
                return null;
            }

            // only return a page if the buffer is full or we are finishing
            if (pageBuilder.isEmpty() || (!finished && !pageBuilder.isFull())) {
                return null;
            }

            Page page = pageBuilder.build();
            completedPositions += page.getPositionCount();
            completedBytes += page.getSizeInBytes();
            pageBuilder.reset();
            return page;
        }
        catch (IOException e) {
            finished = true;
            throw new PrestoException(DRUID_BROKER_RESULT_ERROR, "Parse druid client response error", e);
        }
        finally {
            readTimeNanos += System.nanoTime() - start;
        }
    }

    @Override
    public long getSystemMemoryUsage()
    {
        return 0;
    }

    @Override
    public void close()
    {
        finished = true;
    }
}