PinotSplitManager.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.pinot;
import com.facebook.presto.pinot.query.PinotQueryGenerator;
import com.facebook.presto.pinot.query.PinotQueryGenerator.GeneratedPinotQuery;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.Iterables;
import org.apache.pinot.spi.config.table.TableType;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
import static com.facebook.presto.pinot.PinotSplit.createBrokerSplit;
import static com.facebook.presto.pinot.PinotSplit.createSegmentSplit;
import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.TABLE_NAME_SUFFIX_TEMPLATE;
import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.TIME_BOUNDARY_FILTER_TEMPLATE;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
public class PinotSplitManager
implements ConnectorSplitManager
{
private static final String REALTIME_SUFFIX = "_" + TableType.REALTIME;
private static final String OFFLINE_SUFFIX = "_" + TableType.OFFLINE;
private final String connectorId;
private final PinotConnection pinotPrestoConnection;
@Inject
public PinotSplitManager(ConnectorId connectorId, PinotConnection pinotPrestoConnection)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.pinotPrestoConnection = requireNonNull(pinotPrestoConnection, "pinotPrestoConnection is null");
}
protected ConnectorSplitSource generateSplitForBrokerBasedScan(PinotQueryGenerator.GeneratedPinotQuery brokerPinotQuery, List<PinotColumnHandle> expectedColumnHandles)
{
return new FixedSplitSource(singletonList(createBrokerSplit(connectorId, expectedColumnHandles, brokerPinotQuery)));
}
protected ConnectorSplitSource generateSplitsForSegmentBasedScan(
PinotTableLayoutHandle pinotLayoutHandle,
ConnectorSession session,
List<PinotColumnHandle> expectedColumnHandles)
{
PinotTableHandle tableHandle = pinotLayoutHandle.getTable();
String tableName = tableHandle.getTableName();
Map<String, Map<String, List<String>>> routingTable;
routingTable = pinotPrestoConnection.getRoutingTable(tableName);
List<ConnectorSplit> splits = new ArrayList<>();
if (!routingTable.isEmpty()) {
GeneratedPinotQuery segmentPinotQuery = tableHandle.getPinotQuery().orElseThrow(() -> new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected to find realtime and offline pinot query in " + tableHandle));
PinotClusterInfoFetcher.TimeBoundary timeBoundary = new PinotClusterInfoFetcher.TimeBoundary(null, null);
if (routingTable.containsKey(tableName + REALTIME_SUFFIX) && routingTable.containsKey(tableName + OFFLINE_SUFFIX)) {
timeBoundary = pinotPrestoConnection.getTimeBoundary(tableName);
}
String realtime = getSegmentPinotQuery(segmentPinotQuery, REALTIME_SUFFIX, timeBoundary.getOnlineTimePredicate());
String offline = getSegmentPinotQuery(segmentPinotQuery, OFFLINE_SUFFIX, timeBoundary.getOfflineTimePredicate());
generateSegmentSplits(splits, expectedColumnHandles, routingTable, tableName, "_REALTIME", session, realtime);
generateSegmentSplits(splits, expectedColumnHandles, routingTable, tableName, "_OFFLINE", session, offline);
}
Collections.shuffle(splits);
return new FixedSplitSource(splits);
}
private String getSegmentPinotQuery(GeneratedPinotQuery basePinotQuery, String suffix, Optional<String> timePredicate)
{
String pinotQuery = basePinotQuery.getQuery().replace(TABLE_NAME_SUFFIX_TEMPLATE, suffix);
if (timePredicate.isPresent()) {
String tp = timePredicate.get();
pinotQuery = pinotQuery.replace(TIME_BOUNDARY_FILTER_TEMPLATE, basePinotQuery.isHaveFilter() ? " AND " + tp : " WHERE " + tp);
}
else {
pinotQuery = pinotQuery.replace(TIME_BOUNDARY_FILTER_TEMPLATE, "");
}
return pinotQuery;
}
protected void generateSegmentSplits(
List<ConnectorSplit> splits,
List<PinotColumnHandle> expectedColumnHandles,
Map<String, Map<String, List<String>>> routingTable,
String tableName,
String tableNameSuffix,
ConnectorSession session,
String pinotQuery)
{
final String finalTableName = tableName + tableNameSuffix;
int segmentsPerSplitConfigured = PinotSessionProperties.getNumSegmentsPerSplit(session);
for (String routingTableName : routingTable.keySet()) {
if (!routingTableName.equalsIgnoreCase(finalTableName)) {
continue;
}
Map<String, List<String>> hostToSegmentsMap = routingTable.get(routingTableName);
hostToSegmentsMap.forEach((host, segments) -> {
int numSegmentsInThisSplit = Math.min(segments.size(), segmentsPerSplitConfigured);
// segments is already shuffled
Iterables.partition(segments, numSegmentsInThisSplit).forEach(
segmentsForThisSplit -> splits.add(
createSegmentSplit(connectorId, pinotQuery, expectedColumnHandles, segmentsForThisSplit, host, getGrpcPort(host))));
});
}
}
private int getGrpcPort(String host)
{
return pinotPrestoConnection.getGrpcPort(host);
}
public static class QueryNotAdequatelyPushedDownException
extends PinotException
{
private final String connectorId;
private final ConnectorTableHandle connectorTableHandle;
public QueryNotAdequatelyPushedDownException(PinotErrorCode errorCode, ConnectorTableHandle connectorTableHandle, String connectorId)
{
super(requireNonNull(errorCode, "error code is null"), Optional.empty(), "Query uses unsupported expressions that cannot be pushed into Pinot.");
this.connectorId = requireNonNull(connectorId, "connector id is null");
this.connectorTableHandle = requireNonNull(connectorTableHandle, "connector table handle is null");
}
@Override
public String getMessage()
{
return super.getMessage() + String.format(" table: %s:%s", connectorId, connectorTableHandle);
}
}
@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transactionHandle,
ConnectorSession session,
ConnectorTableLayoutHandle layout,
SplitSchedulingContext splitSchedulingContext)
{
PinotTableLayoutHandle pinotLayoutHandle = (PinotTableLayoutHandle) layout;
PinotTableHandle pinotTableHandle = pinotLayoutHandle.getTable();
Supplier<PrestoException> errorSupplier = () -> new QueryNotAdequatelyPushedDownException(PinotErrorCode.PINOT_PUSH_DOWN_QUERY_NOT_PRESENT, pinotTableHandle, connectorId);
if (!pinotTableHandle.getForBroker().orElseThrow(errorSupplier)) {
if (PinotSessionProperties.isForbidSegmentQueries(session)) {
throw errorSupplier.get();
}
return generateSplitsForSegmentBasedScan(pinotLayoutHandle, session, pinotTableHandle.getExpectedColumnHandles().orElseThrow(errorSupplier));
}
else {
return generateSplitForBrokerBasedScan(pinotTableHandle.getPinotQuery().orElseThrow(errorSupplier), pinotTableHandle.getExpectedColumnHandles().orElseThrow(errorSupplier));
}
}
}