TestEntryFileIO.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.rangeExcludingIterator;
/**
* Test {@link EntryFileIO}.
*/
public class TestEntryFileIO extends AbstractManifestCommitterTest {
private static final Logger LOG = LoggerFactory.getLogger(
TestEntryFileIO.class);
/**
* Entry to save.
*/
public static final FileEntry ENTRY = new FileEntry("source", "dest", 100, "etag");
/**
* Entry file instance.
*/
private EntryFileIO entryFileIO;
/**
* Path to a test entry file.
*/
private File entryFile;
/**
* Create an entry file during setup.
*/
@BeforeEach
public void setup() throws Exception {
entryFileIO = new EntryFileIO(new Configuration());
createEntryFile();
}
/**
* Teardown deletes any entry file.
* @throws Exception on any failure
*/
@AfterEach
public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
if (getEntryFile() != null) {
getEntryFile().delete();
}
}
/**
* Create a temp entry file and set the entryFile field to it.
* @throws IOException creation failure
*/
private void createEntryFile() throws IOException {
setEntryFile(File.createTempFile("entry", ".seq"));
}
/**
* reference to any temp file created.
*/
private File getEntryFile() {
return entryFile;
}
private void setEntryFile(File entryFile) {
this.entryFile = entryFile;
}
/**
* Create a file with one entry, then read it back
* via all the mechanisms available.
*/
@Test
public void testCreateWriteReadFileOneEntry() throws Throwable {
final FileEntry source = ENTRY;
// do an explicit close to help isolate any failure.
SequenceFile.Writer writer = createWriter();
writer.append(NullWritable.get(), source);
writer.flush();
writer.close();
FileEntry readBack = new FileEntry();
try (SequenceFile.Reader reader = readEntryFile()) {
reader.next(NullWritable.get(), readBack);
}
Assertions.assertThat(readBack)
.describedAs("entry read back from sequence file")
.isEqualTo(source);
// now use the iterator to access it.
final RemoteIterator<FileEntry> it =
iterateOverEntryFile();
List<FileEntry> files = new ArrayList<>();
foreach(it, files::add);
Assertions.assertThat(files)
.describedAs("iteration over the entry file")
.hasSize(1)
.element(0)
.isEqualTo(source);
final EntryFileIO.EntryIterator et = (EntryFileIO.EntryIterator) it;
Assertions.assertThat(et)
.describedAs("entry iterator %s", et)
.matches(p -> p.isClosed())
.extracting(p -> p.getCount())
.isEqualTo(1);
}
/**
* Create a writer.
* @return a writer
* @throws IOException failure to create the file.
*/
private SequenceFile.Writer createWriter() throws IOException {
return entryFileIO.createWriter(getEntryFile());
}
/**
* Create an iterator over the records in the (non empty) entry file.
* @return an iterator over entries.
* @throws IOException failure to open the file
*/
private RemoteIterator<FileEntry> iterateOverEntryFile() throws IOException {
return entryFileIO.iterateOver(readEntryFile());
}
/**
* Create a reader for the (non empty) entry file.
* @return a reader.
* @throws IOException failure to open the file
*/
private SequenceFile.Reader readEntryFile() throws IOException {
assertEntryFileNonEmpty();
return entryFileIO.createReader(getEntryFile());
}
/**
* Create a file with one entry.
*/
@Test
public void testCreateEmptyFile() throws Throwable {
final File file = getEntryFile();
entryFileIO.createWriter(file).close();
// now use the iterator to access it.
List<FileEntry> files = new ArrayList<>();
Assertions.assertThat(foreach(iterateOverEntryFile(), files::add))
.describedAs("Count of iterations over entries in an entry file with no entries")
.isEqualTo(0);
}
private void assertEntryFileNonEmpty() {
Assertions.assertThat(getEntryFile().length())
.describedAs("Length of file %s", getEntryFile())
.isGreaterThan(0);
}
@Test
public void testCreateInvalidWriter() throws Throwable {
intercept(NullPointerException.class, () ->
entryFileIO.launchEntryWriter(null, 1));
}
@Test
public void testCreateInvalidWriterCapacity() throws Throwable {
intercept(IllegalStateException.class, () ->
entryFileIO.launchEntryWriter(null, 0));
}
/**
* Generate lots of data and write it.
*/
@Test
public void testLargeStreamingWrite() throws Throwable {
// list of 100 entries at a time
int listSize = 100;
// and the number of block writes
int writes = 100;
List<FileEntry> list = buildEntryList(listSize);
int total = listSize * writes;
try (EntryFileIO.EntryWriter out = entryFileIO.launchEntryWriter(createWriter(), 2)) {
Assertions.assertThat(out.isActive())
.describedAs("out.isActive in ()", out)
.isTrue();
for (int i = 0; i < writes; i++) {
Assertions.assertThat(out.enqueue(list))
.describedAs("enqueue of list")
.isTrue();
}
out.close();
out.maybeRaiseWriteException();
Assertions.assertThat(out.isActive())
.describedAs("out.isActive in ()", out)
.isFalse();
Assertions.assertThat(out.getCount())
.describedAs("total elements written")
.isEqualTo(total);
}
// now read it back
AtomicInteger count = new AtomicInteger();
foreach(iterateOverEntryFile(), e -> {
final int elt = count.getAndIncrement();
final int index = elt % listSize;
Assertions.assertThat(e)
.describedAs("element %d in file mapping to index %d", elt, index)
.isEqualTo(list.get(index));
});
Assertions.assertThat(count.get())
.describedAs("total elements read")
.isEqualTo(total);
}
/**
* Build an entry list.
* @param listSize size of the list
* @return a list of entries
*/
private static List<FileEntry> buildEntryList(final int listSize) {
List<FileEntry> list = new ArrayList<>(listSize);
for (int i = 0; i < listSize; i++) {
list.add(new FileEntry("source" + i, "dest" + i, i, "etag-" + i));
}
// just for debugging/regression testing
Assertions.assertThat(list).hasSize(listSize);
return list;
}
/**
* Write lists to the output, but the stream is going to fail after a
* configured number of records have been written.
* Verify that the (blocked) submitter is woken up
* and that the exception was preserved for rethrowing.
*/
@Test
public void testFailurePropagation() throws Throwable {
final int count = 4;
final SequenceFile.Writer writer = spyWithFailingAppend(
entryFileIO.createWriter(getEntryFile()), count);
// list of 100 entries at a time
// and the number of block writes
List<FileEntry> list = buildEntryList(1);
// small queue ensures the posting thread is blocked
try (EntryFileIO.EntryWriter out = entryFileIO.launchEntryWriter(writer, 2)) {
boolean valid = true;
for (int i = 0; valid && i < count * 2; i++) {
valid = out.enqueue(list);
}
LOG.info("queue to {} finished valid={}", out, valid);
out.close();
// verify the exception is as expected
intercept(IOException.class, "mocked", () ->
out.maybeRaiseWriteException());
// and verify the count of invocations.
Assertions.assertThat(out.getCount())
.describedAs("process count of %s", count)
.isEqualTo(count);
}
}
/**
* Spy on a writer with the append operation to fail after the given count of calls
* is reached.
* @param writer write.
* @param count number of allowed append calls.
* @return spied writer.
* @throws IOException from the signature of the append() call mocked.
*/
private static SequenceFile.Writer spyWithFailingAppend(final SequenceFile.Writer writer,
final int count)
throws IOException {
AtomicLong limit = new AtomicLong(count);
final SequenceFile.Writer spied = Mockito.spy(writer);
Mockito.doAnswer((InvocationOnMock invocation) -> {
final Writable k = invocation.getArgument(0);
final Writable v = invocation.getArgument(1);
if (limit.getAndDecrement() > 0) {
writer.append(k, v);
} else {
throw new IOException("mocked");
}
return null;
}).when(spied).append(Mockito.any(Writable.class), Mockito.any(Writable.class));
return spied;
}
/**
* Multithreaded writing.
*/
@Test
public void testParallelWrite() throws Throwable {
// list of 100 entries at a time
int listSize = 100;
// and the number of block writes
int attempts = 100;
List<FileEntry> list = buildEntryList(listSize);
int total = listSize * attempts;
try (EntryFileIO.EntryWriter out = entryFileIO.launchEntryWriter(createWriter(), 20)) {
TaskPool.foreach(rangeExcludingIterator(0, attempts))
.executeWith(getSubmitter())
.stopOnFailure()
.run(l -> {
out.enqueue(list);
});
out.close();
out.maybeRaiseWriteException();
Assertions.assertThat(out.getCount())
.describedAs("total elements written")
.isEqualTo(total);
}
// now read it back
Assertions.assertThat(foreach(iterateOverEntryFile(), e -> { }))
.describedAs("total elements read")
.isEqualTo(total);
}
}