DynamicBucketNodeMap.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.scheduler.group;
import com.facebook.presto.execution.scheduler.BucketNodeMap;
import com.facebook.presto.execution.scheduler.InternalNodeInfo;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.Split;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
public class DynamicBucketNodeMap
extends BucketNodeMap
{
private final int bucketCount;
private final Int2ObjectMap<InternalNodeInfo> bucketToNodeInfo = new Int2ObjectOpenHashMap<>();
private final boolean hasInitialMap;
public DynamicBucketNodeMap(ToIntFunction<Split> splitToBucket, int bucketCount)
{
super(splitToBucket);
checkArgument(bucketCount > 0, "bucketCount must be positive");
this.bucketCount = bucketCount;
hasInitialMap = false;
}
public DynamicBucketNodeMap(ToIntFunction<Split> splitToBucket, int bucketCount, List<InternalNode> bucketToPreferredNode)
{
super(splitToBucket);
checkArgument(bucketCount > 0, "bucketCount must be positive");
checkArgument(bucketToPreferredNode.size() == bucketCount, "bucketToPreferredNode size must be equal to bucketCount");
for (int bucketNumber = 0; bucketNumber < bucketCount; bucketNumber++) {
bucketToNodeInfo.put(bucketNumber, new InternalNodeInfo(bucketToPreferredNode.get(bucketNumber), true));
}
this.bucketCount = bucketCount;
this.hasInitialMap = true;
}
@Override
public Optional<InternalNode> getAssignedNode(int bucketedId)
{
if (!bucketToNodeInfo.containsKey(bucketedId)) {
return Optional.empty();
}
return Optional.of(bucketToNodeInfo.get(bucketedId).getInternalNode());
}
@Override
public boolean isBucketCacheable(int bucketedId)
{
if (!bucketToNodeInfo.containsKey(bucketedId)) {
return false;
}
return bucketToNodeInfo.get(bucketedId).isCacheable();
}
@Override
public int getBucketCount()
{
return bucketCount;
}
@Override
public void assignOrUpdateBucketToNode(int bucketedId, InternalNode node, boolean cacheable)
{
checkArgument(bucketedId >= 0 && bucketedId < bucketCount);
requireNonNull(node, "node is null");
bucketToNodeInfo.put(bucketedId, new InternalNodeInfo(node, cacheable));
}
@Override
public boolean isDynamic()
{
return true;
}
@Override
public boolean hasInitialMap()
{
return hasInitialMap;
}
@Override
public Optional<List<InternalNode>> getBucketToNode()
{
if (bucketToNodeInfo.isEmpty()) {
return Optional.empty();
}
return Optional.of(bucketToNodeInfo.values().stream().map(InternalNodeInfo::getInternalNode).collect(Collectors.toList()));
}
}