DataSetLockManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;

import java.util.HashMap;
import java.util.Stack;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Class for maintain a set of lock for fsDataSetImpl.
 */
public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetLock> {
  public static final Logger LOG = LoggerFactory.getLogger(DataSetLockManager.class);
  private final HashMap<String, TrackLog> threadCountMap = new HashMap<>();
  private final LockMap lockMap = new LockMap();
  private boolean isFair = true;
  private final boolean openLockTrace;
  private Exception lastException;
  private DataNode datanode;

  /**
   * Class for maintain lockMap and is thread safe.
   */
  private class LockMap {
    private final HashMap<String, AutoCloseDataSetLock> readlockMap = new HashMap<>();
    private final HashMap<String, AutoCloseDataSetLock> writeLockMap = new HashMap<>();

    public synchronized void addLock(String name, ReentrantReadWriteLock lock) {
      AutoCloseDataSetLock readLock = new AutoCloseDataSetLock(lock.readLock());
      AutoCloseDataSetLock writeLock = new AutoCloseDataSetLock(lock.writeLock());
      if (openLockTrace) {
        readLock.setDataNodeLockManager(DataSetLockManager.this);
        writeLock.setDataNodeLockManager(DataSetLockManager.this);
      }
      readlockMap.putIfAbsent(name, readLock);
      writeLockMap.putIfAbsent(name, writeLock);
    }

    public synchronized void removeLock(String name) {
      if (!readlockMap.containsKey(name) || !writeLockMap.containsKey(name)) {
        LOG.error("The lock " + name + " is not in LockMap");
      }
      readlockMap.remove(name);
      writeLockMap.remove(name);
    }

    public synchronized AutoCloseDataSetLock getReadLock(String name) {
      return readlockMap.get(name);
    }

    public synchronized AutoCloseDataSetLock getWriteLock(String name) {
      return writeLockMap.get(name);
    }
  }

  /**
   * Generate lock order string concatenates with lock name.
   * @param level which level lock want to acquire.
   * @param resources lock name by lock order.
   * @return lock order string concatenates with lock name.
   */
  private String generateLockName(LockLevel level, String... resources) {
    if (resources.length == 1 && level == LockLevel.BLOCK_POOl) {
      if (resources[0] == null) {
        throw new IllegalArgumentException("acquire a null block pool lock");
      }
      return resources[0];
    } else if (resources.length == 2 && level == LockLevel.VOLUME) {
      if (resources[0] == null || resources[1] == null) {
        throw new IllegalArgumentException("acquire a null bp lock : "
            + resources[0] + "volume lock :" + resources[1]);
      }
      return resources[0] + resources[1];
    } else if (resources.length == 3 && level == LockLevel.DIR) {
      if (resources[0] == null || resources[1] == null || resources[2] == null) {
        throw new IllegalArgumentException("acquire a null dataset lock : "
            + resources[0] + ",volume lock :" + resources[1]
        + ",subdir lock :" + resources[2]);
      }
      return resources[0] + resources[1] + resources[2];
    } else {
      throw new IllegalArgumentException("lock level do not match resource");
    }
  }

  /**
   * Class for record thread acquire lock stack trace and count.
   */
  private static class TrackLog {
    private final Stack<Exception> logStack = new Stack<>();
    private int lockCount = 0;
    private final String threadName;

    TrackLog(String threadName) {
      this.threadName = threadName;
      incrLockCount();
    }

    public void incrLockCount() {
      logStack.push(new Exception("lock stack trace"));
      lockCount += 1;
    }

    public void decrLockCount() {
      logStack.pop();
      lockCount -= 1;
    }

    public void showLockMessage() {
      LOG.error("hold lock thread name is:" + threadName +
          " hold count is:" + lockCount);
      while (!logStack.isEmpty()) {
        Exception e = logStack.pop();
        LOG.error("lock stack ", e);
      }
    }

    public boolean shouldClear() {
      return lockCount == 1;
    }
  }

  public DataSetLockManager() {
    this.openLockTrace = true;
  }

  public DataSetLockManager(Configuration conf, DataNode dn) {
    this.isFair = conf.getBoolean(
        DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
        DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT);
    this.openLockTrace = conf.getBoolean(
        DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE,
        DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT);
    this.datanode = dn;
  }

  @Override
  public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
    if (level == LockLevel.BLOCK_POOl) {
      return getReadLock(level, resources[0]);
    } else if (level == LockLevel.VOLUME){
      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
      AutoCloseDataSetLock volLock = getReadLock(level, resources);
      volLock.setParentLock(bpLock);
      if (openLockTrace) {
        LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " +
            resources[0]);
      }
      return volLock;
    } else {
      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
      AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
      volLock.setParentLock(bpLock);
      AutoCloseDataSetLock dirLock = getReadLock(level, resources);
      dirLock.setParentLock(volLock);
      if (openLockTrace) {
        LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
            resources[0] + resources[1]);
      }
      return dirLock;
    }
  }

  @Override
  public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
    if (level == LockLevel.BLOCK_POOl) {
      return getWriteLock(level, resources[0]);
    } else if (level == LockLevel.VOLUME) {
      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
      AutoCloseDataSetLock volLock = getWriteLock(level, resources);
      volLock.setParentLock(bpLock);
      if (openLockTrace) {
        LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " +
            resources[0]);
      }
      return volLock;
    } else {
      AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
      AutoCloseDataSetLock volLock = getReadLock(LockLevel.VOLUME, resources[0], resources[1]);
      volLock.setParentLock(bpLock);
      AutoCloseDataSetLock dirLock = getWriteLock(level, resources);
      dirLock.setParentLock(volLock);
      if (openLockTrace) {
        LOG.debug("Sub lock " + resources[0] + resources[1] + resources[2] + " parent lock " +
            resources[0] + resources[1]);
      }
      return dirLock;
    }
  }

  /**
   * Return a not null ReadLock.
   */
  private AutoCloseDataSetLock getReadLock(LockLevel level, String... resources) {
    long startTimeNanos = Time.monotonicNowNanos();
    String lockName = generateLockName(level, resources);
    AutoCloseDataSetLock lock = lockMap.getReadLock(lockName);
    if (lock == null) {
      LOG.warn("Ignore this error during dn restart: Not existing readLock "
          + lockName);
      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
      lock = lockMap.getReadLock(lockName);
    }
    lock.lock();
    if (openLockTrace) {
      putThreadName(getThreadName());
    }
    if (datanode != null) {
      datanode.metrics.addAcquireDataSetReadLock(Time.monotonicNowNanos() - startTimeNanos);
    }
    return lock;
  }

  /**
   * Return a not null WriteLock.
   */
  private AutoCloseDataSetLock getWriteLock(LockLevel level, String... resources) {
    long startTimeNanos = Time.monotonicNowNanos();
    String lockName = generateLockName(level, resources);
    AutoCloseDataSetLock lock = lockMap.getWriteLock(lockName);
    if (lock == null) {
      LOG.warn("Ignore this error during dn restart: Not existing writeLock"
          + lockName);
      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
      lock = lockMap.getWriteLock(lockName);
    }
    lock.lock();
    if (openLockTrace) {
      putThreadName(getThreadName());
    }
    if (datanode != null) {
      datanode.metrics.addAcquireDataSetWriteLock(Time.monotonicNowNanos() - startTimeNanos);
    }
    return lock;
  }

  @Override
  public void addLock(LockLevel level, String... resources) {
    String lockName = generateLockName(level, resources);
    if (level == LockLevel.BLOCK_POOl) {
      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
    } else if (level == LockLevel.VOLUME) {
      lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
    } else {
      lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
      lockMap.addLock(generateLockName(LockLevel.VOLUME, resources[0], resources[1]),
          new ReentrantReadWriteLock(isFair));
      lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
    }
  }

  @Override
  public void removeLock(LockLevel level, String... resources) {
    String lockName = generateLockName(level, resources);
    try (AutoCloseDataSetLock lock = writeLock(level, resources)) {
      lockMap.removeLock(lockName);
    }
  }

  @Override
  public void hook() {
    if (openLockTrace) {
      removeThreadName(getThreadName());
    }
  }

  /**
   * Add thread name when lock a lock.
   */
  private synchronized void putThreadName(String thread) {
    if (threadCountMap.containsKey(thread)) {
      TrackLog trackLog = threadCountMap.get(thread);
      trackLog.incrLockCount();
    }
    threadCountMap.putIfAbsent(thread, new TrackLog(thread));
  }

  public synchronized void lockLeakCheck() {
    if (!openLockTrace) {
      LOG.warn("not open lock leak check func");
      return;
    }
    if (threadCountMap.isEmpty()) {
      LOG.warn("all lock has release");
      return;
    }
    setLastException(new Exception("lock Leak"));
    threadCountMap.forEach((name, trackLog) -> trackLog.showLockMessage());
  }

  /**
   * Remove thread name when unlock a lock.
   */
  private synchronized void removeThreadName(String thread) {
    if (threadCountMap.containsKey(thread)) {
      TrackLog trackLog = threadCountMap.get(thread);
      if (trackLog.shouldClear()) {
        threadCountMap.remove(thread);
        return;
      }
      trackLog.decrLockCount();
    }
  }

  private void setLastException(Exception e) {
    this.lastException = e;
  }

  public Exception getLastException() {
    return lastException;
  }

  private String getThreadName() {
    return Thread.currentThread().getName() + Thread.currentThread().getId();
  }
}