TestResourceManagerClusterStateProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo;
import com.facebook.presto.execution.resourceGroups.ResourceGroupSpecInfo;
import com.facebook.presto.memory.MemoryInfo;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.BasicQueryStats;
import com.facebook.presto.server.NodeStatus;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.joda.time.DateTime;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.execution.QueryState.DISPATCHING;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.FINISHED;
import static com.facebook.presto.execution.QueryState.FINISHING;
import static com.facebook.presto.execution.QueryState.PLANNING;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.QueryState.STARTING;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_RESOURCES;
import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
public class TestResourceManagerClusterStateProvider
{
@Test(timeOut = 15_000)
public void testQueryInfo()
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node2", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
assertEquals(provider.getClusterQueries(), ImmutableList.of());
long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
long query4Sequence = 0;
provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), query1Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), query2Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHED), query3Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("4", FAILED), query4Sequence++);
assertQueryInfos(provider.getClusterQueries(), 4, 2);
provider.registerQueryHeartbeat("node1", createQueryInfo("1", RUNNING), query1Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", FINISHING), query2Sequence++);
assertQueryInfos(provider.getClusterQueries(), 4, 2);
// Update query 2 to FINISHED to verify this is now completed in the resource manager
provider.registerQueryHeartbeat("node1", createQueryInfo("2", FINISHED), query2Sequence++);
assertQueryInfos(provider.getClusterQueries(), 4, 3);
// Mix in queries from another coordinator
provider.registerQueryHeartbeat("node2", createQueryInfo("1", QUEUED), query1Sequence++);
provider.registerQueryHeartbeat("node2", createQueryInfo("2", RUNNING), query2Sequence++);
provider.registerQueryHeartbeat("node2", createQueryInfo("3", FINISHED), query3Sequence++);
provider.registerQueryHeartbeat("node2", createQueryInfo("4", FAILED), query4Sequence++);
assertQueryInfos(provider.getClusterQueries(), 8, 5);
// Expire completed queries
Thread.sleep(SECONDS.toMillis(5));
assertQueryInfos(provider.getClusterQueries(), 8, 5);
// Expire all queries
Thread.sleep(SECONDS.toMillis(5));
assertQueryInfos(provider.getClusterQueries(), 0, 0);
}
@Test(timeOut = 15_000)
public void testOutOfOrderUpdatesIgnored()
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node2", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
assertEquals(provider.getClusterQueries(), ImmutableList.of());
provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), 1);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", FINISHED), 2);
assertQueryInfos(provider.getClusterQueries(), 2, 1);
provider.registerQueryHeartbeat("node1", createQueryInfo("1", FINISHED), 0);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), 1);
assertQueryInfos(provider.getClusterQueries(), 2, 1);
provider.registerQueryHeartbeat("node1", createQueryInfo("1", FINISHED), 2);
assertQueryInfos(provider.getClusterQueries(), 2, 2);
}
@Test(timeOut = 15_000)
public void testResourceGroups()
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node2", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node3", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("50s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("local"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node1"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node2"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node3"));
assertEquals(provider.getClusterQueries(), ImmutableList.of());
long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
long query4Sequence = 0;
long query5Sequence = 0;
long query6Sequence = 0;
provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED, "rg1", GENERAL_POOL), query1Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING, "rg2", GENERAL_POOL), query2Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHING, "rg3", GENERAL_POOL), query3Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("4", FINISHED, "rg4", GENERAL_POOL), query4Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("5", FAILED, "rg5", GENERAL_POOL), query5Sequence++);
assertResourceGroups(provider, "node1", 0);
assertResourceGroups(provider, "node2", 3);
// Add an existing leaf node from another node
provider.registerQueryHeartbeat("node3", createQueryInfo("6", QUEUED, "rg6", GENERAL_POOL), query6Sequence++);
assertResourceGroups(provider, "node1", 1);
assertResourceGroups(provider, "node2", 4);
assertResourceGroups(provider, "node3", 3);
// Expire running queries
Thread.sleep(SECONDS.toMillis(5));
assertResourceGroups(provider, "node1", 0);
assertResourceGroups(provider, "node2", 0);
assertResourceGroups(provider, "node3", 0);
}
@Test(timeOut = 15_000)
public void testResourceGroupsMerged()
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node2", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node3", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node4", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node5", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node6", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("50s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("local"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node1"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node2"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node3"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node4"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node5"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node6"));
assertEquals(provider.getClusterQueries(), ImmutableList.of());
provider.registerQueryHeartbeat("node1", createQueryInfo("1", WAITING_FOR_PREREQUISITES, "rg4", GENERAL_POOL), 0);
assertTrue(provider.getClusterResourceGroups("node1").isEmpty());
assertResourceGroup(provider, "node2", "rg4", 0, 0, DataSize.valueOf("1MB"));
provider.registerQueryHeartbeat("node2", createQueryInfo("2", QUEUED, "rg4", GENERAL_POOL), 0);
assertResourceGroup(provider, "node1", "rg4", 1, 0, DataSize.valueOf("1MB"));
assertResourceGroup(provider, "node2", "rg4", 0, 0, DataSize.valueOf("1MB"));
assertResourceGroup(provider, "node3", "rg4", 1, 0, DataSize.valueOf("2MB"));
provider.registerQueryHeartbeat("node3", createQueryInfo("3", RUNNING, "rg4", GENERAL_POOL), 0);
assertResourceGroup(provider, "node1", "rg4", 1, 1, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node2", "rg4", 0, 1, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node3", "rg4", 1, 0, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node4", "rg4", 1, 1, DataSize.valueOf("3MB"));
provider.registerQueryHeartbeat("node4", createQueryInfo("4", FINISHED, "rg4", GENERAL_POOL), 0);
assertResourceGroup(provider, "node1", "rg4", 1, 1, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node2", "rg4", 0, 1, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node3", "rg4", 1, 0, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node4", "rg4", 1, 1, DataSize.valueOf("3MB"));
assertResourceGroup(provider, "node4", "rg4", 1, 1, DataSize.valueOf("3MB"));
provider.registerQueryHeartbeat("node5", createQueryInfo("5", FAILED, "rg4", GENERAL_POOL), 0);
assertResourceGroup(provider, "node1", "rg4", 1, 1, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node2", "rg4", 0, 1, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node3", "rg4", 1, 0, DataSize.valueOf("2MB"));
assertResourceGroup(provider, "node4", "rg4", 1, 1, DataSize.valueOf("3MB"));
assertResourceGroup(provider, "node4", "rg4", 1, 1, DataSize.valueOf("3MB"));
assertResourceGroup(provider, "node5", "rg4", 1, 1, DataSize.valueOf("3MB"));
// Add queries which are in non-terminal states other than RUNNING and QUEUED
provider.registerQueryHeartbeat("node1", createQueryInfo("6", WAITING_FOR_PREREQUISITES, "rg4", GENERAL_POOL), 0);
provider.registerQueryHeartbeat("node2", createQueryInfo("7", WAITING_FOR_RESOURCES, "rg4", GENERAL_POOL), 0);
provider.registerQueryHeartbeat("node3", createQueryInfo("8", DISPATCHING, "rg4", GENERAL_POOL), 0);
provider.registerQueryHeartbeat("node4", createQueryInfo("9", PLANNING, "rg4", GENERAL_POOL), 0);
provider.registerQueryHeartbeat("node5", createQueryInfo("10", STARTING, "rg4", GENERAL_POOL), 0);
provider.registerQueryHeartbeat("node6", createQueryInfo("11", FINISHING, "rg4", GENERAL_POOL), 0);
assertResourceGroup(provider, "node1", "rg4", 1, 6, DataSize.valueOf("7MB"));
assertResourceGroup(provider, "node2", "rg4", 0, 5, DataSize.valueOf("7MB"));
assertResourceGroup(provider, "node3", "rg4", 1, 4, DataSize.valueOf("7MB"));
assertResourceGroup(provider, "node4", "rg4", 1, 5, DataSize.valueOf("8MB"));
assertResourceGroup(provider, "node5", "rg4", 1, 5, DataSize.valueOf("8MB"));
assertResourceGroup(provider, "node6", "rg4", 1, 5, DataSize.valueOf("8MB"));
// Expire running queries
Thread.sleep(SECONDS.toMillis(5));
assertTrue(provider.getClusterResourceGroups("node1").isEmpty());
assertTrue(provider.getClusterResourceGroups("node2").isEmpty());
assertTrue(provider.getClusterResourceGroups("node3").isEmpty());
assertTrue(provider.getClusterResourceGroups("node4").isEmpty());
assertTrue(provider.getClusterResourceGroups("node5").isEmpty());
assertTrue(provider.getClusterResourceGroups("node6").isEmpty());
}
@Test
public void testNonLeafResourceGroupsMerged()
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node2", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node3", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node4", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node5", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("node6", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("50s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("local"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node1"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node2"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node3"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node4"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node5"));
provider.registerNodeHeartbeat(createCoordinatorNodeStatus("node6"));
long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
long query4Sequence = 0;
long query5Sequence = 0;
long query6Sequence = 0;
long query7Sequence = 0;
long query8Sequence = 0;
long query9Sequence = 0;
assertEquals(provider.getClusterQueries(), ImmutableList.of());
provider.registerQueryHeartbeat("node1", createQueryInfo("1", WAITING_FOR_PREREQUISITES, "root.rg4", GENERAL_POOL), query1Sequence++);
assertTrue(provider.getClusterResourceGroups("node1").isEmpty());
assertResourceGroup(provider, "node2", "root.rg4", 0, 0, DataSize.valueOf("1MB"));
provider.registerQueryHeartbeat("node2", createQueryInfo("2", QUEUED, "root.rg4", GENERAL_POOL), query2Sequence++);
assertResourceGroup(provider, "node1", "root.rg4", 1, 0, DataSize.valueOf("1MB"));
assertNonLeafResourceGroup(provider, "node1", "root", 0, 0, 1, 0);
assertResourceGroup(provider, "node2", "root.rg4", 0, 0, DataSize.valueOf("1MB"));
assertNonLeafResourceGroup(provider, "node2", "root", 0, 0, 0, 0);
assertResourceGroup(provider, "node3", "root.rg4", 1, 0, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node3", "root", 0, 0, 1, 0);
provider.registerQueryHeartbeat("node3", createQueryInfo("3", RUNNING, "root.rg4", GENERAL_POOL), query3Sequence++);
assertResourceGroup(provider, "node1", "root.rg4", 1, 1, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node1", "root", 0, 0, 1, 1);
assertResourceGroup(provider, "node2", "root.rg4", 0, 1, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node2", "root", 0, 0, 0, 1);
assertResourceGroup(provider, "node3", "root.rg4", 1, 0, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node3", "root", 0, 0, 1, 0);
assertResourceGroup(provider, "node4", "root.rg4", 1, 1, DataSize.valueOf("3MB"));
assertNonLeafResourceGroup(provider, "node4", "root", 0, 0, 1, 1);
provider.registerQueryHeartbeat("node4", createQueryInfo("4", FINISHED, "root.rg4", GENERAL_POOL), query4Sequence++);
assertResourceGroup(provider, "node1", "root.rg4", 1, 1, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node1", "root", 0, 0, 1, 1);
assertResourceGroup(provider, "node2", "root.rg4", 0, 1, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node2", "root", 0, 0, 0, 1);
assertResourceGroup(provider, "node3", "root.rg4", 1, 0, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node3", "root", 0, 0, 1, 0);
assertResourceGroup(provider, "node4", "root.rg4", 1, 1, DataSize.valueOf("3MB"));
assertNonLeafResourceGroup(provider, "node4", "root", 0, 0, 1, 1);
assertResourceGroup(provider, "node5", "root.rg4", 1, 1, DataSize.valueOf("3MB"));
assertNonLeafResourceGroup(provider, "node5", "root", 0, 0, 1, 1);
provider.registerQueryHeartbeat("node5", createQueryInfo("5", FAILED, "root.rg4", GENERAL_POOL), query5Sequence++);
assertResourceGroup(provider, "node1", "root.rg4", 1, 1, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node1", "root", 0, 0, 1, 1);
assertResourceGroup(provider, "node2", "root.rg4", 0, 1, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node2", "root", 0, 0, 0, 1);
assertResourceGroup(provider, "node3", "root.rg4", 1, 0, DataSize.valueOf("2MB"));
assertNonLeafResourceGroup(provider, "node3", "root", 0, 0, 1, 0);
assertResourceGroup(provider, "node4", "root.rg4", 1, 1, DataSize.valueOf("3MB"));
assertNonLeafResourceGroup(provider, "node4", "root", 0, 0, 1, 1);
assertResourceGroup(provider, "node5", "root.rg4", 1, 1, DataSize.valueOf("3MB"));
assertNonLeafResourceGroup(provider, "node5", "root", 0, 0, 1, 1);
assertResourceGroup(provider, "node6", "root.rg4", 1, 1, DataSize.valueOf("3MB"));
assertNonLeafResourceGroup(provider, "node6", "root", 0, 0, 1, 1);
// Add queries which are in non-terminal states other than RUNNING and QUEUED
provider.registerQueryHeartbeat("node1", createQueryInfo("6", WAITING_FOR_PREREQUISITES, "root.rg4", GENERAL_POOL), query5Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("7", WAITING_FOR_RESOURCES, "root.rg4", GENERAL_POOL), query5Sequence++);
provider.registerQueryHeartbeat("node2", createQueryInfo("8", DISPATCHING, "root.rg4", GENERAL_POOL), query6Sequence++);
provider.registerQueryHeartbeat("node3", createQueryInfo("9", PLANNING, "root.rg4", GENERAL_POOL), query7Sequence++);
provider.registerQueryHeartbeat("node4", createQueryInfo("10", STARTING, "root.rg4", GENERAL_POOL), query8Sequence++);
provider.registerQueryHeartbeat("node5", createQueryInfo("11", FINISHING, "root.rg4", GENERAL_POOL), query9Sequence++);
assertResourceGroup(provider, "node1", "root.rg4", 1, 5, DataSize.valueOf("6MB"));
assertNonLeafResourceGroup(provider, "node1", "root", 0, 0, 1, 5);
assertResourceGroup(provider, "node2", "root.rg4", 0, 5, DataSize.valueOf("7MB"));
assertNonLeafResourceGroup(provider, "node2", "root", 0, 0, 0, 5);
assertResourceGroup(provider, "node3", "root.rg4", 1, 4, DataSize.valueOf("7MB"));
assertNonLeafResourceGroup(provider, "node3", "root", 0, 0, 1, 4);
assertResourceGroup(provider, "node4", "root.rg4", 1, 5, DataSize.valueOf("8MB"));
assertNonLeafResourceGroup(provider, "node4", "root", 0, 0, 1, 5);
assertResourceGroup(provider, "node5", "root.rg4", 1, 5, DataSize.valueOf("8MB"));
assertNonLeafResourceGroup(provider, "node5", "root", 0, 0, 1, 5);
assertResourceGroup(provider, "node6", "root.rg4", 1, 6, DataSize.valueOf("9MB"));
assertNonLeafResourceGroup(provider, "node6", "root", 0, 0, 1, 6);
// Expire running queries
Thread.sleep(SECONDS.toMillis(5));
nodeManager.refreshNodes();
assertTrue(provider.getClusterResourceGroups("node1").isEmpty());
assertTrue(provider.getClusterResourceGroups("node2").isEmpty());
assertTrue(provider.getClusterResourceGroups("node3").isEmpty());
assertTrue(provider.getClusterResourceGroups("node4").isEmpty());
assertTrue(provider.getClusterResourceGroups("node5").isEmpty());
assertTrue(provider.getClusterResourceGroups("node6").isEmpty());
}
@Test(timeOut = 15_000)
public void testClusterMemoryPoolInfo()
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addNode(new ConnectorId("x"), new InternalNode("nodeId1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("nodeId2", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("nodeId3", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
nodeManager.addNode(new ConnectorId("x"), new InternalNode("nodeId4", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("4s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
// Memory pool starts off empty
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 0, 0, 0, 0, 0, Optional.empty());
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 0, 0, 0, Optional.empty());
// Create a node and heartbeat to the resource manager
provider.registerNodeHeartbeat(createNodeStatus("nodeId", GENERAL_POOL, createMemoryPoolInfo(100, 2, 1)));
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 0, 0, 100, 2, 1, Optional.empty());
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 0, 0, 0, Optional.empty());
// Register a query and heartbeat that to the resource manager
provider.registerQueryHeartbeat("nodeId1", createQueryInfo("1", QUEUED, "rg4", GENERAL_POOL), query1Sequence++);
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 1, 0, 100, 2, 1, Optional.of("1"));
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 0, 0, 0, Optional.empty());
// Create another node and heartbeat to the resource manager
provider.registerNodeHeartbeat(createNodeStatus("nodeId2", GENERAL_POOL, createMemoryPoolInfo(1000, 20, 10)));
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 1, 0, 1100, 22, 11, Optional.of("1"));
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 0, 0, 0, Optional.empty());
// Create a blocked node and heartbeat to the resource manager
provider.registerNodeHeartbeat(createNodeStatus("nodeId3", GENERAL_POOL, createMemoryPoolInfo(1, 2, 3)));
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 1, 1, 1101, 24, 14, Optional.of("1"));
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 0, 0, 0, Optional.empty());
// Create a node that has only reserved pool allocations
provider.registerNodeHeartbeat(createNodeStatus("nodeId4", RESERVED_POOL, createMemoryPoolInfo(5, 3, 2)));
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 1, 1, 1101, 24, 14, Optional.of("1"));
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 5, 3, 2, Optional.empty());
// Add a larger query and verify that the largest query is updated
provider.registerQueryHeartbeat("nodeId2", createQueryInfo("2", RUNNING, "rg4", GENERAL_POOL, DataSize.valueOf("25GB").toBytes()), query2Sequence++);
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 2, 1, 1101, 24, 14, Optional.of("2"));
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 5, 3, 2, Optional.empty());
// Adding a larger reserved pool query does not affect largest query in general pool
provider.registerQueryHeartbeat("nodeId1", createQueryInfo("3", RUNNING, "rg4", RESERVED_POOL, DataSize.valueOf("50GB").toBytes()), query3Sequence++);
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 2, 1, 1101, 24, 14, Optional.of("2"));
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 1, 0, 5, 3, 2, Optional.empty());
// Expire nodes
Thread.sleep(SECONDS.toMillis(5));
// All nodes expired, memory pools emptied
assertMemoryPoolMap(provider, 2, GENERAL_POOL, 0, 0, 0, 0, 0, Optional.empty());
assertMemoryPoolMap(provider, 2, RESERVED_POOL, 0, 0, 0, 0, 0, Optional.empty());
}
@DataProvider(name = "resourceRuntimeHeartbeat")
public static Object[][] resourceRuntimeHeartbeatTestData()
{
return new Object[][] {
{ImmutableMap.of(
"node1", ImmutableList.of(ResourceGroupRuntimeInfo.builder(new ResourceGroupId("global-user1"))
.addRunningQueries(2)
.addQueuedQueries(3)
.setResourceGroupSpecInfo(new ResourceGroupSpecInfo(20))
.build()),
"node2", ImmutableList.of(ResourceGroupRuntimeInfo.builder(new ResourceGroupId("global-user2"))
.addRunningQueries(5)
.addQueuedQueries(100)
.setResourceGroupSpecInfo(new ResourceGroupSpecInfo(20))
.build())
), 18},
{ImmutableMap.of(
"node1", ImmutableList.of(ResourceGroupRuntimeInfo.builder(new ResourceGroupId("global-user1"))
.addRunningQueries(2)
.addQueuedQueries(3)
.setResourceGroupSpecInfo(new ResourceGroupSpecInfo(20))
.build()),
"node2", ImmutableList.of(ResourceGroupRuntimeInfo.builder(new ResourceGroupId("global-user1"))
.addRunningQueries(5)
.addQueuedQueries(100)
.setResourceGroupSpecInfo(new ResourceGroupSpecInfo(20))
.build())
), 13}
};
}
@Test(timeOut = 15_000, dataProvider = "resourceRuntimeHeartbeat")
public void testAdjustedQueueSize(Map<String, List<ResourceGroupRuntimeInfo>> nodeHeartBeats, int expectedAdjustedQueueSize)
throws Exception
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeHeartBeats.keySet().stream().forEach(nodeIdentifier ->
nodeManager.addNode(new ConnectorId("x"), new InternalNode(nodeIdentifier, URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true)));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("4s"), Duration.valueOf("0s"), Duration.valueOf("20s"), true, newSingleThreadScheduledExecutor());
nodeHeartBeats.entrySet().stream().forEach(entry ->
provider.registerResourceGroupRuntimeHeartbeat(entry.getKey(), entry.getValue()));
Thread.sleep(SECONDS.toMillis(5));
assertEquals(provider.getAdjustedQueueSize(), expectedAdjustedQueueSize);
}
@Test(timeOut = 20_000)
public void testWorkerMemoryInfo()
throws Exception
{
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(new InMemoryNodeManager(), createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
assertWorkerMemoryInfo(provider, 0);
provider.registerNodeHeartbeat(createNodeStatus("nodeId", GENERAL_POOL, createMemoryPoolInfo(100, 2, 1)));
assertWorkerMemoryInfo(provider, 1);
provider.registerNodeHeartbeat(createNodeStatus("nodeId2", GENERAL_POOL, createMemoryPoolInfo(200, 20, 10)));
assertWorkerMemoryInfo(provider, 2);
// Node expiration timeout was set as 4 seconds
// Waiting for the expiration to cross the threshold + adding buffer to avoid flakiness
Thread.sleep(SECONDS.toMillis(10));
assertWorkerMemoryInfo(provider, 0);
}
@Test(timeOut = 15_000)
public void testShuttingDownCoordinatorHeartbeat()
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addShuttingDownNode(new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
assertEquals(provider.getClusterQueries(), ImmutableList.of());
long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
long query4Sequence = 0;
provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), query1Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), query2Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHED), query3Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("4", FAILED), query4Sequence++);
assertQueryInfos(provider.getClusterQueries(), 4, 2);
provider.registerQueryHeartbeat("node1", createQueryInfo("1", RUNNING), query1Sequence++);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", FINISHING), query2Sequence++);
assertQueryInfos(provider.getClusterQueries(), 4, 2);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", FINISHED), query2Sequence++);
assertQueryInfos(provider.getClusterQueries(), 4, 3);
}
@Test
public void testRunningTaskCount()
{
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
nodeManager.addShuttingDownNode(new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true));
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), Duration.valueOf("4s"), true, newSingleThreadScheduledExecutor());
assertEquals(provider.getRunningTaskCount(), 0);
long query1Sequence = 0;
long query2Sequence = 0;
long query3Sequence = 0;
long query4Sequence = 0;
provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), query1Sequence++);
assertEquals(provider.getRunningTaskCount(), 0);
provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), query2Sequence++);
assertEquals(provider.getRunningTaskCount(), 11);
provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHED), query3Sequence++);
assertEquals(provider.getRunningTaskCount(), 11);
provider.registerQueryHeartbeat("node1", createQueryInfo("4", FAILED), query4Sequence++);
assertEquals(provider.getRunningTaskCount(), 11);
}
@Test
public void testResourceGroupStatsExpiry()
throws Exception
{
Map<String, List<ResourceGroupRuntimeInfo>> resourceGroupStates = ImmutableMap.of(
"node1", ImmutableList.of(ResourceGroupRuntimeInfo.builder(new ResourceGroupId("global-user1"))
.addRunningQueries(2)
.addQueuedQueries(3)
.setResourceGroupSpecInfo(new ResourceGroupSpecInfo(20))
.build()),
"node2", ImmutableList.of(ResourceGroupRuntimeInfo.builder(new ResourceGroupId("global-user2"))
.addRunningQueries(5)
.addQueuedQueries(100)
.setResourceGroupSpecInfo(new ResourceGroupSpecInfo(20))
.build()));
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, createTestingSessionPropertyManager(), 10, Duration.valueOf("4s"),
Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("10s"), Duration.valueOf("10s"), true, newSingleThreadScheduledExecutor());
resourceGroupStates.entrySet().stream().forEach(entry -> provider.registerResourceGroupRuntimeHeartbeat(entry.getKey(), entry.getValue()));
Thread.sleep(SECONDS.toMillis(1));
assertEquals(provider.getAdjustedQueueSize(), 18);
// Expire existing resourceGroupStates.
Thread.sleep(SECONDS.toMillis(10));
provider.registerResourceGroupRuntimeHeartbeat("node1", resourceGroupStates.get("node1"));
Thread.sleep(SECONDS.toMillis(1));
assertEquals(provider.getAdjustedQueueSize(), 3);
}
void assertWorkerMemoryInfo(ResourceManagerClusterStateProvider provider, int count)
{
Map<String, MemoryInfo> workerMemoryInfo = provider.getWorkerMemoryInfo();
assertNotNull(workerMemoryInfo);
assertEquals(workerMemoryInfo.size(), count);
}
private NodeStatus createNodeStatus(String nodeId, MemoryPoolId memoryPoolId, MemoryPoolInfo memoryPoolInfo)
{
return new NodeStatus(
nodeId,
new NodeVersion("1"),
"environment",
false,
new Duration(1, SECONDS),
"http://externalAddress",
"http://internalAddress",
new MemoryInfo(new DataSize(1, MEGABYTE), ImmutableMap.of(memoryPoolId, memoryPoolInfo)),
1,
1.0,
2.0,
1,
2,
3);
}
private NodeStatus createCoordinatorNodeStatus(String nodeId)
{
return new NodeStatus(
nodeId,
new NodeVersion("1"),
"environment",
true,
new Duration(1, SECONDS),
"http://exernalAddress",
"http://internalAddress",
new MemoryInfo(new DataSize(1, MEGABYTE), ImmutableMap.of(GENERAL_POOL, createMemoryPoolInfo(100, 2, 1))),
1,
1.0,
2.0,
1,
2,
3);
}
private MemoryPoolInfo createMemoryPoolInfo(int maxBytes, int reservedBytes, int reservedRevocableBytes)
{
return new MemoryPoolInfo(
maxBytes,
reservedBytes,
reservedRevocableBytes,
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of());
}
private void assertQueryInfos(List<BasicQueryInfo> queryInfos, int count, int numberDone)
{
assertNotNull(queryInfos);
assertEquals(queryInfos.size(), count);
assertEquals(queryInfos.stream().filter(info -> info.getState().isDone()).count(), numberDone);
}
private void assertResourceGroups(ResourceManagerClusterStateProvider provider, String excludingNode, int count)
throws ResourceManagerInconsistentException
{
List<ResourceGroupRuntimeInfo> resourceGroups = provider.getClusterResourceGroups(excludingNode);
assertNotNull(resourceGroups);
assertEquals(resourceGroups.size(), count);
}
private void assertResourceGroup(ResourceManagerClusterStateProvider provider, String excludingNode, String resourceGroupId, int queuedQueries, int runningQueries, DataSize userMemoryReservation)
throws ResourceManagerInconsistentException
{
ResourceGroupId currResourceGroupId = new ResourceGroupId(Arrays.asList(resourceGroupId.split("\\.")));
List<ResourceGroupRuntimeInfo> list = provider.getClusterResourceGroups(excludingNode);
Optional<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo = list.stream()
.filter(resourceGroupInfo -> currResourceGroupId.equals(resourceGroupInfo.getResourceGroupId()))
.findFirst();
assertTrue(resourceGroupRuntimeInfo.isPresent(), "Resource group " + resourceGroupId + " not found");
ResourceGroupRuntimeInfo info = resourceGroupRuntimeInfo.get();
ResourceGroupId rg = new ResourceGroupId(Arrays.asList(resourceGroupId.split("\\.")));
assertEquals(info.getQueuedQueries(), queuedQueries, format("Expected %s queued queries, found %s", queuedQueries, info.getQueuedQueries()));
assertEquals(info.getRunningQueries(), runningQueries, format("Expected %s running queries, found %s", runningQueries, info.getRunningQueries()));
assertEquals(info.getResourceGroupId(), rg, format("Expected resource group id %s, found %s", resourceGroupId, info.getResourceGroupId()));
assertEquals(info.getMemoryUsageBytes(), userMemoryReservation.toBytes(), format("Expected %s user memory reservation found %s", userMemoryReservation, succinctBytes(info.getMemoryUsageBytes())));
}
private void assertNonLeafResourceGroup(ResourceManagerClusterStateProvider provider, String excludingNode, String resourceGroupId, int queuedQueries, int runningQueries, int descendantQueuedQueries, int descendantRunningQueries)
throws ResourceManagerInconsistentException
{
Optional<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo = provider.getClusterResourceGroups(excludingNode).stream()
.filter(resourceGroupInfo -> new ResourceGroupId(resourceGroupId).equals(resourceGroupInfo.getResourceGroupId()))
.findFirst();
assertTrue(resourceGroupRuntimeInfo.isPresent(), "Resource group " + resourceGroupId + " not found");
ResourceGroupRuntimeInfo info = resourceGroupRuntimeInfo.get();
assertEquals(info.getQueuedQueries(), queuedQueries, format("Expected %s queued queries, found %s", queuedQueries, info.getQueuedQueries()));
assertEquals(info.getRunningQueries(), runningQueries, format("Expected %s running queries, found %s", runningQueries, info.getRunningQueries()));
assertEquals(info.getDescendantQueuedQueries(), descendantQueuedQueries, format("Expected %s descendant queued queries, found %s", descendantQueuedQueries, info.getDescendantQueuedQueries()));
assertEquals(info.getDescendantRunningQueries(), descendantRunningQueries, format("Expected %s descendant running queries, found %s", descendantRunningQueries, info.getDescendantRunningQueries()));
assertEquals(info.getResourceGroupId(), new ResourceGroupId(resourceGroupId), format("Expected resource group id %s, found %s", resourceGroupId, info.getResourceGroupId()));
}
private void assertMemoryPoolMap(ResourceManagerClusterStateProvider provider, int memoryPoolSize, MemoryPoolId memoryPoolId, int assignedQueries, int blockedNodes, int maxBytes, int reservedBytes, int reservedRevocableBytes, Optional<String> largestMemoryQuery)
{
Map<MemoryPoolId, ClusterMemoryPoolInfo> memoryPoolMap = provider.getClusterMemoryPoolInfo();
assertNotNull(memoryPoolMap);
assertEquals(memoryPoolMap.size(), memoryPoolSize);
ClusterMemoryPoolInfo clusterMemoryPoolInfo = memoryPoolMap.get(memoryPoolId);
assertNotNull(clusterMemoryPoolInfo);
assertEquals(clusterMemoryPoolInfo.getAssignedQueries(), assignedQueries);
assertEquals(clusterMemoryPoolInfo.getBlockedNodes(), blockedNodes);
assertEquals(clusterMemoryPoolInfo.getMemoryPoolInfo().getMaxBytes(), maxBytes);
assertEquals(clusterMemoryPoolInfo.getMemoryPoolInfo().getReservedBytes(), reservedBytes);
assertEquals(clusterMemoryPoolInfo.getMemoryPoolInfo().getReservedRevocableBytes(), reservedRevocableBytes);
assertEquals(clusterMemoryPoolInfo.getLargestMemoryQuery().map(QueryId::getId), largestMemoryQuery);
}
private static BasicQueryInfo createQueryInfo(String queryId, QueryState state)
{
return createQueryInfo(queryId, state, "global", GENERAL_POOL);
}
private static BasicQueryInfo createQueryInfo(String queryId, QueryState state, String resourceGroupId, MemoryPoolId memoryPool)
{
return createQueryInfo(queryId, state, resourceGroupId, memoryPool, DataSize.valueOf("24GB").toBytes());
}
private static BasicQueryInfo createQueryInfo(String queryId, QueryState state, String resourceGroupIdString, MemoryPoolId memoryPool, long totalMemoryReservation)
{
ResourceGroupId resourceGroupId = new ResourceGroupId(Arrays.asList(resourceGroupIdString.split("\\.")));
return new BasicQueryInfo(
new QueryId(queryId),
TEST_SESSION.toSessionRepresentation(),
Optional.of(resourceGroupId),
state,
memoryPool,
true,
URI.create("1"),
"",
new BasicQueryStats(
new DateTime("1991-09-06T05:00").getMillis(),
new DateTime("1991-09-06T05:01").getMillis(),
Duration.valueOf("6m"),
Duration.valueOf("8m"),
Duration.valueOf("7m"),
Duration.valueOf("34m"),
Duration.valueOf("10m"),
11,
12,
13,
14,
15,
100,
13,
14,
15,
100,
13,
14,
15,
100,
DataSize.valueOf("21GB"),
22,
23,
24,
DataSize.valueOf("1MB"),
succinctBytes(totalMemoryReservation),
DataSize.valueOf("25GB"),
DataSize.valueOf("26GB"),
DataSize.valueOf("27GB"),
DataSize.valueOf("28GB"),
Duration.valueOf("23m"),
Duration.valueOf("24m"),
true,
ImmutableSet.of(WAITING_FOR_MEMORY),
DataSize.valueOf("123MB"),
OptionalDouble.of(20)),
null,
Optional.empty(),
ImmutableList.of(),
Optional.empty());
}
}