RouterNamenodeProtocolTranslatorPB.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.Client;
import java.io.IOException;
import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncIpcClient;
public class RouterNamenodeProtocolTranslatorPB extends NamenodeProtocolTranslatorPB {
/*
* Protobuf requests with no parameters instantiated only once
*/
private static final GetBlockKeysRequestProto VOID_GET_BLOCKKEYS_REQUEST =
GetBlockKeysRequestProto.newBuilder().build();
private static final GetTransactionIdRequestProto VOID_GET_TRANSACTIONID_REQUEST =
GetTransactionIdRequestProto.newBuilder().build();
private static final RollEditLogRequestProto VOID_ROLL_EDITLOG_REQUEST =
RollEditLogRequestProto.newBuilder().build();
private static final VersionRequestProto VOID_VERSION_REQUEST =
VersionRequestProto.newBuilder().build();
private final NamenodeProtocolPB rpcProxy;
public RouterNamenodeProtocolTranslatorPB(NamenodeProtocolPB rpcProxy) {
super(rpcProxy);
this.rpcProxy = rpcProxy;
}
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
}
NamenodeProtocolProtos.GetBlocksRequestProto.Builder builder =
NamenodeProtocolProtos.GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval);
if (storageType != null) {
builder.setStorageType(PBHelperClient.convertStorageType(storageType));
}
NamenodeProtocolProtos.GetBlocksRequestProto req = builder.build();
return asyncIpcClient(() -> rpcProxy.getBlocks(null, req),
res -> PBHelper.convert(res.getBlocks()),
BlocksWithLocations.class);
}
@Override
public ExportedBlockKeys getBlockKeys() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getBlockKeys();
}
return asyncIpcClient(() -> rpcProxy.getBlockKeys(null,
VOID_GET_BLOCKKEYS_REQUEST),
res -> res.hasKeys() ? PBHelper.convert(res.getKeys()) : null,
ExportedBlockKeys.class);
}
@Override
public long getTransactionID() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getTransactionID();
}
return asyncIpcClient(() -> rpcProxy.getTransactionId(null,
VOID_GET_TRANSACTIONID_REQUEST),
res -> res.getTxId(), Long.class);
}
@Override
public long getMostRecentCheckpointTxId() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getMostRecentCheckpointTxId();
}
return asyncIpcClient(() -> rpcProxy.getMostRecentCheckpointTxId(null,
NamenodeProtocolProtos
.GetMostRecentCheckpointTxIdRequestProto
.getDefaultInstance()),
res -> res.getTxId(), Long.class);
}
@Override
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getMostRecentNameNodeFileTxId(nnf);
}
return asyncIpcClient(() -> rpcProxy.getMostRecentNameNodeFileTxId(null,
NamenodeProtocolProtos
.GetMostRecentNameNodeFileTxIdRequestProto
.newBuilder()
.setNameNodeFile(nnf.toString())
.build()),
res -> res.getTxId(), Long.class);
}
@Override
public CheckpointSignature rollEditLog() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.rollEditLog();
}
return asyncIpcClient(() -> rpcProxy.rollEditLog(null,
VOID_ROLL_EDITLOG_REQUEST),
res -> PBHelper.convert(res.getSignature()), CheckpointSignature.class);
}
@Override
public NamespaceInfo versionRequest() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.versionRequest();
}
return asyncIpcClient(() -> rpcProxy.versionRequest(null,
VOID_VERSION_REQUEST),
res -> PBHelper.convert(res.getInfo()),
NamespaceInfo.class);
}
@Override
public void errorReport(NamenodeRegistration registration, int errorCode,
String msg) throws IOException {
if (!Client.isAsynchronousMode()) {
super.errorReport(registration, errorCode, msg);
return;
}
NamenodeProtocolProtos.ErrorReportRequestProto req =
NamenodeProtocolProtos.ErrorReportRequestProto.newBuilder()
.setErrorCode(errorCode).setMsg(msg)
.setRegistration(PBHelper.convert(registration)).build();
asyncIpcClient(() -> rpcProxy.errorReport(null, req),
res -> null, Void.class);
}
@Override
public NamenodeRegistration registerSubordinateNamenode(
NamenodeRegistration registration) throws IOException {
if (!Client.isAsynchronousMode()) {
return super.registerSubordinateNamenode(registration);
}
NamenodeProtocolProtos.RegisterRequestProto req =
NamenodeProtocolProtos.RegisterRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).build();
return asyncIpcClient(() -> rpcProxy.registerSubordinateNamenode(null, req),
res -> PBHelper.convert(res.getRegistration()),
NamenodeRegistration.class);
}
@Override
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
if (!Client.isAsynchronousMode()) {
return super.startCheckpoint(registration);
}
NamenodeProtocolProtos.StartCheckpointRequestProto req =
NamenodeProtocolProtos.StartCheckpointRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).build();
return asyncIpcClient(() -> rpcProxy.startCheckpoint(null, req),
res -> {
HdfsServerProtos.NamenodeCommandProto cmd = res.getCommand();
return PBHelper.convert(cmd);
}, NamenodeCommand.class);
}
@Override
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
if (!Client.isAsynchronousMode()) {
super.endCheckpoint(registration, sig);
return;
}
NamenodeProtocolProtos.EndCheckpointRequestProto req =
NamenodeProtocolProtos.EndCheckpointRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setSignature(PBHelper.convert(sig)).build();
asyncIpcClient(() -> rpcProxy.endCheckpoint(null, req),
res -> null, Void.class);
}
@Override
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getEditLogManifest(sinceTxId);
}
NamenodeProtocolProtos.GetEditLogManifestRequestProto req =
NamenodeProtocolProtos.GetEditLogManifestRequestProto
.newBuilder().setSinceTxId(sinceTxId).build();
return asyncIpcClient(() -> rpcProxy.getEditLogManifest(null, req),
res -> PBHelper.convert(res.getManifest()), RemoteEditLogManifest.class);
}
@Override
public boolean isUpgradeFinalized() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.isUpgradeFinalized();
}
NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto req =
NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto
.newBuilder().build();
return asyncIpcClient(() -> rpcProxy.isUpgradeFinalized(null, req),
res -> res.getIsUpgradeFinalized(), Boolean.class);
}
@Override
public boolean isRollingUpgrade() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.isRollingUpgrade();
}
NamenodeProtocolProtos.IsRollingUpgradeRequestProto req =
NamenodeProtocolProtos.IsRollingUpgradeRequestProto
.newBuilder().build();
return asyncIpcClient(() -> rpcProxy.isRollingUpgrade(null, req),
res -> res.getIsRollingUpgrade(), Boolean.class);
}
@Override
public Long getNextSPSPath() throws IOException {
if (!Client.isAsynchronousMode()) {
return super.getNextSPSPath();
}
NamenodeProtocolProtos.GetNextSPSPathRequestProto req =
NamenodeProtocolProtos.GetNextSPSPathRequestProto.newBuilder().build();
return asyncIpcClient(() -> rpcProxy.getNextSPSPath(null, req),
res -> res.hasSpsPath() ? res.getSpsPath() : null, Long.class);
}
}