FSDirConcatOp.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;

import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.List;

import static org.apache.hadoop.util.Time.now;

/**
 * Restrictions for a concat operation:
 * <pre>
 * 1. the src file and the target file are in the same dir
 * 2. all the source files are not in snapshot
 * 3. any source file cannot be the same with the target file
 * 4. source files cannot be under construction or empty
 * 5. source file's preferred block size cannot be greater than the target file
 * </pre>
 */
class FSDirConcatOp {

  static FileStatus concat(FSDirectory fsd, FSPermissionChecker pc,
      String target, String[] srcs, boolean logRetryCache) throws IOException {
    validatePath(target, srcs);
    assert srcs != null;
    NameNode.stateChangeLog.debug("DIR* NameSystem.concat: {} to {}",
        Arrays.toString(srcs), target);

    final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
    // write permission for the target
    if (fsd.isPermissionEnabled()) {
      fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
    }

    // check the target
    verifyTargetFile(fsd, target, targetIIP);
    // check the srcs
    INodeFile[] srcFiles = verifySrcFiles(fsd, srcs, targetIIP, pc);

    long timestamp = now();
    fsd.writeLock();
    try {
      unprotectedConcat(fsd, targetIIP, srcFiles, timestamp);
    } finally {
      fsd.writeUnlock();
    }
    fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
    return fsd.getAuditFileInfo(targetIIP);
  }

  private static void validatePath(String target, String[] srcs)
      throws IOException {
    Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
    Preconditions.checkArgument(srcs != null && srcs.length > 0,
        "No sources given");
    if (FSDirectory.isReservedRawName(target)
        || FSDirectory.isReservedInodesName(target)) {
      throw new IOException("Concat operation doesn't support "
          + FSDirectory.DOT_RESERVED_STRING + " relative path : " + target);
    }
    for (String srcPath : srcs) {
      if (FSDirectory.isReservedRawName(srcPath)
          || FSDirectory.isReservedInodesName(srcPath)) {
        throw new IOException("Concat operation doesn't support "
            + FSDirectory.DOT_RESERVED_STRING + " relative path : " + srcPath);
      }
    }
  }

  private static void verifyTargetFile(FSDirectory fsd, final String target,
      final INodesInPath targetIIP) throws IOException {
    // check the target
    if (FSDirEncryptionZoneOp.getEZForPath(fsd, targetIIP) != null) {
      throw new HadoopIllegalArgumentException(
          "concat can not be called for files in an encryption zone.");
    }
    final INodeFile targetINode = INodeFile.valueOf(targetIIP.getLastINode(),
        target);
    if(targetINode.isUnderConstruction()) {
      throw new HadoopIllegalArgumentException("concat: target file "
          + target + " is under construction");
    }
  }

  private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
      INodesInPath targetIIP, FSPermissionChecker pc) throws IOException {
    // to make sure no two files are the same
    Set<INodeFile> si = new LinkedHashSet<>();
    final INodeFile targetINode = targetIIP.getLastINode().asFile();
    final INodeDirectory targetParent = targetINode.getParent();
    // now check the srcs
    for(String src : srcs) {
      final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
      // permission check for srcs
      if (pc != null && fsd.isPermissionEnabled()) {
        fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
        fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
      }
      final INode srcINode = iip.getLastINode();
      final INodeFile srcINodeFile = INodeFile.valueOf(srcINode, src);
      // make sure the src file and the target file are in the same dir
      if (srcINodeFile.getParent() != targetParent) {
        throw new HadoopIllegalArgumentException("Source file " + src
            + " is not in the same directory with the target "
            + targetIIP.getPath());
      }
      // make sure all the source files are not in snapshot
      if (srcINode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
        throw new SnapshotException("Concat: the source file " + src
            + " is in snapshot");
      }
      // check if the file has other references.
      if (srcINode.isReference() && ((INodeReference.WithCount)
          srcINode.asReference().getReferredINode()).getReferenceCount() > 1) {
        throw new SnapshotException("Concat: the source file " + src
            + " is referred by some other reference in some snapshot.");
      }
      // source file cannot be the same with the target file
      if (srcINode.equals(targetINode)) {
        throw new HadoopIllegalArgumentException("concat: the src file " + src
            + " is the same with the target file " + targetIIP.getPath());
      }
      // source file cannot be under construction or empty
      if(srcINodeFile.isUnderConstruction() || srcINodeFile.numBlocks() == 0) {
        throw new HadoopIllegalArgumentException("concat: source file " + src
            + " is invalid or empty or underConstruction");
      }

      // source file's preferred block size cannot be greater than the target
      // file
      if (srcINodeFile.getPreferredBlockSize() >
          targetINode.getPreferredBlockSize()) {
        throw new HadoopIllegalArgumentException("concat: source file " + src
            + " has preferred block size " + srcINodeFile.getPreferredBlockSize()
            + " which is greater than the target file's preferred block size "
            + targetINode.getPreferredBlockSize());
      }
      if(srcINodeFile.getErasureCodingPolicyID() !=
          targetINode.getErasureCodingPolicyID()) {
        throw new HadoopIllegalArgumentException("Source file " + src
            + " and target file " + targetIIP.getPath()
            + " have different erasure coding policy");
      }
      si.add(srcINodeFile);
    }

    // make sure no two files are the same
    if(si.size() < srcs.length) {
      // it means at least two files are the same
      throw new HadoopIllegalArgumentException(
          "concat: at least two of the source files are the same");
    }
    return si.toArray(new INodeFile[si.size()]);
  }

  private static QuotaCounts computeQuotaDeltas(FSDirectory fsd,
      INodeFile target, INodeFile[] srcList) {
    QuotaCounts deltas = new QuotaCounts.Builder().build();
    final short targetRepl = target.getPreferredBlockReplication();
    for (INodeFile src : srcList) {
      short srcRepl = src.getFileReplication();
      long fileSize = src.computeFileSize();
      if (targetRepl != srcRepl) {
        deltas.addStorageSpace(fileSize * (targetRepl - srcRepl));
        BlockStoragePolicy bsp =
            fsd.getBlockStoragePolicySuite().getPolicy(src.getStoragePolicyID());
        if (bsp != null) {
          List<StorageType> srcTypeChosen = bsp.chooseStorageTypes(srcRepl);
          for (StorageType t : srcTypeChosen) {
            if (t.supportTypeQuota()) {
              deltas.addTypeSpace(t, -fileSize);
            }
          }
          List<StorageType> targetTypeChosen = bsp.chooseStorageTypes(targetRepl);
          for (StorageType t : targetTypeChosen) {
            if (t.supportTypeQuota()) {
              deltas.addTypeSpace(t, fileSize);
            }
          }
        }
      }
    }
    deltas.addNameSpace(-srcList.length);
    return deltas;
  }

  private static void verifyQuota(FSDirectory fsd, INodesInPath targetIIP,
      QuotaCounts deltas) throws QuotaExceededException {
    if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
      // Do not check quota if editlog is still being processed
      return;
    }
    FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, deltas, null);
  }

  /**
   * Concat all the blocks from srcs to trg and delete the srcs files
   * @param fsd FSDirectory
   */
  static void unprotectedConcat(FSDirectory fsd, INodesInPath targetIIP,
      INodeFile[] srcList, long timestamp) throws IOException {
    assert fsd.hasWriteLock();
    NameNode.stateChangeLog.debug("DIR* NameSystem.concat to {}",
        targetIIP.getPath());

    final INodeFile trgInode = targetIIP.getLastINode().asFile();
    QuotaCounts deltas = computeQuotaDeltas(fsd, trgInode, srcList);
    verifyQuota(fsd, targetIIP, deltas);

    // the target file can be included in a snapshot
    trgInode.recordModification(targetIIP.getLatestSnapshotId());
    INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory();
    trgInode.concatBlocks(srcList, fsd.getBlockManager());

    // since we are in the same dir - we can use same parent to remove files
    int count = 0;
    for (INodeFile nodeToRemove : srcList) {
      if(nodeToRemove != null) {
        nodeToRemove.clearBlocks();
        // Ensure the nodeToRemove is cleared from snapshot diff list
        nodeToRemove.getParent().removeChild(nodeToRemove,
            targetIIP.getLatestSnapshotId());
        fsd.getINodeMap().remove(nodeToRemove);
        count++;
      }
    }

    trgInode.setModificationTime(timestamp, targetIIP.getLatestSnapshotId());
    trgParent.updateModificationTime(timestamp, targetIIP.getLatestSnapshotId());
    // update quota on the parent directory with deltas
    FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1, deltas);
  }
}