TestCapacitySchedulerQueueACLs.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
import org.junit.jupiter.api.Test;
public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase {
@Override
protected Configuration createConfiguration() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(ROOT, new String[] {
QUEUEA, QUEUEB });
setQueueCapacity(csConf, 50, A_QUEUE_PATH);
setQueueCapacity(csConf, 50, B_QUEUE_PATH);
Map<QueueACL, AccessControlList> aclsOnQueueA =
new HashMap<QueueACL, AccessControlList>();
AccessControlList submitACLonQueueA = new AccessControlList(QUEUE_A_USER);
submitACLonQueueA.addUser(COMMON_USER);
AccessControlList adminACLonQueueA = new AccessControlList(QUEUE_A_ADMIN);
aclsOnQueueA.put(QueueACL.SUBMIT_APPLICATIONS, submitACLonQueueA);
aclsOnQueueA.put(QueueACL.ADMINISTER_QUEUE, adminACLonQueueA);
csConf.setAcls(A_QUEUE_PATH, aclsOnQueueA);
Map<QueueACL, AccessControlList> aclsOnQueueB =
new HashMap<QueueACL, AccessControlList>();
AccessControlList submitACLonQueueB = new AccessControlList(QUEUE_B_USER);
submitACLonQueueB.addUser(COMMON_USER);
AccessControlList adminACLonQueueB = new AccessControlList(QUEUE_B_ADMIN);
aclsOnQueueB.put(QueueACL.SUBMIT_APPLICATIONS, submitACLonQueueB);
aclsOnQueueB.put(QueueACL.ADMINISTER_QUEUE, adminACLonQueueB);
csConf.setAcls(B_QUEUE_PATH, aclsOnQueueB);
Map<QueueACL, AccessControlList> aclsOnRootQueue =
new HashMap<QueueACL, AccessControlList>();
AccessControlList submitACLonRoot = new AccessControlList("");
AccessControlList adminACLonRoot = new AccessControlList(ROOT_ADMIN);
aclsOnRootQueue.put(QueueACL.SUBMIT_APPLICATIONS, submitACLonRoot);
aclsOnRootQueue.put(QueueACL.ADMINISTER_QUEUE, adminACLonRoot);
csConf.setAcls(ROOT, aclsOnRootQueue);
csConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
return csConf;
}
@Override
public String getQueueD() {
return QUEUED;
}
@Override
public String getQueueD1() {
return QUEUED1;
}
/**
* Updates the configuration with the following queue hierarchy:
* root
* |
* D
* |
* D1.
* @param rootAcl administer queue and submit application ACL for root queue
* @param queueDAcl administer queue and submit application ACL for D queue
* @param queueD1Acl administer queue and submit application ACL for D1 queue
* @throws IOException
*/
@Override
public void updateConfigWithDAndD1Queues(String rootAcl, String queueDAcl,
String queueD1Acl) throws IOException {
CapacitySchedulerConfiguration csConf =
(CapacitySchedulerConfiguration) getConf();
csConf.clear();
csConf.setQueues(ROOT,
new String[] {QUEUED, QUEUEA, QUEUEB});
String dPath = CapacitySchedulerConfiguration.ROOT + "." + QUEUED;
String d1Path = dPath + "." + QUEUED1;
QueuePath dQueuePath = new QueuePath(dPath);
QueuePath d1QueuePath = new QueuePath(d1Path);
csConf.setQueues(dQueuePath, new String[] {QUEUED1});
setQueueCapacity(csConf, 100, d1QueuePath);
setQueueCapacity(csConf, 30, A_QUEUE_PATH);
setQueueCapacity(csConf, 50, B_QUEUE_PATH);
setQueueCapacity(csConf, 20, dQueuePath);
if (rootAcl != null) {
setAdminAndSubmitACL(csConf, rootAcl, ROOT);
}
if (queueDAcl != null) {
setAdminAndSubmitACL(csConf, queueDAcl, dQueuePath);
}
if (queueD1Acl != null) {
setAdminAndSubmitACL(csConf, queueD1Acl, d1QueuePath);
}
resourceManager.getResourceScheduler()
.reinitialize(csConf, resourceManager.getRMContext());
}
private void setQueueCapacity(CapacitySchedulerConfiguration csConf,
float capacity, QueuePath queuePath) {
csConf.setCapacity(queuePath, capacity);
}
private void setAdminAndSubmitACL(CapacitySchedulerConfiguration csConf,
String queueAcl, QueuePath queuePath) {
csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl);
csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl);
}
@Test
public void testCheckAccessForUserWithOnlyLeafNameProvided() {
testCheckAccess(false, "dynamicQueue");
}
@Test
public void testCheckAccessForUserWithFullPathProvided() {
testCheckAccess(true, "root.users.dynamicQueue");
}
@Test
public void testCheckAccessForRootQueue() {
testCheckAccess(false, "root");
}
private void testCheckAccess(boolean expectedResult, String queueName) {
CapacitySchedulerQueueManager qm =
mock(CapacitySchedulerQueueManager.class);
CSQueue root = mock(ParentQueue.class);
CSQueue users = mock(ManagedParentQueue.class);
when(qm.getQueue("root")).thenReturn(root);
when(qm.getQueue("root.users")).thenReturn(users);
when(users.hasAccess(any(QueueACL.class),
any(UserGroupInformation.class))).thenReturn(true);
UserGroupInformation mockUGI = mock(UserGroupInformation.class);
CapacityScheduler cs =
(CapacityScheduler) resourceManager.getResourceScheduler();
cs.setQueueManager(qm);
assertEquals(expectedResult,
cs.checkAccess(mockUGI, QueueACL.ADMINISTER_QUEUE, queueName), "checkAccess() failed");
}
}