TestRouterRMAdminService.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.router.rmadmin;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService.RequestInterceptorChainWrapper;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test class to validate the RMAdmin Service inside the Router.
 */
public class TestRouterRMAdminService extends BaseRouterRMAdminTest {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestRouterRMAdminService.class);

  /**
   * Tests if the pipeline is created properly.
   */
  @Test
  public void testRequestInterceptorChainCreation() throws Exception {
    RMAdminRequestInterceptor root =
        super.getRouterRMAdminService().createRequestInterceptorChain();
    int index = 0;
    while (root != null) {
      // The current pipeline is:
      // PassThroughRMAdminRequestInterceptor - index = 0
      // PassThroughRMAdminRequestInterceptor - index = 1
      // PassThroughRMAdminRequestInterceptor - index = 2
      // MockClientRequestInterceptor - index = 3
      switch (index) {
      case 0: // Fall to the next case
      case 1: // Fall to the next case
      case 2:
        // If index is equal to 0,1 or 2 we fall in this check
        Assert.assertEquals(
            PassThroughRMAdminRequestInterceptor.class.getName(),
            root.getClass().getName());
        break;
      case 3:
        Assert.assertEquals(MockRMAdminRequestInterceptor.class.getName(),
            root.getClass().getName());
        break;
      default:
        Assert.fail();
      }
      root = root.getNextInterceptor();
      index++;
    }
    Assert.assertEquals("The number of interceptors in chain does not match", 4,
        index);
  }

  /**
   * Test if the RouterRMAdmin forwards all the requests to the MockRM and get
   * back the responses.
   */
  @Test
  public void testRouterRMAdminServiceE2E() throws Exception {

    String user = "test1";

    LOG.info("testRouterRMAdminServiceE2E - Refresh Queues");

    RefreshQueuesResponse responseRefreshQueues = refreshQueues(user);
    Assert.assertNotNull(responseRefreshQueues);

    LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes");

    RefreshNodesResponse responseRefreshNodes = refreshNodes(user);
    Assert.assertNotNull(responseRefreshNodes);

    LOG.info("testRouterRMAdminServiceE2E - Refresh Super User");

    RefreshSuperUserGroupsConfigurationResponse responseRefreshSuperUser =
        refreshSuperUserGroupsConfiguration(user);
    Assert.assertNotNull(responseRefreshSuperUser);

    LOG.info("testRouterRMAdminServiceE2E - Refresh User to Group");

    RefreshUserToGroupsMappingsResponse responseRefreshUserToGroup =
        refreshUserToGroupsMappings(user);
    Assert.assertNotNull(responseRefreshUserToGroup);

    LOG.info("testRouterRMAdminServiceE2E - Refresh Admin Acls");

    RefreshAdminAclsResponse responseRefreshAdminAcls = refreshAdminAcls(user);
    Assert.assertNotNull(responseRefreshAdminAcls);

    LOG.info("testRouterRMAdminServiceE2E - Refresh Service Acls");

    RefreshServiceAclsResponse responseRefreshServiceAcls =
        refreshServiceAcls(user);
    Assert.assertNotNull(responseRefreshServiceAcls);

    LOG.info("testRouterRMAdminServiceE2E - Update Node Resource");

    UpdateNodeResourceResponse responseUpdateNodeResource =
        updateNodeResource(user);
    Assert.assertNotNull(responseUpdateNodeResource);

    LOG.info("testRouterRMAdminServiceE2E - Refresh Nodes Resource");

    RefreshNodesResourcesResponse responseRefreshNodesResources =
        refreshNodesResources(user);
    Assert.assertNotNull(responseRefreshNodesResources);

    LOG.info("testRouterRMAdminServiceE2E - Add To Cluster NodeLabels");

    AddToClusterNodeLabelsResponse responseAddToClusterNodeLabels =
        addToClusterNodeLabels(user);
    Assert.assertNotNull(responseAddToClusterNodeLabels);

    LOG.info("testRouterRMAdminServiceE2E - Remove To Cluster NodeLabels");

    RemoveFromClusterNodeLabelsResponse responseRemoveFromClusterNodeLabels =
        removeFromClusterNodeLabels(user);
    Assert.assertNotNull(responseRemoveFromClusterNodeLabels);

    LOG.info("testRouterRMAdminServiceE2E - Replace Labels On Node");

    ReplaceLabelsOnNodeResponse responseReplaceLabelsOnNode =
        replaceLabelsOnNode(user);
    Assert.assertNotNull(responseReplaceLabelsOnNode);

    LOG.info("testRouterRMAdminServiceE2E - Check For Decommissioning Nodes");

    CheckForDecommissioningNodesResponse responseCheckForDecom =
        checkForDecommissioningNodes(user);
    Assert.assertNotNull(responseCheckForDecom);

    LOG.info("testRouterRMAdminServiceE2E - Refresh Cluster Max Priority");

    RefreshClusterMaxPriorityResponse responseRefreshClusterMaxPriority =
        refreshClusterMaxPriority(user);
    Assert.assertNotNull(responseRefreshClusterMaxPriority);

    LOG.info("testRouterRMAdminServiceE2E - Get Groups For User");

    String[] responseGetGroupsForUser = getGroupsForUser(user);
    Assert.assertNotNull(responseGetGroupsForUser);

  }

  /**
   * Test if the different chains for users are generated, and LRU cache is
   * working as expected.
   */
  @Test
  public void testUsersChainMapWithLRUCache()
      throws IOException, InterruptedException {

    Map<String, RequestInterceptorChainWrapper> pipelines;
    RequestInterceptorChainWrapper chain;

    refreshQueues("test1");
    refreshQueues("test2");
    refreshQueues("test3");
    refreshQueues("test4");
    refreshQueues("test5");
    refreshQueues("test6");
    refreshQueues("test7");
    refreshQueues("test8");

    pipelines = super.getRouterRMAdminService().getPipelines();
    Assert.assertEquals(8, pipelines.size());

    refreshQueues("test9");
    refreshQueues("test10");
    refreshQueues("test1");
    refreshQueues("test11");

    // The cache max size is defined in
    // BaseRouterClientRMTest.TEST_MAX_CACHE_SIZE
    Assert.assertEquals(10, pipelines.size());

    chain = pipelines.get("test1");
    Assert.assertNotNull("test1 should not be evicted", chain);

    chain = pipelines.get("test2");
    Assert.assertNull("test2 should have been evicted", chain);
  }

  /**
   * This test validates if the RMAdminRequestInterceptor chain for the user
   * can build and init correctly when a multi-client process begins to
   * request RouterRMAdminService for the same user simultaneously.
   */
  @Test
  public void testRMAdminPipelineConcurrent() throws InterruptedException {
    final String user = "test1";

    /*
     * ClientTestThread is a thread to simulate a client request to get a
     * RMAdminRequestInterceptor for the user.
     */
    class ClientTestThread extends Thread {
      private RMAdminRequestInterceptor interceptor;
      @Override public void run() {
        try {
          interceptor = pipeline();
        } catch (IOException | InterruptedException e) {
          e.printStackTrace();
        }
      }
      private RMAdminRequestInterceptor pipeline()
          throws IOException, InterruptedException {
        return UserGroupInformation.createRemoteUser(user).doAs(
            new PrivilegedExceptionAction<RMAdminRequestInterceptor>() {
              @Override
              public RMAdminRequestInterceptor run() throws Exception {
                RequestInterceptorChainWrapper wrapper =
                    getRouterRMAdminService().getInterceptorChain();
                RMAdminRequestInterceptor interceptor =
                    wrapper.getRootInterceptor();
                Assert.assertNotNull(interceptor);
                LOG.info("init rm admin interceptor success for user" + user);
                return interceptor;
              }
            });
      }
    }

    /*
     * We start the first thread. It should not finish initing a chainWrapper
     * before the other thread starts. In this way, the second thread can
     * init at the same time of the first one. In the end, we validate that
     * the 2 threads get the same chainWrapper without going into error.
     */
    ClientTestThread client1 = new ClientTestThread();
    ClientTestThread client2 = new ClientTestThread();
    client1.start();
    client2.start();
    client1.join();
    client2.join();

    Assert.assertNotNull(client1.interceptor);
    Assert.assertNotNull(client2.interceptor);
    Assert.assertTrue(client1.interceptor == client2.interceptor);
  }

}