TestWebHdfsTimeouts.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.web;

import static org.junit.jupiter.api.Assertions.fail;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Timeout;
import org.opentest4j.TestAbortedException;

/**
 * This test suite checks that WebHdfsFileSystem sets connection timeouts and
 * read timeouts on its sockets, thus preventing threads from hanging
 * indefinitely on an undefined/infinite timeout.  The tests work by starting a
 * bogus server on the namenode HTTP port, which is rigged to not accept new
 * connections or to accept connections but not send responses.
 */
public class TestWebHdfsTimeouts {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestWebHdfsTimeouts.class);

  private static final int CLIENTS_TO_CONSUME_BACKLOG = 129;
  private static final int CONNECTION_BACKLOG = 1;
  private static final int SHORT_SOCKET_TIMEOUT = 200;

  private List<SocketChannel> clients;
  private WebHdfsFileSystem fs;
  private InetSocketAddress nnHttpAddress;
  private ServerSocket serverSocket;
  private Thread serverThread;
  private final URLConnectionFactory connectionFactory = new URLConnectionFactory(new ConnectionConfigurator() {
    @Override
    public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
      conn.setReadTimeout(SHORT_SOCKET_TIMEOUT);
      conn.setConnectTimeout(SHORT_SOCKET_TIMEOUT);
      return conn;
    }
  });
  private volatile boolean failedToConsumeBacklog;

  public enum TimeoutSource { ConnectionFactory, Configuration };

  /**
   * Run all tests twice: once with the timeouts set by the
   * connection factory, and again with the timeouts set by
   * configuration options.
   */
  public static Collection<Object[]> data() {
    return Arrays.asList(new Object[][] {
      { TimeoutSource.ConnectionFactory },
      { TimeoutSource.Configuration }
    });
  }

  public void setUp(TimeoutSource timeoutSource) throws Exception {
    Configuration conf = WebHdfsTestUtil.createConf();
    serverSocket = new ServerSocket(0, CONNECTION_BACKLOG);
    nnHttpAddress = new InetSocketAddress("localhost", serverSocket.getLocalPort());
    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "localhost:" + serverSocket.getLocalPort());
    if (timeoutSource == TimeoutSource.Configuration) {
      String v = Integer.toString(SHORT_SOCKET_TIMEOUT) + "ms";
      conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_CONNECT_TIMEOUT_KEY, v);
      conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_SOCKET_READ_TIMEOUT_KEY, v);
    }

    fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
    if (timeoutSource == TimeoutSource.ConnectionFactory) {
      fs.connectionFactory = connectionFactory;
    }

    clients = new ArrayList<SocketChannel>();
    serverThread = null;
    failedToConsumeBacklog = false;
  }

  @AfterEach
  public void tearDown() throws Exception {
    IOUtils.cleanupWithLogger(
        LOG, clients.toArray(new SocketChannel[clients.size()]));
    IOUtils.cleanupWithLogger(LOG, fs);
    if (serverSocket != null) {
      try {
        serverSocket.close();
      } catch (IOException e) {
        LOG.debug("Exception in closing " + serverSocket, e);
      }
    }
    if (serverThread != null) {
      serverThread.join();
    }
  }

  /**
   * Expect connect timeout, because the connection backlog is consumed.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testConnectTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    consumeConnectionBacklog();
    try {
      fs.listFiles(new Path("/"), false);
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      GenericTestUtils.assertExceptionContains(fs.getUri().getAuthority()
          + ": connect timed out",e);
    }
  }

  /**
   * Expect read timeout, because the bogus server never sends a reply.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testReadTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    try {
      fs.listFiles(new Path("/"), false);
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      GenericTestUtils.assertExceptionContains(fs.getUri().getAuthority() +
          ": Read timed out", e);
    }
  }

  /**
   * Expect connect timeout on a URL that requires auth, because the connection
   * backlog is consumed.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testAuthUrlConnectTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    consumeConnectionBacklog();
    try {
      fs.getDelegationToken("renewer");
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      GenericTestUtils.assertExceptionContains(fs.getUri().getAuthority() +
          ": connect timed out", e);
    }
  }

  /**
   * Expect read timeout on a URL that requires auth, because the bogus server
   * never sends a reply.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testAuthUrlReadTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    try {
      fs.getDelegationToken("renewer");
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      GenericTestUtils.assertExceptionContains(
          fs.getUri().getAuthority() + ": Read timed out", e);
    }
  }

  /**
   * After a redirect, expect connect timeout accessing the redirect location,
   * because the connection backlog is consumed.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testRedirectConnectTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    startSingleTemporaryRedirectResponseThread(true);
    try {
      fs.getFileChecksum(new Path("/file"));
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      assumeBacklogConsumed();
      GenericTestUtils.assertExceptionContains(
          fs.getUri().getAuthority() + ": connect timed out", e);
    }
  }

  /**
   * After a redirect, expect read timeout accessing the redirect location,
   * because the bogus server never sends a reply.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testRedirectReadTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    startSingleTemporaryRedirectResponseThread(false);
    try {
      fs.getFileChecksum(new Path("/file"));
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      GenericTestUtils.assertExceptionContains(
          fs.getUri().getAuthority() + ": Read timed out", e);
    }
  }

  /**
   * On the second step of two-step write, expect connect timeout accessing the
   * redirect location, because the connection backlog is consumed.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testTwoStepWriteConnectTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    startSingleTemporaryRedirectResponseThread(true);
    OutputStream os = null;
    try {
      os = fs.create(new Path("/file"));
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      assumeBacklogConsumed();
      GenericTestUtils.assertExceptionContains(
          fs.getUri().getAuthority() + ": connect timed out", e);
    } finally {
      IOUtils.cleanupWithLogger(LOG, os);
    }
  }

  /**
   * On the second step of two-step write, expect read timeout accessing the
   * redirect location, because the bogus server never sends a reply.
   */
  @MethodSource("data")
  @ParameterizedTest
  @EnumSource(TimeoutSource.class)
  @Timeout(value = 100)
  public void testTwoStepWriteReadTimeout(TimeoutSource src) throws Exception {
    setUp(src);
    startSingleTemporaryRedirectResponseThread(false);
    OutputStream os = null;
    try {
      os = fs.create(new Path("/file"));
      os.close(); // must close stream to force reading the HTTP response
      os = null;
      fail("expected timeout");
    } catch (SocketTimeoutException e) {
      GenericTestUtils.assertExceptionContains("Read timed out", e);
    } finally {
      IOUtils.cleanupWithLogger(LOG, os);
    }
  }

  /**
   * Starts a background thread that accepts one and only one client connection
   * on the server socket, sends an HTTP 307 Temporary Redirect response, and
   * then exits.  This is useful for testing timeouts on the second step of
   * methods that issue 2 HTTP requests (request 1, redirect, request 2).
   * 
   * For handling the first request, this method sets socket timeout to use the
   * initial values defined in URLUtils.  Afterwards, it guarantees that the
   * second request will use a very short timeout.
   * 
   * Optionally, the thread may consume the connection backlog immediately after
   * receiving its one and only client connection.  This is useful for forcing a
   * connection timeout on the second request.
   * 
   * On tearDown, open client connections are closed, and the thread is joined.
   * 
   * @param consumeConnectionBacklog boolean whether or not to consume connection
   *   backlog and thus force a connection timeout on the second request
   */
  private void startSingleTemporaryRedirectResponseThread(
      final boolean consumeConnectionBacklog) {
    fs.connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
    serverThread = new Thread() {
      @Override
      public void run() {
        Socket clientSocket = null;
        OutputStream out = null;
        InputStream in = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        try {
          // Accept one and only one client connection.
          clientSocket = serverSocket.accept();

          // Immediately setup conditions for subsequent connections.
          fs.connectionFactory = connectionFactory;
          if (consumeConnectionBacklog) {
            consumeConnectionBacklog();
          }

          // Consume client's HTTP request by reading until EOF or empty line.
          in = clientSocket.getInputStream();
          isr = new InputStreamReader(in);
          br = new BufferedReader(isr);
          for (;;) {
            String line = br.readLine();
            if (line == null || line.isEmpty()) {
              break;
            }
          }

          // Write response.
          out = clientSocket.getOutputStream();
          out.write(temporaryRedirect().getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
          // Fail the test on any I/O error in the server thread.
          LOG.error("unexpected IOException in server thread", e);
          fail("unexpected IOException in server thread: " + e);
        } finally {
          // Clean it all up.
          IOUtils.cleanupWithLogger(LOG, br, isr, in, out);
          IOUtils.closeSocket(clientSocket);
        }
      }
    };
    serverThread.start();
  }

  /**
   * Consumes the test server's connection backlog by spamming non-blocking
   * SocketChannel client connections.  We never do anything with these sockets
   * beyond just initiaing the connections.  The method saves a reference to each
   * new SocketChannel so that it can be closed during tearDown.  We define a
   * very small connection backlog, but the OS may silently enforce a larger
   * minimum backlog than requested.  To work around this, we create far more
   * client connections than our defined backlog.
   * 
   * @throws IOException thrown for any I/O error
   */
  private void consumeConnectionBacklog() throws IOException {
    for (int i = 0; i < CLIENTS_TO_CONSUME_BACKLOG; ++i) {
      SocketChannel client = SocketChannel.open();
      client.configureBlocking(false);
      client.connect(nnHttpAddress);
      clients.add(client);
    }
    try {
      GenericTestUtils.waitFor(() -> {
        try (SocketChannel c = SocketChannel.open()) {
          c.socket().connect(nnHttpAddress, 100);
        } catch (SocketTimeoutException e) {
          return true;
        } catch (IOException e) {
          LOG.debug("unexpected exception: " + e);
        }
        return false;
      }, 100, 10000);
    } catch (TimeoutException | InterruptedException e) {
      failedToConsumeBacklog = true;
      assumeBacklogConsumed();
    }
  }

  private void assumeBacklogConsumed() {
    if (failedToConsumeBacklog) {
      throw new TestAbortedException(
          "failed to fill up connection backlog.");
    }
  }

  /**
   * Creates an HTTP 307 response with the redirect location set back to the
   * test server's address.  HTTP is supposed to terminate newlines with CRLF, so
   * we hard-code that instead of using the line separator property.
   * 
   * @return String HTTP 307 response
   */
  private String temporaryRedirect() {
    return "HTTP/1.1 307 Temporary Redirect\r\n" +
      "Location: http://" + NetUtils.getHostPortString(nnHttpAddress) + "\r\n" +
      "\r\n";
  }
}