TestResourceGroups.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.resourceGroups;
import com.facebook.presto.execution.MockManagedQueryExecution;
import com.facebook.presto.execution.resourceGroups.InternalResourceGroup.RootInternalResourceGroup;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.QueryStateInfo;
import com.facebook.presto.server.ResourceGroupInfo;
import com.facebook.presto.spi.ConnectorId;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.commons.math3.distribution.BinomialDistribution;
import org.testng.annotations.Test;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import static com.facebook.airlift.testing.Assertions.assertBetweenInclusive;
import static com.facebook.airlift.testing.Assertions.assertGreaterThan;
import static com.facebook.airlift.testing.Assertions.assertLessThan;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_QUEUE;
import static com.facebook.presto.spi.resourceGroups.ResourceGroupState.CAN_RUN;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.FAIR;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.QUERY_PRIORITY;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.WEIGHTED;
import static com.facebook.presto.spi.resourceGroups.SchedulingPolicy.WEIGHTED_FAIR;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Collections.reverse;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class TestResourceGroups
{
@Test(timeOut = 10_000)
public void testQueueFull()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(1);
root.setHardConcurrencyLimit(1);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
root.run(query1);
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), QUEUED);
MockManagedQueryExecution query3 = new MockManagedQueryExecution(0);
query3.startWaitingForPrerequisites();
root.run(query3);
assertEquals(query3.getState(), FAILED);
assertEquals(query3.getThrowable().getMessage(), "Too many queued queries for \"root\"");
}
@Test(timeOut = 10_000)
public void testFairEligibility()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(4);
root.setHardConcurrencyLimit(1);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(4);
group1.setHardConcurrencyLimit(1);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(4);
group2.setHardConcurrencyLimit(1);
InternalResourceGroup group3 = root.getOrCreateSubGroup("3", true);
group3.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group3.setMaxQueuedQueries(4);
group3.setHardConcurrencyLimit(1);
MockManagedQueryExecution query1a = new MockManagedQueryExecution(0);
query1a.startWaitingForPrerequisites();
group1.run(query1a);
assertEquals(query1a.getState(), RUNNING);
MockManagedQueryExecution query1b = new MockManagedQueryExecution(0);
query1b.startWaitingForPrerequisites();
group1.run(query1b);
assertEquals(query1b.getState(), QUEUED);
MockManagedQueryExecution query2a = new MockManagedQueryExecution(0);
query2a.startWaitingForPrerequisites();
group2.run(query2a);
assertEquals(query2a.getState(), QUEUED);
MockManagedQueryExecution query2b = new MockManagedQueryExecution(0);
query2b.startWaitingForPrerequisites();
group2.run(query2b);
assertEquals(query2b.getState(), QUEUED);
MockManagedQueryExecution query3a = new MockManagedQueryExecution(0);
query3a.startWaitingForPrerequisites();
group3.run(query3a);
assertEquals(query3a.getState(), QUEUED);
query1a.complete();
root.processQueuedQueries();
// 2a and not 1b should have started, as group1 was not eligible to start a second query
assertEquals(query1b.getState(), QUEUED);
assertEquals(query2a.getState(), RUNNING);
assertEquals(query2b.getState(), QUEUED);
assertEquals(query3a.getState(), QUEUED);
query2a.complete();
root.processQueuedQueries();
assertEquals(query3a.getState(), RUNNING);
assertEquals(query2b.getState(), QUEUED);
assertEquals(query1b.getState(), QUEUED);
query3a.complete();
root.processQueuedQueries();
assertEquals(query1b.getState(), RUNNING);
assertEquals(query2b.getState(), QUEUED);
}
@Test
public void testSetSchedulingPolicy()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(4);
root.setHardConcurrencyLimit(1);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(4);
group1.setHardConcurrencyLimit(2);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(4);
group2.setHardConcurrencyLimit(2);
MockManagedQueryExecution query1a = new MockManagedQueryExecution(0);
query1a.startWaitingForPrerequisites();
group1.run(query1a);
assertEquals(query1a.getState(), RUNNING);
MockManagedQueryExecution query1b = new MockManagedQueryExecution(0);
query1b.startWaitingForPrerequisites();
group1.run(query1b);
assertEquals(query1b.getState(), QUEUED);
MockManagedQueryExecution query1c = new MockManagedQueryExecution(0);
query1c.startWaitingForPrerequisites();
group1.run(query1c);
assertEquals(query1c.getState(), QUEUED);
MockManagedQueryExecution query2a = new MockManagedQueryExecution(0);
query2a.startWaitingForPrerequisites();
group2.run(query2a);
assertEquals(query2a.getState(), QUEUED);
assertEquals(root.getInfo().getNumEligibleSubGroups(), 2);
assertEquals(root.getOrCreateSubGroup("1", true).getQueuedQueries(), 2);
assertEquals(root.getOrCreateSubGroup("2", true).getQueuedQueries(), 1);
assertEquals(root.getSchedulingPolicy(), FAIR);
root.setSchedulingPolicy(QUERY_PRIORITY);
assertEquals(root.getInfo().getNumEligibleSubGroups(), 2);
assertEquals(root.getOrCreateSubGroup("1", true).getQueuedQueries(), 2);
assertEquals(root.getOrCreateSubGroup("2", true).getQueuedQueries(), 1);
assertEquals(root.getSchedulingPolicy(), QUERY_PRIORITY);
assertEquals(root.getOrCreateSubGroup("1", true).getSchedulingPolicy(), QUERY_PRIORITY);
assertEquals(root.getOrCreateSubGroup("2", true).getSchedulingPolicy(), QUERY_PRIORITY);
}
@Test(timeOut = 10_000)
public void testFairQueuing()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(4);
root.setHardConcurrencyLimit(1);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(4);
group1.setHardConcurrencyLimit(2);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(4);
group2.setHardConcurrencyLimit(2);
MockManagedQueryExecution query1a = new MockManagedQueryExecution(0);
query1a.startWaitingForPrerequisites();
group1.run(query1a);
assertEquals(query1a.getState(), RUNNING);
MockManagedQueryExecution query1b = new MockManagedQueryExecution(0);
query1b.startWaitingForPrerequisites();
group1.run(query1b);
assertEquals(query1b.getState(), QUEUED);
MockManagedQueryExecution query1c = new MockManagedQueryExecution(0);
query1c.startWaitingForPrerequisites();
group1.run(query1c);
assertEquals(query1c.getState(), QUEUED);
MockManagedQueryExecution query2a = new MockManagedQueryExecution(0);
query2a.startWaitingForPrerequisites();
group2.run(query2a);
assertEquals(query2a.getState(), QUEUED);
query1a.complete();
root.processQueuedQueries();
// 1b and not 2a should have started, as it became queued first and group1 was eligible to run more
assertEquals(query1b.getState(), RUNNING);
assertEquals(query1c.getState(), QUEUED);
assertEquals(query2a.getState(), QUEUED);
// 2a and not 1c should have started, as all eligible sub groups get fair sharing
query1b.complete();
root.processQueuedQueries();
assertEquals(query2a.getState(), RUNNING);
assertEquals(query1c.getState(), QUEUED);
}
@Test(timeOut = 10_000)
public void testMemoryLimit()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, BYTE));
root.setMaxQueuedQueries(4);
root.setHardConcurrencyLimit(3);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(2);
query1.startWaitingForPrerequisites();
root.run(query1);
// Process the group to refresh stats
root.processQueuedQueries();
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), QUEUED);
MockManagedQueryExecution query3 = new MockManagedQueryExecution(0);
query3.startWaitingForPrerequisites();
root.run(query3);
assertEquals(query3.getState(), QUEUED);
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
assertEquals(query3.getState(), RUNNING);
}
@Test
public void testSubgroupMemoryLimit()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(10, BYTE));
root.setMaxQueuedQueries(4);
root.setHardConcurrencyLimit(3);
InternalResourceGroup subgroup = root.getOrCreateSubGroup("subgroup", true);
subgroup.setSoftMemoryLimit(new DataSize(1, BYTE));
subgroup.setMaxQueuedQueries(4);
subgroup.setHardConcurrencyLimit(3);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(2);
query1.startWaitingForPrerequisites();
subgroup.run(query1);
// Process the group to refresh stats
root.processQueuedQueries();
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
subgroup.run(query2);
assertEquals(query2.getState(), QUEUED);
MockManagedQueryExecution query3 = new MockManagedQueryExecution(0);
query3.startWaitingForPrerequisites();
subgroup.run(query3);
assertEquals(query3.getState(), QUEUED);
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
assertEquals(query3.getState(), RUNNING);
}
@Test(timeOut = 10_000)
public void testSoftCpuLimit()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, BYTE));
root.setSoftCpuLimit(new Duration(1, SECONDS));
root.setHardCpuLimit(new Duration(2, SECONDS));
root.setCpuQuotaGenerationMillisPerSecond(2000);
root.setMaxQueuedQueries(1);
root.setHardConcurrencyLimit(2);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(1, "query_id", 1, new Duration(1, SECONDS));
query1.startWaitingForPrerequisites();
root.run(query1);
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), RUNNING);
MockManagedQueryExecution query3 = new MockManagedQueryExecution(0);
query3.startWaitingForPrerequisites();
root.run(query3);
assertEquals(query3.getState(), QUEUED);
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
assertEquals(query3.getState(), QUEUED);
root.generateCpuQuota(2);
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
assertEquals(query3.getState(), RUNNING);
}
@Test(timeOut = 10_000)
public void testPerWorkerQueryLimit()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setWorkersPerQueryLimit(5);
root.setMaxQueuedQueries(2);
root.setHardConcurrencyLimit(2);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
root.run(query1);
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), QUEUED);
MockManagedQueryExecution query3 = new MockManagedQueryExecution(0);
query3.startWaitingForPrerequisites();
root.run(query3);
assertEquals(query3.getState(), QUEUED);
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
assertEquals(query3.getState(), QUEUED);
query2.complete();
root.processQueuedQueries();
assertEquals(query3.getState(), RUNNING);
}
@Test(timeOut = 10_000)
public void testPerWorkerQueryLimitMultipleGroups()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setWorkersPerQueryLimit(5);
root.setMaxQueuedQueries(5);
root.setHardConcurrencyLimit(2);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setWorkersPerQueryLimit(10);
group1.setMaxQueuedQueries(2);
group1.setHardConcurrencyLimit(2);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setWorkersPerQueryLimit(10);
group2.setMaxQueuedQueries(2);
group2.setHardConcurrencyLimit(2);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
group1.run(query1);
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
group2.run(query2);
assertEquals(query2.getState(), QUEUED);
MockManagedQueryExecution query3 = new MockManagedQueryExecution(0);
query3.startWaitingForPrerequisites();
group1.run(query3);
assertEquals(query3.getState(), QUEUED);
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
assertEquals(query3.getState(), QUEUED);
query2.complete();
root.processQueuedQueries();
assertEquals(query3.getState(), RUNNING);
}
@Test(timeOut = 10_000)
public void testHardCpuLimit()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, BYTE));
root.setHardCpuLimit(new Duration(1, SECONDS));
root.setCpuQuotaGenerationMillisPerSecond(2000);
root.setMaxQueuedQueries(1);
root.setHardConcurrencyLimit(1);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(1, "query_id", 1, new Duration(2, SECONDS));
query1.startWaitingForPrerequisites();
root.run(query1);
assertEquals(query1.getState(), RUNNING);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), QUEUED);
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), QUEUED);
root.generateCpuQuota(2);
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
}
@Test(timeOut = 10_000)
public void testPriorityScheduling()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(100);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
root.setSchedulingPolicy(QUERY_PRIORITY);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(100);
group1.setHardConcurrencyLimit(1);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(100);
group2.setHardConcurrencyLimit(1);
SortedMap<Integer, MockManagedQueryExecution> queries = new TreeMap<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
int priority;
do {
priority = random.nextInt(1_000_000) + 1;
}
while (queries.containsKey(priority));
MockManagedQueryExecution query = new MockManagedQueryExecution(0, "query_id", priority);
if (random.nextBoolean()) {
group1.run(query);
}
else {
group2.run(query);
}
queries.put(priority, query);
}
root.setHardConcurrencyLimit(1);
List<MockManagedQueryExecution> orderedQueries = new ArrayList<>(queries.values());
reverse(orderedQueries);
for (MockManagedQueryExecution query : orderedQueries) {
root.processQueuedQueries();
assertEquals(query.getState(), RUNNING);
query.complete();
}
}
@Test(timeOut = 20_000)
public void testWeightedScheduling()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(4);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
root.setSchedulingPolicy(WEIGHTED);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(2);
group1.setHardConcurrencyLimit(2);
group1.setSoftConcurrencyLimit(2);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(2);
group2.setHardConcurrencyLimit(2);
group2.setSoftConcurrencyLimit(2);
group2.setSchedulingWeight(2);
Set<MockManagedQueryExecution> group1Queries = fillGroupTo(group1, ImmutableSet.of(), 2);
Set<MockManagedQueryExecution> group2Queries = fillGroupTo(group2, ImmutableSet.of(), 2);
root.setHardConcurrencyLimit(1);
int group2Ran = 0;
for (int i = 0; i < 1000; i++) {
for (Iterator<MockManagedQueryExecution> iterator = group1Queries.iterator(); iterator.hasNext(); ) {
MockManagedQueryExecution query = iterator.next();
if (query.getState() == RUNNING) {
query.complete();
iterator.remove();
}
}
group2Ran += completeGroupQueries(group2Queries);
root.processQueuedQueries();
group1Queries = fillGroupTo(group1, group1Queries, 2);
group2Queries = fillGroupTo(group2, group2Queries, 2);
}
// group1 has a weight of 1 and group2 has a weight of 2, so group2 should account for (2 / (1 + 2)) of the queries.
// since this is stochastic, we check that the result of 1000 trials are 2/3 with 99.9999% confidence
BinomialDistribution binomial = new BinomialDistribution(1000, 2.0 / 3.0);
int lowerBound = binomial.inverseCumulativeProbability(0.000001);
int upperBound = binomial.inverseCumulativeProbability(0.999999);
assertLessThan(group2Ran, upperBound);
assertGreaterThan(group2Ran, lowerBound);
}
@Test(timeOut = 30_000)
public void testWeightedFairScheduling()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(50);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
root.setSchedulingPolicy(WEIGHTED_FAIR);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(50);
group1.setHardConcurrencyLimit(2);
group1.setSoftConcurrencyLimit(2);
group1.setSchedulingWeight(1);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(50);
group2.setHardConcurrencyLimit(2);
group2.setSoftConcurrencyLimit(2);
group2.setSchedulingWeight(2);
Set<MockManagedQueryExecution> group1Queries = fillGroupTo(group1, ImmutableSet.of(), 4);
Set<MockManagedQueryExecution> group2Queries = fillGroupTo(group2, ImmutableSet.of(), 4);
root.setHardConcurrencyLimit(3);
int group1Ran = 0;
int group2Ran = 0;
for (int i = 0; i < 1000; i++) {
group1Ran += completeGroupQueries(group1Queries);
group2Ran += completeGroupQueries(group2Queries);
root.processQueuedQueries();
group1Queries = fillGroupTo(group1, group1Queries, 4);
group2Queries = fillGroupTo(group2, group2Queries, 4);
}
// group1 has a weight of 1 and group2 has a weight of 2, so group2 should account for (2 / (1 + 2)) * 3000 queries.
assertBetweenInclusive(group1Ran, 995, 1000);
assertBetweenInclusive(group2Ran, 1995, 2000);
}
@Test(timeOut = 10_000)
public void testWeightedFairSchedulingEqualWeights()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(50);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
root.setSchedulingPolicy(WEIGHTED_FAIR);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(50);
group1.setHardConcurrencyLimit(2);
group1.setSoftConcurrencyLimit(2);
group1.setSchedulingWeight(1);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(50);
group2.setHardConcurrencyLimit(2);
group2.setSoftConcurrencyLimit(2);
group2.setSchedulingWeight(1);
InternalResourceGroup group3 = root.getOrCreateSubGroup("3", true);
group3.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group3.setMaxQueuedQueries(50);
group3.setHardConcurrencyLimit(2);
group3.setSoftConcurrencyLimit(2);
group3.setSchedulingWeight(2);
Set<MockManagedQueryExecution> group1Queries = fillGroupTo(group1, ImmutableSet.of(), 4);
Set<MockManagedQueryExecution> group2Queries = fillGroupTo(group2, ImmutableSet.of(), 4);
Set<MockManagedQueryExecution> group3Queries = fillGroupTo(group3, ImmutableSet.of(), 4);
root.setHardConcurrencyLimit(4);
int group1Ran = 0;
int group2Ran = 0;
int group3Ran = 0;
for (int i = 0; i < 1000; i++) {
group1Ran += completeGroupQueries(group1Queries);
group2Ran += completeGroupQueries(group2Queries);
group3Ran += completeGroupQueries(group3Queries);
root.processQueuedQueries();
group1Queries = fillGroupTo(group1, group1Queries, 4);
group2Queries = fillGroupTo(group2, group2Queries, 4);
group3Queries = fillGroupTo(group3, group3Queries, 4);
}
// group 3 should run approximately 2x the number of queries of 1 and 2
BinomialDistribution binomial = new BinomialDistribution(4000, 1.0 / 4.0);
int lowerBound = binomial.inverseCumulativeProbability(0.000001);
int upperBound = binomial.inverseCumulativeProbability(0.999999);
assertBetweenInclusive(group1Ran, lowerBound, upperBound);
assertBetweenInclusive(group2Ran, lowerBound, upperBound);
assertBetweenInclusive(group3Ran, 2 * lowerBound, 2 * upperBound);
}
@Test(timeOut = 20_000)
public void testWeightedFairSchedulingNoStarvation()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(50);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
root.setSchedulingPolicy(WEIGHTED_FAIR);
InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true);
group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group1.setMaxQueuedQueries(50);
group1.setHardConcurrencyLimit(2);
group1.setSoftConcurrencyLimit(2);
group1.setSchedulingWeight(1);
InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true);
group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
group2.setMaxQueuedQueries(50);
group2.setHardConcurrencyLimit(2);
group2.setSoftConcurrencyLimit(2);
group2.setSchedulingWeight(2);
Set<MockManagedQueryExecution> group1Queries = fillGroupTo(group1, ImmutableSet.of(), 4);
Set<MockManagedQueryExecution> group2Queries = fillGroupTo(group2, ImmutableSet.of(), 4);
root.setHardConcurrencyLimit(1);
int group1Ran = 0;
for (int i = 0; i < 2000; i++) {
group1Ran += completeGroupQueries(group1Queries);
completeGroupQueries(group2Queries);
root.processQueuedQueries();
group1Queries = fillGroupTo(group1, group1Queries, 4);
group2Queries = fillGroupTo(group2, group2Queries, 4);
}
assertEquals(group1Ran, 1000);
assertEquals(group1Ran, 1000);
}
@Test
public void testGetInfo()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(40);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
root.setSchedulingPolicy(WEIGHTED);
InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true);
rootA.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootA.setMaxQueuedQueries(20);
rootA.setHardConcurrencyLimit(2);
InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true);
rootB.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootB.setMaxQueuedQueries(20);
rootB.setHardConcurrencyLimit(2);
rootB.setSchedulingWeight(2);
rootB.setSchedulingPolicy(QUERY_PRIORITY);
InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true);
rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAX.setMaxQueuedQueries(10);
rootAX.setHardConcurrencyLimit(10);
InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true);
rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAY.setMaxQueuedQueries(10);
rootAY.setHardConcurrencyLimit(10);
InternalResourceGroup rootBX = rootB.getOrCreateSubGroup("x", true);
rootBX.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootBX.setMaxQueuedQueries(10);
rootBX.setHardConcurrencyLimit(10);
InternalResourceGroup rootBY = rootB.getOrCreateSubGroup("y", true);
rootBY.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootBY.setMaxQueuedQueries(10);
rootBY.setHardConcurrencyLimit(10);
// Queue 40 queries (= maxQueuedQueries (40) + maxRunningQueries (0))
Set<MockManagedQueryExecution> queries = fillGroupTo(rootAX, ImmutableSet.of(), 10, false);
queries.addAll(fillGroupTo(rootAY, ImmutableSet.of(), 10, false));
queries.addAll(fillGroupTo(rootBX, ImmutableSet.of(), 10, true));
queries.addAll(fillGroupTo(rootBY, ImmutableSet.of(), 10, true));
ResourceGroupInfo info = root.getInfo();
assertEquals(info.getNumRunningQueries(), 0);
assertEquals(info.getNumQueuedQueries(), 40);
// root.maxRunningQueries = 4, root.a.maxRunningQueries = 2, root.b.maxRunningQueries = 2. Will have 4 queries running and 36 left queued.
root.setHardConcurrencyLimit(4);
root.processQueuedQueries();
info = root.getInfo();
assertEquals(info.getNumRunningQueries(), 4);
assertEquals(info.getNumQueuedQueries(), 36);
// Complete running queries
Iterator<MockManagedQueryExecution> iterator = queries.iterator();
while (iterator.hasNext()) {
MockManagedQueryExecution query = iterator.next();
if (query.getState() == RUNNING) {
query.complete();
iterator.remove();
}
}
// 4 more queries start running, 32 left queued.
root.processQueuedQueries();
info = root.getInfo();
assertEquals(info.getNumRunningQueries(), 4);
assertEquals(info.getNumQueuedQueries(), 32);
// root.maxRunningQueries = 10, root.a.maxRunningQueries = 2, root.b.maxRunningQueries = 2. Still only have 4 running queries and 32 left queued.
root.setHardConcurrencyLimit(10);
root.processQueuedQueries();
info = root.getInfo();
assertEquals(info.getNumRunningQueries(), 4);
assertEquals(info.getNumQueuedQueries(), 32);
// root.maxRunningQueries = 10, root.a.maxRunningQueries = 2, root.b.maxRunningQueries = 10. Will have 10 running queries and 26 left queued.
rootB.setHardConcurrencyLimit(10);
root.processQueuedQueries();
info = root.getInfo();
assertEquals(info.getNumRunningQueries(), 10);
assertEquals(info.getNumQueuedQueries(), 26);
}
@Test
public void testGetResourceGroupStateInfo()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, GIGABYTE));
root.setMaxQueuedQueries(40);
root.setHardConcurrencyLimit(10);
root.setSchedulingPolicy(WEIGHTED);
InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true);
rootA.setSoftMemoryLimit(new DataSize(10, MEGABYTE));
rootA.setMaxQueuedQueries(20);
rootA.setHardConcurrencyLimit(0);
InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true);
rootB.setSoftMemoryLimit(new DataSize(5, MEGABYTE));
rootB.setMaxQueuedQueries(20);
rootB.setHardConcurrencyLimit(1);
rootB.setSchedulingWeight(2);
rootB.setSchedulingPolicy(QUERY_PRIORITY);
InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true);
rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAX.setMaxQueuedQueries(10);
rootAX.setHardConcurrencyLimit(10);
InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true);
rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAY.setMaxQueuedQueries(10);
rootAY.setHardConcurrencyLimit(10);
Set<MockManagedQueryExecution> queries = fillGroupTo(rootAX, ImmutableSet.of(), 5, false);
queries.addAll(fillGroupTo(rootAY, ImmutableSet.of(), 5, false));
queries.addAll(fillGroupTo(rootB, ImmutableSet.of(), 10, true));
ResourceGroupInfo rootInfo = root.getResourceGroupInfo(true, true, false);
assertEquals(rootInfo.getId(), root.getId());
assertEquals(rootInfo.getState(), CAN_RUN);
assertEquals(rootInfo.getSoftMemoryLimit(), root.getSoftMemoryLimit());
assertEquals(rootInfo.getMemoryUsage(), new DataSize(0, BYTE));
assertEquals(rootInfo.getSubGroups().size(), 2);
assertGroupInfoEquals(rootInfo.getSubGroups().get(0), rootA.getInfo());
assertEquals(rootInfo.getSubGroups().get(0).getId(), rootA.getId());
assertEquals(rootInfo.getSubGroups().get(0).getState(), CAN_QUEUE);
assertEquals(rootInfo.getSubGroups().get(0).getSoftMemoryLimit(), rootA.getSoftMemoryLimit());
assertEquals(rootInfo.getSubGroups().get(0).getHardConcurrencyLimit(), rootA.getHardConcurrencyLimit());
assertEquals(rootInfo.getSubGroups().get(0).getMaxQueuedQueries(), rootA.getMaxQueuedQueries());
assertEquals(rootInfo.getSubGroups().get(0).getNumEligibleSubGroups(), 2);
assertEquals(rootInfo.getSubGroups().get(0).getNumRunningQueries(), 0);
assertEquals(rootInfo.getSubGroups().get(0).getNumQueuedQueries(), 10);
assertGroupInfoEquals(rootInfo.getSubGroups().get(1), rootB.getInfo());
assertEquals(rootInfo.getSubGroups().get(1).getId(), rootB.getId());
assertEquals(rootInfo.getSubGroups().get(1).getState(), CAN_QUEUE);
assertEquals(rootInfo.getSubGroups().get(1).getSoftMemoryLimit(), rootB.getSoftMemoryLimit());
assertEquals(rootInfo.getSubGroups().get(1).getHardConcurrencyLimit(), rootB.getHardConcurrencyLimit());
assertEquals(rootInfo.getSubGroups().get(1).getMaxQueuedQueries(), rootB.getMaxQueuedQueries());
assertEquals(rootInfo.getSubGroups().get(1).getNumEligibleSubGroups(), 0);
assertEquals(rootInfo.getSubGroups().get(1).getNumRunningQueries(), 1);
assertEquals(rootInfo.getSubGroups().get(1).getNumQueuedQueries(), 9);
assertEquals(rootInfo.getSoftConcurrencyLimit(), root.getSoftConcurrencyLimit());
assertEquals(rootInfo.getHardConcurrencyLimit(), root.getHardConcurrencyLimit());
assertEquals(rootInfo.getMaxQueuedQueries(), root.getMaxQueuedQueries());
assertEquals(rootInfo.getNumQueuedQueries(), 19);
assertEquals(rootInfo.getRunningQueries().size(), 1);
QueryStateInfo queryInfo = rootInfo.getRunningQueries().get(0);
assertEquals(queryInfo.getResourceGroupId(), Optional.of(rootB.getId()));
}
@Test
public void testGetStaticResourceGroupInfo()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, GIGABYTE));
root.setMaxQueuedQueries(100);
root.setHardConcurrencyLimit(10);
root.setSchedulingPolicy(WEIGHTED);
InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true);
rootA.setSoftMemoryLimit(new DataSize(10, MEGABYTE));
rootA.setMaxQueuedQueries(100);
rootA.setHardConcurrencyLimit(0);
InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true);
rootB.setSoftMemoryLimit(new DataSize(5, MEGABYTE));
rootB.setMaxQueuedQueries(100);
rootB.setHardConcurrencyLimit(1);
rootB.setSchedulingWeight(2);
rootB.setSchedulingPolicy(QUERY_PRIORITY);
// x is a dynamic resource group
InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", false);
rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAX.setMaxQueuedQueries(10);
rootAX.setHardConcurrencyLimit(10);
InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true);
rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAY.setMaxQueuedQueries(10);
rootAY.setHardConcurrencyLimit(10);
for (int i = 0; i < 10; i++) {
InternalResourceGroup subGroup = rootAX.getOrCreateSubGroup("ax" + i, false);
subGroup.setSoftMemoryLimit(new DataSize(i, MEGABYTE));
subGroup.setMaxQueuedQueries(10);
subGroup.setHardConcurrencyLimit(10);
}
for (int i = 0; i < 10; i++) {
fillGroupTo(rootAX.getOrCreateSubGroup("ax" + i, true), ImmutableSet.of(), 1, false);
}
fillGroupTo(rootAY, ImmutableSet.of(), 5, false);
fillGroupTo(rootB, ImmutableSet.of(), 10, true);
ResourceGroupInfo rootInfo = root.getResourceGroupInfo(false, false, true);
assertEquals(rootInfo.getId(), root.getId());
assertNotNull(rootInfo.getSubGroups());
assertEquals(rootInfo.getSubGroups().size(), 2);
Optional<ResourceGroupInfo> rootAInfo = getResourceGroupInfoForId(rootA, rootInfo);
assertTrue(rootAInfo.isPresent());
assertNotNull(rootAInfo.get().getSubGroups());
assertEquals(rootAInfo.get().getSubGroups().size(), 1);
Optional<ResourceGroupInfo> rootAXInfo = getResourceGroupInfoForId(rootAX, rootAInfo.get());
// dynamic resource groups should not be returned.
assertFalse(rootAXInfo.isPresent());
Optional<ResourceGroupInfo> rootAYInfo = getResourceGroupInfoForId(rootAY, rootAInfo.get());
assertTrue(rootAYInfo.isPresent());
assertNotNull(rootAYInfo.get().getSubGroups());
assertEquals(rootAYInfo.get().getSubGroups().size(), 0);
Optional<ResourceGroupInfo> rootBInfo = getResourceGroupInfoForId(rootB, rootInfo);
assertTrue(rootBInfo.isPresent());
assertNotNull(rootBInfo.get().getSubGroups());
assertEquals(rootBInfo.get().getSubGroups().size(), 0);
}
private Optional<ResourceGroupInfo> getResourceGroupInfoForId(InternalResourceGroup rootA, ResourceGroupInfo rootInfo)
{
assertNotNull(rootInfo.getSubGroups());
for (ResourceGroupInfo subGroup : rootInfo.getSubGroups()) {
if (subGroup.getId().equals(rootA.getId())) {
return Optional.of(subGroup);
}
}
return Optional.empty();
}
@Test
public void testGetBlockedQueuedQueries()
{
RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager());
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(40);
// Start with zero capacity, so that nothing starts running until we've added all the queries
root.setHardConcurrencyLimit(0);
InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true);
rootA.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootA.setMaxQueuedQueries(20);
rootA.setHardConcurrencyLimit(8);
InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true);
rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAX.setMaxQueuedQueries(10);
rootAX.setHardConcurrencyLimit(8);
InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true);
rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootAY.setMaxQueuedQueries(10);
rootAY.setHardConcurrencyLimit(5);
InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true);
rootB.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootB.setMaxQueuedQueries(20);
rootB.setHardConcurrencyLimit(8);
InternalResourceGroup rootBX = rootB.getOrCreateSubGroup("x", true);
rootBX.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootBX.setMaxQueuedQueries(10);
rootBX.setHardConcurrencyLimit(8);
InternalResourceGroup rootBY = rootB.getOrCreateSubGroup("y", true);
rootBY.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
rootBY.setMaxQueuedQueries(10);
rootBY.setHardConcurrencyLimit(5);
// Queue 40 queries (= maxQueuedQueries (40) + maxRunningQueries (0))
Set<MockManagedQueryExecution> queries = fillGroupTo(rootAX, ImmutableSet.of(), 10, false);
queries.addAll(fillGroupTo(rootAY, ImmutableSet.of(), 10, false));
queries.addAll(fillGroupTo(rootBX, ImmutableSet.of(), 10, true));
queries.addAll(fillGroupTo(rootBY, ImmutableSet.of(), 10, true));
assertEquals(root.getWaitingQueuedQueries(), 16);
assertEquals(rootA.getWaitingQueuedQueries(), 13);
assertEquals(rootAX.getWaitingQueuedQueries(), 10);
assertEquals(rootAY.getWaitingQueuedQueries(), 10);
assertEquals(rootB.getWaitingQueuedQueries(), 13);
assertEquals(rootBX.getWaitingQueuedQueries(), 10);
assertEquals(rootBY.getWaitingQueuedQueries(), 10);
root.setHardConcurrencyLimit(20);
root.processQueuedQueries();
assertEquals(root.getWaitingQueuedQueries(), 0);
assertEquals(rootA.getWaitingQueuedQueries(), 5);
assertEquals(rootAX.getWaitingQueuedQueries(), 6);
assertEquals(rootAY.getWaitingQueuedQueries(), 6);
assertEquals(rootB.getWaitingQueuedQueries(), 5);
assertEquals(rootBX.getWaitingQueuedQueries(), 6);
assertEquals(rootBY.getWaitingQueuedQueries(), 6);
}
private static int completeGroupQueries(Set<MockManagedQueryExecution> groupQueries)
{
int groupRan = 0;
for (Iterator<MockManagedQueryExecution> iterator = groupQueries.iterator(); iterator.hasNext(); ) {
MockManagedQueryExecution query = iterator.next();
if (query.getState() == RUNNING) {
query.complete();
iterator.remove();
groupRan++;
}
}
return groupRan;
}
private static Set<MockManagedQueryExecution> fillGroupTo(InternalResourceGroup group, Set<MockManagedQueryExecution> existingQueries, int count)
{
return fillGroupTo(group, existingQueries, count, false);
}
private static Set<MockManagedQueryExecution> fillGroupTo(InternalResourceGroup group, Set<MockManagedQueryExecution> existingQueries, int count, boolean queryPriority)
{
int existingCount = existingQueries.size();
Set<MockManagedQueryExecution> queries = new HashSet<>(existingQueries);
for (int i = 0; i < count - existingCount; i++) {
MockManagedQueryExecution query = new MockManagedQueryExecution(
0,
group.getId().toString().replace(".", "") + Integer.toString(i),
queryPriority ? i + 1 : 1,
new Duration(0, MILLISECONDS),
group.getId());
queries.add(query);
group.run(query);
}
return queries;
}
private static void assertGroupInfoEquals(ResourceGroupInfo actual, ResourceGroupInfo expected)
{
assertTrue(actual.getSchedulingWeight() == expected.getSchedulingWeight() &&
actual.getSoftConcurrencyLimit() == expected.getSoftConcurrencyLimit() &&
actual.getHardConcurrencyLimit() == expected.getHardConcurrencyLimit() &&
actual.getMaxQueuedQueries() == expected.getMaxQueuedQueries() &&
actual.getNumQueuedQueries() == expected.getNumQueuedQueries() &&
actual.getNumRunningQueries() == expected.getNumRunningQueries() &&
actual.getNumEligibleSubGroups() == expected.getNumEligibleSubGroups() &&
Objects.equals(actual.getId(), expected.getId()) &&
actual.getState() == expected.getState() &&
actual.getSchedulingPolicy() == expected.getSchedulingPolicy() &&
Objects.equals(actual.getSoftMemoryLimit(), expected.getSoftMemoryLimit()) &&
Objects.equals(actual.getMemoryUsage(), expected.getMemoryUsage()));
}
private InternalNodeManager createNodeManager()
{
InMemoryNodeManager internalNodeManager = new InMemoryNodeManager();
internalNodeManager.addNode(
new ConnectorId("dummy"),
new InternalNode(
"1",
URI.create("local://localhost:123/1"),
OptionalInt.empty(),
"1",
false,
false,
false,
false));
internalNodeManager.addNode(
new ConnectorId("dummy"),
new InternalNode(
"2",
URI.create("local://localhost:456/1"),
OptionalInt.of(2),
"1",
false,
false,
false,
false));
internalNodeManager.addNode(
new ConnectorId("dummy"),
new InternalNode(
"3",
URI.create("local://localhost:789/2"),
OptionalInt.of(3),
"1",
false,
false,
false,
false));
return internalNodeManager;
}
}