TestRPCWaitForProxy.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* tests that the proxy can be interrupted
*/
public class TestRPCWaitForProxy extends TestRpcBase {
private static final Logger
LOG = LoggerFactory.getLogger(TestRPCWaitForProxy.class);
private static final Configuration conf = new Configuration();
@BeforeEach
public void setupProtocolEngine() {
RPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine2.class);
}
/**
* This tests that the time-bounded wait for a proxy operation works, and
* times out.
*
* @throws Throwable any exception other than that which was expected
*/
@Test
@Timeout(value = 50)
public void testWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(0);
worker.start();
worker.join();
Throwable caught = worker.getCaught();
Throwable cause = caught.getCause();
assertNotNull(cause, "No exception was raised");
if (!(cause instanceof ConnectException)) {
throw caught;
}
}
/**
* This test sets off a blocking thread and then interrupts it, before
* checking that the thread was interrupted
*
* @throws Throwable any exception other than that which was expected
*/
@Test
@Timeout(value = 10)
public void testInterruptedWaitForProxy() throws Throwable {
RpcThread worker = new RpcThread(100);
worker.start();
Thread.sleep(1000);
assertTrue(worker.waitStarted, "worker hasn't started");
worker.interrupt();
worker.join();
Throwable caught = worker.getCaught();
assertNotNull(caught, "No exception was raised");
// looking for the root cause here, which can be wrapped
// as part of the NetUtils work. Having this test look
// a the type of exception there would be brittle to improvements
// in exception diagnostics.
Throwable cause = caught.getCause();
if (cause == null) {
// no inner cause, use outer exception as root cause.
cause = caught;
} else if (cause.getCause() != null) {
cause = cause.getCause();
}
if (!(cause instanceof InterruptedIOException)
&& !(cause instanceof ClosedByInterruptException)) {
throw caught;
}
}
/**
* This thread waits for a proxy for the specified timeout, and retains any
* throwable that was raised in the process
*/
private class RpcThread extends Thread {
private Throwable caught;
private int connectRetries;
private volatile boolean waitStarted = false;
private RpcThread(int connectRetries) {
this.connectRetries = connectRetries;
}
@Override
public void run() {
try {
Configuration config = new Configuration(conf);
config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
connectRetries);
config.setInt(
IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
connectRetries);
waitStarted = true;
short invalidPort = 20;
InetSocketAddress invalidAddress = new InetSocketAddress(ADDRESS,
invalidPort);
TestRpcBase.TestRpcService proxy = RPC.getProxy(
TestRpcBase.TestRpcService.class,
1L, invalidAddress, conf);
// Test echo method
proxy.echo(null, newEchoRequest("hello"));
} catch (Throwable throwable) {
caught = throwable;
}
}
public Throwable getCaught() {
return caught;
}
}
}