TempStorageSingleStreamSpillerFactory.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spiller;
import com.facebook.presto.CompressionCodec;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.SpillContext;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.spiller.SpillCipher;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.storage.TempStorageManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.Optional;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class TempStorageSingleStreamSpillerFactory
implements SingleStreamSpillerFactory
{
private final TempStorageManager tempStorageManager;
private final ListeningExecutorService executor;
private final PagesSerdeFactory serdeFactory;
private final SpillerStats spillerStats;
private final boolean spillEncryptionEnabled;
private final String tempStorageName;
@Inject
public TempStorageSingleStreamSpillerFactory(
TempStorageManager tempStorageManager,
BlockEncodingSerde blockEncodingSerde,
SpillerStats spillerStats,
FeaturesConfig featuresConfig,
NodeSpillConfig nodeSpillConfig)
{
this(
tempStorageManager,
listeningDecorator(newFixedThreadPool(
requireNonNull(featuresConfig, "featuresConfig is null").getSpillerThreads(),
daemonThreadsNamed("binary-spiller-%s"))),
blockEncodingSerde,
spillerStats,
requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").getSpillCompressionCodec(),
requireNonNull(nodeSpillConfig, "nodeSpillConfig is null").isSpillEncryptionEnabled(),
requireNonNull(featuresConfig, "featuresConfig is null").getSpillerTempStorage());
}
@VisibleForTesting
TempStorageSingleStreamSpillerFactory(
TempStorageManager tempStorageManager,
ListeningExecutorService executor,
BlockEncodingSerde blockEncodingSerde,
SpillerStats spillerStats,
CompressionCodec spillCompressionEnabled,
boolean spillEncryptionEnabled,
String tempStorageName)
{
this.tempStorageManager = requireNonNull(tempStorageManager, "tempStorageManager is null");
this.serdeFactory = new PagesSerdeFactory(requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"), spillCompressionEnabled);
this.executor = requireNonNull(executor, "executor is null");
this.spillerStats = requireNonNull(spillerStats, "spillerStats can not be null");
this.spillEncryptionEnabled = spillEncryptionEnabled;
this.tempStorageName = requireNonNull(tempStorageName, "tempStorageName is null");
}
@PreDestroy
public void destroy()
{
executor.shutdownNow();
}
@Override
public SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext)
{
Optional<SpillCipher> spillCipher = Optional.empty();
if (spillEncryptionEnabled) {
spillCipher = Optional.of(new AesSpillCipher());
}
PagesSerde serde = serdeFactory.createPagesSerdeForSpill(spillCipher);
return new TempStorageSingleStreamSpiller(
tempStorageManager.getTempStorage(tempStorageName),
serde,
executor,
spillerStats,
spillContext,
memoryContext,
spillCipher);
}
}