BaseJobSuite.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.util.ParseUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class BaseJobSuite {
private static final Logger LOG = LoggerFactory.getLogger(BaseJobSuite.class);
public static final int DEFAULT_APP_ATTEMPT_ID = 1;
protected static final Text KEY_1 = new Text("key1");
protected static final Text KEY_2 = new Text("key2");
protected static final Text VAL_1 = new Text("val1");
protected static final Text VAL_2 = new Text("val2");
private Job job;
private String jobId;
private FileSystem fs;
private Path outputPath;
private ObjectStorage storage;
private final boolean dumpObjectStorage = ParseUtils.envAsBoolean("DUMP_OBJECT_STORAGE", false);
protected abstract Path magicPartPath();
protected abstract Path magicPendingSetPath();
protected abstract void assertSuccessMarker() throws IOException;
protected abstract void assertSummaryReport(Path reportDir) throws IOException;
protected abstract void assertNoTaskAttemptPath() throws IOException;
protected void assertMagicPathExist(Path output) throws IOException {
Path magicPath = CommitUtils.magicPath(output);
assertTrue(fs.exists(magicPath), String.format("Magic path: %s should exist", magicPath));
}
protected void assertMagicPathNotExist(Path output) throws IOException {
Path magicPath = CommitUtils.magicPath(output);
assertFalse(fs.exists(magicPath), String.format("Magic path: %s should not exist", magicPath));
}
protected abstract boolean skipTests();
public Path magicPendingPath() {
Path magicPart = magicPartPath();
return new Path(magicPart.getParent(), magicPart.getName() + ".pending");
}
public Path magicJobPath() {
return CommitUtils.magicPath(outputPath);
}
public String magicPartKey() {
return ObjectUtils.pathToKey(magicPartPath());
}
public String destPartKey() {
return MagicOutputStream.toDestKey(magicPartPath());
}
public Job job() {
return job;
}
public String jobId() {
return jobId;
}
public FileSystem fs() {
return fs;
}
public Path outputPath() {
return outputPath;
}
public ObjectStorage storage() {
return storage;
}
public void setJob(Job value) {
this.job = value;
}
public void setJobId(String value) {
this.jobId = value;
}
public void setFs(FileSystem value) {
this.fs = value;
}
public void setOutputPath(Path value) {
this.outputPath = value;
}
public void setObjectStorage(ObjectStorage value) {
this.storage = value;
}
public void assertHasMagicKeys() {
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
assertTrue(
Iterables.any(objects, o -> o.key().contains(CommitUtils.MAGIC) && o.key().contains(jobId)),
"Should have some __magic object keys");
}
public void assertHasBaseKeys() {
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
assertTrue(
Iterables.any(objects, o -> o.key().contains(CommitUtils.BASE) && o.key().contains(jobId)),
"Should have some __base object keys");
}
public void assertNoMagicPendingFile() {
String magicPendingKey = String.format("%s.pending", magicPartKey());
assertNull(storage.head(magicPendingKey), "Magic pending key should exist");
}
public void assertHasMagicPendingFile() {
String magicPendingKey = String.format("%s.pending", magicPartKey());
assertNotNull(storage.head(magicPendingKey), "Magic pending key should exist");
}
public void assertNoMagicMultipartUpload() {
Iterable<MultipartUpload> uploads =
storage.listUploads(ObjectUtils.pathToKey(magicJobPath(), true));
boolean anyMagicUploads = Iterables.any(uploads, u -> u.key().contains(CommitUtils.MAGIC));
assertFalse(anyMagicUploads, "Should have no magic multipart uploads");
}
public void assertNoMagicObjectKeys() {
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
boolean anyMagicUploads =
Iterables.any(objects, o -> o.key().contains(CommitUtils.MAGIC) && o.key().contains(jobId));
assertFalse(anyMagicUploads, "Should not have any magic keys");
}
public void assertHasPendingSet() {
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
boolean anyPendingSet = Iterables.any(objects,
o -> o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId));
assertTrue(anyPendingSet, "Should have the expected .pendingset file");
}
public void assertPendingSetAtRightLocation() {
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
Path magicJobAttemptPath =
CommitUtils.magicJobAttemptPath(job().getJobID().toString(), DEFAULT_APP_ATTEMPT_ID,
outputPath);
String inQualifiedPath = magicJobAttemptPath.toUri().getPath().substring(1);
Iterable<ObjectInfo> filtered = Iterables.filter(objects,
o -> o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId));
boolean pendingSetAtRightLocation = Iterables.any(filtered,
o -> o.key().startsWith(inQualifiedPath) && o.key().contains(jobId));
assertTrue(pendingSetAtRightLocation,
"The .pendingset file should locate at the job's magic output path.");
}
public void assertMultipartUpload(int expectedUploads) {
// Note: should be care in concurrent case: they need to check the same output path.
Iterable<MultipartUpload> uploads =
storage.listUploads(ObjectUtils.pathToKey(outputPath, true));
long actualUploads = StreamSupport.stream(uploads.spliterator(), false).count();
assertEquals(expectedUploads, actualUploads);
}
public void assertPartFiles(int num) throws IOException {
FileStatus[] files = fs.listStatus(outputPath,
f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && f.toUri().toString()
.contains("part-"));
assertEquals(num, files.length);
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(outputPath, true), "");
List<ObjectInfo> infos = Arrays.stream(Iterables.toArray(objects, ObjectInfo.class))
.filter(o -> o.key().contains("part-")).collect(Collectors.toList());
assertEquals(num, infos.size(),
String.format("Part files number should be %d, but got %d", num, infos.size()));
}
public void assertNoPartFiles() throws IOException {
FileStatus[] files = fs.listStatus(outputPath,
f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && f.toUri().toString()
.contains("part-"));
assertEquals(0, files.length);
Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(outputPath, true), "");
boolean anyPartFile = Iterables.any(objects, o -> o.key().contains("part-"));
assertFalse(anyPartFile, "Should have no part files");
}
public void dumpObjectStorage() {
if (dumpObjectStorage) {
LOG.info("===> Dump object storage - Start <===");
dumpObjectKeys();
dumpMultipartUploads();
LOG.info("===> Dump object storage - End <===");
}
}
public void dumpObjectKeys() {
String prefix = ObjectUtils.pathToKey(magicJobPath());
LOG.info("Dump object keys with prefix {}", prefix);
storage.listAll("", "").forEach(o -> LOG.info("Dump object keys - {}", o));
}
public void dumpMultipartUploads() {
String prefix = ObjectUtils.pathToKey(magicJobPath());
LOG.info("Dump multi part uploads with prefix {}", prefix);
storage.listUploads("")
.forEach(u -> LOG.info("Dump multipart uploads - {}", u));
}
public void verifyPartContent() throws IOException {
String partKey = destPartKey();
LOG.info("Part key to verify is: {}", partKey);
try (InputStream in = storage.get(partKey).stream()) {
byte[] data = IOUtils.toByteArray(in);
String expected = String.format("%s\t%s\n%s\t%s\n", KEY_1, VAL_1, KEY_2, VAL_2);
assertEquals(expected, new String(data, StandardCharsets.UTF_8));
}
}
public void assertSuccessMarkerNotExist() throws IOException {
Path succPath = CommitUtils.successMarker(outputPath);
assertFalse(fs.exists(succPath), String.format("%s should not exists", succPath));
}
}