EntryFileIO.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.FutureIO;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.Preconditions.checkState;
/**
* Read or write entry file.
* This can be used to create a simple reader, or to create
* a writer queue where different threads can queue data for
* writing.
* The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
*/
public class EntryFileIO {
private static final Logger LOG = LoggerFactory.getLogger(
EntryFileIO.class);
/**
* How long should the writer shutdown take?
*/
public static final int WRITER_SHUTDOWN_TIMEOUT_SECONDS = 60;
/**
* How long should trying to queue a write block before giving up
* with an error?
* This is a safety feature to ensure that if something has gone wrong
* in the queue code the job fails with an error rather than just hangs
*/
public static final int WRITER_QUEUE_PUT_TIMEOUT_MINUTES = 10;
/** Configuration used to load filesystems. */
private final Configuration conf;
/**
* Constructor.
* @param conf Configuration used to load filesystems
*/
public EntryFileIO(final Configuration conf) {
this.conf = conf;
}
/**
* Create a writer to a local file.
* @param file file
* @return the writer
* @throws IOException failure to create the file
*/
public SequenceFile.Writer createWriter(File file) throws IOException {
return createWriter(toPath(file));
}
/**
* Create a writer to a file on any FS.
* @param path path to write to.
* @return the writer
* @throws IOException failure to create the file
*/
public SequenceFile.Writer createWriter(Path path) throws IOException {
return SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(NullWritable.class),
SequenceFile.Writer.valueClass(FileEntry.class));
}
/**
* Reader is created with sequential reads.
* @param file file
* @return the reader
* @throws IOException failure to open
*/
public SequenceFile.Reader createReader(File file) throws IOException {
return createReader(toPath(file));
}
/**
* Reader is created with sequential reads.
* @param path path
* @return the reader
* @throws IOException failure to open
*/
public SequenceFile.Reader createReader(Path path) throws IOException {
return new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
}
/**
* Iterator to retrieve file entries from the sequence file.
* Closeable: cast and invoke to close the reader.
* @param reader reader;
* @return iterator
*/
public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
return new EntryIterator(reader);
}
/**
* Create and start an entry writer.
* @param writer writer
* @param capacity queue capacity
* @return the writer.
*/
public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int capacity) {
final EntryWriter ew = new EntryWriter(writer, capacity);
ew.start();
return ew;
}
/**
* Write a sequence of entries to the writer.
* @param writer writer
* @param entries entries
* @param close close the stream afterwards
* @return number of entries written
* @throws IOException write failure.
*/
public static int write(SequenceFile.Writer writer,
Collection<FileEntry> entries,
boolean close)
throws IOException {
try {
for (FileEntry entry : entries) {
writer.append(NullWritable.get(), entry);
}
writer.flush();
} finally {
if (close) {
writer.close();
}
}
return entries.size();
}
/**
* Given a file, create a Path.
* @param file file
* @return path to the file
*/
public static Path toPath(final File file) {
return new Path(file.toURI());
}
/**
* Actions in the queue.
*/
private enum Actions {
/** Write the supplied list of entries. */
write,
/** Stop the processor thread. */
stop
}
/**
* What gets queued: an action and a list of entries.
*/
private static final class QueueEntry {
private final Actions action;
private final List<FileEntry> entries;
private QueueEntry(final Actions action, List<FileEntry> entries) {
this.action = action;
this.entries = entries;
}
private QueueEntry(final Actions action) {
this(action, null);
}
}
/**
* A Writer thread takes reads from a queue containing
* list of entries to save; these are serialized via the writer to
* the output stream.
* Other threads can queue the file entry lists from loaded manifests
* for them to be written.
* These threads will be blocked when the queue capacity is reached.
* This is quite a complex process, with the main troublespots in the code
* being:
* - managing the shutdown
* - failing safely on write failures, restarting all blocked writers in the process
*/
public static final class EntryWriter implements Closeable {
/**
* The destination of the output.
*/
private final SequenceFile.Writer writer;
/**
* Blocking queue of actions.
*/
private final BlockingQueue<QueueEntry> queue;
/**
* stop flag.
*/
private final AtomicBoolean stop = new AtomicBoolean(false);
/**
* Is the processor thread active.
*/
private final AtomicBoolean active = new AtomicBoolean(false);
private final int capacity;
/**
* Executor of writes.
*/
private ExecutorService executor;
/**
* Future invoked.
*/
private Future<Integer> future;
/**
* count of file entries saved; only updated in one thread
* so volatile.
*/
private final AtomicInteger count = new AtomicInteger();
/**
* Any failure caught on the writer thread; this should be
* raised within the task/job thread as it implies that the
* entire write has failed.
*/
private final AtomicReference<IOException> failure = new AtomicReference<>();
/**
* Create.
* @param writer writer
* @param capacity capacity.
*/
private EntryWriter(SequenceFile.Writer writer, int capacity) {
checkState(capacity > 0, "invalid queue capacity %s", capacity);
this.writer = requireNonNull(writer);
this.capacity = capacity;
this.queue = new ArrayBlockingQueue<>(capacity);
}
/**
* Is the writer active?
* @return true if the processor thread is live
*/
public boolean isActive() {
return active.get();
}
/**
* Get count of files processed.
* @return the count
*/
public int getCount() {
return count.get();
}
/**
* Any failure.
* @return any IOException caught when writing the output
*/
public IOException getFailure() {
return failure.get();
}
/**
* Start the thread.
*/
private void start() {
checkState(executor == null, "already started");
active.set(true);
executor = HadoopExecutors.newSingleThreadExecutor();
future = executor.submit(this::processor);
LOG.debug("Started entry writer {}", this);
}
/**
* Add a list of entries to the queue.
* @param entries entries.
* @return whether the queue worked.
*/
public boolean enqueue(List<FileEntry> entries) {
if (entries.isEmpty()) {
LOG.debug("ignoring enqueue of empty list");
// exit fast, but return true.
return true;
}
if (active.get()) {
try {
LOG.debug("Queueing {} entries", entries.size());
final boolean enqueued = queue.offer(new QueueEntry(Actions.write, entries),
WRITER_QUEUE_PUT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!enqueued) {
LOG.warn("Timeout submitting entries to {}", this);
}
return enqueued;
} catch (InterruptedException e) {
Thread.interrupted();
return false;
}
} else {
LOG.warn("EntryFile write queue inactive; discarding {} entries submitted to {}",
entries.size(), this);
return false;
}
}
/**
* Queue and process entries until done.
* @return count of entries written.
* @throws UncheckedIOException on write failure
*/
private int processor() {
Thread.currentThread().setName("EntryIOWriter");
try {
while (!stop.get()) {
final QueueEntry queueEntry = queue.take();
switch (queueEntry.action) {
case stop: // stop the operation
LOG.debug("Stop processing");
stop.set(true);
break;
case write: // write data
default: // here to shut compiler up
// write
final List<FileEntry> entries = queueEntry.entries;
LOG.debug("Adding block of {} entries", entries.size());
for (FileEntry entry : entries) {
append(entry);
}
break;
}
}
} catch (IOException e) {
LOG.debug("Write failure", e);
failure.set(e);
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
// being stopped implicitly
LOG.debug("interrupted", e);
} finally {
stop.set(true);
active.set(false);
// clear the queue, so wake up on any failure mode.
queue.clear();
}
return count.get();
}
/**
* write one entry.
* @param entry entry to write
* @throws IOException on write failure
*/
private void append(FileEntry entry) throws IOException {
writer.append(NullWritable.get(), entry);
final int c = count.incrementAndGet();
LOG.trace("Added entry #{}: {}", c, entry);
}
/**
* Close: stop accepting new writes, wait for queued writes to complete.
* @throws IOException failure closing that writer, or somehow the future
* raises an IOE which isn't caught for later.
*/
@Override
public void close() throws IOException {
// declare as inactive.
// this stops queueing more data, but leaves
// the worker thread still polling and writing.
if (!active.getAndSet(false)) {
// already stopped
return;
}
LOG.debug("Shutting down writer; entry lists in queue: {}",
capacity - queue.remainingCapacity());
// signal queue closure by queuing a stop option.
// this is added at the end of the list of queued blocks,
// of which are written.
try {
queue.put(new QueueEntry(Actions.stop));
} catch (InterruptedException e) {
Thread.interrupted();
}
try {
// wait for the op to finish.
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
LOG.debug("Processed {} files", total);
executor.shutdown();
} catch (TimeoutException e) {
LOG.warn("Timeout waiting for write thread to finish");
// trouble. force close
executor.shutdownNow();
// close the stream
} finally {
writer.close();
}
}
/**
* Raise any IOException caught during execution of the writer thread.
* @throws IOException if one was caught and saved.
*/
public void maybeRaiseWriteException() throws IOException {
final IOException f = failure.get();
if (f != null) {
throw f;
}
}
@Override
public String toString() {
return "EntryWriter{" +
"stop=" + stop.get() +
", active=" + active.get() +
", count=" + count.get() +
", queue depth=" + queue.size() +
", failure=" + failure +
'}';
}
}
/**
* Iterator to retrieve file entries from the sequence file.
* Closeable; it will close automatically when the last element is read.
* No thread safety.
*/
@VisibleForTesting
static final class EntryIterator implements RemoteIterator<FileEntry>, Closeable {
private final SequenceFile.Reader reader;
private FileEntry fetched;
private boolean closed;
private int count;
/**
* Create an iterator.
* @param reader the file to read from.
*/
private EntryIterator(final SequenceFile.Reader reader) {
this.reader = requireNonNull(reader);
}
@Override
public void close() throws IOException {
if (!closed) {
closed = true;
reader.close();
}
}
@Override
public String toString() {
return "EntryIterator{" +
"closed=" + closed +
", count=" + count +
", fetched=" + fetched +
'}';
}
@Override
public boolean hasNext() throws IOException {
return fetched != null || fetchNext();
}
/**
* Fetch the next entry.
* If there is none, then the reader is closed before `false`
* is returned.
* @return true if a record was retrieved.
* @throws IOException IO failure.
*/
private boolean fetchNext() throws IOException {
FileEntry readBack = new FileEntry();
if (reader.next(NullWritable.get(), readBack)) {
fetched = readBack;
count++;
return true;
} else {
fetched = null;
close();
return false;
}
}
@Override
public FileEntry next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
final FileEntry r = fetched;
fetched = null;
return r;
}
/**
* Is the stream closed.
* @return true if closed.
*/
public boolean isClosed() {
return closed;
}
int getCount() {
return count;
}
}
}