PrefetchingInputStreamFactory.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.prefetch;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.impl.streams.AbstractObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_COUNT_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
import static org.apache.hadoop.fs.s3a.S3AUtils.longBytesOption;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
import static org.apache.hadoop.util.Preconditions.checkState;
/**
* Factory for prefetching streams.
* <p>
* Reads and validates prefetch configuration options during service init.
*/
public class PrefetchingInputStreamFactory extends AbstractObjectInputStreamFactory {
/** Size in bytes of a single prefetch block. */
private int prefetchBlockSize;
/** Size of prefetch queue (in number of blocks). */
private int prefetchBlockCount;
/**
* Shared prefetch options.
*/
private PrefetchOptions prefetchOptions;
public PrefetchingInputStreamFactory() {
super("PrefetchingInputStreamFactory");
}
@Override
public InputStreamType streamType() {
return InputStreamType.Prefetch;
}
@Override
protected void serviceInit(final Configuration conf) throws Exception {
super.serviceInit(conf);
long prefetchBlockSizeLong =
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
checkState(prefetchBlockSizeLong < Integer.MAX_VALUE,
"S3A prefetch block size exceeds int limit");
prefetchBlockSize = (int) prefetchBlockSizeLong;
prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
prefetchOptions = new PrefetchOptions(
prefetchBlockSize,
prefetchBlockCount);
}
@Override
public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
return new S3APrefetchingInputStream(parameters,
getConfig(),
prefetchOptions);
}
/**
* Calculate Return StreamFactoryRequirements.
* @return thread count a vector minimum seek of 0.
*/
@Override
public StreamFactoryRequirements factoryRequirements() {
// fill in the vector context
// and then disable range merging.
// this ensures that no reads are made for data which is then discarded...
// so the prefetch and block read code doesn't ever do wasteful fetches.
final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig())
.setMinSeekForVectoredReads(0);
return new StreamFactoryRequirements(prefetchBlockCount,
0,
vectorContext,
StreamFactoryRequirements.Requirements.RequiresFuturePool);
}
}