AnalyticsStreamFactory.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.fs.s3a.impl.streams;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.Statistic.ANALYTICS_STREAM_FACTORY_CLOSED;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext;
/**
* A factory for {@link AnalyticsStream}. This class is instantiated during initialization of
* {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics.
*/
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
private static final Logger LOG =
LoggerFactory.getLogger(AnalyticsStreamFactory.class);
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory;
private boolean requireCrt;
public AnalyticsStreamFactory() {
super("AnalyticsStreamFactory");
}
@Override
protected void serviceInit(final Configuration conf) throws Exception {
super.serviceInit(conf);
ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
this.seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.requireCrt = false;
}
@Override
public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
super.bind(factoryBindingParameters);
this.s3SeekableInputStreamFactory =
new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory());
}
@Override
public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
return new AnalyticsStream(
parameters,
getOrCreateS3SeekableInputStreamFactory());
}
@Override
public InputStreamType streamType() {
return InputStreamType.Analytics;
}
/**
* Calculate Return StreamFactoryRequirements.
* @return a positive thread count.
*/
@Override
public StreamFactoryRequirements factoryRequirements() {
// fill in the vector context
final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig());
// and then disable range merging.
// this ensures that no reads are made for data which is then discarded...
// so the prefetch and block read code doesn't ever do wasteful fetches.
vectorContext.setMinSeekForVectoredReads(0);
return new StreamFactoryRequirements(0,
0, vectorContext,
StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
}
@Override
protected void serviceStop() throws Exception {
try {
s3SeekableInputStreamFactory.close();
} catch (Exception ignored) {
LOG.debug("Ignored exception while closing stream factory", ignored);
}
callbacks().incrementFactoryStatistic(ANALYTICS_STREAM_FACTORY_CLOSED);
super.serviceStop();
}
private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
throws IOException {
return s3SeekableInputStreamFactory.eval();
}
private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() {
return () -> new S3SeekableInputStreamFactory(
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
seekableInputStreamConfiguration);
}
}