SpillableGroupedTopNBuilder.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.spiller.Spiller;
import com.facebook.presto.spiller.SpillerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.openjdk.jol.info.ClassLayout;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import static com.facebook.presto.operator.Operator.NOT_BLOCKED;
import static com.facebook.presto.operator.SpillingUtils.checkSpillSucceeded;
import static com.facebook.presto.operator.WorkProcessor.TransformationState.blocked;
import static com.facebook.presto.operator.WorkProcessor.TransformationState.finished;
import static com.facebook.presto.operator.WorkProcessor.TransformationState.needsMoreData;
import static com.facebook.presto.operator.WorkProcessor.TransformationState.ofResult;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.Objects.requireNonNull;
public class SpillableGroupedTopNBuilder
implements GroupedTopNBuilder
{
private static final long INSTANCE_SIZE = ClassLayout.parseClass(SpillableGroupedTopNBuilder.class).instanceSize();
private final Supplier<InMemoryGroupedTopNBuilder> inputInMemoryGroupedTopNBuilderSupplier;
private final Supplier<InMemoryGroupedTopNBuilder> outputInMemoryGroupedTopNBuilderSupplier;
private final Supplier<ListenableFuture<?>> memoryWaitingFutureSupplier;
private final SpillerFactory spillerFactory;
private final List<Type> sourceTypes;
private final List<Type> partitionTypes;
private final List<Integer> partitionChannels;
private InMemoryGroupedTopNBuilder inputInMemoryGroupedTopNBuilder;
private InMemoryGroupedTopNBuilder outputInMemoryGroupedTopNBuilder;
private final LocalMemoryContext localUserMemoryContext;
private final LocalMemoryContext localRevocableMemoryContext;
private final AggregatedMemoryContext aggregatedMemoryContextForMerge;
private final AggregatedMemoryContext aggregatedMemoryContextForSpill;
private final DriverYieldSignal driverYieldSignal;
private final SpillContext spillContext;
private final long unspillMemoryLimit;
private Optional<Spiller> spiller = Optional.empty();
private ListenableFuture<?> spillInProgress = immediateFuture(null);
public SpillableGroupedTopNBuilder(
List<Type> sourceTypes,
List<Type> partitionTypes,
List<Integer> partitionChannels,
Supplier<InMemoryGroupedTopNBuilder> inputInMemoryGroupedTopNBuilderSupplier,
Supplier<InMemoryGroupedTopNBuilder> outputInMemoryGroupedTopNBuilderSupplier,
Supplier<ListenableFuture<?>> memoryWaitingFutureSupplier,
long unspillMemoryLimit,
LocalMemoryContext localUserMemoryContext,
LocalMemoryContext localRevocableMemoryContext,
AggregatedMemoryContext aggregatedMemoryContextForMerge,
AggregatedMemoryContext aggregatedMemoryContextForSpill,
SpillContext spillContext,
DriverYieldSignal driverYieldSignal,
SpillerFactory spillerFactory)
{
this.inputInMemoryGroupedTopNBuilderSupplier = requireNonNull(inputInMemoryGroupedTopNBuilderSupplier, "inputInMemoryGroupedTopNBuilderSupplier cannot be null");
this.outputInMemoryGroupedTopNBuilderSupplier = requireNonNull(outputInMemoryGroupedTopNBuilderSupplier, "outputInMemoryGroupedTopNBuilderSupplier cannot be null");
this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory cannot be null");
this.sourceTypes = requireNonNull(sourceTypes, "sourceTypes cannot be null");
this.partitionTypes = requireNonNull(partitionTypes, "partitionTypes cannot be null");
this.partitionChannels = requireNonNull(partitionChannels, "partitionChannels cannot be null");
initializeInputInMemoryGroupedTopNBuilder();
this.localUserMemoryContext = requireNonNull(localUserMemoryContext, "localUserMemoryContext cannot be null");
this.localRevocableMemoryContext = requireNonNull(localRevocableMemoryContext, "localRevocableMemoryContext cannot be null");
this.aggregatedMemoryContextForMerge = requireNonNull(aggregatedMemoryContextForMerge, "aggregatedMemoryContextForMerge cannot be null");
this.aggregatedMemoryContextForSpill = requireNonNull(aggregatedMemoryContextForSpill, "aggregatedMemoryContextForSpill cannot be null");
this.driverYieldSignal = requireNonNull(driverYieldSignal, "driverYieldSignal cannot be null");
this.spillContext = requireNonNull(spillContext, "spillContext cannot be null");
this.unspillMemoryLimit = requireNonNull(unspillMemoryLimit, "unspillMemoryLimit cannot be null");
this.memoryWaitingFutureSupplier = memoryWaitingFutureSupplier;
}
public Work<?> processPage(Page page)
{
checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished");
return inputInMemoryGroupedTopNBuilder.processPage(page);
}
private boolean hasPreviousSpillCompletedSuccessfully()
{
if (spillInProgress.isDone()) {
// check for exception from previous spill for early failure
checkSpillSucceeded(spillInProgress);
return true;
}
return false;
}
@Override
public WorkProcessor<Page> buildResult()
{
// spill could be in progress.
checkSpillSucceeded(spillInProgress);
// Convert revocable memory to user memory as returned Iterator holds on to memory so we no longer can revoke.
if (!spiller.isPresent()) {
if (inputInMemoryGroupedTopNBuilder.isEmpty() || inputInMemoryGroupedTopNBuilder.migrateMemoryContext(localUserMemoryContext)) {
// we were able to successfully move to userMemory, so we can now safely return the result
return inputInMemoryGroupedTopNBuilder.buildResult();
}
}
// Spill the remaining collected input
// TODO: Possible Optimization here is to not spill the last remaining buffered input
// and instead do a memory+disk sort merge. SpillableHashAggregationBuilder does this
checkSpillSucceeded(spillToDisk());
verify(inputInMemoryGroupedTopNBuilder.isEmpty());
updateMemoryReservations();
// Collect all spill streams to merge-sort
List<WorkProcessor<Page>> sortedPageStreams = ImmutableList.<WorkProcessor<Page>>builder()
.addAll(spiller.get().getSpills().stream()
.map(WorkProcessor::fromIterator)
.collect(toImmutableList()))
.build();
// Sort-Merge the rows and produce group-by-group output
return getFinalResult(sortedPageStreams);
}
@Override
public GroupByHash getGroupByHash()
{
return inputInMemoryGroupedTopNBuilder.getGroupByHash();
}
@Override
public boolean isEmpty()
{
return inputInMemoryGroupedTopNBuilder.isEmpty() && outputInMemoryGroupedTopNBuilder.isEmpty();
}
@Override
public long getEstimatedSizeInBytes()
{
return INSTANCE_SIZE + inputInMemoryGroupedTopNBuilder.getEstimatedSizeInBytes();
}
@Override
public ListenableFuture<?> updateMemoryReservations()
{
ListenableFuture<?> inputBuilderFuture = inputInMemoryGroupedTopNBuilder.updateMemoryReservations();
ListenableFuture<?> outputBuilderFuture = null;
if (outputInMemoryGroupedTopNBuilder != null) {
outputBuilderFuture = outputInMemoryGroupedTopNBuilder.updateMemoryReservations();
}
if (!inputBuilderFuture.isDone()) {
return inputBuilderFuture;
}
if (outputBuilderFuture != null && !outputBuilderFuture.isDone()) {
return outputBuilderFuture;
}
return Futures.immediateFuture(null);
}
@Override
public void close()
{
try (Closer closer = Closer.create()) {
if (inputInMemoryGroupedTopNBuilder != null) {
closer.register(inputInMemoryGroupedTopNBuilder::close);
}
if (outputInMemoryGroupedTopNBuilder != null) {
closer.register(outputInMemoryGroupedTopNBuilder::close);
}
spiller.ifPresent(closer::register);
closer.register(() -> localUserMemoryContext.setBytes(0));
closer.register(() -> localRevocableMemoryContext.setBytes(0));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public ListenableFuture<?> startMemoryRevoke()
{
checkState(spillInProgress.isDone());
if (inputInMemoryGroupedTopNBuilder.isEmpty() || localRevocableMemoryContext.getBytes() == 0) {
// All revocable memory has been released in buildResult method.
// At this point, InMemoryGroupedTopNBuilder is no longer accepting any input so no point in spilling.
return NOT_BLOCKED;
}
spillToDisk();
return spillInProgress;
}
public void finishMemoryRevoke()
{
if (spiller.isPresent()) {
checkState(spillInProgress.isDone());
verify(inputInMemoryGroupedTopNBuilder.isEmpty());
spiller.get().commit();
}
updateMemoryReservations();
}
@VisibleForTesting
private WorkProcessor<Page> getFinalResult(List<WorkProcessor<Page>> sortedPageStreams)
{
MergeHashSort mergeHashSort = new MergeHashSort(aggregatedMemoryContextForMerge);
WorkProcessor<Page> mergedSortedPages = mergeHashSort.merge(
partitionTypes,
partitionChannels,
sourceTypes,
sortedPageStreams,
driverYieldSignal);
initializeOutputInMemoryGroupedTopNBuilder();
// Create final result by re-processing the sorted stream page-at-a-time through a GroupedTopNBuilder
return mergedSortedPages.flatTransform(new WorkProcessor.Transformation<Page, WorkProcessor<Page>>()
{
public WorkProcessor.TransformationState<WorkProcessor<Page>> process(Optional<Page> inputPageOptional)
{
boolean inputIsPresent = inputPageOptional.isPresent();
if (!inputIsPresent && outputInMemoryGroupedTopNBuilder.isEmpty()) {
// no more pages and builder is empty
return finished();
}
if (inputIsPresent) {
Page inputPage = inputPageOptional.get();
boolean done = outputInMemoryGroupedTopNBuilder.processPage(inputPage).process();
if (!done) {
return blocked(memoryWaitingFutureSupplier.get());
}
if (outputInMemoryGroupedTopNBuilder.getEstimatedSizeInBytes() < unspillMemoryLimit) {
return needsMoreData();
}
}
// We can produce output after every input page, because input pages do not have
// hash values that span multiple pages (guaranteed by MergeHashSort)
//
// iterator to extract existing context out of builder
WorkProcessor<Page> result = outputInMemoryGroupedTopNBuilder.buildResult();
// initialize new builder
initializeOutputInMemoryGroupedTopNBuilder();
return ofResult(result, inputIsPresent);
}
});
}
private ListenableFuture<?> spillToDisk()
{
if (!spiller.isPresent()) {
spiller = Optional.of(spillerFactory.create(
sourceTypes,
spillContext,
aggregatedMemoryContextForSpill));
}
// start spilling process with current content of the inMemoryGroupedTopNBuilder builder...
spillInProgress = spiller.get().spill(inputInMemoryGroupedTopNBuilder.buildHashSortedIntermediateResult());
// ... and immediately create new inMemoryGroupedTopNBuilder so effectively memory ownership
// over inMemoryGroupedTopNBuilder is transferred from this thread to a spilling thread
initializeInputInMemoryGroupedTopNBuilder();
return spillInProgress;
}
private void initializeInputInMemoryGroupedTopNBuilder()
{
if (inputInMemoryGroupedTopNBuilder != null) {
inputInMemoryGroupedTopNBuilder.close();
}
inputInMemoryGroupedTopNBuilder = inputInMemoryGroupedTopNBuilderSupplier.get();
}
private void initializeOutputInMemoryGroupedTopNBuilder()
{
if (outputInMemoryGroupedTopNBuilder != null) {
outputInMemoryGroupedTopNBuilder.close();
}
outputInMemoryGroupedTopNBuilder = outputInMemoryGroupedTopNBuilderSupplier.get();
}
@Override
public Iterator<Page> buildHashSortedIntermediateResult()
{
throw new UnsupportedOperationException("SpillableGroupedTopNBuilder does not support buildHashSortedIntermediateResult");
}
@VisibleForTesting
protected InMemoryGroupedTopNBuilder getInputInMemoryGroupedTopNBuilder()
{
return inputInMemoryGroupedTopNBuilder;
}
}