ConnectionManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implements a pool of connections for the {@link Router} to be able to open
 * many connections to many Namenodes.
 */
public class ConnectionManager {

  private static final Logger LOG =
      LoggerFactory.getLogger(ConnectionManager.class);

  /** Configuration for the connection manager, pool and sockets. */
  private final Configuration conf;

  /** Min number of connections per user + nn. */
  private final int minSize = 1;
  /** Max number of connections per user + nn. */
  private final int maxSize;
  /** Min ratio of active connections per user + nn. */
  private final float minActiveRatio;

  /** How often we close a pool for a particular user + nn. */
  private final long poolCleanupPeriodMs;
  /** How often we close a connection in a pool. */
  private final long connectionCleanupPeriodMs;

  /** Map of connection pools, one pool per user + NN. */
  private final Map<ConnectionPoolId, ConnectionPool> pools;
  /** Lock for accessing pools. */
  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  /** Queue for creating new connections. */
  private final BlockingQueue<ConnectionPool> creatorQueue;
  /**
   * Global federated namespace context for router.
   */
  private final RouterStateIdContext routerStateIdContext;
  /** Max size of queue for creating new connections. */
  private final int creatorQueueMaxSize;

  /** Create new connections asynchronously. */
  private final ConnectionCreator creator;
  /** Periodic executor to remove stale connection pools. */
  private final ScheduledThreadPoolExecutor cleaner =
      new ScheduledThreadPoolExecutor(1);

  /** If the connection manager is running. */
  private boolean running = false;

  public ConnectionManager(Configuration config) {
    this(config, new RouterStateIdContext(config));
  }

  /**
   * Creates a proxy client connection pool manager.
   *
   * @param config Configuration for the connections.
   * @param routerStateIdContext Federated namespace context for router.
   */
  public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) {
    this.conf = config;
    this.routerStateIdContext = routerStateIdContext;
    // Configure minimum, maximum and active connection pools
    this.maxSize = this.conf.getInt(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
    this.minActiveRatio = this.conf.getFloat(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT);

    // Map with the connections indexed by UGI and Namenode
    this.pools = new HashMap<>();

    // Create connections in a thread asynchronously
    this.creatorQueueMaxSize = this.conf.getInt(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT
        );
    this.creatorQueue = new ArrayBlockingQueue<>(this.creatorQueueMaxSize);
    this.creator = new ConnectionCreator(this.creatorQueue);
    this.creator.setDaemon(true);

    // Cleanup periods
    this.poolCleanupPeriodMs = this.conf.getLong(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT);
    LOG.info("Cleaning connection pools every {} seconds",
        TimeUnit.MILLISECONDS.toSeconds(this.poolCleanupPeriodMs));
    this.connectionCleanupPeriodMs = this.conf.getLong(
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS,
        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
    LOG.info("Cleaning connections every {} seconds",
        TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));
  }

  /**
   * Start the connection manager.
   */
  public void start() {
    // Start the thread that creates connections asynchronously
    this.creator.start();

    // Schedule a task to remove stale connection pools and sockets
    long recycleTimeMs = Math.min(
        poolCleanupPeriodMs, connectionCleanupPeriodMs);
    LOG.info("Cleaning every {} seconds",
        TimeUnit.MILLISECONDS.toSeconds(recycleTimeMs));
    this.cleaner.scheduleAtFixedRate(
        new CleanupTask(), 0, recycleTimeMs, TimeUnit.MILLISECONDS);

    // Mark the manager as running
    this.running = true;
  }

  /**
   * Stop the connection manager by closing all the pools.
   */
  public void close() {
    this.creator.shutdown();
    this.cleaner.shutdown();
    this.running = false;

    writeLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        pool.close();
      }
      this.pools.clear();
    } finally {
      writeLock.unlock();
    }
  }

  @VisibleForTesting
  public void closeConnectionCreator(){
    this.creator.shutdown();
  }

  /**
   * Fetches the next available proxy client in the pool. Each client connection
   * is reserved for a single user and cannot be reused until free.
   *
   * @param ugi User group information.
   * @param nnAddress Namenode address for the connection.
   * @param protocol Protocol for the connection.
   * @param nsId Nameservice identity.
   * @return Proxy client to connect to nnId as UGI.
   * @throws IOException If the connection cannot be obtained.
   */
  public ConnectionContext getConnection(UserGroupInformation ugi,
      String nnAddress, Class<?> protocol, String nsId) throws IOException {
    // Check if the manager is shutdown
    if (!this.running) {
      LOG.error(
          "Cannot get a connection to {} because the manager isn't running",
          nnAddress);
      return null;
    }

    // Try to get the pool if created
    ConnectionPoolId connectionId =
        new ConnectionPoolId(ugi, nnAddress, protocol);
    ConnectionPool pool = null;
    readLock.lock();
    try {
      pool = this.pools.get(connectionId);
    } finally {
      readLock.unlock();
    }

    // Create the pool if not created before
    if (pool == null) {
      writeLock.lock();
      try {
        pool = this.pools.get(connectionId);
        if (pool == null) {
          pool = new ConnectionPool(
              this.conf, nnAddress, ugi, this.minSize, this.maxSize,
              this.minActiveRatio, protocol,
              new PoolAlignmentContext(this.routerStateIdContext, nsId));
          this.pools.put(connectionId, pool);
        }
      } finally {
        writeLock.unlock();
      }
    }

    long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId);
    pool.getPoolAlignmentContext().advanceClientStateId(clientStateId);

    ConnectionContext conn = pool.getConnection();

    // Add a new connection to the pool if it wasn't usable
    if (conn == null || !conn.isUsable()) {
      if (!this.creatorQueue.contains(pool) && !this.creatorQueue.offer(pool)) {
        LOG.error("Cannot add more than {} connections at the same time",
            this.creatorQueueMaxSize);
      }
    }

    if (conn != null && conn.isClosed()) {
      LOG.error("We got a closed connection from {}", pool);
      conn = null;
    }

    return conn;
  }

  /**
   * Get the number of connection pools.
   *
   * @return Number of connection pools.
   */
  public int getNumConnectionPools() {
    readLock.lock();
    try {
      return pools.size();
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Get number of open connections.
   *
   * @return Number of open connections.
   */
  public int getNumConnections() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumConnections();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get number of active connections.
   *
   * @return Number of active connections.
   */
  public int getNumActiveConnections() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumActiveConnections();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get number of idle connections.
   *
   * @return Number of active connections.
   */
  public int getNumIdleConnections() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumIdleConnections();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get number of recently active connections.
   *
   * @return Number of recently active connections.
   */
  public int getNumActiveConnectionsRecently() {
    int total = 0;
    readLock.lock();
    try {
      for (ConnectionPool pool : this.pools.values()) {
        total += pool.getNumActiveConnectionsRecently();
      }
    } finally {
      readLock.unlock();
    }
    return total;
  }

  /**
   * Get the number of connections to be created.
   *
   * @return Number of connections to be created.
   */
  public int getNumCreatingConnections() {
    return this.creatorQueue.size();
  }

  /**
   * Get a JSON representation of the connection pool.
   *
   * @return JSON representation of all the connection pools.
   */
  public String getJSON() {
    final Map<String, String> info = new TreeMap<>();
    readLock.lock();
    try {
      for (Entry<ConnectionPoolId, ConnectionPool> entry :
          this.pools.entrySet()) {
        ConnectionPoolId connectionPoolId = entry.getKey();
        ConnectionPool pool = entry.getValue();
        info.put(connectionPoolId.toString(), pool.getJSON());
      }
    } finally {
      readLock.unlock();
    }
    return JSON.toString(info);
  }

  @VisibleForTesting
  Map<ConnectionPoolId, ConnectionPool> getPools() {
    return this.pools;
  }

  /**
   * Clean the unused connections for this pool.
   *
   * @param pool Connection pool to cleanup.
   */
  @VisibleForTesting
  void cleanup(ConnectionPool pool) {
    if (pool.getNumConnections() > pool.getMinSize()) {
      // Check if the pool hasn't been active in a while or not 50% are used
      long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
      int total = pool.getNumConnections();
      // Active is a transient status in many cases for a connection since
      // the handler thread uses the connection very quickly. Thus, the number
      // of connections with handlers using at the call time is constantly low.
      // Recently active is more lasting status, and it shows how many
      // connections have been used with a recent time period. (i.e. 30 seconds)
      int active = pool.getNumActiveConnectionsRecently();
      float poolMinActiveRatio = pool.getMinActiveRatio();
      if (timeSinceLastActive > connectionCleanupPeriodMs ||
          active < poolMinActiveRatio * total) {
        // Be greedy here to close as many connections as possible in one shot
        // The number should at least be 1
        int targetConnectionsCount = Math.max(1,
            (int)(poolMinActiveRatio * total) - active);
        List<ConnectionContext> connections =
            pool.removeConnections(targetConnectionsCount);
        for (ConnectionContext conn : connections) {
          conn.close();
        }
        LOG.debug("Removed connection {} used {} seconds ago. " +
                "Pool has {}/{} connections", pool.getConnectionPoolId(),
            TimeUnit.MILLISECONDS.toSeconds(timeSinceLastActive),
            pool.getNumConnections(), pool.getMaxSize());
      }
    }
  }

  /**
   * Removes stale connections not accessed recently from the pool. This is
   * invoked periodically.
   */
  private class CleanupTask implements Runnable {

    @Override
    public void run() {
      long currentTime = Time.now();
      List<ConnectionPoolId> toRemove = new LinkedList<>();

      // Look for stale pools
      readLock.lock();
      try {
        for (Entry<ConnectionPoolId, ConnectionPool> entry : pools.entrySet()) {
          ConnectionPool pool = entry.getValue();
          long lastTimeActive = pool.getLastActiveTime();
          boolean isStale =
              currentTime > (lastTimeActive + poolCleanupPeriodMs);
          if (lastTimeActive > 0 && isStale) {
            // Remove this pool
            LOG.debug("Closing and removing stale pool {}", pool);
            pool.close();
            ConnectionPoolId poolId = entry.getKey();
            toRemove.add(poolId);
          } else {
            // Keep this pool but clean connections inside
            LOG.debug("Cleaning up {}", pool);
            cleanup(pool);
          }
        }
      } finally {
        readLock.unlock();
      }

      // Remove stale pools
      if (!toRemove.isEmpty()) {
        writeLock.lock();
        try {
          for (ConnectionPoolId poolId : toRemove) {
            pools.remove(poolId);
          }
        } finally {
          writeLock.unlock();
        }
      }
    }
  }

  /**
   * Thread that creates connections asynchronously.
   */
  static class ConnectionCreator extends Thread {
    /** If the creator is running. */
    private boolean running = true;
    /** Queue to push work to. */
    private BlockingQueue<ConnectionPool> queue;

    ConnectionCreator(BlockingQueue<ConnectionPool> blockingQueue) {
      super("Connection creator");
      this.queue = blockingQueue;
    }

    @Override
    public void run() {
      while (this.running) {
        try {
          ConnectionPool pool = this.queue.take();
          try {
            int total = pool.getNumConnections();
            int active = pool.getNumActiveConnectionsRecently();
            float poolMinActiveRatio = pool.getMinActiveRatio();
            if (pool.getNumConnections() < pool.getMaxSize() &&
                active >= poolMinActiveRatio * total) {
              ConnectionContext conn = pool.newConnection();
              pool.addConnection(conn);
            } else {
              LOG.debug("Cannot add more than {} connections to {}",
                  pool.getMaxSize(), pool);
            }
          } catch (IOException e) {
            LOG.error("Cannot create a new connection for {} {}", pool, e);
          }
        } catch (InterruptedException e) {
          LOG.error("The connection creator was interrupted");
          this.running = false;
        } catch (Throwable e) {
          LOG.error("Fatal error caught by connection creator ", e);
        }
      }
    }

    /**
     * Stop this connection creator.
     */
    public void shutdown() {
      this.running = false;
      this.interrupt();
    }
  }
}