TestRouterAsyncErasureCoding.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router.async;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.CallerContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestRouterAsyncErasureCoding {
private static Configuration routerConf;
/** Federated HDFS cluster. */
private static MiniRouterDFSCluster cluster;
private static String ns0;
/** Random Router for this federated cluster. */
private MiniRouterDFSCluster.RouterContext router;
private FileSystem routerFs;
private RouterRpcServer routerRpcServer;
private AsyncErasureCoding asyncErasureCoding;
private final String testfilePath = "/testdir/testAsyncErasureCoding.file";
@BeforeAll
public static void setUpCluster() throws Exception {
cluster = new MiniRouterDFSCluster(true, 1, 2,
DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
cluster.setNumDatanodesPerNameservice(3);
cluster.setRacks(
new String[] {"/rack1", "/rack2", "/rack3"});
cluster.startCluster();
// Making one Namenode active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
}
}
// Start routers with only an RPC service
routerConf = new RouterConfigBuilder()
.rpc()
.build();
// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
// We decrease the DN cache times to make the test faster
routerConf.setTimeDuration(
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
cluster.addRouterOverrides(routerConf);
// Start routers with only an RPC service
cluster.startRouters();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
cluster.waitActiveNamespaces();
ns0 = cluster.getNameservices().get(0);
}
@AfterAll
public static void shutdownCluster() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@BeforeEach
public void setUp() throws IOException {
router = cluster.getRandomRouter();
routerFs = router.getFileSystem();
routerRpcServer = router.getRouterRpcServer();
routerRpcServer.initAsyncThreadPools(routerConf);
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
routerRpcServer.getRPCMonitor(),
routerRpcServer.getRouterStateIdContext());
RouterRpcServer spy = Mockito.spy(routerRpcServer);
Mockito.when(spy.getRPCClient()).thenReturn(asyncRpcClient);
asyncErasureCoding = new AsyncErasureCoding(spy);
// Create mock locations
MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
resolver.addLocation("/", ns0, "/");
FsPermission permission = new FsPermission("705");
routerFs.mkdirs(new Path("/testdir"), permission);
FSDataOutputStream fsDataOutputStream = routerFs.create(
new Path(testfilePath), true);
fsDataOutputStream.write(new byte[1024]);
fsDataOutputStream.close();
}
@AfterEach
public void tearDown() throws IOException {
// clear client context
CallerContext.setCurrent(null);
boolean delete = routerFs.delete(new Path("/testdir"));
assertTrue(delete);
if (routerFs != null) {
routerFs.close();
}
}
@Test
public void testRouterAsyncErasureCoding() throws Exception {
String ecPolicyName = StripedFileTestUtil.getDefaultECPolicy().getName();
HdfsFileStatus fileInfo = cluster.getNamenodes().get(0).getClient().getFileInfo(testfilePath);
assertNotNull(fileInfo);
asyncErasureCoding.setErasureCodingPolicy("/testdir", ecPolicyName);
syncReturn(null);
asyncErasureCoding.getErasureCodingPolicy("/testdir");
ErasureCodingPolicy ecPolicy = syncReturn(ErasureCodingPolicy.class);
assertEquals(StripedFileTestUtil.getDefaultECPolicy().getName(), ecPolicy.getName());
asyncErasureCoding.getErasureCodingPolicies();
ErasureCodingPolicyInfo[] erasureCodingPolicies = syncReturn(ErasureCodingPolicyInfo[].class);
int numECPolicies = erasureCodingPolicies.length;
ErasureCodingPolicyInfo[] erasureCodingPoliciesFromNameNode =
cluster.getNamenodes().get(0).getClient().getErasureCodingPolicies();
assertArrayEquals(erasureCodingPoliciesFromNameNode, erasureCodingPolicies);
asyncErasureCoding.getErasureCodingCodecs();
Map<String, String> erasureCodingCodecs = syncReturn(Map.class);
Map<String, String> erasureCodingCodecsFromNameNode =
cluster.getNamenodes().get(0).getClient().getErasureCodingCodecs();
assertEquals(erasureCodingCodecs, erasureCodingCodecsFromNameNode);
// RS-12-4-1024k
final ECSchema schema = new ECSchema("rs", 12, 4);
ErasureCodingPolicy erasureCodingPolicy = new ErasureCodingPolicy(schema, 1024 * 1024);
asyncErasureCoding.addErasureCodingPolicies(new ErasureCodingPolicy[]{erasureCodingPolicy});
AddErasureCodingPolicyResponse[] response = syncReturn(AddErasureCodingPolicyResponse[].class);
assertEquals(response[0].isSucceed(), true);
asyncErasureCoding.getErasureCodingPolicies();
ErasureCodingPolicyInfo[] erasureCodingPolicies2 = syncReturn(ErasureCodingPolicyInfo[].class);
int numNewECPolicies = erasureCodingPolicies2.length;
assertEquals(numECPolicies + 1, numNewECPolicies);
asyncErasureCoding.getECTopologyResultForPolicies(
new String[]{"RS-6-3-1024k", "RS-12-4-1024k"});
ECTopologyVerifierResult ecTResultForPolicies = syncReturn(ECTopologyVerifierResult.class);
assertEquals(false, ecTResultForPolicies.isSupported());
asyncErasureCoding.getECTopologyResultForPolicies(
new String[]{"XOR-2-1-1024k"});
ECTopologyVerifierResult ecTResultForPolicies2 = syncReturn(ECTopologyVerifierResult.class);
assertEquals(true, ecTResultForPolicies2.isSupported());
}
}