TestPlacementConstraintsUtil.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.maxCardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.GenericDiagnosticsCollector;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.mockito.Mockito;
/**
* Test the PlacementConstraint Utility class functionality.
*/
public class TestPlacementConstraintsUtil {
private List<RMNode> rmNodes;
private RMContext rmContext;
private static final int GB = 1024;
private ApplicationId appId1;
private PlacementConstraint c1, c2, c3, c4, c5, c6, c7;
private Set<String> sourceTag1, sourceTag2;
private Map<Set<String>, PlacementConstraint> constraintMap1,
constraintMap2, constraintMap3, constraintMap4;
private AtomicLong requestID = new AtomicLong(0);
@Before
public void setup() {
MockRM rm = new MockRM();
rm.start();
MockNodes.resetHostIds();
rmNodes = MockNodes.newNodes(2, 2, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) {
rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
}
rmContext = rm.getRMContext();
// Build appIDs, constraints, source tags, and constraint map.
long ts = System.currentTimeMillis();
appId1 = BuilderUtils.newApplicationId(ts, 123);
c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
c3 = PlacementConstraints
.build(targetNotIn(NODE, allocationTag("hbase-m")));
c4 = PlacementConstraints
.build(targetNotIn(RACK, allocationTag("hbase-rs")));
c5 = PlacementConstraints
.build(and(targetNotIn(NODE, allocationTag("hbase-m")),
maxCardinality(NODE, 3, "spark")));
c6 = PlacementConstraints
.build(or(targetIn(NODE, allocationTag("hbase-m")),
targetIn(NODE, allocationTag("hbase-rs"))));
c7 = PlacementConstraints
.build(or(targetIn(NODE, allocationTag("hbase-m")),
and(targetIn(NODE, allocationTag("hbase-rs")),
targetIn(NODE, allocationTag("spark")))));
sourceTag1 = new HashSet<>(Arrays.asList("spark"));
sourceTag2 = new HashSet<>(Arrays.asList("zk"));
constraintMap1 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c1),
new AbstractMap.SimpleEntry<>(sourceTag2, c2))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
constraintMap2 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c3),
new AbstractMap.SimpleEntry<>(sourceTag2, c4))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
constraintMap3 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c5))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
constraintMap4 = Stream
.of(new AbstractMap.SimpleEntry<>(sourceTag1, c6),
new AbstractMap.SimpleEntry<>(sourceTag2, c7))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey,
AbstractMap.SimpleEntry::getValue));
}
private SchedulingRequest createSchedulingRequest(Set<String> allocationTags,
PlacementConstraint constraint) {
return SchedulingRequest
.newInstance(requestID.incrementAndGet(),
Priority.newInstance(0),
ExecutionTypeRequest.newInstance(),
allocationTags,
ResourceSizing.newInstance(Resource.newInstance(1024, 3)),
constraint);
}
private SchedulingRequest createSchedulingRequest(Set<String>
allocationTags) {
return createSchedulingRequest(allocationTags, null);
}
private ContainerId newContainerId(ApplicationId appId) {
return ContainerId.newContainerId(
ApplicationAttemptId.newInstance(appId, 0), 0);
}
private ContainerId newContainerId(ApplicationId appId, int containerId) {
return ContainerId.newContainerId(
ApplicationAttemptId.newInstance(appId, 0), containerId);
}
private SchedulerNode newSchedulerNode(String hostname, String rackName,
NodeId nodeId) {
SchedulerNode node = mock(SchedulerNode.class);
when(node.getNodeName()).thenReturn(hostname);
when(node.getRackName()).thenReturn(rackName);
when(node.getNodeID()).thenReturn(nodeId);
return node;
}
@Test
public void testNodeAffinityAssignment()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
// Register App1 with affinity constraint map
pcm.registerApplication(appId1, constraintMap1);
// No containers are running so all 'zk' and 'spark' allocations should fail
// on every cluster NODE
Iterator<RMNode> nodeIterator = rmNodes.iterator();
while (nodeIterator.hasNext()) {
RMNode currentNode = nodeIterator.next();
SchedulerNode schedulerNode =newSchedulerNode(currentNode.getHostName(),
currentNode.getRackName(), currentNode.getNodeID());
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode, pcm, tm));
}
/**
* Now place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-m)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
// FAIL on the rest of the nodes
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
// Test diagnostics collector
DiagnosticsCollector collector =
new GenericDiagnosticsCollector();
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm,
Optional.of(collector)));
Assert.assertNotNull(collector.getDiagnostics());
Assert.assertTrue(collector.getDiagnostics().contains("ALLOCATION_TAG"));
}
@Test
public void testMultiTagsPlacementConstraints()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
HashSet<String> st1 = new HashSet<>(Arrays.asList("X"));
HashSet<String> st2 = new HashSet<>(Arrays.asList("Y"));
// X anti-affinity with A and B
PlacementConstraint pc1 = PlacementConstraints.build(
targetNotIn(NODE, allocationTag("A", "B")));
// Y affinity with A and B
PlacementConstraint pc2 = PlacementConstraints.build(
targetIn(NODE, allocationTag("A", "B")));
Map<Set<String>, PlacementConstraint> constraintMap =
ImmutableMap.of(st1, pc1, st2, pc2);
// Register App1 with affinity constraint map
pcm.registerApplication(appId1, constraintMap);
/**
* Now place container:
* n0: A(1)
* n1: B(1)
* n2:
* n3:
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
ContainerId ca = newContainerId(appId1, 0);
tm.addContainer(n0_r1.getNodeID(), ca, ImmutableSet.of("A"));
ContainerId cb = newContainerId(appId1, 1);
tm.addContainer(n1_r1.getNodeID(), cb, ImmutableSet.of("B"));
// n0 and n1 has A/B so they cannot satisfy the PC
// n2 and n3 doesn't have A or B, so they can satisfy the PC
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st1), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st1), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st1), schedulerNode3, pcm, tm));
/**
* Now place container:
* n0: A(1)
* n1: B(1)
* n2: A(1), B(1)
* n3:
*/
ContainerId ca1 = newContainerId(appId1, 2);
tm.addContainer(n2_r2.getNodeID(), ca1, ImmutableSet.of("A"));
ContainerId cb1 = newContainerId(appId1, 3);
tm.addContainer(n2_r2.getNodeID(), cb1, ImmutableSet.of("B"));
// Only n2 has both A and B so only it can satisfy the PC
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st2), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st2), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st2), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(st2), schedulerNode3, pcm, tm));
}
@Test
public void testRackAffinityAssignment()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
// Register App1 with affinity constraint map
pcm.registerApplication(appId1, constraintMap1);
/**
* Now place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-rs)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
// 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 'zk' placement on Rack1 should now SUCCEED
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
// FAIL on the rest of the RACKs
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
}
@Test
public void testNodeAntiAffinityAssignment()
throws InvalidAllocationTagsQueryException {
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
// Register App1 with anti-affinity constraint map
pcm.registerApplication(appId1, constraintMap2);
/**
* place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-m)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 1 Containers on node 0 with allocationTag 'hbase-m'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-m"));
// 'spark' placement on Node0 should now FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
// SUCCEED on the rest of the nodes
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
}
@Test
public void testRackAntiAffinityAssignment()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
// Register App1 with anti-affinity constraint map
pcm.registerApplication(appId1, constraintMap2);
/**
* Place container:
* Node0:123 (Rack1):
* container_app1_1 (hbase-rs)
*/
RMNode n0_r1 = rmNodes.get(0);
RMNode n1_r1 = rmNodes.get(1);
RMNode n2_r2 = rmNodes.get(2);
RMNode n3_r2 = rmNodes.get(3);
// 1 Containers on Node0-Rack1 with allocationTag 'hbase-rs'
ContainerId hbase_m = ContainerId
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs"));
SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(),
n0_r1.getRackName(), n0_r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(),
n1_r1.getRackName(), n1_r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(),
n2_r2.getRackName(), n2_r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
n3_r2.getRackName(), n3_r2.getNodeID());
// 'zk' placement on Rack1 should FAIL
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
// SUCCEED on the rest of the RACKs
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
}
@Test
public void testORConstraintAssignment()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
// Register App1 with anti-affinity constraint map.
pcm.registerApplication(appId1, constraintMap4);
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-rs(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(appId1, 1), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(appId1, 2), ImmutableSet.of("hbase-rs"));
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
.get("hbase-m").longValue());
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
.get("hbase-rs").longValue());
SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
// n0 and n2 should be qualified for allocation as
// they either have hbase-m or hbase-rs tag
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-rs(1)
* n3: hbase-rs(1)
*/
tm.addContainer(n3r2.getNodeID(),
newContainerId(appId1, 2), ImmutableSet.of("hbase-rs"));
// n3 is qualified now because it is allocated with hbase-rs tag
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-rs(1), spark(1)
* n3: hbase-rs(1)
*/
// Place
tm.addContainer(n2r2.getNodeID(),
newContainerId(appId1, 3), ImmutableSet.of("spark"));
// According to constraint, "zk" is allowed to be placed on a node
// has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags.
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm));
}
@Test
public void testANDConstraintAssignment()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
// Register App1 with anti-affinity constraint map.
pcm.registerApplication(appId1, constraintMap3);
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: hbase-m(1)
* n1: ""
* n2: hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(appId1, 0), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(appId1, 1), ImmutableSet.of("hbase-m"));
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
.get("hbase-m").longValue());
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
.get("hbase-m").longValue());
SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
// Anti-affinity with hbase-m so it should not be able to be placed
// onto n0 and n2 as they already have hbase-m allocated.
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
/**
* Place container:
* n0: hbase-m(1)
* n1: spark(3)
* n2: hbase-m(1)
* n3: ""
*/
for (int i=0; i<4; i++) {
tm.addContainer(n1r1.getNodeID(),
newContainerId(appId1, i+2), ImmutableSet.of("spark"));
}
Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID())
.get("spark").longValue());
// Violate cardinality constraint
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
}
@Test
public void testGlobalAppConstraints()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: app1/A(1), app2/A(1)
* n1: app3/A(3)
* n2: app1/A(2)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application1, 0), ImmutableSet.of("A"));
tm.addContainer(n0r1.getNodeID(),
newContainerId(application2, 1), ImmutableSet.of("A"));
tm.addContainer(n1r1.getNodeID(),
newContainerId(application3, 2), ImmutableSet.of("A"));
tm.addContainer(n1r1.getNodeID(),
newContainerId(application3, 3), ImmutableSet.of("A"));
tm.addContainer(n1r1.getNodeID(),
newContainerId(application3, 4), ImmutableSet.of("A"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1, 5), ImmutableSet.of("A"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1, 6), ImmutableSet.of("A"));
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
TargetApplicationsNamespace namespaceAll =
new TargetApplicationsNamespace.All();
//***************************
// 1) all, anti-affinity
//***************************
// Anti-affinity with "A" from any application including itself.
PlacementConstraint constraint1 = PlacementConstraints.targetNotIn(
NODE, allocationTagWithNamespace(namespaceAll.toString(), "A"))
.build();
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
Set<String> srcTags1 = ImmutableSet.of("A");
constraintMap.put(srcTags1, constraint1);
pcm.registerApplication(application1, constraintMap);
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application1);
//***************************
// 2) all, max cardinality
//***************************
PlacementConstraint constraint2 = PlacementConstraints
.maxCardinality(NODE, namespaceAll.toString(), 2, "A")
.build();
constraintMap.clear();
Set<String> srcTags2 = ImmutableSet.of("foo");
constraintMap.put(srcTags2, constraint2);
pcm.registerApplication(application2, constraintMap);
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application2);
//***************************
// 3) all, min cardinality
//***************************
PlacementConstraint constraint3 = PlacementConstraints
.minCardinality(NODE, namespaceAll.toString(), 3, "A")
.build();
constraintMap.clear();
Set<String> srcTags3 = ImmutableSet.of("foo");
constraintMap.put(srcTags3, constraint3);
pcm.registerApplication(application3, constraintMap);
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application3, createSchedulingRequest(srcTags3),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application3, createSchedulingRequest(srcTags3),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application3, createSchedulingRequest(srcTags3),
schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application3, createSchedulingRequest(srcTags3),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application3);
}
@Test
public void testNotSelfAppConstraints()
throws InvalidAllocationTagsQueryException {
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>();
allApps.put(application1, new MockRMApp(123, 1000,
RMAppState.NEW, "userA", ImmutableSet.of("")));
allApps.put(application2, new MockRMApp(124, 1001,
RMAppState.NEW, "userA", ImmutableSet.of("")));
allApps.put(application3, new MockRMApp(125, 1002,
RMAppState.NEW, "userA", ImmutableSet.of("")));
RMContext mockedContext = Mockito.spy(rmContext);
when(mockedContext.getRMApps()).thenReturn(allApps);
AllocationTagsManager tm = new AllocationTagsManager(mockedContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
mockedContext.setAllocationTagsManager(tm);
mockedContext.setPlacementConstraintManager(pcm);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: app1/A(1), app2/A(1)
* n1: app3/A(3)
* n2: app1/A(2)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application1), ImmutableSet.of("A"));
tm.addContainer(n0r1.getNodeID(),
newContainerId(application2), ImmutableSet.of("A"));
tm.addContainer(n1r1.getNodeID(),
newContainerId(application3), ImmutableSet.of("A"));
tm.addContainer(n1r1.getNodeID(),
newContainerId(application3), ImmutableSet.of("A"));
tm.addContainer(n1r1.getNodeID(),
newContainerId(application3), ImmutableSet.of("A"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1), ImmutableSet.of("A"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1), ImmutableSet.of("A"));
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
TargetApplicationsNamespace notSelf =
new TargetApplicationsNamespace.NotSelf();
//***************************
// 1) not-self, app1
//***************************
// Anti-affinity with "A" from app2 and app3,
// n0 and n1 both have tag "A" from either app2 or app3, so they are
// not qualified for the placement.
PlacementConstraint constraint1 = PlacementConstraints.targetNotIn(
NODE, allocationTagWithNamespace(notSelf.toString(), "A"))
.build();
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
Set<String> srcTags1 = ImmutableSet.of("A");
constraintMap.put(srcTags1, constraint1);
pcm.registerApplication(application1, constraintMap);
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode0, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags1),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application1);
//***************************
// 2) not-self, app1
//***************************
// Affinity with "A" from app2 and app3,
// N0 and n1 are qualified for the placement.
PlacementConstraint constraint2 = PlacementConstraints.targetIn(
NODE, allocationTagWithNamespace(notSelf.toString(), "A"))
.build();
Map<Set<String>, PlacementConstraint> cm2 = new HashMap<>();
Set<String> srcTags2 = ImmutableSet.of("A");
cm2.put(srcTags2, constraint2);
pcm.registerApplication(application1, cm2);
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags2),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags2),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags2),
schedulerNode2, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application1, createSchedulingRequest(srcTags2),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application1);
}
@Test
public void testInterAppConstraintsByAppID()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 124);
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 125);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: app1/hbase-m(1)
* n1: ""
* n2: app1/hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application1, 0), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1, 1), ImmutableSet.of("hbase-m"));
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
.get("hbase-m").longValue());
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
.get("hbase-m").longValue());
SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.AppID(application1);
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
PlacementConstraint constraint2 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
"hbase-m"))
.build();
Set<String> srcTags2 = new HashSet<>();
srcTags2.add("app2");
constraintMap.put(srcTags2, constraint2);
pcm.registerApplication(application2, constraintMap);
// Anti-affinity with app1/hbase-m so it should not be able to be placed
// onto n0 and n2 as they already have hbase-m allocated.
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode3, pcm, tm));
// Intra-app constraint
// Test with default and empty namespace
TargetApplicationsNamespace self = new TargetApplicationsNamespace.Self();
PlacementConstraint constraint3 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(self.toString(),
"hbase-m"))
.build();
Set<String> srcTags3 = new HashSet<>();
srcTags3.add("app3");
constraintMap.put(srcTags3, constraint3);
pcm.registerApplication(application3, constraintMap);
/**
* Place container:
* n0: app1/hbase-m(1), app3/hbase-m
* n1: ""
* n2: app1/hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application3, 0), ImmutableSet.of("hbase-m"));
// Anti-affinity to self/hbase-m
Assert.assertFalse(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application3);
}
@Test
public void testInterAppConstriantsByAppTag()
throws InvalidAllocationTagsQueryException {
ApplicationId application1 = BuilderUtils.newApplicationId(1000, 123);
ApplicationId application2 = BuilderUtils.newApplicationId(1001, 124);
// app1: test-tag
// app2: N/A
RMContext mockedContext = Mockito.spy(rmContext);
ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>();
allApps.put(application1, new MockRMApp(123, 1000,
RMAppState.NEW, "userA", ImmutableSet.of("test-tag")));
allApps.put(application2, new MockRMApp(124, 1001,
RMAppState.NEW, "userA", ImmutableSet.of("")));
when(mockedContext.getRMApps()).thenReturn(allApps);
AllocationTagsManager tm = new AllocationTagsManager(mockedContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
mockedContext.setAllocationTagsManager(tm);
mockedContext.setPlacementConstraintManager(pcm);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: app1/hbase-m(1)
* n1: ""
* n2: app1/hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application1), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1), ImmutableSet.of("hbase-m"));
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.AppTag("test-tag");
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
PlacementConstraint constraint2 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
"hbase-m"))
.build();
Set<String> srcTags2 = ImmutableSet.of("app2");
constraintMap.put(srcTags2, constraint2);
pcm.registerApplication(application2, constraintMap);
// Anti-affinity with app-tag/test-tag/hbase-m,
// app1 has tag "test-tag" so the constraint is equally to work on app1
// onto n1 and n3 as they don't have "hbase-m" from app1.
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application1);
pcm.unregisterApplication(application2);
}
@Test
public void testInvalidAllocationTagNamespace() {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
RMNode n0r1 = rmNodes.get(0);
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
PlacementConstraint constraint1 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace("unknown_namespace",
"hbase-m"))
.build();
Set<String> srcTags1 = new HashSet<>();
srcTags1.add("app1");
try {
PlacementConstraintsUtil.canSatisfyConstraints(application1,
createSchedulingRequest(srcTags1, constraint1), schedulerNode0,
pcm, tm);
Assert.fail("This should fail because we gave an invalid namespace");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
Assert.assertTrue(e.getMessage()
.contains("Invalid namespace prefix: unknown_namespace"));
}
}
}