GoogleHadoopOutputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.gs;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.thirdparty.com.google.common.base.Ascii;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
class GoogleHadoopOutputStream extends OutputStream
implements StreamCapabilities, Syncable {
private static final Logger LOG = LoggerFactory.getLogger(GoogleHadoopOutputStream.class);
// Prefix used for all temporary files created by this stream.
private static final String TMP_FILE_PREFIX = "_GHFS_SYNC_TMP_FILE_";
// Temporary files don't need to contain the desired attributes of the final destination file
// since metadata settings get clobbered on final compose() anyways; additionally, due to
// the way we pick temp file names and already ensured directories for the destination file,
// we can optimize tempfile creation by skipping various directory checks.
private static final CreateFileOptions TMP_FILE_CREATE_OPTIONS =
CreateFileOptions.builder().setEnsureNoDirectoryConflict(false).build();
// Deletion of temporary files occurs asynchronously for performance reasons, but in-flight
// deletions are awaited on close() so as long as all output streams are closed, there should
// be no remaining in-flight work occurring inside this threadpool.
private static final ExecutorService TMP_FILE_CLEANUP_THREADPOOL =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ghfs-output-stream-sync-cleanup-%d")
.setDaemon(true)
.build());
private final GoogleHadoopFileSystem ghfs;
private final CreateObjectOptions composeObjectOptions;
// Path of the file to write to.
private final URI dstGcsPath;
/**
* The last known generationId of the {@link #dstGcsPath} file, or possibly {@link
* StorageResourceId#UNKNOWN_GENERATION_ID} if unknown.
*/
private long dstGenerationId;
// GCS path pointing at the "tail" file which will be appended to the destination
// on hflush()/hsync() call.
private URI tmpGcsPath;
/**
* Stores the component index corresponding to {@link #tmpGcsPath}. If close() is called, the
* total number of components in the {@link #dstGcsPath} will be {@code tmpIndex + 1}.
*/
private int tmpIndex;
// OutputStream pointing at the "tail" file which will be appended to the destination
// on hflush()/hsync() call.
private OutputStream tmpOut;
private final RateLimiter syncRateLimiter;
// List of temporary file-deletion futures accrued during the lifetime of this output stream.
private final List<Future<Void>> tmpDeletionFutures = new ArrayList<>();
// Statistics tracker provided by the parent GoogleHadoopFileSystem for recording
// numbers of bytes written.
private final FileSystem.Statistics statistics;
/**
* Constructs an instance of GoogleHadoopOutputStream object.
*
* @param ghfs Instance of {@link GoogleHadoopFileSystem}.
* @param dstGcsPath Path of the file to write to.
* @param statistics File system statistics object.
* @param createFileOptions options for file creation
* @throws IOException if an IO error occurs.
*/
GoogleHadoopOutputStream(GoogleHadoopFileSystem ghfs, URI dstGcsPath,
CreateFileOptions createFileOptions, FileSystem.Statistics statistics) throws IOException {
LOG.trace("GoogleHadoopOutputStream(gcsPath: {}, createFileOptions: {})", dstGcsPath,
createFileOptions);
this.ghfs = ghfs;
this.dstGcsPath = dstGcsPath;
this.statistics = statistics;
Duration minSyncInterval = ghfs.getFileSystemConfiguration().getMinSyncInterval();
this.syncRateLimiter =
minSyncInterval.isNegative() || minSyncInterval.isZero()
? null
: RateLimiter.create(/* permitsPerSecond= */ 1_000.0 / minSyncInterval.toMillis());
this.composeObjectOptions =
GoogleCloudStorageFileSystem.objectOptionsFromFileOptions(
createFileOptions.toBuilder()
// Set write mode to OVERWRITE because we use compose operation to append new data
// to an existing object
.setWriteMode(CreateFileOptions.WriteMode.OVERWRITE)
.build());
if (createFileOptions.getWriteMode() == CreateFileOptions.WriteMode.APPEND) {
// When appending first component has to go to new temporary file.
this.tmpGcsPath = getNextTmpPath();
this.tmpIndex = 1;
} else {
// The first component of the stream will go straight to the destination filename to optimize
// the case where no hsync() or a single hsync() is called during the lifetime of the stream;
// committing the first component thus doesn't require any compose() call under the hood.
this.tmpGcsPath = dstGcsPath;
this.tmpIndex = 0;
}
this.tmpOut =
createOutputStream(
ghfs.getGcsFs(),
tmpGcsPath,
tmpIndex == 0 ? createFileOptions : TMP_FILE_CREATE_OPTIONS);
this.dstGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID;
}
private OutputStream createOutputStream(GoogleCloudStorageFileSystem gcsfs, URI gcsPath,
CreateFileOptions options)
throws IOException {
WritableByteChannel channel;
try {
channel = gcsfs.create(gcsPath, options);
} catch (java.nio.file.FileAlreadyExistsException e) {
throw (FileAlreadyExistsException) new FileAlreadyExistsException(
String.format("'%s' already exists", gcsPath)).initCause(e);
}
OutputStream outputStream = Channels.newOutputStream(channel);
int bufferSize = gcsfs.getConfiguration().getOutStreamBufferSize();
return bufferSize > 0 ? new BufferedOutputStream(outputStream, bufferSize) : outputStream;
}
@Override
public void write(int b) throws IOException {
throwIfNotOpen();
tmpOut.write(b);
statistics.incrementBytesWritten(1);
statistics.incrementWriteOps(1);
}
@Override
public void write(@Nonnull byte[] b, int offset, int len) throws IOException {
throwIfNotOpen();
tmpOut.write(b, offset, len);
statistics.incrementBytesWritten(len);
statistics.incrementWriteOps(1);
}
private void commitTempFile() throws IOException {
// TODO: return early when 0 bytes have been written in the temp files
tmpOut.close();
long tmpGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID;
LOG.trace(
"tmpOut is an instance of {}; expected generationId {}.",
tmpOut.getClass(), tmpGenerationId);
// On the first component, tmpGcsPath will equal finalGcsPath, and no compose() call is
// necessary. Otherwise, we compose in-place into the destination object and then delete
// the temporary object.
if (dstGcsPath.equals(tmpGcsPath)) {
// First commit was direct to the destination; the generationId of the object we just
// committed will be used as the destination generation id for future compose calls.
dstGenerationId = tmpGenerationId;
} else {
StorageResourceId dstId =
StorageResourceId.fromUriPath(
dstGcsPath, /* allowEmptyObjectName= */ false, dstGenerationId);
StorageResourceId tmpId =
StorageResourceId.fromUriPath(
tmpGcsPath, /* allowEmptyObjectName= */ false, tmpGenerationId);
checkState(
dstId.getBucketName().equals(tmpId.getBucketName()),
"Destination bucket in path '%s' doesn't match temp file bucket in path '%s'",
dstGcsPath,
tmpGcsPath);
GoogleCloudStorageFileSystem gcs = ghfs.getGcsFs();
GoogleCloudStorageItemInfo composedObject =
gcs.composeObjects(ImmutableList.of(dstId, tmpId), dstId, composeObjectOptions);
dstGenerationId = composedObject.getContentGeneration();
tmpDeletionFutures.add(
TMP_FILE_CLEANUP_THREADPOOL.submit(
() -> {
gcs.delete(ImmutableList.of(tmpId));
return null;
}));
}
}
@Override
public void close() throws IOException {
LOG.trace(
"close(): temp tail file: %s final destination: {}", tmpGcsPath, dstGcsPath);
if (tmpOut == null) {
LOG.trace("close(): Ignoring; stream already closed.");
return;
}
commitTempFile();
try {
tmpOut.close();
} finally {
tmpOut = null;
}
tmpGcsPath = null;
tmpIndex = -1;
LOG.trace("close(): Awaiting {} deletionFutures", tmpDeletionFutures.size());
for (Future<?> deletion : tmpDeletionFutures) {
try {
deletion.get();
} catch (ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new IOException(
String.format(
"Failed to delete temporary files while closing stream: '%s'", dstGcsPath),
e);
}
}
}
private void throwIfNotOpen() throws IOException {
if (tmpOut == null) {
throw new ClosedChannelException();
}
}
@Override
public boolean hasCapability(String capability) {
checkArgument(!isNullOrEmpty(capability), "capability must not be null or empty string");
switch (Ascii.toLowerCase(capability)) {
case StreamCapabilities.HFLUSH:
case StreamCapabilities.HSYNC:
return syncRateLimiter != null;
case StreamCapabilities.IOSTATISTICS:
return false; // TODO: Add support
default:
return false;
}
}
/**
* There is no way to flush data to become available for readers without a full-fledged hsync(),
* If the output stream is only syncable, this method is a no-op. If the output stream is also
* flushable, this method will simply use the same implementation of hsync().
*
* <p>If it is rate limited, unlike hsync(), which will try to acquire the permits and block, it
* will do nothing.
*/
@Override
public void hflush() throws IOException {
LOG.trace("hflush(): {}", dstGcsPath);
long startMs = System.currentTimeMillis();
throwIfNotOpen();
// If rate limit not set or permit acquired than use hsync()
if (syncRateLimiter == null || syncRateLimiter.tryAcquire()) {
LOG.trace("hflush() uses hsyncInternal() for {}", dstGcsPath);
hsyncInternal(startMs);
return;
}
LOG.trace(
"hflush(): No-op due to rate limit ({}): readers will *not* yet see flushed data for {}",
syncRateLimiter, dstGcsPath);
}
@Override
public void hsync() throws IOException {
LOG.trace("hsync(): {}", dstGcsPath);
long startMs = System.currentTimeMillis();
throwIfNotOpen();
if (syncRateLimiter != null) {
LOG.trace(
"hsync(): Rate limited ({}) with blocking permit acquisition for {}",
syncRateLimiter, dstGcsPath);
syncRateLimiter.acquire();
}
hsyncInternal(startMs);
}
/** Internal implementation of hsync, can be reused by hflush() as well. */
private void hsyncInternal(long startMs) throws IOException {
LOG.trace(
"hsyncInternal(): Committing tail file {} to final destination {}", tmpGcsPath, dstGcsPath);
commitTempFile();
// Use a different temporary path for each temporary component to reduce the possible avenues of
// race conditions in the face of low-level retries, etc.
++tmpIndex;
tmpGcsPath = getNextTmpPath();
LOG.trace(
"hsync(): Opening next temporary tail file {} at {} index", tmpGcsPath, tmpIndex);
tmpOut = createOutputStream(ghfs.getGcsFs(), tmpGcsPath, TMP_FILE_CREATE_OPTIONS);
long finishMs = System.currentTimeMillis();
LOG.trace("Took {}ms to sync() for {}", finishMs - startMs, dstGcsPath);
}
/** Returns URI to be used for the next temp "tail" file in the series. */
private URI getNextTmpPath() {
Path basePath = ghfs.getHadoopPath(dstGcsPath);
Path tempPath =
new Path(
basePath.getParent(),
String.format(
"%s%s.%d.%s", TMP_FILE_PREFIX, basePath.getName(), tmpIndex, UUID.randomUUID()));
return ghfs.getGcsPath(tempPath);
}
}