PinotClusterInfoFetcher.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.pinot;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.StaticBodyGenerator;
import com.facebook.airlift.http.client.StringResponseHandler;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonCodecBinder;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.pinot.auth.PinotBrokerAuthenticationProvider;
import com.facebook.presto.pinot.auth.PinotControllerAuthenticationProvider;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.common.net.HttpHeaders;
import org.apache.pinot.spi.data.Schema;
import javax.inject.Inject;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.facebook.airlift.http.client.StringResponseHandler.createStringResponseHandler;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_HTTP_ERROR;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_INSTANCE;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNEXPECTED_RESPONSE;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.net.HttpHeaders.AUTHORIZATION;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.pinot.spi.utils.builder.TableNameBuilder.extractRawTableName;
public class PinotClusterInfoFetcher
{
private static final Logger log = Logger.get(PinotClusterInfoFetcher.class);
private static final String HTTPS_SCHEME = "https";
private static final String HTTP_SCHEME = "http";
private static final String APPLICATION_JSON = "application/json";
private static final Pattern BROKER_PATTERN = Pattern.compile("Broker_(.*)_(\\d+)");
private static final String GET_ALL_TABLES_API_TEMPLATE = "tables";
private static final String TABLE_INSTANCES_API_TEMPLATE = "tables/%s/instances";
private static final String TABLE_SCHEMA_API_TEMPLATE = "tables/%s/schema";
private static final String INSTANCE_API_TEMPLATE = "instances/%s";
private static final String ROUTING_TABLE_API_TEMPLATE = "debug/routingTable/%s";
private static final String TIME_BOUNDARY_API_TEMPLATE = "debug/timeBoundary/%s";
private static final String TIME_BOUNDARY_NOT_FOUND_ERROR_CODE = "404";
private final PinotConfig pinotConfig;
private final PinotMetrics pinotMetrics;
private final HttpClient httpClient;
private final Ticker ticker = Ticker.systemTicker();
private final LoadingCache<String, List<String>> brokersForTableCache;
// In order to get the grpc port, we need to query Pinot controller based on Pinot instance name.
// When doing segment level query, we issue one query for every split (which contains the Pinot server to query and the segments to query).
// For non-grpc query, the query port is part of instance id (in the format of `Server_$HOSTNAME_$PORT`) to be parsed.
// For grpc, we need to query Pinot controller for Pinot instance then extract the grpc port.
// In the worst case scenario, assume one Pinot table has 100 segments, then one presto query will be split into one segment per split,
// which means, we will issue 100 queries to Pinot servers and 100 calls to fetch grpc port, hence the cache of instances here.
// The first query will go to Pinot controller to fetch instance config then extract the grpc port. This info will be cached for 2 minutes(default).
private final LoadingCache<String, Instance> instanceConfigCache;
private final JsonCodec<GetTables> tablesJsonCodec;
private final JsonCodec<BrokersForTable> brokersForTableJsonCodec;
private final JsonCodec<RoutingTables> routingTablesJsonCodec;
private final JsonCodec<RoutingTablesV2> routingTablesV2JsonCodec;
private final JsonCodec<TimeBoundary> timeBoundaryJsonCodec;
private final JsonCodec<Instance> instanceJsonCodec;
private final PinotControllerAuthenticationProvider controllerAuthenticationProvider;
private final PinotBrokerAuthenticationProvider brokerAuthenticationProvider;
@Inject
public PinotClusterInfoFetcher(
PinotConfig pinotConfig,
PinotMetrics pinotMetrics,
PinotControllerAuthenticationProvider controllerAuthenticationProvider,
PinotBrokerAuthenticationProvider brokerAuthenticationProvider,
@ForPinot HttpClient httpClient,
JsonCodec<GetTables> tablesJsonCodec,
JsonCodec<BrokersForTable> brokersForTableJsonCodec,
JsonCodec<RoutingTables> routingTablesJsonCodec,
JsonCodec<RoutingTablesV2> routingTablesV2JsonCodec,
JsonCodec<TimeBoundary> timeBoundaryJsonCodec,
JsonCodec<Instance> instanceJsonCodec)
{
this.pinotConfig = requireNonNull(pinotConfig, "pinotConfig is null");
this.pinotMetrics = requireNonNull(pinotMetrics, "pinotMetrics is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.tablesJsonCodec = requireNonNull(tablesJsonCodec, "json codec is null");
this.brokersForTableJsonCodec = requireNonNull(brokersForTableJsonCodec, "brokers for table json codec is null");
this.routingTablesJsonCodec = requireNonNull(routingTablesJsonCodec, "routing tables json codec is null");
this.routingTablesV2JsonCodec = requireNonNull(routingTablesV2JsonCodec, "routing tables v2 json codec is null");
this.timeBoundaryJsonCodec = requireNonNull(timeBoundaryJsonCodec, "time boundary json codec is null");
this.instanceJsonCodec = requireNonNull(instanceJsonCodec, "instance json codec is null");
final long cacheExpiryMs = pinotConfig.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS);
this.brokersForTableCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpiryMs, TimeUnit.MILLISECONDS)
.build((CacheLoader.from(this::getAllBrokersForTable)));
this.instanceConfigCache = CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpiryMs, TimeUnit.MILLISECONDS)
.build((CacheLoader.from(this::getInstance)));
this.controllerAuthenticationProvider = controllerAuthenticationProvider;
this.brokerAuthenticationProvider = brokerAuthenticationProvider;
}
public static JsonCodecBinder addJsonBinders(JsonCodecBinder jsonCodecBinder)
{
jsonCodecBinder.bindJsonCodec(GetTables.class);
jsonCodecBinder.bindJsonCodec(BrokersForTable.InstancesInBroker.class);
jsonCodecBinder.bindJsonCodec(BrokersForTable.class);
jsonCodecBinder.bindJsonCodec(RoutingTables.class);
jsonCodecBinder.bindJsonCodec(RoutingTables.RoutingTableSnapshot.class);
jsonCodecBinder.bindJsonCodec(RoutingTablesV2.class);
jsonCodecBinder.bindJsonCodec(TimeBoundary.class);
jsonCodecBinder.bindJsonCodec(Instance.class);
return jsonCodecBinder;
}
public String doHttpActionWithHeaders(
Request.Builder requestBuilder,
Optional<String> requestBody,
Optional<String> rpcService)
{
requestBuilder = requestBuilder
.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON);
if (requestBody.isPresent()) {
requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
}
if (rpcService.isPresent()) {
requestBuilder
.setHeader(pinotConfig.getCallerHeaderParam(), pinotConfig.getCallerHeaderValue())
.setHeader(pinotConfig.getServiceHeaderParam(), rpcService.get());
}
if (requestBody.isPresent()) {
requestBuilder.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(requestBody.get(), StandardCharsets.UTF_8));
}
pinotConfig.getExtraHttpHeaders().forEach(requestBuilder::setHeader);
Request request = requestBuilder.build();
long startTime = ticker.read();
long duration;
StringResponseHandler.StringResponse response;
try {
response = httpClient.execute(request, createStringResponseHandler());
}
finally {
duration = ticker.read() - startTime;
}
pinotMetrics.monitorRequest(request, response, duration, TimeUnit.NANOSECONDS);
String responseBody = response.getBody();
if (PinotUtils.isValidPinotHttpResponseCode(response.getStatusCode())) {
return responseBody;
}
else {
throw new PinotException(
PINOT_HTTP_ERROR,
Optional.empty(),
String.format(
"Unexpected response status: %d for request %s to url %s, with headers %s, full response %s",
response.getStatusCode(),
requestBody.orElse(""),
request.getUri(),
request.getHeaders(),
responseBody));
}
}
private String sendHttpGetToController(String path)
{
final URI controllerPathUri = HttpUriBuilder
.uriBuilder()
.scheme(pinotConfig.isUseSecureConnection() ? HTTPS_SCHEME : HTTP_SCHEME)
.hostAndPort(HostAndPort.fromString(pinotConfig.getControllerUrl()))
.appendPath(path)
.build();
Request.Builder builder = Request.builder().prepareGet().setUri(controllerPathUri);
controllerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> builder.setHeader(AUTHORIZATION, token));
return doHttpActionWithHeaders(
builder,
Optional.empty(),
Optional.ofNullable(pinotConfig.getControllerRestService()));
}
private String sendHttpGetToBroker(String table, String path)
{
final String hostPort = pinotConfig.isUseProxy() ? pinotConfig.getControllerUrl() : getBrokerHost(table);
final URI brokerPathUri = HttpUriBuilder
.uriBuilder()
.scheme(pinotConfig.isUseSecureConnection() ? HTTPS_SCHEME : HTTP_SCHEME)
.hostAndPort(HostAndPort.fromString(hostPort))
.appendPath(path)
.build();
Request.Builder builder = Request.builder().prepareGet().setUri(brokerPathUri);
brokerAuthenticationProvider.getAuthenticationToken().ifPresent(token -> builder.setHeader(AUTHORIZATION, token));
return doHttpActionWithHeaders(
builder,
Optional.empty(),
Optional.empty());
}
public static class GetTables
{
private final List<String> tables;
@JsonCreator
public GetTables(@JsonProperty("tables") List<String> tables)
{
this.tables = tables;
}
public List<String> getTables()
{
return tables;
}
}
public List<String> getAllTables()
{
return tablesJsonCodec.fromJson(sendHttpGetToController(GET_ALL_TABLES_API_TEMPLATE)).getTables();
}
public Schema getTableSchema(String table)
throws Exception
{
String responseBody = sendHttpGetToController(String.format(TABLE_SCHEMA_API_TEMPLATE, table));
return Schema.fromString(responseBody);
}
public static class BrokersForTable
{
public static class InstancesInBroker
{
private final List<String> instances;
@JsonCreator
public InstancesInBroker(@JsonProperty("instances") List<String> instances)
{
this.instances = instances;
}
@JsonProperty("instances")
public List<String> getInstances()
{
return instances;
}
}
private final List<InstancesInBroker> brokers;
@JsonCreator
public BrokersForTable(@JsonProperty("brokers") List<InstancesInBroker> brokers)
{
this.brokers = brokers;
}
@JsonProperty("brokers")
public List<InstancesInBroker> getBrokers()
{
return brokers;
}
}
@VisibleForTesting
List<String> getAllBrokersForTable(String table)
{
String responseBody = sendHttpGetToController(String.format(TABLE_INSTANCES_API_TEMPLATE, table));
ArrayList<String> brokers = brokersForTableJsonCodec
.fromJson(responseBody)
.getBrokers()
.stream()
.flatMap(broker -> broker.getInstances().stream())
.distinct()
.map(brokerToParse -> {
Matcher matcher = BROKER_PATTERN.matcher(brokerToParse);
if (matcher.matches() && matcher.groupCount() == 2) {
return matcher.group(1) + ":" + matcher.group(2);
}
else {
throw new PinotException(
PINOT_UNABLE_TO_FIND_BROKER,
Optional.empty(),
String.format("Cannot parse %s in the broker instance", brokerToParse));
}
})
.collect(Collectors.toCollection(() -> new ArrayList<>()));
Collections.shuffle(brokers);
return ImmutableList.copyOf(brokers);
}
public String getBrokerHost(String table)
{
try {
List<String> brokers = brokersForTableCache.get(table);
if (brokers.isEmpty()) {
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "No valid brokers found for " + table);
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
catch (ExecutionException e) {
Throwable throwable = e.getCause();
if (throwable instanceof PinotException) {
throw (PinotException) throwable;
}
else {
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "Error when getting brokers for table " + table, throwable);
}
}
}
public static class RoutingTables
{
public static class RoutingTableSnapshot
{
private final String tableName;
private final List<Map<String, List<String>>> routingTableEntries;
@JsonCreator
public RoutingTableSnapshot(
@JsonProperty("tableName") String tableName,
@JsonProperty("routingTableEntries") List<Map<String, List<String>>> routingTableEntries)
{
this.tableName = requireNonNull(tableName, "table name is null");
this.routingTableEntries = requireNonNull(routingTableEntries, "routing table entries is null");
}
@JsonProperty("tableName")
public String getTableName()
{
return tableName;
}
@JsonProperty("routingTableEntries")
public List<Map<String, List<String>>> getRoutingTableEntries()
{
return routingTableEntries;
}
}
private final List<RoutingTableSnapshot> routingTableSnapshot;
@JsonCreator
public RoutingTables(@JsonProperty("routingTableSnapshot") List<RoutingTableSnapshot> routingTableSnapshot)
{
this.routingTableSnapshot = routingTableSnapshot;
}
public List<RoutingTableSnapshot> getRoutingTableSnapshot()
{
return routingTableSnapshot;
}
}
/**
* RoutingTableV2 is a full snapshot of segments in one table replica. It maintains a mapping from Table name
* (e.g. myTable_OFFLINE/myTable_REALTIME) to a map from Pinot Server to List of segments in that server to query)
*/
public static class RoutingTablesV2
{
private final Map<String, Map<String, List<String>>> routingTable;
@JsonCreator
public RoutingTablesV2(Map<String, Map<String, List<String>>> routingTable)
{
this.routingTable = routingTable;
}
public Map<String, Map<String, List<String>>> getRoutingTable()
{
return routingTable;
}
}
public Map<String, Map<String, List<String>>> getRoutingTableForTable(String tableName)
{
log.debug("Trying to get routingTable for %s from broker", tableName);
String responseBody = sendHttpGetToBroker(tableName, String.format(ROUTING_TABLE_API_TEMPLATE, tableName));
// New Pinot Broker API (>=0.3.0) directly return a valid routing table.
// Will always check with new API response format first, then fail over to old routing table format.
try {
return routingTablesV2JsonCodec.fromJson(responseBody).getRoutingTable();
}
catch (Exception e) {
return getRoutingTableV1(tableName, responseBody);
}
}
private Map<String, Map<String, List<String>>> getRoutingTableV1(String tableName, String responseBody)
{
ImmutableMap.Builder<String, Map<String, List<String>>> routingTableMap = ImmutableMap.builder();
routingTablesJsonCodec.fromJson(responseBody).getRoutingTableSnapshot().forEach(snapshot -> {
String tableNameWithType = snapshot.getTableName();
// Response could contain info for tableName that matches the original table by prefix.
// e.g. when table name is "table1", response could contain routingTable for "table1_staging"
if (!tableName.equals(extractRawTableName(tableNameWithType))) {
log.debug("Ignoring routingTable for %s", tableNameWithType);
}
else {
List<Map<String, List<String>>> routingTableEntriesList = snapshot.getRoutingTableEntries();
if (routingTableEntriesList.isEmpty()) {
throw new PinotException(
PINOT_UNEXPECTED_RESPONSE,
Optional.empty(),
String.format("Empty routingTableEntries for %s. RoutingTable: %s", tableName, responseBody));
}
// We are given multiple routing tables for a table, each with different segment to host assignments
// We pick one randomly, so that a retry may hit a different server
Map<String, List<String>> routingTableEntries = routingTableEntriesList.get(new Random().nextInt(routingTableEntriesList.size()));
ImmutableMap.Builder<String, List<String>> routingTableBuilder = ImmutableMap.builder();
routingTableEntries.forEach((host, segments) -> {
List<String> segmentsCopied = new ArrayList<>(segments);
Collections.shuffle(segmentsCopied);
routingTableBuilder.put(host, ImmutableList.copyOf(segmentsCopied));
});
routingTableMap.put(tableNameWithType, routingTableBuilder.build());
}
});
return routingTableMap.build();
}
@Override
public String toString()
{
return toStringHelper(this)
.add("pinotConfig", pinotConfig)
.toString();
}
public static class TimeBoundary
{
private final Optional<String> onlineTimePredicate;
private final Optional<String> offlineTimePredicate;
public TimeBoundary()
{
this(null, null);
}
@JsonCreator
public TimeBoundary(
@JsonProperty String timeColumn,
@JsonProperty String timeValue)
{
if (timeColumn != null && timeValue != null) {
offlineTimePredicate = Optional.of(format("%s < '%s'", timeColumn, timeValue));
onlineTimePredicate = Optional.of(format("%s >= '%s'", timeColumn, timeValue));
}
else {
onlineTimePredicate = Optional.empty();
offlineTimePredicate = Optional.empty();
}
}
public Optional<String> getOnlineTimePredicate()
{
return onlineTimePredicate;
}
public Optional<String> getOfflineTimePredicate()
{
return offlineTimePredicate;
}
}
public TimeBoundary getTimeBoundaryForTable(String table)
{
try {
String responseBody = sendHttpGetToBroker(table, String.format(TIME_BOUNDARY_API_TEMPLATE, table));
return timeBoundaryJsonCodec.fromJson(responseBody);
}
catch (Exception e) {
if ((e instanceof PinotException)) {
/** New Pinot broker will set response code 404 if time boundary is not set.
* This is backward incompatible with old version, as it returns an empty json object.
* In order to gracefully handle this, below check will extract response code and return empty json
* if not found time boundary.
*
* Sample error message:
* Unexpected response status: 404 for request to url http://127.0.0.1:8000/debug/timeBoundary/baseballStats, with headers ...
*/
String[] errorMessageSplits = e.getMessage().split(" ");
if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase(TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) {
return timeBoundaryJsonCodec.fromJson("{}");
}
}
throw e;
}
}
public static class Instance
{
private final String instanceName;
private final String hostName;
private final boolean enabled;
private final int port;
private final int grpcPort;
private final List<String> tags;
private final List<String> pools;
@JsonCreator
public Instance(
@JsonProperty String instanceName,
@JsonProperty String hostName,
@JsonProperty boolean enabled,
@JsonProperty int port,
@JsonProperty int grpcPort,
@JsonProperty List<String> tags,
@JsonProperty List<String> pools)
{
this.instanceName = instanceName;
this.hostName = hostName;
this.enabled = enabled;
this.port = port;
this.grpcPort = grpcPort;
this.tags = tags;
this.pools = pools;
}
@JsonProperty
public String getInstanceName()
{
return instanceName;
}
@JsonProperty
public String getHostName()
{
return hostName;
}
@JsonProperty
public boolean isEnabled()
{
return enabled;
}
@JsonProperty
public int getPort()
{
return port;
}
@JsonProperty
public int getGrpcPort()
{
return grpcPort;
}
@JsonProperty
public List<String> getTags()
{
return tags;
}
@JsonProperty
public List<String> getPools()
{
return pools;
}
}
public Instance getInstance(String instanceName)
{
try {
String responseBody = sendHttpGetToController(String.format(INSTANCE_API_TEMPLATE, instanceName));
return instanceJsonCodec.fromJson(responseBody);
}
catch (Exception throwable) {
throw new PinotException(PINOT_UNABLE_TO_FIND_INSTANCE, Optional.empty(), "Error when fetching instance configs for " + instanceName, throwable);
}
}
// Fetch grpc port from Pinot instance config.
public int getGrpcPort(String serverInstance)
{
try {
return instanceConfigCache.get(serverInstance).getGrpcPort();
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof PinotException) {
throw (PinotException) cause;
}
throw new PinotException(
PINOT_UNABLE_TO_FIND_INSTANCE,
Optional.empty(),
"Error when getting instance config for " + serverInstance,
cause);
}
}
}