TestTaskManifestFileIO.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output.committer.manifest;

import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.util.JsonSerialization;

import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.assertDirEntryMatch;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.assertFileEntryMatch;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.marshallPath;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.unmarshallPath;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
 * Test Reading/writing manifest files.
 */
public class TestTaskManifestFileIO extends AbstractManifestCommitterTest {

  private TaskManifest source;

  private ManifestCommitterTestSupport.JobAndTaskIDsForTests taskIDs;

  private String taskAttempt00;

  private Path testPath;

  private Path taPath;

  @Override
  public void setup() throws Exception {
    super.setup();
    taskIDs = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);
    source = new TaskManifest();
    taskAttempt00 = taskIDs.getTaskAttempt(0, 0);
    source.setTaskAttemptID(taskAttempt00);
    testPath = methodPath();
    taPath = new Path(testPath, "  " + taskAttempt00);
    source.setTaskAttemptDir(marshallPath(taPath));
  }

  /**
   * Test marshalling, paying attention to paths with spaces in them
   * as they've been a source of trouble in the S3A committers.
   */
  @Test
  public void testJsonRoundTrip() throws Throwable {
    describe("Save manifest file to string and back");
    Path subdirS = new Path(taPath, "subdir");
    Path subdirD = new Path(testPath, "subdir");
    source.addDirectory(DirEntry.dirEntry(subdirD, 0, 0));

    // a file
    Path subfileS = new Path(subdirS, "file");
    Path subfileD = new Path(subdirD, "file");
    long len = 256L;
    FileEntry subFileEntry = new FileEntry(subfileS,
        subfileD, len, "etag");
    source.addFileToCommit(subFileEntry);


    JsonSerialization<TaskManifest> serializer
        = TaskManifest.serializer();

    String json = serializer.toJson(source);
    LOG.info("serialized form\n{}", json);
    TaskManifest deser = serializer.fromJson(json);
    deser.validate();

    Assertions.assertThat(deser.getTaskAttemptID())
        .describedAs("Task attempt ID")
        .isEqualTo(taskAttempt00);

    Assertions.assertThat(unmarshallPath(deser.getTaskAttemptDir()))
        .describedAs("Task attempt Dir %s",
            deser.getTaskAttemptDir())
        .isEqualTo(taPath);

    Assertions.assertThat(deser.getDestDirectories())
        .hasSize(1)
        .allSatisfy(d -> assertDirEntryMatch(d, subdirD, 0));
    Assertions.assertThat(deser.getFilesToCommit())
        .hasSize(1)
        .allSatisfy(d -> assertFileEntryMatch(d, subfileS, subfileD, len));
    final FileEntry entry = deser.getFilesToCommit().get(0);
    assertFileEntryMatch(entry, subfileS, subfileD, len);
    Assertions.assertThat(entry.getEtag())
        .describedAs("etag of %s", entry)
        .isEqualTo("etag");

  }

  /**
   * The manifest validation logic has a safety check that only one
   * file can rename to the same destination, and that the entries
   * are valid.
   */
  @Test
  public void testValidateRejectsTwoCommitsToSameDest() throws Throwable {

    Path subdirS = new Path(taPath, "subdir");
    Path subdirD = new Path(testPath, "subdir");
    source.addDirectory(DirEntry.dirEntry(subdirD, 0, 0));

    // a file
    Path subfileS = new Path(subdirS, "file");
    Path subfileS2 = new Path(subdirS, "file2");
    Path subfileD = new Path(subdirD, "file");
    long len = 256L;
    source.addFileToCommit(
        new FileEntry(subfileS, subfileD, len, "tag1"));
    source.addFileToCommit(
        new FileEntry(subfileS2, subfileD, len, "tag2"));
    assertValidationFailureOnRoundTrip(source);
  }

  /**
   * The manifest validation logic has a safety check that only one
   * file can rename to the same destination, and that the entries
   * are valid.
   */
  @Test
  public void testValidateRejectsIncompleteFileEntry() throws Throwable {
    source.addFileToCommit(
        new FileEntry(taPath, null, 0, null));
    assertValidationFailureOnRoundTrip(source);
  }

  /**
   * negative lengths are not allowed.
   */
  @Test
  public void testValidateRejectsInvalidFileLength() throws Throwable {
    source.addFileToCommit(
        new FileEntry(taPath, testPath, -1, null));
    assertValidationFailureOnRoundTrip(source);
  }

  @Test
  public void testRejectIncompatibleVersion() throws Throwable {
    source.setVersion(5);
    assertValidationFailureOnRoundTrip(source);
  }

  @Test
  public void testRejectIncompatibleType() throws Throwable {
    source.setType("Incompatible type");
    assertValidationFailureOnRoundTrip(source);
  }

  private void assertValidationFailureOnRoundTrip(
      final TaskManifest manifest) throws Exception {
    JsonSerialization<TaskManifest> serializer
        = TaskManifest.serializer();
    String json = serializer.toJson(manifest);
    LOG.info("serialized form\n{}", json);
    TaskManifest deser = serializer.fromJson(json);
    intercept(IOException.class, deser::validate);
  }


}