RouterNamenodeProtocolServerSideTranslatorPB.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.EndCheckpointResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer;
public class RouterNamenodeProtocolServerSideTranslatorPB
extends NamenodeProtocolServerSideTranslatorPB {
private final RouterRpcServer server;
private final boolean isAsyncRpc;
public RouterNamenodeProtocolServerSideTranslatorPB(NamenodeProtocol impl) {
super(impl);
this.server = (RouterRpcServer) impl;
this.isAsyncRpc = server.isAsync();
}
@Override
public GetBlocksResponseProto getBlocks(RpcController unused,
GetBlocksRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.getBlocks(unused, request);
}
asyncRouterServer(() -> {
DatanodeInfo dnInfo = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(PBHelperClient.convert(request.getDatanode()))
.build();
return server.getBlocks(dnInfo, request.getSize(),
request.getMinBlockSize(), request.getTimeInterval(),
request.hasStorageType() ?
PBHelperClient.convertStorageType(request.getStorageType()): null);
}, blocks ->
GetBlocksResponseProto.newBuilder()
.setBlocks(PBHelper.convert(blocks)).build());
return null;
}
@Override
public GetBlockKeysResponseProto getBlockKeys(RpcController unused,
GetBlockKeysRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.getBlockKeys(unused, request);
}
asyncRouterServer(server::getBlockKeys, keys -> {
GetBlockKeysResponseProto.Builder builder =
GetBlockKeysResponseProto.newBuilder();
if (keys != null) {
builder.setKeys(PBHelper.convert(keys));
}
return builder.build();
});
return null;
}
@Override
public GetTransactionIdResponseProto getTransactionId(RpcController unused,
GetTransactionIdRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.getTransactionId(unused, request);
}
asyncRouterServer(server::getTransactionID,
txid -> GetTransactionIdResponseProto
.newBuilder().setTxId(txid).build());
return null;
}
@Override
public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return super.getMostRecentCheckpointTxId(unused, request);
}
asyncRouterServer(server::getMostRecentCheckpointTxId,
txid -> GetMostRecentCheckpointTxIdResponseProto
.newBuilder().setTxId(txid).build());
return null;
}
@Override
public GetMostRecentNameNodeFileTxIdResponseProto getMostRecentNameNodeFileTxId(
RpcController unused, GetMostRecentNameNodeFileTxIdRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return super.getMostRecentNameNodeFileTxId(unused, request);
}
asyncRouterServer(() -> server.getMostRecentNameNodeFileTxId(
NNStorage.NameNodeFile.valueOf(request.getNameNodeFile())),
txid -> GetMostRecentNameNodeFileTxIdResponseProto
.newBuilder().setTxId(txid).build());
return null;
}
@Override
public RollEditLogResponseProto rollEditLog(RpcController unused,
RollEditLogRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.rollEditLog(unused, request);
}
asyncRouterServer(server::rollEditLog,
signature -> RollEditLogResponseProto.newBuilder()
.setSignature(PBHelper.convert(signature)).build());
return null;
}
@Override
public ErrorReportResponseProto errorReport(RpcController unused,
ErrorReportRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.errorReport(unused, request);
}
asyncRouterServer(() -> {
server.errorReport(PBHelper.convert(request.getRegistration()),
request.getErrorCode(), request.getMsg());
return null;
}, result -> VOID_ERROR_REPORT_RESPONSE);
return null;
}
@Override
public RegisterResponseProto registerSubordinateNamenode(
RpcController unused, RegisterRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.registerSubordinateNamenode(unused, request);
}
asyncRouterServer(() -> server.registerSubordinateNamenode(
PBHelper.convert(request.getRegistration())),
reg -> RegisterResponseProto.newBuilder()
.setRegistration(PBHelper.convert(reg)).build());
return null;
}
@Override
public StartCheckpointResponseProto startCheckpoint(RpcController unused,
StartCheckpointRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.startCheckpoint(unused, request);
}
asyncRouterServer(() ->
server.startCheckpoint(PBHelper.convert(request.getRegistration())),
cmd -> StartCheckpointResponseProto.newBuilder()
.setCommand(PBHelper.convert(cmd)).build());
return null;
}
@Override
public EndCheckpointResponseProto endCheckpoint(RpcController unused,
EndCheckpointRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.endCheckpoint(unused, request);
}
asyncRouterServer(() -> {
server.endCheckpoint(PBHelper.convert(request.getRegistration()),
PBHelper.convert(request.getSignature()));
return null;
}, result -> VOID_END_CHECKPOINT_RESPONSE);
return null;
}
@Override
public GetEditLogManifestResponseProto getEditLogManifest(
RpcController unused, GetEditLogManifestRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.getEditLogManifest(unused, request);
}
asyncRouterServer(() -> server.getEditLogManifest(request.getSinceTxId()),
manifest -> GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest)).build());
return null;
}
@Override
public VersionResponseProto versionRequest(
RpcController controller,
VersionRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.versionRequest(controller, request);
}
asyncRouterServer(server::versionRequest,
info -> VersionResponseProto.newBuilder()
.setInfo(PBHelper.convert(info)).build());
return null;
}
@Override
public IsUpgradeFinalizedResponseProto isUpgradeFinalized(RpcController controller,
IsUpgradeFinalizedRequestProto request) throws ServiceException {
if (!isAsyncRpc) {
return super.isUpgradeFinalized(controller, request);
}
asyncRouterServer(server::isUpgradeFinalized,
isUpgradeFinalized -> IsUpgradeFinalizedResponseProto.newBuilder()
.setIsUpgradeFinalized(isUpgradeFinalized).build());
return null;
}
@Override
public IsRollingUpgradeResponseProto isRollingUpgrade(
RpcController controller, IsRollingUpgradeRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return super.isRollingUpgrade(controller, request);
}
asyncRouterServer(server::isRollingUpgrade,
isRollingUpgrade -> IsRollingUpgradeResponseProto.newBuilder()
.setIsRollingUpgrade(isRollingUpgrade).build());
return null;
}
@Override
public GetNextSPSPathResponseProto getNextSPSPath(
RpcController controller, GetNextSPSPathRequestProto request)
throws ServiceException {
if (!isAsyncRpc) {
return super.getNextSPSPath(controller, request);
}
asyncRouterServer(server::getNextSPSPath,
nextSPSPath -> GetNextSPSPathResponseProto.newBuilder()
.setSpsPath(nextSPSPath).build());
return null;
}
}