StateStoreFileBaseImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;

import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.util.Time.now;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * {@link StateStoreDriver} implementation based on files. In this approach, we
 * use temporary files for the writes and renaming "atomically" to the final
 * value. Instead of writing to the final location, it will go to a temporary
 * one and then rename to the final destination.
 */
public abstract class StateStoreFileBaseImpl
    extends StateStoreSerializableImpl {

  private static final Logger LOG =
      LoggerFactory.getLogger(StateStoreFileBaseImpl.class);

  /** File extension for temporary files. */
  private static final String TMP_MARK = ".tmp";
  /** We remove temporary files older than 10 seconds. */
  private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
  /** File pattern for temporary records: file.XYZ.tmp. */
  private static final Pattern OLD_TMP_RECORD_PATTERN =
      Pattern.compile(".+\\.(\\d+)\\.tmp");

  /** If it is initialized. */
  private boolean initialized = false;

  private ExecutorService concurrentStoreAccessPool;

  /**
   * Get the reader of a record for the file system.
   *
   * @param path Path of the record to read.
   * @param <T> Type of the state store record.
   * @return Reader for the record.
   */
  protected abstract <T extends BaseRecord> BufferedReader getReader(
      String path);

  /**
   * Get the writer of a record for the file system.
   *
   * @param path Path of the record to write.
   * @param <T> Type of the state store record.
   * @return Writer for the record.
   */
  @VisibleForTesting
  public abstract <T extends BaseRecord> BufferedWriter getWriter(
      String path);

  /**
   * Check if a path exists.
   *
   * @param path Path to check.
   * @return If the path exists.
   */
  protected abstract boolean exists(String path);

  /**
   * Make a directory.
   *
   * @param path Path of the directory to create.
   * @return If the directory was created.
   */
  protected abstract boolean mkdir(String path);

  /**
   * Rename a file. This should be atomic.
   *
   * @param src Source name.
   * @param dst Destination name.
   * @return If the rename was successful.
   */
  protected abstract boolean rename(String src, String dst);

  /**
   * Remove a file.
   *
   * @param path Path for the file to remove
   * @return If the file was removed.
   */
  protected abstract boolean remove(String path);

  /**
   * Get the children for a path.
   *
   * @param path Path to check.
   * @return List of children.
   */
  protected abstract List<String> getChildren(String path);

  /**
   * Get root directory.
   *
   * @return Root directory.
   */
  protected abstract String getRootDir();

  protected abstract int getConcurrentFilesAccessNumThreads();

  /**
   * Set the driver as initialized.
   *
   * @param ini If the driver is initialized.
   */
  public void setInitialized(boolean ini) {
    this.initialized = ini;
  }

  @Override
  public boolean initDriver() {
    String rootDir = getRootDir();
    try {
      if (rootDir == null) {
        LOG.error("Invalid root directory, unable to initialize driver.");
        return false;
      }

      // Check root path
      if (!exists(rootDir)) {
        if (!mkdir(rootDir)) {
          LOG.error("Cannot create State Store root directory {}", rootDir);
          return false;
        }
      }
    } catch (Exception ex) {
      LOG.error(
          "Cannot initialize filesystem using root directory {}", rootDir, ex);
      return false;
    }
    setInitialized(true);
    int threads = getConcurrentFilesAccessNumThreads();
    if (threads > 1) {
      this.concurrentStoreAccessPool =
          new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS,
              new LinkedBlockingQueue<>(),
              new ThreadFactoryBuilder()
                  .setNameFormat("state-store-file-based-concurrent-%d")
                  .setDaemon(true).build());
      LOG.info("File based state store will be accessed concurrently with {} max threads", threads);
    } else {
      LOG.info("File based state store will be accessed serially");
    }
    return true;
  }

  @Override
  public void close() throws Exception {
    super.close();
    if (this.concurrentStoreAccessPool != null) {
      this.concurrentStoreAccessPool.shutdown();
      boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5, TimeUnit.SECONDS);
      LOG.info("Concurrent store access pool is terminated: {}", isTerminated);
      this.concurrentStoreAccessPool = null;
    }
  }

  @Override
  public <T extends BaseRecord> boolean initRecordStorage(
      String className, Class<T> recordClass) {

    String dataDirPath = getRootDir() + "/" + className;
    try {
      // Create data directories for files
      if (!exists(dataDirPath)) {
        LOG.info("{} data directory doesn't exist, creating it", dataDirPath);
        if (!mkdir(dataDirPath)) {
          LOG.error("Cannot create data directory {}", dataDirPath);
          return false;
        }
      }
    } catch (Exception ex) {
      LOG.error("Cannot create data directory {}", dataDirPath, ex);
      return false;
    }
    return true;
  }

  @Override
  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
      throws IOException {
    verifyDriverReady();
    long start = monotonicNow();
    StateStoreMetrics metrics = getMetrics();
    List<T> result = Collections.synchronizedList(new ArrayList<>());
    try {
      String path = getPathForClass(clazz);
      List<String> children = getChildren(path);
      List<Callable<Void>> callables = new ArrayList<>();
      children.forEach(child -> callables.add(
          () -> getRecordsFromFileAndRemoveOldTmpRecords(clazz, result, path, child)));
      if (this.concurrentStoreAccessPool != null) {
        // Read records concurrently
        List<Future<Void>> futures = this.concurrentStoreAccessPool.invokeAll(callables);
        for (Future<Void> future : futures) {
          future.get();
        }
      } else {
        // Read records serially
        callables.forEach(e -> {
          try {
            e.call();
          } catch (Exception ex) {
            LOG.error("Failed to retrieve record using file operations.", ex);
            throw new RuntimeException(ex);
          }
        });
      }
    } catch (Exception e) {
      if (metrics != null) {
        metrics.addFailure(monotonicNow() - start);
      }
      String msg = "Cannot fetch records for " + clazz.getSimpleName();
      LOG.error(msg, e);
      throw new IOException(msg, e);
    }

    if (metrics != null) {
      metrics.addRead(monotonicNow() - start);
    }
    return new QueryResult<>(result, getTime());
  }

  /**
   * Get the state store record from the given path (path/child) and add the record to the
   * result list.
   *
   * @param clazz Class of the record.
   * @param result The list of results record. The records would be added to it unless the given
   * path represents old temp file.
   * @param path The parent path.
   * @param child The child path under the parent path. Both path and child completes the file
   * location for the given record.
   * @param <T> Record class of the records.
   * @return Void.
   * @throws IOException If the file read operation fails.
   */
  private <T extends BaseRecord> Void getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz,
      List<T> result, String path, String child) throws IOException {
    String pathRecord = path + "/" + child;
    if (child.endsWith(TMP_MARK)) {
      LOG.debug("There is a temporary file {} in {}", child, path);
      if (isOldTempRecord(child)) {
        LOG.warn("Removing {} as it's an old temporary record", child);
        remove(pathRecord);
      }
    } else {
      T record = getRecord(pathRecord, clazz);
      result.add(record);
    }
    return null;
  }

  /**
   * Check if a record is temporary and old.
   *
   * @param pathRecord Path for the record to check.
   * @return If the record is temporary and old.
   */
  @VisibleForTesting
  public static boolean isOldTempRecord(final String pathRecord) {
    if (!pathRecord.endsWith(TMP_MARK)) {
      return false;
    }
    // Extract temporary record creation time
    Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
    if (m.find()) {
      long time = Long.parseLong(m.group(1));
      return now() - time > OLD_TMP_RECORD_MS;
    }
    return false;
  }

  /**
   * Read a record from a file.
   *
   * @param path Path to the file containing the record.
   * @param clazz Class of the record.
   * @return Record read from the file.
   * @throws IOException If the file cannot be read.
   */
  private <T extends BaseRecord> T getRecord(
      final String path, final Class<T> clazz) throws IOException {
    try (BufferedReader reader = getReader(path)) {
      String line;
      while ((line = reader.readLine()) != null) {
        if (!line.startsWith("#") && line.length() > 0) {
          try {
            return newRecord(line, clazz, false);
          } catch (Exception ex) {
            LOG.error("Cannot parse line {} in file {}", line, path, ex);
          }
        }
      }
    }
    throw new IOException("Cannot read " + path + " for record " +
        clazz.getSimpleName());
  }

  /**
   * Get the path for a record class.
   * @param clazz Class of the record.
   * @return Path for this record class.
   */
  private <T extends BaseRecord> String getPathForClass(final Class<T> clazz) {
    String className = StateStoreUtils.getRecordName(clazz);
    StringBuilder sb = new StringBuilder();
    sb.append(getRootDir());
    if (sb.charAt(sb.length() - 1) != '/') {
      sb.append("/");
    }
    sb.append(className);
    return sb.toString();
  }

  @Override
  public boolean isDriverReady() {
    return this.initialized;
  }

  @Override
  public <T extends BaseRecord> StateStoreOperationResult putAll(
      List<T> records, boolean allowUpdate, boolean errorIfExists)
          throws StateStoreUnavailableException {
    verifyDriverReady();
    if (records.isEmpty()) {
      return StateStoreOperationResult.getDefaultSuccessResult();
    }

    long start = monotonicNow();
    StateStoreMetrics metrics = getMetrics();

    // Check if any record exists
    Map<String, T> toWrite = new HashMap<>();
    final List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList<>());
    final AtomicBoolean success = new AtomicBoolean(true);

    for (T record : records) {
      Class<? extends BaseRecord> recordClass = record.getClass();
      String path = getPathForClass(recordClass);
      String primaryKey = getPrimaryKey(record);
      String recordPath = path + "/" + primaryKey;

      if (exists(recordPath)) {
        if (allowUpdate) {
          // Update the mod time stamp. Many backends will use their
          // own timestamp for the mod time.
          record.setDateModified(this.getTime());
          toWrite.put(recordPath, record);
        } else if (errorIfExists) {
          LOG.error("Attempt to insert record {} that already exists", recordPath);
          failedRecordsKeys.add(getOriginalPrimaryKey(primaryKey));
          success.set(false);
        } else {
          LOG.debug("Not updating {}", record);
        }
      } else {
        toWrite.put(recordPath, record);
      }
    }

    // Write the records
    final List<Callable<Void>> callables = new ArrayList<>();
    toWrite.entrySet().forEach(
        entry -> callables.add(() -> writeRecordToFile(success, entry, failedRecordsKeys)));
    if (this.concurrentStoreAccessPool != null) {
      // Write records concurrently
      List<Future<Void>> futures = null;
      try {
        futures = this.concurrentStoreAccessPool.invokeAll(callables);
      } catch (InterruptedException e) {
        success.set(false);
        LOG.error("Failed to put record concurrently.", e);
      }
      if (futures != null) {
        for (Future<Void> future : futures) {
          try {
            future.get();
          } catch (InterruptedException | ExecutionException e) {
            success.set(false);
            LOG.error("Failed to retrieve results from concurrent record put runs.", e);
          }
        }
      }
    } else {
      // Write records serially
      callables.forEach(callable -> {
        try {
          callable.call();
        } catch (Exception e) {
          success.set(false);
          LOG.error("Failed to put record.", e);
        }
      });
    }

    long end = monotonicNow();
    if (metrics != null) {
      if (success.get()) {
        metrics.addWrite(end - start);
      } else {
        metrics.addFailure(end - start);
      }
    }
    return new StateStoreOperationResult(failedRecordsKeys, success.get());
  }

  /**
   * Writes the state store record to the file. At first, the record is written to a temp location
   * and then later renamed to the final location that is passed with the entry key.
   *
   * @param <T> Record class of the records.
   * @param success The atomic boolean that gets updated to false if the file write operation fails.
   * @param entry The entry of the record path and the state store record to be written to the file
   * by first writing to a temp location and then renaming it to the record path.
   * @param failedRecordsList The list of paths of the failed records.
   * @return Void.
   */
  private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
      Entry<String, T> entry, List<String> failedRecordsList) {
    final String recordPath = entry.getKey();
    final T record = entry.getValue();
    final String primaryKey = getPrimaryKey(record);
    final String recordPathTemp = recordPath + "." + now() + TMP_MARK;
    boolean recordWrittenSuccessfully = true;
    try (BufferedWriter writer = getWriter(recordPathTemp)) {
      String line = serializeString(record);
      writer.write(line);
    } catch (IOException e) {
      LOG.error("Cannot write {}", recordPathTemp, e);
      recordWrittenSuccessfully = false;
      failedRecordsList.add(getOriginalPrimaryKey(primaryKey));
      success.set(false);
    }
    // Commit
    if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
      LOG.error("Failed committing record into {}", recordPath);
      failedRecordsList.add(getOriginalPrimaryKey(primaryKey));
      success.set(false);
    }
    return null;
  }

  @Override
  public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query)
      throws StateStoreUnavailableException {
    verifyDriverReady();

    if (query == null) {
      return 0;
    }

    long start = Time.monotonicNow();
    StateStoreMetrics metrics = getMetrics();
    int removed = 0;
    // Get the current records
    try {
      final QueryResult<T> result = get(clazz);
      final List<T> existingRecords = result.getRecords();
      // Write all of the existing records except those to be removed
      final List<T> recordsToRemove = filterMultiple(query, existingRecords);
      boolean success = true;
      for (T recordToRemove : recordsToRemove) {
        String path = getPathForClass(clazz);
        String primaryKey = getPrimaryKey(recordToRemove);
        String recordToRemovePath = path + "/" + primaryKey;
        if (remove(recordToRemovePath)) {
          removed++;
        } else {
          LOG.error("Cannot remove record {}", recordToRemovePath);
          success = false;
        }
      }
      if (!success) {
        LOG.error("Cannot remove records {} query {}", clazz, query);
        if (metrics != null) {
          metrics.addFailure(monotonicNow() - start);
        }
      }
    } catch (IOException e) {
      LOG.error("Cannot remove records {} query {}", clazz, query, e);
      if (metrics != null) {
        metrics.addFailure(monotonicNow() - start);
      }
    }

    if (removed > 0 && metrics != null) {
      metrics.addRemove(monotonicNow() - start);
    }
    return removed;
  }

  @Override
  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
      throws StateStoreUnavailableException {
    verifyDriverReady();
    long start = Time.monotonicNow();
    StateStoreMetrics metrics = getMetrics();

    boolean success = true;
    String path = getPathForClass(clazz);
    List<String> children = getChildren(path);
    for (String child : children) {
      String pathRecord = path + "/" + child;
      if (!remove(pathRecord)) {
        success = false;
      }
    }

    if (metrics != null) {
      long time = Time.monotonicNow() - start;
      if (success) {
        metrics.addRemove(time);
      } else {
        metrics.addFailure(time);
      }
    }
    return success;
  }
}