AbfsInputStreamContext.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
/**
* Class to hold extra input stream configs.
*/
public class AbfsInputStreamContext extends AbfsStreamContext {
// Retaining logger of AbfsInputStream
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
private int readBufferSize;
private int readAheadQueueDepth;
private boolean tolerateOobAppends;
private boolean isReadAheadEnabled = true;
private boolean isReadAheadV2Enabled;
private boolean alwaysReadBufferSize;
private int readAheadBlockSize;
private int readAheadRange;
private AbfsInputStreamStatistics streamStatistics;
private boolean readSmallFilesCompletely;
private boolean optimizeFooterRead;
private int footerReadBufferSize;
private boolean bufferedPreadDisabled;
/** A BackReference to the FS instance that created this OutputStream. */
private BackReference fsBackRef;
private ContextEncryptionAdapter contextEncryptionAdapter = null;
/**
* Constructs a new {@link AbfsInputStreamContext}.
*
* @param sasTokenRenewPeriodForStreamsInSeconds SAS token renewal interval in seconds.
*/
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
/**
* Sets the read buffer size.
*
* @param readBufferSize buffer size in bytes.
* @return this instance.
*/
public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
/**
* Sets the read-ahead queue depth.
* Defaults to the number of available processors if negative.
*
* @param readAheadQueueDepth queue depth.
* @return this instance.
*/
public AbfsInputStreamContext withReadAheadQueueDepth(
final int readAheadQueueDepth) {
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
? readAheadQueueDepth
: Runtime.getRuntime().availableProcessors();
return this;
}
/**
* Enables or disables tolerance for out-of-band appends.
*
* @param tolerateOobAppends whether OOB appends should be tolerated.
* @return this instance.
*/
public AbfsInputStreamContext withTolerateOobAppends(
final boolean tolerateOobAppends) {
this.tolerateOobAppends = tolerateOobAppends;
return this;
}
/**
* Enables or disables read-ahead feature.
*
* @param isReadAheadEnabled whether read-ahead is enabled.
* @return this instance.
*/
public AbfsInputStreamContext isReadAheadEnabled(
final boolean isReadAheadEnabled) {
this.isReadAheadEnabled = isReadAheadEnabled;
return this;
}
/**
* Enables or disables read-ahead version 2.
*
* @param isReadAheadV2Enabled whether read-ahead V2 is enabled.
* @return this instance.
*/
public AbfsInputStreamContext isReadAheadV2Enabled(
final boolean isReadAheadV2Enabled) {
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
return this;
}
/**
* Sets the read-ahead range.
*
* @param readAheadRange range in bytes.
* @return this instance.
*/
public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
return this;
}
/**
* Sets stream statistics collector.
*
* @param streamStatistics statistics instance.
* @return this instance.
*/
public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
return this;
}
/**
* Enables or disables complete read of small files in a single operation.
*
* @param readSmallFilesCompletely whether small files should be fully read.
* @return this instance.
*/
public AbfsInputStreamContext withReadSmallFilesCompletely(
final boolean readSmallFilesCompletely) {
this.readSmallFilesCompletely = readSmallFilesCompletely;
return this;
}
/**
* Enables or disables footer read optimization.
*
* @param optimizeFooterRead whether footer read optimization is enabled.
* @return this instance.
*/
public AbfsInputStreamContext withOptimizeFooterRead(
final boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
return this;
}
/**
* Sets the footer read buffer size.
*
* @param footerReadBufferSize size in bytes.
* @return this instance.
*/
public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}
/**
* Forces use of the configured read buffer size always.
*
* @param alwaysReadBufferSize whether to always use configured buffer size.
* @return this instance.
*/
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
return this;
}
/**
* Sets the read-ahead block size.
*
* @param readAheadBlockSize block size in bytes.
* @return this instance.
*/
public AbfsInputStreamContext withReadAheadBlockSize(
final int readAheadBlockSize) {
this.readAheadBlockSize = readAheadBlockSize;
return this;
}
/**
* Enables or disables buffered positional reads.
*
* @param bufferedPreadDisabled whether buffered pread is disabled.
* @return this instance.
*/
public AbfsInputStreamContext withBufferedPreadDisabled(
final boolean bufferedPreadDisabled) {
this.bufferedPreadDisabled = bufferedPreadDisabled;
return this;
}
/**
* Sets a back reference to the filesystem that created this stream.
*
* @param fsBackRef filesystem back reference.
* @return this instance.
*/
public AbfsInputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
/**
* Sets the context encryption adapter.
*
* @param contextEncryptionAdapter encryption adapter.
* @return this instance.
*/
public AbfsInputStreamContext withEncryptionAdapter(
ContextEncryptionAdapter contextEncryptionAdapter){
this.contextEncryptionAdapter = contextEncryptionAdapter;
return this;
}
/**
* Finalizes and validates the context configuration.
* <p>
* Ensures read-ahead range is valid and aligns read-ahead block size with
* read request size if necessary.
*
* @return this instance.
*/
public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
"fs.azure.read.request.size[={}] is configured for higher size than "
+ "fs.azure.read.readahead.blocksize[={}]. Auto-align "
+ "readAhead block size to be same as readRequestSize.",
readBufferSize, readAheadBlockSize);
readAheadBlockSize = readBufferSize;
}
// Validation of parameters to be done here.
Preconditions.checkArgument(readAheadRange > 0,
"Read ahead range should be greater than 0");
return this;
}
/** @return configured read buffer size. */
public int getReadBufferSize() {
return readBufferSize;
}
/** @return read-ahead queue depth. */
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}
/** @return whether out-of-band appends are tolerated. */
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
/** @return whether read-ahead is enabled. */
public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}
/** @return whether read-ahead V2 is enabled. */
public boolean isReadAheadV2Enabled() {
return isReadAheadV2Enabled;
}
/** @return read-ahead range. */
public int getReadAheadRange() {
return readAheadRange;
}
/** @return stream statistics collector. */
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
/** @return whether small files should be read completely. */
public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
/** @return whether footer read optimization is enabled. */
public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}
/** @return footer read buffer size. */
public int getFooterReadBufferSize() {
return footerReadBufferSize;
}
/** @return whether the configured buffer size is always used. */
public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
/** @return read-ahead block size. */
public int getReadAheadBlockSize() {
return readAheadBlockSize;
}
/** @return whether buffered pread is disabled. */
public boolean isBufferedPreadDisabled() {
return bufferedPreadDisabled;
}
/** @return filesystem back reference. */
public BackReference getFsBackRef() {
return fsBackRef;
}
/** @return context encryption adapter. */
public ContextEncryptionAdapter getEncryptionAdapter() {
return contextEncryptionAdapter;
}
}