S3SelectRecordCursorProvider.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.s3select;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveFileSplit;
import com.facebook.presto.hive.HiveRecordCursorProvider;
import com.facebook.presto.hive.IonSqlQueryBuilder;
import com.facebook.presto.hive.s3.PrestoS3ClientFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTimeZone;
import javax.inject.Inject;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.serde.serdeConstants.COLUMN_NAME_DELIMITER;
import static org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS;
public class S3SelectRecordCursorProvider
implements HiveRecordCursorProvider
{
private final HdfsEnvironment hdfsEnvironment;
private final HiveClientConfig clientConfig;
private final PrestoS3ClientFactory s3ClientFactory;
@Inject
public S3SelectRecordCursorProvider(
HdfsEnvironment hdfsEnvironment,
HiveClientConfig clientConfig,
PrestoS3ClientFactory s3ClientFactory)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.clientConfig = requireNonNull(clientConfig, "clientConfig is null");
this.s3ClientFactory = requireNonNull(s3ClientFactory, "s3ClientFactory is null");
}
@Override
public Optional<RecordCursor> createRecordCursor(
Configuration configuration,
ConnectorSession session,
HiveFileSplit fileSplit,
Properties schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
boolean s3SelectPushdownEnabled)
{
if (!s3SelectPushdownEnabled) {
return Optional.empty();
}
Path path = new Path(fileSplit.getPath());
try {
this.hdfsEnvironment.getFileSystem(session.getUser(), path, configuration);
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);
}
// Query is not going to filter any data, no need to use S3 Select.
if (!hasFilters(schema, effectivePredicate, columns)) {
return Optional.empty();
}
String serdeName = getDeserializerClassName(schema);
Optional<S3SelectDataType> s3SelectDataTypeOptional = S3SelectSerDeDataTypeMapper.getDataType(serdeName);
if (s3SelectDataTypeOptional.isPresent()) {
S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get();
IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType);
String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate);
Optional<S3SelectLineRecordReader> recordReader = S3SelectLineRecordReaderProvider.get(configuration, clientConfig, path, fileSplit.getStart(), fileSplit.getLength(), fileSplit.getFileSize(), schema, ionSqlQuery, s3ClientFactory, s3SelectDataType);
// If S3 Select data type is not mapped to a S3SelectLineRecordReader it will return Optional.empty()
return recordReader.map(s3SelectLineRecordReader -> new S3SelectRecordCursor<>(configuration, path, s3SelectLineRecordReader, fileSplit.getLength(), schema, columns, hiveStorageTimeZone, typeManager));
}
// unsupported serdes
return Optional.empty();
}
private static boolean hasFilters(
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> projectedColumns)
{
// When there are no effective predicates and the projected columns are identical to the schema, it means that
// we get all the data out of S3. We can use S3 GetObject instead of S3 SelectObjectContent in these cases.
if (effectivePredicate.isAll()) {
return !areColumnsEquivalent(projectedColumns, schema);
}
return true;
}
private static boolean areColumnsEquivalent(List<HiveColumnHandle> projectedColumns, Properties schema)
{
Set<String> projectedColumnNames = projectedColumns.stream().map(HiveColumnHandle::getName).collect(toImmutableSet());
Set<String> schemaColumnNames;
String columnNameProperty = schema.getProperty(LIST_COLUMNS);
if (columnNameProperty.isEmpty()) {
schemaColumnNames = ImmutableSet.of();
}
else {
String columnNameDelimiter = (String) schema.getOrDefault(COLUMN_NAME_DELIMITER, ",");
schemaColumnNames = ImmutableSet.copyOf(columnNameProperty.split(columnNameDelimiter));
}
return projectedColumnNames.equals(schemaColumnNames);
}
}