TestRouterAsyncRpcFairnessPolicyController.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.fairness;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test functionality of {@link RouterAsyncRpcFairnessPolicyController).
*/
public class TestRouterAsyncRpcFairnessPolicyController {
private static String nameServices =
"ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
private static int perNsPermits = 30;
@Test
public void testHandlerAllocationEqualAssignment() {
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
= getFairnessPolicyController(perNsPermits);
verifyHandlerAllocation(routerRpcFairnessPolicyController);
}
@Test
public void testAcquireTimeout() {
Configuration conf = createConf(perNsPermits);
conf.setTimeDuration(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 100, TimeUnit.MILLISECONDS);
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);
// Ns1 should have number of perNsPermits permits allocated.
for (int i = 0; i < perNsPermits; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
}
long acquireBeginTimeMs = Time.monotonicNow();
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
long acquireTimeMs = Time.monotonicNow() - acquireBeginTimeMs;
// There are some other operations, so acquireTimeMs >= 100ms.
assertTrue(acquireTimeMs >= 100);
}
@Test
public void testAllocationSuccessfullyWithZeroHandlers() {
Configuration conf = createConf(0);
verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
}
@Test
public void testAllocationSuccessfullyWithNegativePermits() {
Configuration conf = createConf(-1);
verifyInstantiationStatus(conf, DFS_ROUTER_ASYNC_RPC_MAX_ASYNC_CALL_PERMIT_DEFAULT);
}
@Test
public void testGetAvailableHandlerOnPerNs() {
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
= getFairnessPolicyController(perNsPermits);
assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":30}",
routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
routerRpcFairnessPolicyController.acquirePermit("ns1");
assertEquals("{\"concurrent\":30,\"ns2\":30,\"ns1\":29}",
routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
}
@Test
public void testGetAvailableHandlerOnPerNsForNoFairness() {
Configuration conf = new Configuration();
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);
assertEquals("N/A",
routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
}
private void verifyInstantiationStatus(Configuration conf, int permits) {
GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
.captureLogs(LoggerFactory.getLogger(
RouterAsyncRpcFairnessPolicyController.class));
try {
FederationUtil.newFairnessPolicyController(conf);
} catch (IllegalArgumentException e) {
// Ignore the exception as it is expected here.
}
String infoMsg = String.format(
RouterAsyncRpcFairnessPolicyController.INIT_MSG, permits);
assertTrue(logs.getOutput().contains(infoMsg), "Should contain info message: " +
infoMsg);
}
private RouterRpcFairnessPolicyController getFairnessPolicyController(
int asyncCallPermits) {
return FederationUtil.newFairnessPolicyController(createConf(asyncCallPermits));
}
private void verifyHandlerAllocation(
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) {
for (int i = 0; i < perNsPermits; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
// CONCURRENT_NS doesn't acquire permits.
assertTrue(
routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
routerRpcFairnessPolicyController.releasePermit("ns1");
routerRpcFairnessPolicyController.releasePermit("ns2");
routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS);
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}
private Configuration createConf(int asyncCallPermits) {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, asyncCallPermits);
conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
conf.setClass(
RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
RouterAsyncRpcFairnessPolicyController.class,
RouterRpcFairnessPolicyController.class);
return conf;
}
}