TestSchedulingRequestContainerAllocation.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentMap;

import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
 * Test Container Allocation with SchedulingRequest.
 */
public class TestSchedulingRequestContainerAllocation {
  private static final int GB = 1024;
  private YarnConfiguration conf;
  private String placementConstraintHandler;
  RMNodeLabelsManager mgr;

  public static Collection<Object[]> placementConstarintHandlers() {
    Object[][] params = new Object[][]{
            {YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER},
            {YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER}};
    return Arrays.asList(params);
  }

  private void initTestSchedulingRequestContainerAllocation(
      String pPlacementConstraintHandler) throws Exception {
    this.placementConstraintHandler = pPlacementConstraintHandler;
    setUp();
  }

  public void setUp() throws Exception {
    conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
        this.placementConstraintHandler);
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
  }

  private RMApp submitApp(MockRM rm, int memory, Set<String> appTags)
      throws Exception {
    Resource resource = Resource.newInstance(memory, 0);
    ResourceRequest amResourceRequest = ResourceRequest.newInstance(
        Priority.newInstance(0), ResourceRequest.ANY, resource, 1);
    List<ResourceRequest> amResourceRequests =
        Collections.singletonList(amResourceRequest);
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithResource(resource, rm)
            .withAmLabel(null)
            .withAmResourceRequests(amResourceRequests)
            .withApplicationTags(appTags)
            .build();
    return MockRMAppSubmitter.submit(rm, data);
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testIntraAppAntiAffinity(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();

    // 4 NMs.
    MockNM[] nms = new MockNM[4];
    RMNode[] rmNodes = new RMNode[4];
    for (int i = 0; i < 4; i++) {
      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
    }

    // app1 -> c
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);

    // app1 asks for 10 anti-affinity containers for the same app. It should
    // only get 4 containers allocated because we only have 4 nodes.
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(10, Resource.newInstance(1024, 1)),
        Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");

    List<Container> allocated = waitForAllocation(4, 3000, am1, nms);
    assertEquals(4, allocated.size());
    assertEquals(4, getContainerNodesNum(allocated));

    // Similarly, app1 asks 10 anti-affinity containers at different priority,
    // it should be satisfied as well.
    // app1 asks for 10 anti-affinity containers for the same app. It should
    // only get 4 containers allocated because we only have 4 nodes.
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
        Priority.newInstance(2), 1L, ImmutableSet.of("reducer"), "reducer");

    allocated = waitForAllocation(4, 3000, am1, nms);
    assertEquals(4, allocated.size());
    assertEquals(4, getContainerNodesNum(allocated));

    // Test anti-affinity to both of "mapper/reducer", we should only get no
    // container allocated
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(10, Resource.newInstance(2048, 1)),
        Priority.newInstance(3), 1L, ImmutableSet.of("reducer2"), "mapper");

    boolean caughtException = false;
    try {
      allocated = waitForAllocation(1, 3000, am1, nms);
    } catch (Exception e) {
      caughtException = true;
    }
    assertTrue(caughtException);

    rm1.close();
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testIntraAppAntiAffinityWithMultipleTags(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();

    // 4 NMs.
    MockNM[] nms = new MockNM[4];
    RMNode[] rmNodes = new RMNode[4];
    for (int i = 0; i < 4; i++) {
      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
    }

    // app1 -> c
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);

    // app1 asks for 2 anti-affinity containers for the same app.
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
        Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
        "tag_1_1", "tag_1_2");

    List<Container> allocated = waitForAllocation(2, 3000, am1, nms);
    assertEquals(2, allocated.size());
    assertEquals(2, getContainerNodesNum(allocated));

    // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
    // to tag_1_1/tag_1_2. With allocation_tag = tag_2_1/tag_2_2
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
        Priority.newInstance(2), 1L, ImmutableSet.of("tag_2_1", "tag_2_2"),
        "tag_1_1", "tag_1_2");

    List<Container> allocated1 = waitForAllocation(1, 3000, am1, nms);
    assertEquals(1, allocated1.size());
    allocated.addAll(allocated1);
    assertEquals(3, getContainerNodesNum(allocated));

    // app1 asks for 1 anti-affinity containers for the same app. anti-affinity
    // to tag_1_1/tag_1_2/tag_2_1/tag_2_2. With allocation_tag = tag_3
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
        Priority.newInstance(3), 1L, ImmutableSet.of("tag_3"),
        "tag_1_1", "tag_1_2", "tag_2_1", "tag_2_2");

    allocated1 = waitForAllocation(1, 3000, am1, nms);
    assertEquals(1, allocated1.size());
    allocated.addAll(allocated1);
    assertEquals(4, getContainerNodesNum(allocated));

    rm1.close();
  }

  /**
   * This UT covers some basic end-to-end inter-app anti-affinity
   * constraint tests. For comprehensive tests over different namespace
   * types, see more in TestPlacementConstraintsUtil.
   * @throws Exception
   */
  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testInterAppAntiAffinity(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();

    // 4 NMs.
    MockNM[] nms = new MockNM[4];
    RMNode[] rmNodes = new RMNode[4];
    for (int i = 0; i < 4; i++) {
      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
    }

    // app1 -> c
    MockRMAppSubmissionData data2 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);

    // app1 asks for 3 anti-affinity containers for the same app. It should
    // only get 3 containers allocated to 3 different nodes..
    am1.allocateIntraAppAntiAffinity(
        ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
        Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");

    List<Container> allocated = waitForAllocation(3, 3000, am1, nms);
    assertEquals(3, allocated.size());
    assertEquals(3, getContainerNodesNum(allocated));

    System.out.println("Mappers on HOST0: "
        + rmNodes[0].getAllocationTagsWithCount().get("mapper"));
    System.out.println("Mappers on HOST1: "
        + rmNodes[1].getAllocationTagsWithCount().get("mapper"));
    System.out.println("Mappers on HOST2: "
        + rmNodes[2].getAllocationTagsWithCount().get("mapper"));

    // app2 -> c
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);

    // App2 asks for 3 containers that anti-affinity with any mapper,
    // since 3 out of 4 nodes already have mapper containers, all 3
    // containers will be allocated on the other node.
    TargetApplicationsNamespace.All allNs =
        new TargetApplicationsNamespace.All();
    am2.allocateAppAntiAffinity(
        ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
        Priority.newInstance(1), 1L, allNs.toString(),
        ImmutableSet.of("foo"), "mapper");

    List<Container> allocated1 = waitForAllocation(3, 3000, am2, nms);
    assertEquals(3, allocated1.size());
    assertEquals(1, getContainerNodesNum(allocated1));
    allocated.addAll(allocated1);
    assertEquals(4, getContainerNodesNum(allocated));


    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
        am2.getApplicationAttemptId());

    // The allocated node should not have mapper tag.
    assertTrue(schedulerApp2.getLiveContainers()
        .stream().allMatch(rmContainer -> {
          // except the nm host
          if (!rmContainer.getContainer().getNodeId().equals(rmNodes[0])) {
            return !rmContainer.getAllocationTags().contains("mapper");
          }
          return true;
        }));

    // app3 -> c
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nms[0]);

    // App3 asks for 3 containers that anti-affinity with any mapper.
    // Unlike the former case, since app3 source tags are also mapper,
    // it will anti-affinity with itself too. So there will be only 1
    // container be allocated.
    am3.allocateAppAntiAffinity(
        ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
        Priority.newInstance(1), 1L, allNs.toString(),
        ImmutableSet.of("mapper"), "mapper");


    allocated1 = waitForAllocation(1, 3000, am3, nms);
    assertEquals(1, allocated1.size());
    allocated.addAll(allocated1);
    assertEquals(4, getContainerNodesNum(allocated));

    rm1.close();
  }

  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testSchedulingRequestDisabledByDefault(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
        new Configuration());

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();

    // 4 NMs.
    MockNM[] nms = new MockNM[4];
    RMNode[] rmNodes = new RMNode[4];
    for (int i = 0; i < 4; i++) {
      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
    }

    // app1 -> c
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);

    // app1 asks for 2 anti-affinity containers for the same app.
    boolean caughtException = false;
    try {
      // Since feature is disabled by default, we should expect exception.
      am1.allocateIntraAppAntiAffinity(
          ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
          Priority.newInstance(1), 1L, ImmutableSet.of("tag_1_1", "tag_1_2"),
          "tag_1_1", "tag_1_2");
    } catch (Exception e) {
      caughtException = true;
    }
    assertTrue(caughtException);
    rm1.close();
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testSchedulingRequestWithNullConstraint(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);

    // inject node label manager
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();

    // 4 NMs.
    MockNM[] nms = new MockNM[4];
    RMNode[] rmNodes = new RMNode[4];
    for (int i = 0; i < 4; i++) {
      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
    }

    // app1 -> c
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();

    PlacementConstraint constraint = targetNotIn("node", allocationTag("t1"))
        .build();
    SchedulingRequest sc = SchedulingRequest
        .newInstance(0, Priority.newInstance(1),
            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
            ImmutableSet.of("t1"),
            ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
            constraint);
    AllocateRequest request = AllocateRequest.newBuilder()
        .schedulingRequests(ImmutableList.of(sc)).build();
    am1.allocate(request);

    List<Container> allocated = waitForAllocation(1, 3000, am1, nms);
    assertEquals(1, allocated.size());

    // Send another request with null placement constraint,
    // ensure there is no NPE while handling this request.
    sc = SchedulingRequest
        .newInstance(1, Priority.newInstance(1),
            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
            ImmutableSet.of("t2"),
            ResourceSizing.newInstance(2, Resource.newInstance(1024, 1)),
            null);
    AllocateRequest request1 = AllocateRequest.newBuilder()
        .schedulingRequests(ImmutableList.of(sc)).build();
    am1.allocate(request1);

    allocated = waitForAllocation(2, 3000, am1, nms);
    assertEquals(2, allocated.size());

    rm1.close();
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testInvalidSchedulingRequest(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
    MockRM rm1 = new MockRM(csConf) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();

    // 4 NMs.
    MockNM[] nms = new MockNM[4];
    RMNode[] rmNodes = new RMNode[4];
    for (int i = 0; i < 4; i++) {
      nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
      rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
    }

    MockRMAppSubmissionData submissionData =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
        .withAppName("app")
        .withUser("user")
        .withAcls(null)
        .withQueue("c")
        .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, submissionData);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);

    // Constraint with Invalid Allocation Tag Namespace
    PlacementConstraint constraint = targetNotIn("node",
        allocationTagWithNamespace("invalid", "t1")).build();
    SchedulingRequest sc = SchedulingRequest
        .newInstance(1, Priority.newInstance(1),
        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
        ImmutableSet.of("t1"),
        ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
        constraint);
    AllocateRequest request = AllocateRequest.newBuilder()
        .schedulingRequests(ImmutableList.of(sc)).build();
    am1.allocate(request);

    try {
      GenericTestUtils.waitFor(() -> {
        try {
          doNodeHeartbeat(nms);
          AllocateResponse response = am1.schedule();
          return response.getRejectedSchedulingRequests().size() == 1;
        } catch (Exception e) {
          return false;
        }
      }, 500, 20000);
    } catch (Exception e) {
      fail("Failed to reject invalid scheduling request");
    }
    rm1.stop();
  }

  private static void doNodeHeartbeat(MockNM... nms) throws Exception {
    for (MockNM nm : nms) {
      nm.nodeHeartbeat(true);
    }
  }

  public static List<Container> waitForAllocation(int allocNum, int timeout,
      MockAM am, MockNM... nms) throws Exception {
    final List<Container> result = new ArrayList<>();
    GenericTestUtils.waitFor(() -> {
      try {
        AllocateResponse response = am.schedule();
        List<Container> allocated = response.getAllocatedContainers();
        System.out.println("Expecting allocation: " + allocNum
            + ", actual allocation: " + allocated.size());
        for (Container c : allocated) {
          System.out.println("Container " + c.getId().toString()
              + " is allocated on node: " + c.getNodeId().toString()
              + ", allocation tags: "
              + String.join(",", c.getAllocationTags()));
        }
        result.addAll(allocated);
        if (result.size() == allocNum) {
          return true;
        }
        doNodeHeartbeat(nms);
      } catch (Exception e) {
        e.printStackTrace();
      }
      return false;
    }, 500, timeout);
    return result;
  }

  private static SchedulingRequest schedulingRequest(int requestId,
      int containers, int cores, int mem, PlacementConstraint constraint,
      String... tags) {
    return schedulingRequest(1, requestId, containers, cores, mem,
        ExecutionType.GUARANTEED, constraint, tags);
  }

  private static SchedulingRequest schedulingRequest(
      int priority, long allocReqId, int containers, int cores, int mem,
      ExecutionType execType, PlacementConstraint constraint, String... tags) {
    return SchedulingRequest.newBuilder()
        .priority(Priority.newInstance(priority))
        .allocationRequestId(allocReqId)
        .allocationTags(new HashSet<>(Arrays.asList(tags)))
        .executionType(ExecutionTypeRequest.newInstance(execType, true))
        .resourceSizing(
            ResourceSizing.newInstance(containers,
                Resource.newInstance(mem, cores)))
        .placementConstraintExpression(constraint)
        .build();
  }

  public static int getContainerNodesNum(List<Container> containers) {
    Set<NodeId> nodes = new HashSet<>();
    if (containers != null) {
      containers.forEach(c -> nodes.add(c.getNodeId()));
    }
    return nodes.size();
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testInterAppCompositeConstraints(String pPlacementConstraintHandler)
      throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    // This test both intra and inter app constraints.
    // Including simple affinity, anti-affinity, cardinality constraints,
    // and simple AND composite constraints.

    MockRM rm = new MockRM(conf);
    try {
      rm.start();

      MockNM nm1 = rm.registerNode("192.168.0.1:1234", 100*GB, 100);
      MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100);
      MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100);
      MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100);
      MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100);

      RMApp app1 = submitApp(rm, 1*GB, ImmutableSet.of("hbase"));
      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

      // App1 (hbase)
      // h1: hbase-master(1)
      // h2: hbase-master(1)
      // h3:
      // h4:
      // h5:
      PlacementConstraint pc = targetNotIn("node",
          allocationTag("hbase-master")).build();
      am1.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 2, 1, 2048, pc, "hbase-master")));
      List<Container> allocated = waitForAllocation(2, 3000, am1, nm1, nm2);

      // 2 containers allocated
      assertEquals(2, allocated.size());
      // containers should be distributed on 2 different nodes
      assertEquals(2, getContainerNodesNum(allocated));

      // App1 (hbase)
      // h1: hbase-rs(1), hbase-master(1)
      // h2: hbase-rs(1), hbase-master(1)
      // h3: hbase-rs(1)
      // h4: hbase-rs(1)
      // h5:
      pc = targetNotIn("node", allocationTag("hbase-rs")).build();
      am1.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(2, 4, 1, 1024, pc, "hbase-rs")));
      allocated = waitForAllocation(4, 3000, am1, nm1, nm2, nm3, nm4, nm5);

      assertEquals(4, allocated.size());
      assertEquals(4, getContainerNodesNum(allocated));

      // App2 (web-server)
      // Web server instance has 2 instance and non of them can be co-allocated
      // with hbase-master.
      RMApp app2 = submitApp(rm, 1*GB, ImmutableSet.of("web-server"));
      MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);

      // App2 (web-server)
      // h1: hbase-rs(1), hbase-master(1)
      // h2: hbase-rs(1), hbase-master(1)
      // h3: hbase-rs(1), ws-inst(1)
      // h4: hbase-rs(1), ws-inst(1)
      // h5:
      pc = and(
          targetIn("node", allocationTagWithNamespace(
              new TargetApplicationsNamespace.All().toString(),
              "hbase-master")),
          targetNotIn("node", allocationTag("ws-inst"))).build();
      am2.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 2, 1, 2048, pc, "ws-inst")));
      allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5);
      assertEquals(2, allocated.size());
      assertEquals(2, getContainerNodesNum(allocated));

      ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
      for (Container c : allocated) {
        RMNode rmNode = rmNodes.get(c.getNodeId());
        assertNotNull(rmNode);
        assertTrue(rmNode.getAllocationTagsWithCount().get("ws-inst") == 1,
            "If ws-inst is allocated to a node,"
            + " this node should have inherited the ws-inst tag ");
        assertTrue(rmNode.getAllocationTagsWithCount().get("hbase-master") == 1,
            "ws-inst should be co-allocated to "
            + "hbase-master nodes");
      }

      // App3 (ws-servant)
      // App3 has multiple instances that must be co-allocated
      // with app2 server instance, and each node cannot have more than
      // 3 instances.
      RMApp app3 = submitApp(rm, 1*GB, ImmutableSet.of("ws-servants"));
      MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm3);


      // App3 (ws-servant)
      // h1: hbase-rs(1), hbase-master(1)
      // h2: hbase-rs(1), hbase-master(1)
      // h3: hbase-rs(1), ws-inst(1), ws-servant(3)
      // h4: hbase-rs(1), ws-inst(1), ws-servant(3)
      // h5:
      pc = and(
          targetIn("node", allocationTagWithNamespace(
              new TargetApplicationsNamespace.AppTag("web-server").toString(),
              "ws-inst")),
          cardinality("node", 0, 2, "ws-servant")).build();
      am3.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 10, 1, 512, pc, "ws-servant")));
      // total 6 containers can be allocated due to cardinality constraint
      // each round, 2 containers can be allocated
      allocated = waitForAllocation(6, 10000, am3, nm1, nm2, nm3, nm4, nm5);
      assertEquals(6, allocated.size());
      assertEquals(2, getContainerNodesNum(allocated));

      for (Container c : allocated) {
        RMNode rmNode = rmNodes.get(c.getNodeId());
        assertNotNull(rmNode);
        assertTrue(rmNode.getAllocationTagsWithCount().get("ws-servant") == 3,
            "Node has ws-servant allocated must have 3 instances");
        assertTrue(rmNode.getAllocationTagsWithCount().get("ws-inst") == 1,
            "Every ws-servant container should be co-allocated"
            + " with ws-inst");
      }
    } finally {
      rm.stop();
    }
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testMultiAllocationTagsConstraints(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    // This test simulates to use PC to avoid port conflicts

    MockRM rm = new MockRM(conf);
    try {
      rm.start();

      MockNM nm1 = rm.registerNode("192.168.0.1:1234", 10*GB, 10);
      MockNM nm2 = rm.registerNode("192.168.0.2:1234", 10*GB, 10);
      MockNM nm3 = rm.registerNode("192.168.0.3:1234", 10*GB, 10);
      MockNM nm4 = rm.registerNode("192.168.0.4:1234", 10*GB, 10);
      MockNM nm5 = rm.registerNode("192.168.0.5:1234", 10*GB, 10);

      RMApp app1 = submitApp(rm, 1*GB, ImmutableSet.of("server1"));
      // Allocate AM container on nm1
      doNodeHeartbeat(nm1);
      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
      am1.registerAppAttempt();

      // App1 uses ports: 7000, 8000 and 9000
      String[] server1Ports =
          new String[] {"port_6000", "port_7000", "port_8000"};
      PlacementConstraint pc = targetNotIn("node",
          allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(),
              server1Ports))
          .build();
      am1.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 2, 1, 1024, pc, server1Ports)));
      List<Container> allocated = waitForAllocation(2, 3000,
          am1, nm1, nm2, nm3, nm4, nm5);

      // 2 containers allocated
      assertEquals(2, allocated.size());
      // containers should be distributed on 2 different nodes
      assertEquals(2, getContainerNodesNum(allocated));

      // App1 uses ports: 6000
      String[] server2Ports = new String[] {"port_6000"};
      RMApp app2 = submitApp(rm, 1*GB, ImmutableSet.of("server2"));
      // Allocate AM container on nm1
      doNodeHeartbeat(nm2);
      RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt();
      MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId());
      am2.registerAppAttempt();

      pc = targetNotIn("node",
          allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(),
              server2Ports))
          .build();
      am2.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 3, 1, 1024, pc, server2Ports)));
      allocated = waitForAllocation(3, 3000, am2, nm1, nm2, nm3, nm4, nm5);
      assertEquals(3, allocated.size());
      assertEquals(3, getContainerNodesNum(allocated));

      ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
      for (Container c : allocated) {
        RMNode rmNode = rmNodes.get(c.getNodeId());
        assertNotNull(rmNode);
        assertTrue(rmNode.getAllocationTagsWithCount().get("port_6000") == 1,
            "server2 should not co-allocate to server1 as"
            + " they both need to use port 6000");
        assertFalse(rmNode.getAllocationTagsWithCount()
            .containsKey("port_7000"));
        assertFalse(rmNode.getAllocationTagsWithCount()
            .containsKey("port_8000"));
      }
    } finally {
      rm.stop();
    }
  }

  @Timeout(30)
  @ParameterizedTest
  @MethodSource("placementConstarintHandlers")
  public void testInterAppConstraintsWithNamespaces(
      String pPlacementConstraintHandler) throws Exception {
    initTestSchedulingRequestContainerAllocation(pPlacementConstraintHandler);
    // This test verifies inter-app constraints with namespaces
    // not-self/app-id/app-tag
    MockRM rm = new MockRM(conf);
    try {
      rm.start();

      MockNM nm1 = rm.registerNode("192.168.0.1:1234", 100*GB, 100);
      MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100);
      MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100);
      MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100);
      MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100);

      ApplicationId app5Id = null;
      Map<ApplicationId, List<Container>> allocMap = new HashMap<>();
      // 10 apps and all containers are attached with foo tag
      for (int i = 0; i<10; i++) {
        // App1 ~ app5 tag "former5"
        // App6 ~ app10 tag "latter5"
        String applicationTag = i<5 ? "former5" : "latter5";
        RMApp app = submitApp(rm, 1*GB, ImmutableSet.of(applicationTag));
        // Allocate AM container on nm1
        doNodeHeartbeat(nm1, nm2, nm3, nm4, nm5);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();

        PlacementConstraint pc = targetNotIn("node", allocationTag("foo"))
            .build();
        am.addSchedulingRequest(
            ImmutableList.of(
                schedulingRequest(1, 3, 1, 1024, pc, "foo")));
        List<Container> allocated = waitForAllocation(3, 3000,
            am, nm1, nm2, nm3, nm4, nm5);
        // Memorize containers that has app5 foo
        if (i == 5) {
          app5Id = am.getApplicationAttemptId().getApplicationId();
        }
        allocMap.put(am.getApplicationAttemptId().getApplicationId(),
            allocated);
      }

      assertNotNull(app5Id);
      assertEquals(3, getContainerNodesNum(allocMap.get(app5Id)));

      // *** app-id
      // Submit another app, use app-id constraint against app5
      RMApp app1 = submitApp(rm, 1*GB, ImmutableSet.of("xyz"));
      // Allocate AM container on nm1
      doNodeHeartbeat(nm1);
      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
      am1.registerAppAttempt();

      PlacementConstraint pc = targetIn("node",
          allocationTagWithNamespace(
              new TargetApplicationsNamespace.AppID(app5Id).toString(),
              "foo"))
          .build();
      am1.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 3, 1, 1024, pc, "foo")));
      List<Container> allocated = waitForAllocation(3, 3000,
          am1, nm1, nm2, nm3, nm4, nm5);

      ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
      List<Container> app5Alloc = allocMap.get(app5Id);
      for (Container c : allocated) {
        RMNode rmNode = rmNodes.get(c.getNodeId());
        assertNotNull(rmNode);
        assertTrue(app5Alloc.stream().anyMatch(c5 -> c5.getNodeId() == c.getNodeId()),
            "This app is affinity with app-id/app5/foo containers");
      }

      // *** app-tag
      RMApp app2 = MockRMAppSubmitter.submitWithMemory(1 * GB, rm);
      // Allocate AM container on nm1
      doNodeHeartbeat(nm2);
      RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt();
      MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId());
      am2.registerAppAttempt();

      pc = targetNotIn("node",
          allocationTagWithNamespace(
              new TargetApplicationsNamespace.AppTag("xyz").toString(),
              "foo"))
          .build();
      am2.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 2, 1, 1024, pc, "foo")));
      allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5);
      assertEquals(2, allocated.size());

      // none of them can be allocated to nodes that has app5 foo containers
      for (Container c : app5Alloc) {
        assertNotEquals(c.getNodeId(),
            allocated.iterator().next().getNodeId());
      }

      // *** not-self
      RMApp app3 = MockRMAppSubmitter.submitWithMemory(1 * GB, rm);
      // Allocate AM container on nm1
      doNodeHeartbeat(nm3);
      RMAppAttempt app3attempt1 = app3.getCurrentAppAttempt();
      MockAM am3 = rm.sendAMLaunched(app3attempt1.getAppAttemptId());
      am3.registerAppAttempt();

      pc = cardinality("node",
          new TargetApplicationsNamespace.NotSelf().toString(),
          1, 1, "foo").build();
      am3.addSchedulingRequest(
          ImmutableList.of(
              schedulingRequest(1, 1, 1, 1024, pc, "foo")));
      allocated = waitForAllocation(1, 3000, am3, nm1, nm2, nm3, nm4, nm5);
      assertEquals(1, allocated.size());
      // All 5 containers should be allocated
      assertTrue(rmNodes.get(allocated.iterator().next().getNodeId())
          .getAllocationTagsWithCount().get("foo") == 2);
    } finally {
      rm.stop();
    }
  }
}