PinotBrokerPageSource.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.pinot;
import com.facebook.airlift.http.client.Request;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DateType;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.JsonType;
import com.facebook.presto.common.type.SmallintType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TinyintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.pinot.auth.PinotBrokerAuthenticationProvider;
import com.facebook.presto.pinot.query.PinotQueryGenerator;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.pinot.spi.utils.BytesUtils;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_REQUEST_GENERATOR_FAILURE;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNAUTHENTICATED_EXCEPTION;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNEXPECTED_RESPONSE;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static com.facebook.presto.pinot.PinotUtils.doWithRetries;
import static com.facebook.presto.pinot.PinotUtils.parseDouble;
import static com.facebook.presto.pinot.PinotUtils.parseTimestamp;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.net.HttpHeaders.AUTHORIZATION;
import static java.lang.Boolean.parseBoolean;
import static java.lang.Long.parseLong;
import static java.util.Objects.requireNonNull;
public class PinotBrokerPageSource
implements ConnectorPageSource
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final List<Class> SUPPORTED_PRESTO_COLUMN_TYPE_CLASSES = ImmutableList.of(
FixedWidthType.class,
VarcharType.class,
JsonType.class,
VarbinaryType.class);
private static final String REQUEST_PAYLOAD_KEY = "sql";
private static final String QUERY_URL_TEMPLATE = "%s://%s/query/sql";
private final PinotQueryGenerator.GeneratedPinotQuery brokerSql;
private final List<PinotColumnHandle> expectedHandles;
protected final PinotConfig pinotConfig;
protected final List<PinotColumnHandle> columnHandles;
protected final PinotClusterInfoFetcher clusterInfoFetcher;
protected final ConnectorSession session;
protected final ObjectMapper objectMapper;
protected final PinotBrokerAuthenticationProvider brokerAuthenticationProvider;
protected boolean finished;
protected long readTimeNanos;
protected long completedBytes;
public PinotBrokerPageSource(
PinotConfig pinotConfig,
ConnectorSession session,
PinotQueryGenerator.GeneratedPinotQuery brokerSql,
List<PinotColumnHandle> columnHandles,
List<PinotColumnHandle> expectedHandles,
PinotClusterInfoFetcher clusterInfoFetcher,
ObjectMapper objectMapper,
PinotBrokerAuthenticationProvider brokerAuthenticationProvider)
{
this.pinotConfig = requireNonNull(pinotConfig, "pinot config is null");
this.clusterInfoFetcher = requireNonNull(clusterInfoFetcher, "cluster info fetcher is null");
this.columnHandles = ImmutableList.copyOf(columnHandles);
this.session = requireNonNull(session, "session is null");
this.objectMapper = requireNonNull(objectMapper, "object mapper is null");
this.brokerAuthenticationProvider = brokerAuthenticationProvider;
this.expectedHandles = requireNonNull(expectedHandles, "expected handles is null");
this.brokerSql = requireNonNull(brokerSql, "broker is null");
}
protected void setValue(Type type, BlockBuilder blockBuilder, JsonNode value)
{
if (blockBuilder == null) {
return;
}
if (value == null || value.isNull()) {
blockBuilder.appendNull();
return;
}
if (type instanceof ArrayType) {
checkState(value.isArray());
BlockBuilder childBuilder = blockBuilder.beginBlockEntry();
ArrayNode arrayNode = (ArrayNode) value;
for (int i = 0; i < arrayNode.size(); i++) {
setValue(((ArrayType) type).getElementType(), childBuilder, asText(arrayNode.get(i)));
}
blockBuilder.closeEntry();
}
else {
setValue(type, blockBuilder, asText(value));
}
}
protected void setValue(Type type, BlockBuilder blockBuilder, String value)
{
if (blockBuilder == null) {
return;
}
if (value == null) {
blockBuilder.appendNull();
return;
}
if (!isTypeSupportInPinot(type)) {
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "type '" + type + "' not supported");
}
if (type instanceof FixedWidthType) {
completedBytes += ((FixedWidthType) type).getFixedSize();
if (type instanceof BigintType) {
type.writeLong(blockBuilder, parseDouble(value).longValue());
}
else if (type instanceof IntegerType) {
blockBuilder.writeInt(parseDouble(value).intValue());
}
else if (type instanceof TinyintType) {
blockBuilder.writeByte(parseDouble(value).byteValue());
}
else if (type instanceof SmallintType) {
blockBuilder.writeShort(parseDouble(value).shortValue());
}
else if (type instanceof BooleanType) {
type.writeBoolean(blockBuilder, parseBoolean(value));
}
else if (type instanceof DecimalType || type instanceof DoubleType) {
type.writeDouble(blockBuilder, parseDouble(value));
}
else if (type instanceof TimestampType) {
type.writeLong(blockBuilder, parseTimestamp(value));
}
else if (type instanceof DateType) {
type.writeLong(blockBuilder, parseLong(value));
}
else {
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "type '" + type + "' not supported");
}
}
else if (type instanceof VarbinaryType) {
// Pinot broker convert bytes to hex string, so we need to decode the hex string back to bytes.
type.writeSlice(blockBuilder, Slices.wrappedBuffer(BytesUtils.toBytes(value)));
}
else {
Slice slice = Slices.utf8Slice(value);
blockBuilder.writeBytes(slice, 0, slice.length()).closeEntry();
completedBytes += slice.length();
}
}
private boolean isTypeSupportInPinot(Type type)
{
for (Class clazz : SUPPORTED_PRESTO_COLUMN_TYPE_CLASSES) {
if (clazz.isInstance(type)) {
return true;
}
}
return false;
}
@Override
public long getCompletedBytes()
{
return completedBytes;
}
@Override
public long getCompletedPositions()
{
return 0; // not available
}
@Override
public long getReadTimeNanos()
{
return readTimeNanos;
}
@Override
public boolean isFinished()
{
return finished;
}
@Override
public Page getNextPage()
{
if (finished) {
return null;
}
long start = System.nanoTime();
try {
BlockAndTypeBuilder blockAndTypeBuilder = buildBlockAndTypeBuilder(columnHandles, brokerSql);
int counter = issueQueryAndPopulate(
brokerSql,
Collections.unmodifiableList(blockAndTypeBuilder.getColumnBlockBuilders()),
Collections.unmodifiableList(blockAndTypeBuilder.getColumnTypes()));
PageBuilder pageBuilder = blockAndTypeBuilder.getPageBuilder();
pageBuilder.declarePositions(counter);
Page page = pageBuilder.build();
// TODO: Implement chunking if the result set is ginormous
finished = true;
return page;
}
finally {
readTimeNanos += System.nanoTime() - start;
}
}
protected void setRows(String query, List<BlockBuilder> blockBuilders, List<Type> types, JsonNode rows)
{
for (int rowNumber = 0; rowNumber < rows.size(); ++rowNumber) {
JsonNode result = rows.get(rowNumber);
if (result == null || result.size() < blockBuilders.size()) {
throw new PinotException(
PINOT_UNEXPECTED_RESPONSE,
Optional.of(query),
String.format("Expected row of %d columns", blockBuilders.size()));
}
for (int columnNumber = 0; columnNumber < blockBuilders.size(); columnNumber++) {
setValue(types.get(columnNumber), blockBuilders.get(columnNumber), result.get(columnNumber));
}
}
}
protected static void handleCommonResponse(String pinotQuery, JsonNode jsonBody)
{
JsonNode numServersResponded = jsonBody.get("numServersResponded");
JsonNode numServersQueried = jsonBody.get("numServersQueried");
if (numServersQueried == null || numServersResponded == null || numServersQueried.asInt() > numServersResponded.asInt()) {
throw new PinotException(
PINOT_INSUFFICIENT_SERVER_RESPONSE,
Optional.of(pinotQuery),
String.format("Only %s out of %s servers responded for query %s", numServersResponded.asInt(), numServersQueried.asInt(), pinotQuery));
}
JsonNode exceptions = jsonBody.get("exceptions");
if (exceptions != null && exceptions.isArray() && exceptions.size() > 0) {
if (exceptions.get(0).get("errorCode").asInt() == 180) {
throw new PinotException(
PINOT_UNAUTHENTICATED_EXCEPTION,
Optional.empty(),
"Query authentication failed.");
}
// Pinot is known to return exceptions with benign errorcodes like 200
// so we treat any exception as an error
throw new PinotException(
PINOT_EXCEPTION,
Optional.of(pinotQuery),
String.format("Query %s encountered exception %s", pinotQuery, exceptions.get(0)));
}
}
protected static String asText(JsonNode node)
{
if (node.isArray()) {
String[] results = new String[node.size()];
for (int i = 0; i < node.size(); i++) {
results[i] = asText(node.get(i));
}
return Arrays.toString(results);
}
checkState(node.isValueNode());
return node.isNull() ? null : node.asText();
}
@Override
public long getSystemMemoryUsage()
{
return 0;
}
@Override
public void close()
{
finished = true;
}
protected int issueQueryAndPopulate(
PinotQueryGenerator.GeneratedPinotQuery pinotQuery,
List<BlockBuilder> blockBuilders,
List<Type> types)
{
return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), (retryNumber) -> {
String queryHost;
Optional<String> rpcService;
if (pinotConfig.isUseProxy()) {
queryHost = pinotConfig.getControllerUrl();
rpcService = Optional.ofNullable(pinotConfig.getRestProxyServiceForQuery());
}
else {
queryHost = clusterInfoFetcher.getBrokerHost(pinotQuery.getTable());
rpcService = Optional.empty();
}
Request.Builder builder = Request.Builder
.preparePost()
.setUri(URI.create(String.format(QUERY_URL_TEMPLATE, pinotConfig.isUseSecureConnection() ? "https" : "http", queryHost)));
brokerAuthenticationProvider.getAuthenticationToken(session).ifPresent(token -> builder.setHeader(AUTHORIZATION, token));
String body = clusterInfoFetcher.doHttpActionWithHeaders(builder, Optional.of(getRequestPayload(pinotQuery)), rpcService);
return populateFromQueryResults(pinotQuery, blockBuilders, types, body);
});
}
public static String getRequestPayload(PinotQueryGenerator.GeneratedPinotQuery pinotQuery)
{
ImmutableMap<String, String> pinotRequest = ImmutableMap.of(REQUEST_PAYLOAD_KEY, pinotQuery.getQuery());
try {
return OBJECT_MAPPER.writeValueAsString(pinotRequest);
}
catch (JsonProcessingException e) {
throw new PinotException(
PINOT_REQUEST_GENERATOR_FAILURE,
Optional.of(pinotQuery.getQuery()),
"Unable to Jsonify request: " + Arrays.toString(pinotRequest.entrySet().toArray()),
e);
}
}
// Set Pinot Response from query response json string.
@VisibleForTesting
public int populateFromQueryResults(
PinotQueryGenerator.GeneratedPinotQuery pinotQuery,
List<BlockBuilder> blockBuilders,
List<Type> types,
String responseJsonString)
{
String sql = pinotQuery.getQuery();
JsonNode jsonBody;
try {
jsonBody = objectMapper.readTree(responseJsonString);
}
catch (IOException e) {
throw new PinotException(PINOT_UNEXPECTED_RESPONSE, Optional.of(sql), "Couldn't parse response", e);
}
handleCommonResponse(sql, jsonBody);
JsonNode resultTable = jsonBody.get("resultTable");
if (resultTable != null) {
JsonNode dataSchema = resultTable.get("dataSchema");
if (dataSchema == null) {
throw new PinotException(
PINOT_UNEXPECTED_RESPONSE,
Optional.of(sql),
"Expected data schema in the response");
}
JsonNode columnDataTypes = dataSchema.get("columnDataTypes");
JsonNode columnNames = dataSchema.get("columnNames");
if (columnDataTypes == null
|| !columnDataTypes.isArray()
|| columnDataTypes.size() < blockBuilders.size()) {
throw new PinotException(
PINOT_UNEXPECTED_RESPONSE,
Optional.of(sql),
String.format("ColumnDataTypes and results expected for %s, expected %d columnDataTypes but got %d", sql, blockBuilders.size(), columnDataTypes == null ? 0 : columnDataTypes.size()));
}
if (columnNames == null
|| !columnNames.isArray()
|| columnNames.size() < blockBuilders.size()) {
throw new PinotException(
PINOT_UNEXPECTED_RESPONSE,
Optional.of(sql),
String.format("ColumnNames and results expected for %s, expected %d columnNames but got %d", sql, blockBuilders.size(), columnNames == null ? 0 : columnNames.size()));
}
JsonNode rows = resultTable.get("rows");
setRows(sql, blockBuilders, types, rows);
return rows.size();
}
return 0;
}
// Build BlockAndTypeBuilder from different query syntax.
// E.g. SQL needs to handle the case that groupBy fields are always show up in front of selection list.
@VisibleForTesting
public BlockAndTypeBuilder buildBlockAndTypeBuilder(List<PinotColumnHandle> columnHandles,
PinotQueryGenerator.GeneratedPinotQuery brokerSql)
{
// When we created the SQL, we came up with some column handles
// however other optimizers post-pushdown can come in and prune/re-order the required column handles
// so we need to map from the column handles the SQL corresponds to, to the actual column handles
// needed in the scan.
List<Type> expectedTypes = columnHandles.stream()
.map(PinotColumnHandle::getDataType)
.collect(Collectors.toList());
PageBuilder pageBuilder = new PageBuilder(expectedTypes);
// The expectedColumnHandles are the handles corresponding to the generated SQL
// However, the engine could end up requesting only a permutation/subset of those handles
// during the actual scan
// Map the handles from planning time to the handles asked in the scan
// so that we know which columns to discard.
int[] handleMapping = new int[expectedHandles.size()];
for (int i = 0; i < handleMapping.length; ++i) {
handleMapping[i] = columnHandles.indexOf(expectedHandles.get(i));
}
ArrayList<BlockBuilder> columnBlockBuilders = new ArrayList<>();
ArrayList<Type> columnTypes = new ArrayList<>();
for (int expectedColumnIndex : brokerSql.getExpectedColumnIndices()) {
// columnIndex is the index of this column in the current scan
// It is obtained from the mapping and can be -ve, which means that the
// expectedColumnIndex'th column returned by Pinot can be discarded.
int columnIndex = -1;
if (expectedColumnIndex >= 0) {
columnIndex = handleMapping[expectedColumnIndex];
}
columnBlockBuilders.add(columnIndex >= 0 ? pageBuilder.getBlockBuilder(columnIndex) : null);
columnTypes.add(columnIndex >= 0 ? expectedTypes.get(columnIndex) : null);
}
return new BlockAndTypeBuilder(pageBuilder, columnBlockBuilders, columnTypes);
}
public static class BlockAndTypeBuilder
{
private final PageBuilder pageBuilder;
private final List<BlockBuilder> columnBlockBuilders;
private final List<Type> columnTypes;
public BlockAndTypeBuilder(PageBuilder pageBuilder, List<BlockBuilder> columnBlockBuilders, List<Type> columnTypes)
{
this.pageBuilder = pageBuilder;
this.columnBlockBuilders = columnBlockBuilders;
this.columnTypes = columnTypes;
}
public PageBuilder getPageBuilder()
{
return pageBuilder;
}
public List<BlockBuilder> getColumnBlockBuilders()
{
return columnBlockBuilders;
}
public List<Type> getColumnTypes()
{
return columnTypes;
}
}
}