TestDynamicLifespanScheduler.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.scheduler.group;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.scheduler.BucketNodeMap;
import com.facebook.presto.execution.scheduler.ScheduleResult;
import com.facebook.presto.execution.scheduler.SourceScheduler;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.stream.IntStream;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static org.testng.Assert.assertEquals;
public class TestDynamicLifespanScheduler
{
private static final int BUCKET_COUNT = 10;
private static final int TASK_COUNT = 2;
private static final InternalNode node1 = getInternalNode("1");
private static final InternalNode node2 = getInternalNode("2");
private static final InternalNode node3 = getInternalNode("3");
@Test
public void testSchedule()
{
LifespanScheduler lifespanScheduler = getLifespanScheduler();
TestingSourceScheduler sourceScheduler = new TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
while (!lifespanScheduler.allLifespanExecutionFinished()) {
lifespanScheduler.schedule(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
}
}
@Test
public void testRetry()
{
LifespanScheduler lifespanScheduler = getLifespanScheduler();
TestingSourceScheduler sourceScheduler = new TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(ImmutableList.of(sourceScheduler.getLastStartedLifespans().get(1)));
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
lifespanScheduler.onTaskFailed(0, ImmutableList.of(sourceScheduler));
assertEquals(sourceScheduler.getLastRewoundLifespans().size(), 1);
sourceScheduler.getLastRewoundLifespans().clear();
while (!lifespanScheduler.allLifespanExecutionFinished()) {
lifespanScheduler.schedule(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 1);
sourceScheduler.getLastStartedLifespans().clear();
}
}
@Test(timeOut = 10_000)
public void testRetryLastLifespan()
{
LifespanScheduler lifespanScheduler = getLifespanScheduler();
TestingSourceScheduler sourceScheduler = new TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
for (int i = 0; i < BUCKET_COUNT / TASK_COUNT - 1; i++) {
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
sourceScheduler.getLastStartedLifespans().clear();
lifespanScheduler.schedule(sourceScheduler);
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
}
lifespanScheduler.onLifespanExecutionFinished(ImmutableList.of(sourceScheduler.getLastStartedLifespans().get(1)));
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
lifespanScheduler.onTaskFailed(0, ImmutableList.of(sourceScheduler));
assertEquals(sourceScheduler.getLastRewoundLifespans().size(), 1);
sourceScheduler.getLastRewoundLifespans().clear();
lifespanScheduler.schedule(sourceScheduler);
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 1);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
}
@Test
public void testAffinitySchedule()
{
BucketNodeMap bucketNodeMap = new DynamicBucketNodeMap(
split -> ((TestDynamicLifespanScheduler.TestSplit) split.getConnectorSplit()).getBucketNumber(),
BUCKET_COUNT,
ImmutableList.of(node1, node2, node1, node2, node1, node2, node1, node2, node1, node2));
LifespanScheduler lifespanScheduler = getAffinityLifespanScheduler(bucketNodeMap);
TestDynamicLifespanScheduler.TestingSourceScheduler sourceScheduler = new TestDynamicLifespanScheduler.TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
while (!lifespanScheduler.allLifespanExecutionFinished()) {
lifespanScheduler.schedule(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
}
}
@Test
public void testAffinityRetry()
{
BucketNodeMap bucketNodeMap = new DynamicBucketNodeMap(
split -> ((TestDynamicLifespanScheduler.TestSplit) split.getConnectorSplit()).getBucketNumber(),
BUCKET_COUNT,
ImmutableList.of(node1, node2, node1, node2, node1, node2, node1, node2, node1, node2));
LifespanScheduler lifespanScheduler = getAffinityLifespanScheduler(bucketNodeMap);
TestDynamicLifespanScheduler.TestingSourceScheduler sourceScheduler = new TestDynamicLifespanScheduler.TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(ImmutableList.of(sourceScheduler.getLastStartedLifespans().get(1)));
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
lifespanScheduler.onTaskFailed(0, ImmutableList.of(sourceScheduler));
assertEquals(sourceScheduler.getLastRewoundLifespans().size(), 1);
sourceScheduler.getLastRewoundLifespans().clear();
while (!lifespanScheduler.allLifespanExecutionFinished()) {
lifespanScheduler.schedule(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 1);
sourceScheduler.getLastStartedLifespans().clear();
}
}
@Test
public void testAffinityScheduleLocality()
{
BucketNodeMap bucketNodeMap = new DynamicBucketNodeMap(
split -> ((TestDynamicLifespanScheduler.TestSplit) split.getConnectorSplit()).getBucketNumber(),
BUCKET_COUNT,
ImmutableList.of(node1, node3, node1, node3, node1, node3, node1, node3, node1, node3));
LifespanScheduler lifespanScheduler = getAffinityLifespanScheduler(bucketNodeMap);
TestDynamicLifespanScheduler.TestingSourceScheduler sourceScheduler = new TestDynamicLifespanScheduler.TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
assertEquals(bucketNodeMap.getAssignedNode(0).get(), node1);
// bucket 1 is already scheduled, thus its assignedNode is changed
assertEquals(bucketNodeMap.getAssignedNode(1).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(2).get(), node1);
// bucket 3 is not scheduled yet, thus its assignedNode remains
assertEquals(bucketNodeMap.getAssignedNode(3).get(), node3);
assertEquals(bucketNodeMap.getAssignedNode(4).get(), node1);
assertEquals(bucketNodeMap.getAssignedNode(5).get(), node3);
assertEquals(bucketNodeMap.getAssignedNode(6).get(), node1);
assertEquals(bucketNodeMap.getAssignedNode(7).get(), node3);
assertEquals(bucketNodeMap.getAssignedNode(8).get(), node1);
assertEquals(bucketNodeMap.getAssignedNode(9).get(), node3);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
while (!lifespanScheduler.allLifespanExecutionFinished()) {
lifespanScheduler.schedule(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
}
assertEquals(bucketNodeMap.getAssignedNode(0).get(), node1);
// bucket 1 is already scheduled, thus its assignedNode is changed
assertEquals(bucketNodeMap.getAssignedNode(1).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(2).get(), node1);
// bucket 3 is not scheduled yet, thus its assignedNode remains
assertEquals(bucketNodeMap.getAssignedNode(3).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(4).get(), node1);
assertEquals(bucketNodeMap.getAssignedNode(5).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(6).get(), node1);
assertEquals(bucketNodeMap.getAssignedNode(7).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(8).get(), node1);
assertEquals(bucketNodeMap.getAssignedNode(9).get(), node2);
}
@Test
public void testAffinityScheduleFailedLocality()
{
BucketNodeMap bucketNodeMap = new DynamicBucketNodeMap(
split -> ((TestDynamicLifespanScheduler.TestSplit) split.getConnectorSplit()).getBucketNumber(),
BUCKET_COUNT,
ImmutableList.of(node1, node2, node1, node2, node1, node2, node1, node2, node1, node2));
LifespanScheduler lifespanScheduler = getAffinityLifespanScheduler(bucketNodeMap);
TestDynamicLifespanScheduler.TestingSourceScheduler sourceScheduler = new TestDynamicLifespanScheduler.TestingSourceScheduler();
lifespanScheduler.scheduleInitial(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(ImmutableList.of(sourceScheduler.getLastStartedLifespans().get(1)));
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 2);
sourceScheduler.getLastStartedLifespans().clear();
lifespanScheduler.onTaskFailed(0, ImmutableList.of(sourceScheduler));
assertEquals(sourceScheduler.getLastRewoundLifespans().size(), 1);
sourceScheduler.getLastRewoundLifespans().clear();
while (!lifespanScheduler.allLifespanExecutionFinished()) {
lifespanScheduler.schedule(sourceScheduler);
lifespanScheduler.onLifespanExecutionFinished(sourceScheduler.getLastStartedLifespans());
assertEquals(sourceScheduler.getLastStartedLifespans().size(), 1);
sourceScheduler.getLastStartedLifespans().clear();
}
assertEquals(bucketNodeMap.getAssignedNode(0).get(), node2);
// bucket 1 is already scheduled, thus its assignedNode is changed
assertEquals(bucketNodeMap.getAssignedNode(1).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(2).get(), node2);
// bucket 3 is not scheduled yet, thus its assignedNode remains
assertEquals(bucketNodeMap.getAssignedNode(3).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(4).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(5).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(6).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(7).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(8).get(), node2);
assertEquals(bucketNodeMap.getAssignedNode(9).get(), node2);
}
private static LifespanScheduler getAffinityLifespanScheduler(BucketNodeMap bucketNodeMap)
{
return new DynamicLifespanScheduler(
bucketNodeMap,
ImmutableList.of(node1, node2),
IntStream.range(0, 10).mapToObj(TestDynamicLifespanScheduler.TestPartitionHandle::new).collect(toImmutableList()),
OptionalInt.of(1));
}
private static LifespanScheduler getLifespanScheduler()
{
return new DynamicLifespanScheduler(
new DynamicBucketNodeMap(split -> ((TestSplit) split.getConnectorSplit()).getBucketNumber(), BUCKET_COUNT),
ImmutableList.of(getInternalNode("1"), getInternalNode("2")),
IntStream.range(0, 10).mapToObj(TestPartitionHandle::new).collect(toImmutableList()),
OptionalInt.of(1));
}
private static InternalNode getInternalNode(String id)
{
return new InternalNode(id, URI.create(id), new NodeVersion("test"), false);
}
private static class TestSplit
implements ConnectorSplit
{
private final int bucketNumber;
private TestSplit(int bucketNumber)
{
this.bucketNumber = bucketNumber;
}
public int getBucketNumber()
{
return bucketNumber;
}
@Override
public NodeSelectionStrategy getNodeSelectionStrategy()
{
return HARD_AFFINITY;
}
@Override
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of();
}
@Override
public Object getInfo()
{
return this;
}
}
private static class TestPartitionHandle
extends ConnectorPartitionHandle
{
private final int bucket;
public TestPartitionHandle(int bucket)
{
this.bucket = bucket;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TestPartitionHandle)) {
return false;
}
TestPartitionHandle that = (TestPartitionHandle) o;
return bucket == that.bucket;
}
@Override
public int hashCode()
{
return Objects.hash(bucket);
}
}
public class TestingSourceScheduler
implements SourceScheduler
{
private final List<Lifespan> lastStartedLifespans = new ArrayList<>();
private final List<Lifespan> lastRewoundLifespans = new ArrayList<>();
public ScheduleResult schedule()
{
throw new UnsupportedOperationException();
}
@Override
public void close()
{
throw new UnsupportedOperationException();
}
@Override
public PlanNodeId getPlanNodeId()
{
throw new UnsupportedOperationException();
}
@Override
public void startLifespan(Lifespan lifespan, ConnectorPartitionHandle partitionHandle)
{
lastStartedLifespans.add(lifespan);
}
@Override
public void rewindLifespan(Lifespan lifespan, ConnectorPartitionHandle partitionHandle)
{
lastRewoundLifespans.add(lifespan);
}
@Override
public List<Lifespan> drainCompletelyScheduledLifespans()
{
throw new UnsupportedOperationException();
}
@Override
public void notifyAllLifespansFinishedExecution()
{
throw new UnsupportedOperationException();
}
public List<Lifespan> getLastStartedLifespans()
{
return lastStartedLifespans;
}
public List<Lifespan> getLastRewoundLifespans()
{
return lastRewoundLifespans;
}
}
}