ConnectionContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Context to track a connection in a {@link ConnectionPool}. When a client uses
 * a connection, it increments a counter to mark it as active. Once the client
 * is done with the connection, it decreases the counter. It also takes care of
 * closing the connection once is not active.
 *
 * The protocols currently used are:
 * <ul>
 * <li>{@link org.apache.hadoop.hdfs.protocol.ClientProtocol}
 * <li>{@link org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol}
 * </ul>
 */
public class ConnectionContext {

  private static final Logger LOG =
      LoggerFactory.getLogger(ConnectionContext.class);

  /** Client for the connection. */
  private final ProxyAndInfo<?> client;
  /** How many threads are using this connection. */
  private int numThreads = 0;
  /** If the connection is closed. */
  private boolean closed = false;
  /** Last timestamp the connection was active. */
  private long lastActiveTs = 0;
  /** The connection's active status would expire after this window. */
  private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
  /** The maximum number of requests that this connection can handle concurrently. **/
  private final int maxConcurrencyPerConn;

  public ConnectionContext(ProxyAndInfo<?> connection, Configuration conf) {
    this.client = connection;
    this.maxConcurrencyPerConn = conf.getInt(
        RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_KEY,
        RBFConfigKeys.DFS_ROUTER_MAX_CONCURRENCY_PER_CONNECTION_DEFAULT);
  }

  /**
   * Check if the connection is active.
   *
   * @return True if the connection is active.
   */
  public synchronized boolean isActive() {
    return this.numThreads > 0;
  }

  /**
   * Check if the connection is/was active recently.
   *
   * @return True if the connection is active or
   * was active in the past period of time.
   */
  public synchronized boolean isActiveRecently() {
    return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
  }

  /**
   * Check if the connection is closed.
   *
   * @return If the connection is closed.
   */
  public synchronized boolean isClosed() {
    return this.closed;
  }

  /**
   * Check if the connection can be used. It checks if the connection is used by
   * another thread or already closed.
   *
   * @return True if the connection can be used.
   */
  public synchronized boolean isUsable() {
    return hasAvailableConcurrency() && !isClosed();
  }

  /**
   * Return true if this connection context still has available concurrency,
   * else return false.
   */
  private synchronized boolean hasAvailableConcurrency() {
    return this.numThreads < maxConcurrencyPerConn;
  }

  /**
   *  Check if the connection is idle. It checks if the connection is not used
   *  by another thread.
   * @return True if the connection is not used by another thread.
   */
  public synchronized boolean isIdle() {
    return !isActive() && !isClosed();
  }

  /**
   * Get the connection client.
   *
   * @return Connection client.
   */
  public synchronized ProxyAndInfo<?> getClient() {
    this.numThreads++;
    this.lastActiveTs = Time.monotonicNow();
    return this.client;
  }

  /**
   * Release this connection.
   */
  public synchronized void release() {
    if (this.numThreads > 0) {
      this.numThreads--;
    }
  }

  /**
   * Close a connection. Only idle connections can be closed since
   * the RPC proxy would be shut down immediately.
   *
   * @param force whether the connection should be closed anyway.
   */
  public synchronized void close(boolean force) {
    if (!force && this.numThreads > 0) {
      // this is an erroneous case, but we have to close the connection
      // anyway since there will be connection leak if we don't do so
      // the connection has been moved out of the pool
      LOG.error("Active connection with {} handlers will be closed, ConnectionContext is {}",
          this.numThreads, this);
    }
    this.closed = true;
    Object proxy = this.client.getProxy();
    // Nobody should be using this anymore, so it should close right away
    RPC.stopProxy(proxy);
  }

  public synchronized void close() {
    close(false);
  }

  @Override
  public String toString() {
    InetSocketAddress addr = this.client.getAddress();
    Object proxy = this.client.getProxy();
    Class<?> clazz = proxy.getClass();

    StringBuilder sb = new StringBuilder();
    sb.append("hashcode:")
        .append(hashCode())
        .append(" ")
        .append(clazz.getSimpleName())
        .append("@")
        .append(addr)
        .append("x")
        .append(numThreads);
    if (closed) {
      sb.append("[CLOSED]");
    }
    return sb.toString();
  }
}