LeaseRenewer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.client.impl;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing
 * file-being-written leases on the namenode.
 * When a file is opened for write (create or append),
 * namenode stores a file lease for recording the identity of the writer.
 * The writer (i.e. the DFSClient) is required to renew the lease periodically.
 * When the lease is not renewed before it expires,
 * the namenode considers the writer as failed and then it may either let
 * another writer to obtain the lease or close the file.
 * </p>
 * <p>
 * This class also provides the following functionality:
 * <ul>
 * <li>
 * It maintains a map from (namenode, user) pairs to lease renewers.
 * The same {@link LeaseRenewer} instance is used for renewing lease
 * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and
 * the same user.
 * </li>
 * <li>
 * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
 * Periodically the leases for all the clients are renewed.
 * A client is removed from the list when the client is closed.
 * </li>
 * <li>
 * A thread per namenode per user is used by the {@link LeaseRenewer}
 * to renew the leases.
 * </li>
 * </ul>
 * <p>
 */
@InterfaceAudience.Private
public class LeaseRenewer {
  public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);

  private static long leaseRenewerGraceDefault = 60*1000L;
  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;

  private AtomicBoolean isLSRunning = new AtomicBoolean(false);

  /** Get a {@link LeaseRenewer} instance */
  public static LeaseRenewer getInstance(final String authority,
      final UserGroupInformation ugi, final DFSClient dfsc) {
    final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
    r.addClient(dfsc);
    return r;
  }

  /**
   * Remove the given renewer from the Factory.
   * Subsequent call will receive new {@link LeaseRenewer} instance.
   * @param renewer Instance to be cleared from Factory
   */
  public static void remove(LeaseRenewer renewer) {
    synchronized (renewer) {
      Factory.INSTANCE.remove(renewer);
    }
  }

  /**
   * A factory for sharing {@link LeaseRenewer} objects
   * among {@link DFSClient} instances
   * so that there is only one renewer per authority per user.
   */
  private static class Factory {
    private static final Factory INSTANCE = new Factory();

    private static class Key {
      /** Namenode info */
      final String authority;
      /** User info */
      final UserGroupInformation ugi;

      private Key(final String authority, final UserGroupInformation ugi) {
        if (authority == null) {
          throw new HadoopIllegalArgumentException("authority == null");
        } else if (ugi == null) {
          throw new HadoopIllegalArgumentException("ugi == null");
        }

        this.authority = authority;
        this.ugi = ugi;
      }

      @Override
      public int hashCode() {
        return authority.hashCode() ^ ugi.hashCode();
      }

      @Override
      public boolean equals(Object obj) {
        if (obj == this) {
          return true;
        }
        if (obj != null && obj instanceof Key) {
          final Key that = (Key)obj;
          return this.authority.equals(that.authority)
                 && this.ugi.equals(that.ugi);
        }
        return false;
      }

      @Override
      public String toString() {
        return ugi.getShortUserName() + "@" + authority;
      }
    }

    /** A map for per user per namenode renewers. */
    private final Map<Key, LeaseRenewer> renewers = new HashMap<>();

    /** Get a renewer. */
    private synchronized LeaseRenewer get(final String authority,
        final UserGroupInformation ugi) {
      final Key k = new Key(authority, ugi);
      LeaseRenewer r = renewers.get(k);
      if (r == null) {
        r = new LeaseRenewer(k);
        renewers.put(k, r);
      }
      return r;
    }

    /** Remove the given renewer. */
    private synchronized void remove(final LeaseRenewer r) {
      final LeaseRenewer stored = renewers.get(r.factorykey);
      //Since a renewer may expire, the stored renewer can be different.
      if (r == stored) {
        // Expire LeaseRenewer daemon thread as soon as possible.
        r.clearClients();
        r.setEmptyTime(0);
        renewers.remove(r.factorykey);
      }
    }
  }

  /** The time in milliseconds that the map became empty. */
  private long emptyTime = Long.MAX_VALUE;
  /** A fixed lease renewal time period in milliseconds */
  private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD / 2;

  /** A daemon for renewing lease */
  private Daemon daemon = null;
  /** Only the daemon with currentId should run. */
  private int currentId = 0;

  /**
   * A period in milliseconds that the lease renewer thread should run
   * after the map became empty.
   * In other words,
   * if the map is empty for a time period longer than the grace period,
   * the renewer should terminate.
   */
  private long gracePeriod;
  /**
   * The time period in milliseconds
   * that the renewer sleeps for each iteration.
   */
  private long sleepPeriod;

  private final Factory.Key factorykey;

  /** A list of clients corresponding to this renewer. */
  private final List<DFSClient> dfsclients = new ArrayList<>();

  /**
   * A stringified stack trace of the call stack when the Lease Renewer
   * was instantiated. This is only generated if trace-level logging is
   * enabled on this class.
   */
  private final String instantiationTrace;

  private LeaseRenewer(Factory.Key factorykey) {
    this.factorykey = factorykey;
    unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault);

    if (LOG.isTraceEnabled()) {
      instantiationTrace = StringUtils.stringifyException(
        new Throwable("TRACE"));
    } else {
      instantiationTrace = null;
    }
  }

  /** @return the renewal time in milliseconds. */
  private synchronized long getRenewalTime() {
    return renewal;
  }

  /** Used for testing only. */
  @VisibleForTesting
  public synchronized void setRenewalTime(final long renewal) {
    this.renewal = renewal;
  }

  /** Add a client. */
  private synchronized void addClient(final DFSClient dfsc) {
    for(DFSClient c : dfsclients) {
      if (c == dfsc) {
        //client already exists, nothing to do.
        return;
      }
    }
    //client not found, add it
    dfsclients.add(dfsc);

    //update renewal time
    final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
    if (hdfsTimeout > 0) {
      final long half = hdfsTimeout/2;
      if (half < renewal) {
        this.renewal = half;
      }
    }
  }

  private synchronized void clearClients() {
    dfsclients.clear();
  }

  private synchronized boolean clientsRunning() {
    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
      if (!i.next().isClientRunning()) {
        i.remove();
      }
    }
    return !dfsclients.isEmpty();
  }

  private synchronized long getSleepPeriod() {
    return sleepPeriod;
  }

  /** Set the grace period and adjust the sleep period accordingly. */
  synchronized void setGraceSleepPeriod(final long gracePeriod) {
    unsyncSetGraceSleepPeriod(gracePeriod);
  }

  private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
    if (gracePeriod < 100L) {
      throw new HadoopIllegalArgumentException(gracePeriod
          + " = gracePeriod < 100ms is too small.");
    }
    this.gracePeriod = gracePeriod;
    final long half = gracePeriod/2;
    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
        half: LEASE_RENEWER_SLEEP_DEFAULT;
  }

  @VisibleForTesting
  /** Is the daemon running? */
  public synchronized boolean isRunning() {
    return daemon != null && daemon.isAlive();
  }

  /** Does this renewer have nothing to renew? */
  public boolean isEmpty() {
    return dfsclients.isEmpty();
  }

  /** Used only by tests */
  synchronized String getDaemonName() {
    return daemon.getName();
  }

  /** Is the empty period longer than the grace period? */
  private synchronized boolean isRenewerExpired() {
    return emptyTime != Long.MAX_VALUE
        && Time.monotonicNow() - emptyTime > gracePeriod;
  }

  public synchronized boolean put(final DFSClient dfsc) {
    if (dfsc.isClientRunning()) {
      if (!isRunning() || isRenewerExpired()) {
        // Start a new daemon with a new id.
        final int id = ++currentId;
        if (isLSRunning.get()) {
          // Not allowed to add multiple daemons into LeaseRenewer, let client
          // create new LR and continue to acquire lease.
          return false;
        }
        isLSRunning.getAndSet(true);

        daemon = new Daemon(new Runnable() {
          @Override
          public void run() {
            try {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Lease renewer daemon for {} with renew id {} started",
                    clientsString(), id);
              }
              LeaseRenewer.this.run(id);
            } catch(InterruptedException e) {
              LOG.debug("LeaseRenewer is interrupted.", e);
            } finally {
              synchronized(LeaseRenewer.this) {
                Factory.INSTANCE.remove(LeaseRenewer.this);
              }
              if (LOG.isDebugEnabled()) {
                LOG.debug("Lease renewer daemon for {} with renew id {} exited",
                    clientsString(), id);
              }
            }
          }

          @Override
          public String toString() {
            return String.valueOf(LeaseRenewer.this);
          }
        });
        daemon.start();
      }
      emptyTime = Long.MAX_VALUE;
    }
    return true;
  }

  @VisibleForTesting
  synchronized void setEmptyTime(long time) {
    emptyTime = time;
  }

  /** Close the given client. */
  public synchronized void closeClient(final DFSClient dfsc) {
    dfsclients.remove(dfsc);
    if (dfsclients.isEmpty()) {
      if (!isRunning() || isRenewerExpired()) {
        Factory.INSTANCE.remove(LeaseRenewer.this);
        return;
      }
      if (emptyTime == Long.MAX_VALUE) {
        //discover the first time that the client list is empty.
        emptyTime = Time.monotonicNow();
      }
    }

    //update renewal time
    if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
      long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
      for(DFSClient c : dfsclients) {
        final int timeout = c.getConf().getHdfsTimeout();
        if (timeout > 0 && timeout < min) {
          min = timeout;
        }
      }
      renewal = min/2;
    }
  }

  public void interruptAndJoin() throws InterruptedException {
    Daemon daemonCopy = null;
    synchronized (this) {
      if (isRunning()) {
        daemon.interrupt();
        daemonCopy = daemon;
      }
    }

    if (daemonCopy != null) {
      LOG.debug("Wait for lease checker to terminate");
      daemonCopy.join();
    }
  }

  private void renew() throws IOException {
    final List<DFSClient> copies;
    synchronized(this) {
      copies = new ArrayList<>(dfsclients);
    }
    //sort the client names for finding out repeated names.
    Collections.sort(copies, new Comparator<DFSClient>() {
      @Override
      public int compare(final DFSClient left, final DFSClient right) {
        return left.getClientName().compareTo(right.getClientName());
      }
    });
    String previousName = "";
    for (final DFSClient c : copies) {
      //skip if current client name is the same as the previous name.
      if (!c.getClientName().equals(previousName)) {
        if (!c.renewLease()) {
          LOG.debug("Did not renew lease for client {}", c);
          continue;
        }
        previousName = c.getClientName();
        LOG.debug("Lease renewed for client {}", previousName);
      }
    }
  }

  /**
   * Periodically check in with the namenode and renew all the leases
   * when the lease period is half over.
   */
  private void run(final int id) throws InterruptedException {
    for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
        Thread.sleep(getSleepPeriod())) {
      final long elapsed = Time.monotonicNow() - lastRenewed;
      if (elapsed >= getRenewalTime()) {
        try {
          renew();
          if (LOG.isDebugEnabled()) {
            LOG.debug("Lease renewer daemon for {} with renew id {} executed",
                clientsString(), id);
          }
          lastRenewed = Time.monotonicNow();
        } catch (SocketTimeoutException ie) {
          LOG.warn("Failed to renew lease for {} for {} seconds.  Aborting ...",
              clientsString(), (elapsed/1000), ie);
          List<DFSClient> dfsclientsCopy;
          synchronized (this) {
            DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
            dfsclientsCopy = new ArrayList<>(dfsclients);
            Factory.INSTANCE.remove(LeaseRenewer.this);
          }
          for (DFSClient dfsClient : dfsclientsCopy) {
            dfsClient.closeAllFilesBeingWritten(true);
          }
          break;
        } catch (IOException ie) {
          LOG.warn("Failed to renew lease for {} for {} seconds.  Will retry shortly ...",
              clientsString(), (elapsed/1000), ie);
        }
      }

      synchronized(this) {
        if (id != currentId || isRenewerExpired()) {
          if (LOG.isDebugEnabled()) {
            if (id != currentId) {
              LOG.debug("Lease renewer daemon for {} with renew id {} is not current",
                  clientsString(), id);
            } else {
              LOG.debug("Lease renewer daemon for {} with renew id {} expired",
                  clientsString(), id);
            }
          }
          //no longer the current daemon or expired
          return;
        }

        // if no clients are in running state or there is no more clients
        // registered with this renewer, stop the daemon after the grace
        // period.
        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
          emptyTime = Time.monotonicNow();
        }
      }
    }
  }

  @Override
  public String toString() {
    String s = getClass().getSimpleName() + ":" + factorykey;
    if (LOG.isTraceEnabled()) {
      return s + ", clients=" +  clientsString()
        + ", created at " + instantiationTrace;
    }
    return s;
  }

  /** Get the names of all clients */
  private synchronized String clientsString() {
    if (dfsclients.isEmpty()) {
      return "[]";
    } else {
      final StringBuilder b = new StringBuilder("[").append(
          dfsclients.get(0).getClientName());
      for(int i = 1; i < dfsclients.size(); i++) {
        b.append(", ").append(dfsclients.get(i).getClientName());
      }
      return b.append("]").toString();
    }
  }

  @VisibleForTesting
  public static void setLeaseRenewerGraceDefault(
      long leaseRenewerGraceDefault) {
    LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault;
  }
}