AbstractTestHiveClient.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.GroupByHashPageIndexerFactory;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.NamedTypeSignature;
import com.facebook.presto.common.type.RowFieldName;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.SqlDate;
import com.facebook.presto.common.type.SqlTimestamp;
import com.facebook.presto.common.type.SqlVarbinary;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.LocationService.WriteInfo;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.metastore.AbstractCachingHiveMetastore.MetastoreCacheScope;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePartitionMutator;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege;
import com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.SortingColumn;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.thrift.BridgingHiveMetastore;
import com.facebook.presto.hive.metastore.thrift.HiveCluster;
import com.facebook.presto.hive.metastore.thrift.TestingHiveCluster;
import com.facebook.presto.hive.metastore.thrift.ThriftHiveMetastore;
import com.facebook.presto.hive.metastore.thrift.ThriftHiveMetastoreConfig;
import com.facebook.presto.hive.orc.OrcBatchPageSource;
import com.facebook.presto.hive.orc.OrcSelectivePageSource;
import com.facebook.presto.hive.pagefile.PageFilePageSource;
import com.facebook.presto.hive.parquet.ParquetPageSource;
import com.facebook.presto.hive.rcfile.RcFilePageSource;
import com.facebook.presto.hive.rule.BaseSubfieldExtractionRewriter.ConnectorPushdownFilterResult;
import com.facebook.presto.hive.rule.HiveFilterPushdown;
import com.facebook.presto.hive.rule.HiveFilterPushdown.SubfieldExtractionRewriter;
import com.facebook.presto.hive.statistics.QuickStatsProvider;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.ConnectorViewDefinition;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.DiscretePredicates;
import com.facebook.presto.spi.PageSinkContext;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.ViewNotFoundException;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorPartitioningMetadata;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingContext;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.constraints.PrimaryKeyConstraint;
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.constraints.UniqueConstraint;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.ConnectorIdentity;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.TestingRowExpressionTranslator;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.TestingConnectorSession;
import com.facebook.presto.testing.TestingNodeManager;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.testing.Assertions.assertEqualsIgnoreOrder;
import static com.facebook.airlift.testing.Assertions.assertGreaterThan;
import static com.facebook.airlift.testing.Assertions.assertGreaterThanOrEqual;
import static com.facebook.airlift.testing.Assertions.assertInstanceOf;
import static com.facebook.airlift.testing.Assertions.assertLessThanOrEqual;
import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.CharType.createCharType;
import static com.facebook.presto.common.type.Chars.isCharType;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DecimalType.createDecimalType;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.HyperLogLogType.HYPER_LOG_LOG;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.common.type.VarcharType.createVarcharType;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.COMMIT;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.ROLLBACK_AFTER_APPEND_PAGE;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.ROLLBACK_AFTER_BEGIN_INSERT;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.ROLLBACK_AFTER_DELETE;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.ROLLBACK_AFTER_FINISH_INSERT;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.ROLLBACK_AFTER_SINK_FINISH;
import static com.facebook.presto.hive.AbstractTestHiveClient.TransactionDeleteInsertTestTag.ROLLBACK_RIGHT_AWAY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.CacheQuotaScope.TABLE;
import static com.facebook.presto.hive.HiveBasicStatistics.createEmptyStatistics;
import static com.facebook.presto.hive.HiveBasicStatistics.createZeroStatistics;
import static com.facebook.presto.hive.HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER;
import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.MAX_PARTITION_KEY_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.bucketColumnHandle;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HiveMetadata.convertToPredicate;
import static com.facebook.presto.hive.HiveQueryRunner.METASTORE_CONTEXT;
import static com.facebook.presto.hive.HiveSessionProperties.OFFLINE_DATA_DEBUG_MODE_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.SORTED_WRITE_TO_TEMP_PATH_ENABLED;
import static com.facebook.presto.hive.HiveStorageFormat.ALPHA;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
import static com.facebook.presto.hive.HiveStorageFormat.CSV;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.JSON;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveStorageFormat.RCBINARY;
import static com.facebook.presto.hive.HiveStorageFormat.RCTEXT;
import static com.facebook.presto.hive.HiveStorageFormat.SEQUENCEFILE;
import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE;
import static com.facebook.presto.hive.HiveTableProperties.BUCKETED_BY_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.BUCKET_COUNT_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.PARTITIONED_BY_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.SORTED_BY_PROPERTY;
import static com.facebook.presto.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY;
import static com.facebook.presto.hive.HiveTestUtils.DO_NOTHING_DIRECTORY_LISTER;
import static com.facebook.presto.hive.HiveTestUtils.FILTER_STATS_CALCULATOR_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION;
import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static com.facebook.presto.hive.HiveTestUtils.METADATA;
import static com.facebook.presto.hive.HiveTestUtils.PAGE_SORTER;
import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.HiveTestUtils.arrayType;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveAggregatedPageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveBatchPageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveFileWriterFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveRecordCursorProvider;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultHiveSelectivePageSourceFactories;
import static com.facebook.presto.hive.HiveTestUtils.getDefaultOrcFileWriterFactory;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.hive.HiveTestUtils.mapType;
import static com.facebook.presto.hive.HiveTestUtils.rowType;
import static com.facebook.presto.hive.HiveType.HIVE_BOOLEAN;
import static com.facebook.presto.hive.HiveType.HIVE_BYTE;
import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE;
import static com.facebook.presto.hive.HiveType.HIVE_FLOAT;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_LONG;
import static com.facebook.presto.hive.HiveType.HIVE_SHORT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.HiveType.toHiveType;
import static com.facebook.presto.hive.HiveUtil.columnExtraInfo;
import static com.facebook.presto.hive.LocationHandle.WriteMode.STAGE_AND_MOVE_TO_TARGET_DIRECTORY;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createBinaryColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createBooleanColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createDateColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createDecimalColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createDoubleColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createStringColumnStatistics;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_QUERY_ID_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_VERSION_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createDirectory;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionNames;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.NoopMetastoreCacheStats.NOOP_METASTORE_CACHE_STATS;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.SplitContext.NON_CACHEABLE;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.facebook.presto.sql.planner.VariablesExtractor.extractUnique;
import static com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.expression;
import static com.facebook.presto.testing.DateTimeTestingUtils.sqlTimestampOf;
import static com.facebook.presto.testing.MaterializedResult.materializeSourceDataStream;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Maps.uniqueIndex;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.Sets.difference;
import static com.google.common.hash.Hashing.sha256;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.lang.Thread.sleep;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyList;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.joda.time.DateTimeZone.UTC;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public abstract class AbstractTestHiveClient
{
protected static final String TEMPORARY_TABLE_PREFIX = "tmp_presto_test_";
protected static final String INVALID_DATABASE = "totally_invalid_database_name";
protected static final String INVALID_TABLE = "totally_invalid_table_name";
protected static final String INVALID_COLUMN = "totally_invalid_column_name";
protected static final String TEST_SERVER_VERSION = "test_version";
protected static final Executor EXECUTOR = Executors.newFixedThreadPool(5);
protected static final PageSinkContext TEST_HIVE_PAGE_SINK_CONTEXT = PageSinkContext.builder().setCommitRequired(false).setConnectorMetadataUpdater(new HiveMetadataUpdater(EXECUTOR)).build();
private static final Type ARRAY_TYPE = arrayType(createUnboundedVarcharType());
private static final Type MAP_TYPE = mapType(createUnboundedVarcharType(), BIGINT);
private static final Type ROW_TYPE = rowType(ImmutableList.of(
new NamedTypeSignature(Optional.of(new RowFieldName("f_string", false)), createUnboundedVarcharType().getTypeSignature()),
new NamedTypeSignature(Optional.of(new RowFieldName("f_bigint", false)), BIGINT.getTypeSignature()),
new NamedTypeSignature(Optional.of(new RowFieldName("f_boolean", false)), BOOLEAN.getTypeSignature())));
private static final List<ColumnMetadata> CREATE_TABLE_COLUMNS = ImmutableList.<ColumnMetadata>builder()
.add(ColumnMetadata.builder().setName("id").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("t_string").setType(createUnboundedVarcharType()).build())
.add(ColumnMetadata.builder().setName("t_tinyint").setType(TINYINT).build())
.add(ColumnMetadata.builder().setName("t_smallint").setType(SMALLINT).build())
.add(ColumnMetadata.builder().setName("t_integer").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("t_bigint").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("t_float").setType(REAL).build())
.add(ColumnMetadata.builder().setName("t_double").setType(DOUBLE).build())
.add(ColumnMetadata.builder().setName("t_boolean").setType(BOOLEAN).build())
.add(ColumnMetadata.builder().setName("t_array").setType(ARRAY_TYPE).build())
.add(ColumnMetadata.builder().setName("t_map").setType(MAP_TYPE).build())
.add(ColumnMetadata.builder().setName("t_row").setType(ROW_TYPE).build())
.build();
private static final MaterializedResult CREATE_TABLE_DATA =
MaterializedResult.resultBuilder(SESSION, BIGINT, createUnboundedVarcharType(), TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, BOOLEAN, ARRAY_TYPE, MAP_TYPE, ROW_TYPE)
.row(1L, "hello", (byte) 45, (short) 345, 234, 123L, -754.1985f, 43.5, true, ImmutableList.of("apple", "banana"), ImmutableMap.of("one", 1L, "two", 2L), ImmutableList.of("true", 1L, true))
.row(2L, null, null, null, null, null, null, null, null, null, null, null)
.row(3L, "bye", (byte) 46, (short) 346, 345, 456L, 754.2008f, 98.1, false, ImmutableList.of("ape", "bear"), ImmutableMap.of("three", 3L, "four", 4L), ImmutableList.of("false", 0L, false))
.build();
protected static final List<ColumnMetadata> CREATE_TABLE_COLUMNS_PARTITIONED = ImmutableList.<ColumnMetadata>builder()
.addAll(CREATE_TABLE_COLUMNS)
.add(ColumnMetadata.builder().setName("ds").setType(createUnboundedVarcharType()).build())
.build();
protected static final Predicate<String> PARTITION_COLUMN_FILTER = columnName -> columnName.equals("ds") || columnName.startsWith("part_");
protected static final MaterializedResult CREATE_TABLE_PARTITIONED_DATA = new MaterializedResult(
CREATE_TABLE_DATA.getMaterializedRows().stream()
.map(row -> new MaterializedRow(row.getPrecision(), newArrayList(concat(row.getFields(), ImmutableList.of("2015-07-0" + row.getField(0))))))
.collect(toList()),
ImmutableList.<Type>builder()
.addAll(CREATE_TABLE_DATA.getTypes())
.add(createUnboundedVarcharType())
.build());
private static final String CREATE_TABLE_PARTITIONED_DATA_2ND_PARTITION_VALUE = "2015-07-04";
private static final MaterializedResult CREATE_TABLE_PARTITIONED_DATA_2ND =
MaterializedResult.resultBuilder(SESSION, BIGINT, createUnboundedVarcharType(), TINYINT, SMALLINT, INTEGER, BIGINT, REAL, DOUBLE, BOOLEAN, ARRAY_TYPE, MAP_TYPE, ROW_TYPE, createUnboundedVarcharType())
.row(4L, "hello", (byte) 45, (short) 345, 234, 123L, 754.1985f, 43.5, true, ImmutableList.of("apple", "banana"), ImmutableMap.of("one", 1L, "two", 2L), ImmutableList.of("true", 1L, true), CREATE_TABLE_PARTITIONED_DATA_2ND_PARTITION_VALUE)
.row(5L, null, null, null, null, null, null, null, null, null, null, null, CREATE_TABLE_PARTITIONED_DATA_2ND_PARTITION_VALUE)
.row(6L, "bye", (byte) 46, (short) 346, 345, 456L, -754.2008f, 98.1, false, ImmutableList.of("ape", "bear"), ImmutableMap.of("three", 3L, "four", 4L), ImmutableList.of("false", 0L, false), CREATE_TABLE_PARTITIONED_DATA_2ND_PARTITION_VALUE)
.build();
private static final List<ColumnMetadata> MISMATCH_SCHEMA_PRIMITIVE_COLUMN_BEFORE = ImmutableList.<ColumnMetadata>builder()
.add(ColumnMetadata.builder().setName("tinyint_to_smallint").setType(TINYINT).build())
.add(ColumnMetadata.builder().setName("tinyint_to_integer").setType(TINYINT).build())
.add(ColumnMetadata.builder().setName("tinyint_to_bigint").setType(TINYINT).build())
.add(ColumnMetadata.builder().setName("smallint_to_integer").setType(SMALLINT).build())
.add(ColumnMetadata.builder().setName("smallint_to_bigint").setType(SMALLINT).build())
.add(ColumnMetadata.builder().setName("integer_to_bigint").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("integer_to_varchar").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("varchar_to_integer").setType(createUnboundedVarcharType()).build())
.add(ColumnMetadata.builder().setName("float_to_double").setType(REAL).build())
.add(ColumnMetadata.builder().setName("varchar_to_drop_in_row").setType(createUnboundedVarcharType()).build())
.build();
private static final List<ColumnMetadata> MISMATCH_SCHEMA_TABLE_BEFORE = ImmutableList.<ColumnMetadata>builder()
.addAll(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_BEFORE)
.add(ColumnMetadata.builder().setName("struct_to_struct").setType(toRowType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_BEFORE)).build())
.add(ColumnMetadata.builder().setName("list_to_list").setType(arrayType(toRowType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_BEFORE))).build())
.add(ColumnMetadata.builder().setName("map_to_map").setType(mapType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_BEFORE.get(1).getType(), toRowType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_BEFORE))).build())
.add(ColumnMetadata.builder().setName("ds").setType(createUnboundedVarcharType()).build())
.build();
private static final DataSize DEFAULT_QUOTA_SIZE = DataSize.succinctDataSize(2, GIGABYTE);
private static final CacheQuotaScope CACHE_SCOPE = TABLE;
private static RowType toRowType(List<ColumnMetadata> columns)
{
return rowType(columns.stream()
.map(col -> new NamedTypeSignature(Optional.of(new RowFieldName(format("f_%s", col.getName()), false)), col.getType().getTypeSignature()))
.collect(toList()));
}
private static final MaterializedResult MISMATCH_SCHEMA_PRIMITIVE_FIELDS_DATA_BEFORE =
MaterializedResult.resultBuilder(SESSION, TINYINT, TINYINT, TINYINT, SMALLINT, SMALLINT, INTEGER, INTEGER, createUnboundedVarcharType(), REAL, createUnboundedVarcharType())
.row((byte) -11, (byte) 12, (byte) -13, (short) 14, (short) 15, -16, 17, "2147483647", 18.0f, "2016-08-01")
.row((byte) 21, (byte) -22, (byte) 23, (short) -24, (short) 25, 26, -27, "asdf", -28.0f, "2016-08-02")
.row((byte) -31, (byte) -32, (byte) 33, (short) 34, (short) -35, 36, 37, "-923", 39.5f, "2016-08-03")
.row(null, (byte) 42, (byte) 43, (short) 44, (short) -45, 46, 47, "2147483648", 49.5f, "2016-08-03")
.build();
private static final MaterializedResult MISMATCH_SCHEMA_TABLE_DATA_BEFORE =
MaterializedResult.resultBuilder(SESSION, MISMATCH_SCHEMA_TABLE_BEFORE.stream().map(ColumnMetadata::getType).collect(toList()))
.rows(MISMATCH_SCHEMA_PRIMITIVE_FIELDS_DATA_BEFORE.getMaterializedRows()
.stream()
.map(materializedRow -> {
List<Object> result = materializedRow.getFields();
List<Object> rowResult = materializedRow.getFields();
result.add(rowResult);
result.add(Arrays.asList(rowResult, null, rowResult));
result.add(ImmutableMap.of(rowResult.get(1), rowResult));
result.add(rowResult.get(9));
return new MaterializedRow(materializedRow.getPrecision(), result);
}).collect(toList()))
.build();
private static final List<ColumnMetadata> MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER = ImmutableList.<ColumnMetadata>builder()
.add(ColumnMetadata.builder().setName("tinyint_to_smallint").setType(SMALLINT).build())
.add(ColumnMetadata.builder().setName("tinyint_to_integer").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("tinyint_to_bigint").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("smallint_to_integer").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("smallint_to_bigint").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("integer_to_bigint").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("integer_to_varchar").setType(createUnboundedVarcharType()).build())
.add(ColumnMetadata.builder().setName("varchar_to_integer").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("float_to_double").setType(DOUBLE).build())
.add(ColumnMetadata.builder().setName("varchar_to_drop_in_row").setType(createUnboundedVarcharType()).build())
.build();
private static final Type MISMATCH_SCHEMA_ROW_TYPE_APPEND = toRowType(ImmutableList.<ColumnMetadata>builder()
.addAll(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER)
.add(ColumnMetadata.builder()
.setName(format("%s_append", MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER.get(0).getName()))
.setType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER.get(0).getType())
.build())
.build());
private static final Type MISMATCH_SCHEMA_ROW_TYPE_DROP = toRowType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER.subList(0, MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER.size() - 1));
private static final List<ColumnMetadata> MISMATCH_SCHEMA_TABLE_AFTER = ImmutableList.<ColumnMetadata>builder()
.addAll(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER)
.add(ColumnMetadata.builder().setName("struct_to_struct").setType(MISMATCH_SCHEMA_ROW_TYPE_APPEND).build())
.add(ColumnMetadata.builder().setName("list_to_list").setType(arrayType(MISMATCH_SCHEMA_ROW_TYPE_APPEND)).build())
.add(ColumnMetadata.builder().setName("map_to_map").setType(mapType(MISMATCH_SCHEMA_PRIMITIVE_COLUMN_AFTER.get(1).getType(), MISMATCH_SCHEMA_ROW_TYPE_DROP)).build())
.add(ColumnMetadata.builder().setName("tinyint_append").setType(TINYINT).build())
.add(ColumnMetadata.builder().setName("ds").setType(createUnboundedVarcharType()).build())
.build();
private static final MaterializedResult MISMATCH_SCHEMA_PRIMITIVE_FIELDS_DATA_AFTER =
MaterializedResult.resultBuilder(SESSION, SMALLINT, INTEGER, BIGINT, INTEGER, BIGINT, BIGINT, createUnboundedVarcharType(), INTEGER, DOUBLE, createUnboundedVarcharType())
.row((short) -11, 12, -13L, 14, 15L, -16L, "17", 2147483647, 18.0, "2016-08-01")
.row((short) 21, -22, 23L, -24, 25L, 26L, "-27", null, -28.0, "2016-08-02")
.row((short) -31, -32, 33L, 34, -35L, 36L, "37", -923, 39.5, "2016-08-03")
.row(null, 42, 43L, 44, -45L, 46L, "47", null, 49.5, "2016-08-03")
.build();
private static final MaterializedResult MISMATCH_SCHEMA_TABLE_DATA_AFTER =
MaterializedResult.resultBuilder(SESSION, MISMATCH_SCHEMA_TABLE_AFTER.stream().map(ColumnMetadata::getType).collect(toList()))
.rows(MISMATCH_SCHEMA_PRIMITIVE_FIELDS_DATA_AFTER.getMaterializedRows()
.stream()
.map(materializedRow -> {
List<Object> result = materializedRow.getFields();
List<Object> appendFieldRowResult = materializedRow.getFields();
appendFieldRowResult.add(null);
List<Object> dropFieldRowResult = materializedRow.getFields().subList(0, materializedRow.getFields().size() - 1);
result.add(appendFieldRowResult);
result.add(Arrays.asList(appendFieldRowResult, null, appendFieldRowResult));
result.add(ImmutableMap.of(result.get(1), dropFieldRowResult));
result.add(null);
result.add(result.get(9));
return new MaterializedRow(materializedRow.getPrecision(), result);
}).collect(toList()))
.build();
private static final SubfieldExtractor SUBFIELD_EXTRACTOR = new SubfieldExtractor(FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE.getExpressionOptimizer(SESSION), SESSION);
private static final TypeProvider TYPE_PROVIDER_AFTER = TypeProvider.copyOf(MISMATCH_SCHEMA_TABLE_AFTER.stream()
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getType)));
private static final TestingRowExpressionTranslator ROW_EXPRESSION_TRANSLATOR = new TestingRowExpressionTranslator(METADATA);
private static final List<RowExpression> MISMATCH_SCHEMA_TABLE_AFTER_FILTERS = ImmutableList.of(
// integer_to_varchar
toRowExpression("integer_to_varchar", VARCHAR, Domain.singleValue(VARCHAR, Slices.utf8Slice("17"))),
toRowExpression("integer_to_varchar", VARCHAR, Domain.notNull(VARCHAR)),
toRowExpression("integer_to_varchar", VARCHAR, Domain.onlyNull(VARCHAR)),
// varchar_to_integer
toRowExpression("varchar_to_integer", INTEGER, Domain.singleValue(INTEGER, -923L)),
toRowExpression("varchar_to_integer", INTEGER, Domain.notNull(INTEGER)),
toRowExpression("varchar_to_integer", INTEGER, Domain.onlyNull(INTEGER)),
// tinyint_append
toRowExpression("tinyint_append", TINYINT, Domain.singleValue(TINYINT, 1L)),
toRowExpression("tinyint_append", TINYINT, Domain.onlyNull(TINYINT)),
toRowExpression("tinyint_append", TINYINT, Domain.notNull(TINYINT)),
// struct_to_struct
toRowExpression("struct_to_struct.f_integer_to_varchar", MISMATCH_SCHEMA_ROW_TYPE_APPEND, Domain.singleValue(VARCHAR, Slices.utf8Slice("-27"))),
toRowExpression("struct_to_struct.f_varchar_to_integer", MISMATCH_SCHEMA_ROW_TYPE_APPEND, Domain.singleValue(INTEGER, 2147483647L)),
toRowExpression("struct_to_struct.f_tinyint_to_smallint_append", MISMATCH_SCHEMA_ROW_TYPE_APPEND, Domain.singleValue(TINYINT, 1L)),
toRowExpression("struct_to_struct.f_tinyint_to_smallint_append", MISMATCH_SCHEMA_ROW_TYPE_APPEND, Domain.onlyNull(TINYINT)),
toRowExpression("struct_to_struct.f_tinyint_to_smallint_append", MISMATCH_SCHEMA_ROW_TYPE_APPEND, Domain.notNull(TINYINT)),
// filter functions
toRowExpression("tinyint_to_smallint + 1 > 0"),
toRowExpression("tinyint_to_smallint * 2 < 0"));
private static RowExpression toRowExpression(String name, Type type, Domain domain)
{
RowExpression expression = SUBFIELD_EXTRACTOR.toRowExpression(new Subfield(name), type);
return ROW_EXPRESSION_SERVICE.getDomainTranslator().toPredicate(TupleDomain.withColumnDomains(ImmutableMap.of(expression, domain)));
}
private static RowExpression toRowExpression(String sql)
{
return ROW_EXPRESSION_TRANSLATOR.translate(expression(sql), TYPE_PROVIDER_AFTER);
}
private static final List<Predicate<MaterializedRow>> MISMATCH_SCHEMA_TABLE_AFTER_RESULT_PREDICATES = ImmutableList.of(
// integer_to_varchar
row -> Objects.equals(row.getField(6), "17"),
row -> row.getField(6) != null,
row -> row.getField(6) == null,
// varchar_to_integer
row -> Objects.equals(row.getField(7), -923),
row -> row.getField(7) != null,
row -> row.getField(7) == null,
// tinyint_append
row -> false,
row -> true,
row -> false,
// struct_to_struct
row -> Objects.equals(row.getField(6), "-27"),
row -> Objects.equals(row.getField(7), 2147483647),
row -> false,
row -> true,
row -> false,
// filter functions
row -> row.getField(0) != null && (short) row.getField(0) + 1 > 0,
row -> row.getField(0) != null && (short) row.getField(0) + 1 < 0);
protected Set<HiveStorageFormat> createTableFormats = getSupportedCreateTableHiveStorageFormats();
private static final JoinCompiler JOIN_COMPILER = new JoinCompiler(MetadataManager.createTestMetadataManager());
private static final List<ColumnMetadata> STATISTICS_TABLE_COLUMNS = ImmutableList.<ColumnMetadata>builder()
.add(ColumnMetadata.builder().setName("t_boolean").setType(BOOLEAN).build())
.add(ColumnMetadata.builder().setName("t_bigint").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("t_integer").setType(INTEGER).build())
.add(ColumnMetadata.builder().setName("t_smallint").setType(SMALLINT).build())
.add(ColumnMetadata.builder().setName("t_tinyint").setType(TINYINT).build())
.add(ColumnMetadata.builder().setName("t_double").setType(DOUBLE).build())
.add(ColumnMetadata.builder().setName("t_float").setType(REAL).build())
.add(ColumnMetadata.builder().setName("t_string").setType(createUnboundedVarcharType()).build())
.add(ColumnMetadata.builder().setName("t_varchar").setType(createVarcharType(100)).build())
.add(ColumnMetadata.builder().setName("t_char").setType(createCharType(5)).build())
.add(ColumnMetadata.builder().setName("t_varbinary").setType(VARBINARY).build())
.add(ColumnMetadata.builder().setName("t_date").setType(DATE).build())
.add(ColumnMetadata.builder().setName("t_timestamp").setType(TIMESTAMP).build())
.add(ColumnMetadata.builder().setName("t_short_decimal").setType(createDecimalType(5, 2)).build())
.add(ColumnMetadata.builder().setName("t_long_decimal").setType(createDecimalType(20, 3)).build())
.build();
protected static final List<ColumnMetadata> STATISTICS_PARTITIONED_TABLE_COLUMNS = ImmutableList.<ColumnMetadata>builder()
.addAll(STATISTICS_TABLE_COLUMNS)
.add(ColumnMetadata.builder().setName("ds").setType(VARCHAR).build())
.build();
protected static final PartitionStatistics EMPTY_TABLE_STATISTICS = new PartitionStatistics(createZeroStatistics(), ImmutableMap.of());
protected static final PartitionStatistics BASIC_STATISTICS_1 = new PartitionStatistics(new HiveBasicStatistics(0, 20, 3, 0), ImmutableMap.of());
protected static final PartitionStatistics BASIC_STATISTICS_2 = new PartitionStatistics(new HiveBasicStatistics(0, 30, 2, 0), ImmutableMap.of());
private static final PartitionStatistics STATISTICS_1 =
new PartitionStatistics(
BASIC_STATISTICS_1.getBasicStatistics(),
ImmutableMap.<String, HiveColumnStatistics>builder()
.put("t_boolean", createBooleanColumnStatistics(OptionalLong.of(5), OptionalLong.of(6), OptionalLong.of(3)))
.put("t_bigint", createIntegerColumnStatistics(OptionalLong.of(1234L), OptionalLong.of(5678L), OptionalLong.of(2), OptionalLong.of(5)))
.put("t_integer", createIntegerColumnStatistics(OptionalLong.of(123L), OptionalLong.of(567L), OptionalLong.of(3), OptionalLong.of(4)))
.put("t_smallint", createIntegerColumnStatistics(OptionalLong.of(12L), OptionalLong.of(56L), OptionalLong.of(2), OptionalLong.of(6)))
.put("t_tinyint", createIntegerColumnStatistics(OptionalLong.of(1L), OptionalLong.of(2L), OptionalLong.of(1), OptionalLong.of(3)))
.put("t_double", createDoubleColumnStatistics(OptionalDouble.of(1234.25), OptionalDouble.of(5678.58), OptionalLong.of(7), OptionalLong.of(8)))
.put("t_float", createDoubleColumnStatistics(OptionalDouble.of(123.25), OptionalDouble.of(567.58), OptionalLong.of(9), OptionalLong.of(10)))
.put("t_string", createStringColumnStatistics(OptionalLong.of(10), OptionalLong.of(50), OptionalLong.of(3), OptionalLong.of(7)))
.put("t_varchar", createStringColumnStatistics(OptionalLong.of(100), OptionalLong.of(230), OptionalLong.of(5), OptionalLong.of(3)))
.put("t_char", createStringColumnStatistics(OptionalLong.of(5), OptionalLong.of(500), OptionalLong.of(1), OptionalLong.of(4)))
.put("t_varbinary", createBinaryColumnStatistics(OptionalLong.of(4), OptionalLong.of(300), OptionalLong.of(1)))
.put("t_date", createDateColumnStatistics(Optional.of(java.time.LocalDate.ofEpochDay(1)), Optional.of(java.time.LocalDate.ofEpochDay(2)), OptionalLong.of(7), OptionalLong.of(6)))
.put("t_timestamp", createIntegerColumnStatistics(OptionalLong.of(1234567L), OptionalLong.of(71234567L), OptionalLong.of(7), OptionalLong.of(5)))
.put("t_short_decimal", createDecimalColumnStatistics(Optional.of(new BigDecimal(10)), Optional.of(new BigDecimal(12)), OptionalLong.of(3), OptionalLong.of(5)))
.put("t_long_decimal", createDecimalColumnStatistics(Optional.of(new BigDecimal("12345678901234567.123")), Optional.of(new BigDecimal("81234567890123456.123")), OptionalLong.of(2), OptionalLong.of(1)))
.build());
private static final PartitionStatistics STATISTICS_1_1 =
new PartitionStatistics(
new HiveBasicStatistics(OptionalLong.of(0), OptionalLong.of(15), OptionalLong.empty(), OptionalLong.of(0)),
STATISTICS_1.getColumnStatistics().entrySet()
.stream()
.filter(entry -> entry.getKey().hashCode() % 2 == 0)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)));
private static final PartitionStatistics STATISTICS_1_2 =
new PartitionStatistics(
new HiveBasicStatistics(OptionalLong.of(0), OptionalLong.of(15), OptionalLong.of(3), OptionalLong.of(0)),
STATISTICS_1.getColumnStatistics().entrySet()
.stream()
.filter(entry -> entry.getKey().hashCode() % 2 == 1)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)));
private static final PartitionStatistics STATISTICS_2 =
new PartitionStatistics(
BASIC_STATISTICS_2.getBasicStatistics(),
ImmutableMap.<String, HiveColumnStatistics>builder()
.put("t_boolean", createBooleanColumnStatistics(OptionalLong.of(4), OptionalLong.of(3), OptionalLong.of(2)))
.put("t_bigint", createIntegerColumnStatistics(OptionalLong.of(2345L), OptionalLong.of(6789L), OptionalLong.of(4), OptionalLong.of(7)))
.put("t_integer", createIntegerColumnStatistics(OptionalLong.of(234L), OptionalLong.of(678L), OptionalLong.of(5), OptionalLong.of(6)))
.put("t_smallint", createIntegerColumnStatistics(OptionalLong.of(23L), OptionalLong.of(65L), OptionalLong.of(7), OptionalLong.of(5)))
.put("t_tinyint", createIntegerColumnStatistics(OptionalLong.of(12), OptionalLong.of(3L), OptionalLong.of(2), OptionalLong.of(3)))
.put("t_double", createDoubleColumnStatistics(OptionalDouble.of(2345.25), OptionalDouble.of(6785.58), OptionalLong.of(6), OptionalLong.of(3)))
.put("t_float", createDoubleColumnStatistics(OptionalDouble.of(235.25), OptionalDouble.of(676.58), OptionalLong.of(7), OptionalLong.of(11)))
.put("t_string", createStringColumnStatistics(OptionalLong.of(11), OptionalLong.of(600), OptionalLong.of(2), OptionalLong.of(6)))
.put("t_varchar", createStringColumnStatistics(OptionalLong.of(99), OptionalLong.of(223), OptionalLong.of(7), OptionalLong.of(1)))
.put("t_char", createStringColumnStatistics(OptionalLong.of(6), OptionalLong.of(60), OptionalLong.of(0), OptionalLong.of(3)))
.put("t_varbinary", createBinaryColumnStatistics(OptionalLong.of(2), OptionalLong.of(10), OptionalLong.of(2)))
.put("t_date", createDateColumnStatistics(Optional.of(java.time.LocalDate.ofEpochDay(2)), Optional.of(java.time.LocalDate.ofEpochDay(3)), OptionalLong.of(8), OptionalLong.of(7)))
.put("t_timestamp", createIntegerColumnStatistics(OptionalLong.of(2345671L), OptionalLong.of(12345677L), OptionalLong.of(9), OptionalLong.of(1)))
.put("t_short_decimal", createDecimalColumnStatistics(Optional.of(new BigDecimal(11)), Optional.of(new BigDecimal(14)), OptionalLong.of(5), OptionalLong.of(7)))
.put("t_long_decimal", createDecimalColumnStatistics(Optional.of(new BigDecimal("71234567890123456.123")), Optional.of(new BigDecimal("78123456789012345.123")), OptionalLong.of(2), OptionalLong.of(1)))
.build());
private static final PartitionStatistics STATISTICS_EMPTY_OPTIONAL_FIELDS =
new PartitionStatistics(
new HiveBasicStatistics(OptionalLong.of(0), OptionalLong.of(20), OptionalLong.empty(), OptionalLong.of(0)),
ImmutableMap.<String, HiveColumnStatistics>builder()
.put("t_boolean", createBooleanColumnStatistics(OptionalLong.of(4), OptionalLong.of(3), OptionalLong.of(2)))
.put("t_bigint", createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.of(4), OptionalLong.of(7)))
.put("t_integer", createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.of(5), OptionalLong.of(6)))
.put("t_smallint", createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.of(7), OptionalLong.of(5)))
.put("t_tinyint", createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.of(2), OptionalLong.of(3)))
.put("t_double", createDoubleColumnStatistics(OptionalDouble.empty(), OptionalDouble.empty(), OptionalLong.of(6), OptionalLong.of(3)))
.put("t_float", createDoubleColumnStatistics(OptionalDouble.empty(), OptionalDouble.empty(), OptionalLong.of(7), OptionalLong.of(11)))
.put("t_string", createStringColumnStatistics(OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(2), OptionalLong.of(6)))
.put("t_varchar", createStringColumnStatistics(OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(7), OptionalLong.of(1)))
.put("t_char", createStringColumnStatistics(OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(3)))
.put("t_varbinary", createBinaryColumnStatistics(OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(2)))
// https://issues.apache.org/jira/browse/HIVE-20098
// .put("t_date", createDateColumnStatistics(Optional.empty(), Optional.empty(), OptionalLong.of(8), OptionalLong.of(7)))
.put("t_timestamp", createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.of(9), OptionalLong.of(1)))
.put("t_short_decimal", createDecimalColumnStatistics(Optional.empty(), Optional.empty(), OptionalLong.of(5), OptionalLong.of(7)))
.put("t_long_decimal", createDecimalColumnStatistics(Optional.empty(), Optional.empty(), OptionalLong.of(2), OptionalLong.of(1)))
.build());
private static final List<ColumnMetadata> TEMPORARY_TABLE_COLUMNS = ImmutableList.<ColumnMetadata>builder()
.add(ColumnMetadata.builder().setName("id").setType(VARCHAR).build())
.add(ColumnMetadata.builder().setName("value").setType(VARCHAR).build())
.build();
private static final int TEMPORARY_TABLE_BUCKET_COUNT = 4;
private static final List<String> TEMPORARY_TABLE_BUCKET_COLUMNS = ImmutableList.of("id");
public static final MaterializedResult TEMPORARY_TABLE_DATA = MaterializedResult.resultBuilder(SESSION, VARCHAR, VARCHAR)
.row("1", "value1")
.row("2", "value2")
.row("3", "value3")
.row("1", "value4")
.row("2", "value5")
.row("3", "value6")
.build();
public static final SplitSchedulingContext SPLIT_SCHEDULING_CONTEXT = new SplitSchedulingContext(UNGROUPED_SCHEDULING, false, WarningCollector.NOOP);
protected String clientId;
protected String database;
protected SchemaTableName tablePartitionFormat;
protected SchemaTableName tableUnpartitioned;
protected SchemaTableName tableOffline;
protected SchemaTableName tableOfflinePartition;
protected SchemaTableName tableNotReadable;
protected SchemaTableName view;
protected SchemaTableName invalidTable;
protected SchemaTableName tableBucketedStringInt;
protected SchemaTableName tableBucketedBigintBoolean;
protected SchemaTableName tableBucketedDoubleFloat;
protected SchemaTableName tablePartitionSchemaChange;
protected SchemaTableName tablePartitionSchemaChangeNonCanonical;
protected SchemaTableName tableBucketEvolution;
protected SchemaTableName tableConstraintsSingleKeyRely;
protected SchemaTableName tableConstraintsMultiKeyRely;
protected SchemaTableName tableConstraintsSingleKeyNoRely;
protected SchemaTableName tableConstraintsMultiKeyNoRely;
protected List<SchemaTableName> constraintsTableList;
private static final List<TableConstraint<String>> constraintsSingleKeyRely = ImmutableList.of(new PrimaryKeyConstraint<>(Optional.empty(), new LinkedHashSet<>(ImmutableList.of("c1")), false, true, false),
new UniqueConstraint<>(Optional.of("uk1"), new LinkedHashSet<>(ImmutableList.of("c2")), false, true, false));
private static final List<TableConstraint<String>> constraintsMultiKeyRely = ImmutableList.of(new PrimaryKeyConstraint<>(Optional.empty(), new LinkedHashSet<>(ImmutableList.of("c1", "c2")), false, true, false),
new UniqueConstraint<>(Optional.of("uk2"), new LinkedHashSet<>(ImmutableList.of("c3", "c4")), false, true, false));
private static final List<TableConstraint<String>> constraintsSingleKeyNoRely = ImmutableList.of(new PrimaryKeyConstraint<>(Optional.empty(), new LinkedHashSet<>(ImmutableList.of("c1")), false, false, false),
new UniqueConstraint<>(Optional.of("uk3"), new LinkedHashSet<>(ImmutableList.of("c2")), false, false, false));
private static final List<TableConstraint<String>> constraintsMultiKeyNoRely = ImmutableList.of(new PrimaryKeyConstraint<>(Optional.empty(), new LinkedHashSet<>(ImmutableList.of("c1", "c2")), false, false, false),
new UniqueConstraint<>(Optional.of("uk4"), new LinkedHashSet<>(ImmutableList.of("c3", "c4")), false, false, false));
Map<SchemaTableName, List<TableConstraint<String>>> tableConstraintsMap;
protected String invalidClientId;
protected HiveTableHandle invalidTableHandle;
protected HiveColumnHandle dsColumn;
protected HiveColumnHandle fileFormatColumn;
protected HiveColumnHandle dummyColumn;
protected HiveColumnHandle intColumn;
protected HiveColumnHandle invalidColumnHandle;
protected int partitionCount;
protected TupleDomain<ColumnHandle> tupleDomain;
protected ConnectorTableLayout tableLayout;
protected ConnectorTableLayout unpartitionedTableLayout;
protected ConnectorTableLayoutHandle invalidTableLayoutHandle;
protected DateTimeZone timeZone;
protected HdfsEnvironment hdfsEnvironment;
protected LocationService locationService;
protected HiveMetadataFactory metadataFactory;
protected HiveTransactionManager transactionManager;
protected HivePartitionManager hivePartitionManager;
protected ExtendedHiveMetastore metastoreClient;
protected HiveEncryptionInformationProvider encryptionInformationProvider;
protected ConnectorSplitManager splitManager;
protected ConnectorPageSourceProvider pageSourceProvider;
protected ConnectorPageSinkProvider pageSinkProvider;
protected ExecutorService executor;
@BeforeClass
public void setupClass()
{
executor = newCachedThreadPool(daemonThreadsNamed("hive-%s"));
}
@AfterClass(alwaysRun = true)
public void tearDown()
{
if (executor != null) {
executor.shutdownNow();
executor = null;
}
}
protected void setupHive(String connectorId, String databaseName, String timeZoneId)
{
clientId = connectorId;
database = databaseName;
tablePartitionFormat = new SchemaTableName(database, "presto_test_partition_format");
tableUnpartitioned = new SchemaTableName(database, "presto_test_unpartitioned");
tableOffline = new SchemaTableName(database, "presto_test_offline");
tableOfflinePartition = new SchemaTableName(database, "presto_test_offline_partition");
tableNotReadable = new SchemaTableName(database, "presto_test_not_readable");
view = new SchemaTableName(database, "presto_test_view");
invalidTable = new SchemaTableName(database, INVALID_TABLE);
tableBucketedStringInt = new SchemaTableName(database, "presto_test_bucketed_by_string_int");
tableBucketedBigintBoolean = new SchemaTableName(database, "presto_test_bucketed_by_bigint_boolean");
tableBucketedDoubleFloat = new SchemaTableName(database, "presto_test_bucketed_by_double_float");
tablePartitionSchemaChange = new SchemaTableName(database, "presto_test_partition_schema_change");
tablePartitionSchemaChangeNonCanonical = new SchemaTableName(database, "presto_test_partition_schema_change_non_canonical");
tableBucketEvolution = new SchemaTableName(database, "presto_test_bucket_evolution");
tableConstraintsSingleKeyRely = new SchemaTableName(database, "test_constraints1");
tableConstraintsMultiKeyRely = new SchemaTableName(database, "test_constraints2");
tableConstraintsSingleKeyNoRely = new SchemaTableName(database, "test_constraints3");
tableConstraintsMultiKeyNoRely = new SchemaTableName(database, "test_constraints4");
constraintsTableList = ImmutableList.of(tableConstraintsSingleKeyRely,
tableConstraintsMultiKeyRely,
tableConstraintsSingleKeyNoRely,
tableConstraintsMultiKeyNoRely,
tablePartitionFormat);
tableConstraintsMap = ImmutableMap.of(tableConstraintsSingleKeyRely, constraintsSingleKeyRely,
tableConstraintsMultiKeyRely, constraintsMultiKeyRely,
tableConstraintsSingleKeyNoRely, constraintsSingleKeyNoRely,
tableConstraintsMultiKeyNoRely, constraintsMultiKeyNoRely,
tablePartitionFormat, ImmutableList.of());
invalidClientId = "hive";
invalidTableHandle = new HiveTableHandle(database, INVALID_TABLE);
invalidTableLayoutHandle = new HiveTableLayoutHandle.Builder()
.setSchemaTableName(invalidTable)
.setTablePath("path")
.setPartitionColumns(ImmutableList.of())
.setDataColumns(ImmutableList.of())
.setTableParameters(ImmutableMap.of())
.setDomainPredicate(TupleDomain.all())
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(ImmutableMap.of())
.setPartitionColumnPredicate(TupleDomain.all())
.setPartitions(ImmutableList.of(new HivePartition(invalidTable, new PartitionNameWithVersion("unknown", Optional.empty()), ImmutableMap.of())))
.setBucketHandle(Optional.empty())
.setBucketFilter(Optional.empty())
.setPushdownFilterEnabled(false)
.setLayoutString("layout")
.setRequestedColumns(Optional.empty())
.setPartialAggregationsPushedDown(false)
.setAppendRowNumberEnabled(false)
.setHiveTableHandle(invalidTableHandle)
.build();
int partitionColumnIndex = MAX_PARTITION_KEY_COLUMN_INDEX;
dsColumn = new HiveColumnHandle("ds", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), partitionColumnIndex--, PARTITION_KEY, Optional.empty(), Optional.empty());
fileFormatColumn = new HiveColumnHandle("file_format", HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), partitionColumnIndex--, PARTITION_KEY, Optional.empty(), Optional.empty());
dummyColumn = new HiveColumnHandle("dummy", HIVE_INT, parseTypeSignature(StandardTypes.INTEGER), partitionColumnIndex--, PARTITION_KEY, Optional.empty(), Optional.empty());
intColumn = new HiveColumnHandle("t_int", HIVE_INT, parseTypeSignature(StandardTypes.INTEGER), partitionColumnIndex--, PARTITION_KEY, Optional.empty(), Optional.empty());
invalidColumnHandle = new HiveColumnHandle(INVALID_COLUMN, HIVE_STRING, parseTypeSignature(StandardTypes.VARCHAR), 0, REGULAR, Optional.empty(), Optional.empty());
List<HiveColumnHandle> partitionColumns = ImmutableList.of(dsColumn, fileFormatColumn, dummyColumn);
List<HivePartition> partitions = ImmutableList.<HivePartition>builder()
.add(new HivePartition(tablePartitionFormat,
new PartitionNameWithVersion("ds=2012-12-29/file_format=textfile/dummy=1", Optional.empty()),
ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(dsColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("2012-12-29")))
.put(fileFormatColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("textfile")))
.put(dummyColumn, NullableValue.of(INTEGER, 1L))
.build()))
.add(new HivePartition(tablePartitionFormat,
new PartitionNameWithVersion("ds=2012-12-29/file_format=sequencefile/dummy=2", Optional.empty()),
ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(dsColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("2012-12-29")))
.put(fileFormatColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("sequencefile")))
.put(dummyColumn, NullableValue.of(INTEGER, 2L))
.build()))
.add(new HivePartition(tablePartitionFormat,
new PartitionNameWithVersion("ds=2012-12-29/file_format=rctext/dummy=3", Optional.empty()),
ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(dsColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("2012-12-29")))
.put(fileFormatColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("rctext")))
.put(dummyColumn, NullableValue.of(INTEGER, 3L))
.build()))
.add(new HivePartition(tablePartitionFormat,
new PartitionNameWithVersion("ds=2012-12-29/file_format=rcbinary/dummy=4", Optional.empty()),
ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(dsColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("2012-12-29")))
.put(fileFormatColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("rcbinary")))
.put(dummyColumn, NullableValue.of(INTEGER, 4L))
.build()))
.build();
partitionCount = partitions.size();
tupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(dsColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice("2012-12-29"))));
TupleDomain<Subfield> domainPredicate = tupleDomain.transform(HiveColumnHandle.class::cast)
.transform(column -> new Subfield(column.getName(), ImmutableList.of()));
List<Column> dataColumns = ImmutableList.of(
new Column("t_string", HIVE_STRING, Optional.empty(), Optional.empty()),
new Column("t_tinyint", HIVE_BYTE, Optional.empty(), Optional.empty()),
new Column("t_smallint", HIVE_SHORT, Optional.empty(), Optional.empty()),
new Column("t_int", HIVE_INT, Optional.empty(), Optional.empty()),
new Column("t_bigint", HIVE_LONG, Optional.empty(), Optional.empty()),
new Column("t_float", HIVE_FLOAT, Optional.empty(), Optional.empty()),
new Column("t_double", HIVE_DOUBLE, Optional.empty(), Optional.empty()),
new Column("t_boolean", HIVE_BOOLEAN, Optional.empty(), Optional.empty()));
tableLayout = new ConnectorTableLayout(
new HiveTableLayoutHandle.Builder()
.setSchemaTableName(tablePartitionFormat).setTablePath("path")
.setPartitionColumns(ImmutableList.copyOf(partitionColumns))
.setDataColumns(dataColumns)
.setTableParameters(ImmutableMap.of())
.setDomainPredicate(domainPredicate)
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(ImmutableMap.of(dsColumn.getName(), dsColumn))
.setPartitionColumnPredicate(tupleDomain)
.setPartitions(partitions).setBucketHandle(Optional.empty())
.setBucketFilter(Optional.empty())
.setPushdownFilterEnabled(false)
.setLayoutString("layout")
.setRequestedColumns(Optional.empty())
.setPartialAggregationsPushedDown(false)
.setAppendRowNumberEnabled(false)
.build(),
Optional.empty(),
withColumnDomains(ImmutableMap.of(
dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false),
fileFormatColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("textfile")), Range.equal(createUnboundedVarcharType(), utf8Slice("sequencefile")), Range.equal(createUnboundedVarcharType(), utf8Slice("rctext")), Range.equal(createUnboundedVarcharType(), utf8Slice("rcbinary"))), false),
dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 1L), Range.equal(INTEGER, 2L), Range.equal(INTEGER, 3L), Range.equal(INTEGER, 4L)), false))),
Optional.empty(),
Optional.empty(),
Optional.of(new DiscretePredicates(ImmutableList.copyOf(partitionColumns), ImmutableList.of(
withColumnDomains(ImmutableMap.of(
dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false),
fileFormatColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("textfile"))), false),
dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 1L)), false))),
withColumnDomains(ImmutableMap.of(
dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false),
fileFormatColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("sequencefile"))), false),
dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 2L)), false))),
withColumnDomains(ImmutableMap.of(
dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false),
fileFormatColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("rctext"))), false),
dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 3L)), false))),
withColumnDomains(ImmutableMap.of(
dsColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("2012-12-29"))), false),
fileFormatColumn, Domain.create(ValueSet.ofRanges(Range.equal(createUnboundedVarcharType(), utf8Slice("rcbinary"))), false),
dummyColumn, Domain.create(ValueSet.ofRanges(Range.equal(INTEGER, 4L)), false)))))),
ImmutableList.of());
List<HivePartition> unpartitionedPartitions = ImmutableList.of(new HivePartition(tableUnpartitioned));
unpartitionedTableLayout = new ConnectorTableLayout(
new HiveTableLayoutHandle.Builder()
.setSchemaTableName(tableUnpartitioned)
.setTablePath("path")
.setPartitionColumns(ImmutableList.of())
.setDataColumns(ImmutableList.of(
new Column("t_string", HIVE_STRING, Optional.empty(), Optional.empty()),
new Column("t_tinyint", HIVE_BYTE, Optional.empty(), Optional.empty())))
.setTableParameters(ImmutableMap.of())
.setDomainPredicate(TupleDomain.all())
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(ImmutableMap.of())
.setPartitionColumnPredicate(TupleDomain.all())
.setPartitions(unpartitionedPartitions)
.setBucketHandle(Optional.empty())
.setBucketFilter(Optional.empty())
.setPushdownFilterEnabled(false)
.setLayoutString("layout")
.setRequestedColumns(Optional.empty())
.setPartialAggregationsPushedDown(false)
.setAppendRowNumberEnabled(false)
.build());
timeZone = DateTimeZone.forTimeZone(TimeZone.getTimeZone(ZoneId.of(timeZoneId)));
}
protected final void setup(String host, int port, String databaseName, String timeZone)
{
HiveClientConfig hiveClientConfig = getHiveClientConfig();
CacheConfig cacheConfig = getCacheConfig();
MetastoreClientConfig metastoreClientConfig = getMetastoreClientConfig();
ThriftHiveMetastoreConfig thriftHiveMetastoreConfig = getThriftHiveMetastoreConfig();
hiveClientConfig.setTimeZone(timeZone);
String proxy = System.getProperty("hive.metastore.thrift.client.socks-proxy");
if (proxy != null) {
metastoreClientConfig.setMetastoreSocksProxy(HostAndPort.fromString(proxy));
}
HiveCluster hiveCluster = new TestingHiveCluster(metastoreClientConfig, thriftHiveMetastoreConfig, host, port, new HiveCommonClientConfig());
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
ExtendedHiveMetastore metastore = new InMemoryCachingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster, metastoreClientConfig, hdfsEnvironment), new HivePartitionMutator()),
executor,
false,
Duration.valueOf("1m"),
Duration.valueOf("15s"),
10000,
false,
MetastoreCacheScope.ALL,
0.0,
metastoreClientConfig.getPartitionCacheColumnCountLimit(),
NOOP_METASTORE_CACHE_STATS);
setup(databaseName, hiveClientConfig, cacheConfig, metastoreClientConfig, metastore);
}
protected final void setup(String databaseName, HiveClientConfig hiveClientConfig, CacheConfig cacheConfig, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore hiveMetastore)
{
HiveConnectorId connectorId = new HiveConnectorId("hive-test");
setupHive(connectorId.toString(), databaseName, hiveClientConfig.getTimeZone());
hivePartitionManager = new HivePartitionManager(FUNCTION_AND_TYPE_MANAGER, hiveClientConfig);
metastoreClient = hiveMetastore;
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
locationService = new HiveLocationService(hdfsEnvironment);
metadataFactory = new HiveMetadataFactory(
metastoreClient,
hdfsEnvironment,
hivePartitionManager,
timeZone,
true,
false,
false,
true,
true,
getHiveClientConfig().getMaxPartitionBatchSize(),
getHiveClientConfig().getMaxPartitionsPerScan(),
false,
10_000,
FUNCTION_AND_TYPE_MANAGER,
locationService,
FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE,
FILTER_STATS_CALCULATOR_SERVICE,
new TableParameterCodec(),
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
listeningDecorator(executor),
new HiveTypeTranslator(),
new HiveStagingFileCommitter(hdfsEnvironment, listeningDecorator(executor)),
new HiveZeroRowFileCreator(hdfsEnvironment, new OutputStreamDataSinkFactory(), listeningDecorator(executor)),
TEST_SERVER_VERSION,
new HivePartitionObjectBuilder(),
new HiveEncryptionInformationProvider(ImmutableList.of()),
new HivePartitionStats(),
new HiveFileRenamer(),
DEFAULT_COLUMN_CONVERTER_PROVIDER,
new QuickStatsProvider(metastoreClient, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
new HiveTableWritabilityChecker(false));
transactionManager = new HiveTransactionManager();
encryptionInformationProvider = new HiveEncryptionInformationProvider(ImmutableList.of());
splitManager = new HiveSplitManager(
transactionManager,
new NamenodeStats(),
hdfsEnvironment,
new CachingDirectoryLister(new HadoopDirectoryLister(), new HiveClientConfig()),
directExecutor(),
new HiveCoercionPolicy(FUNCTION_AND_TYPE_MANAGER),
new CounterStat(),
100,
hiveClientConfig.getMaxOutstandingSplitsSize(),
hiveClientConfig.getMinPartitionBatchSize(),
hiveClientConfig.getMaxPartitionBatchSize(),
hiveClientConfig.getSplitLoaderConcurrency(),
false,
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
encryptionInformationProvider,
new HivePartitionSkippabilityChecker());
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(hiveClientConfig, metastoreClientConfig),
hdfsEnvironment,
PAGE_SORTER,
metastoreClient,
new GroupByHashPageIndexerFactory(JOIN_COMPILER),
FUNCTION_AND_TYPE_MANAGER,
getHiveClientConfig(),
getMetastoreClientConfig(),
getSortingFileWriterConfig(),
locationService,
HiveTestUtils.PARTITION_UPDATE_CODEC,
HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
new TestingNodeManager("fake-environment"),
new HiveEventClient(),
new HiveSessionProperties(hiveClientConfig, new OrcFileWriterConfig(), new ParquetFileWriterConfig(), new CacheConfig()),
new HiveWriterStats(),
getDefaultOrcFileWriterFactory(hiveClientConfig, metastoreClientConfig),
DEFAULT_COLUMN_CONVERTER_PROVIDER);
pageSourceProvider = new HivePageSourceProvider(
hiveClientConfig,
hdfsEnvironment,
getDefaultHiveRecordCursorProvider(hiveClientConfig, metastoreClientConfig),
getDefaultHiveBatchPageSourceFactories(hiveClientConfig, metastoreClientConfig),
getDefaultHiveSelectivePageSourceFactories(hiveClientConfig, metastoreClientConfig),
getDefaultHiveAggregatedPageSourceFactories(hiveClientConfig, metastoreClientConfig),
FUNCTION_AND_TYPE_MANAGER,
ROW_EXPRESSION_SERVICE);
}
/**
* Allow subclass to change default configuration.
*/
protected HiveClientConfig getHiveClientConfig()
{
return new HiveClientConfig()
.setTemporaryTableSchema(database)
.setCreateEmptyBucketFilesForTemporaryTable(false);
}
protected SortingFileWriterConfig getSortingFileWriterConfig()
{
return new SortingFileWriterConfig()
.setMaxOpenSortFiles(10)
.setWriterSortBufferSize(new DataSize(100, KILOBYTE));
}
protected HiveCommonClientConfig getHiveCommonClientConfig()
{
return new HiveCommonClientConfig();
}
protected CacheConfig getCacheConfig()
{
return new CacheConfig().setCacheQuotaScope(CACHE_SCOPE).setDefaultCacheQuota(DEFAULT_QUOTA_SIZE);
}
protected MetastoreClientConfig getMetastoreClientConfig()
{
return new MetastoreClientConfig();
}
protected ThriftHiveMetastoreConfig getThriftHiveMetastoreConfig()
{
return new ThriftHiveMetastoreConfig();
}
protected ConnectorSession newSession()
{
return newSession(getHiveClientConfig(), getHiveCommonClientConfig());
}
protected ConnectorSession newSession(HiveClientConfig hiveClientConfig, HiveCommonClientConfig hiveCommonClientConfig)
{
return new TestingConnectorSession(getAllSessionProperties(hiveClientConfig, hiveCommonClientConfig));
}
protected ConnectorSession newSession(Map<String, Object> extraProperties)
{
ConnectorSession session = newSession();
return new ConnectorSession()
{
@Override
public String getQueryId()
{
return session.getQueryId();
}
@Override
public Optional<String> getSource()
{
return session.getSource();
}
@Override
public ConnectorIdentity getIdentity()
{
return session.getIdentity();
}
@Override
public TimeZoneKey getTimeZoneKey()
{
return session.getTimeZoneKey();
}
@Override
public Locale getLocale()
{
return session.getLocale();
}
@Override
public Optional<String> getTraceToken()
{
return session.getTraceToken();
}
@Override
public Optional<String> getClientInfo()
{
return session.getClientInfo();
}
@Override
public Set<String> getClientTags()
{
return session.getClientTags();
}
@Override
public long getStartTime()
{
return session.getStartTime();
}
@Override
public SqlFunctionProperties getSqlFunctionProperties()
{
return session.getSqlFunctionProperties();
}
@Override
public Map<SqlFunctionId, SqlInvokedFunction> getSessionFunctions()
{
return session.getSessionFunctions();
}
@Override
public <T> T getProperty(String name, Class<T> type)
{
Object value = extraProperties.get(name);
if (value != null) {
return type.cast(value);
}
return session.getProperty(name, type);
}
@Override
public Optional<String> getSchema()
{
return Optional.empty();
}
@Override
public WarningCollector getWarningCollector()
{
return WarningCollector.NOOP;
}
@Override
public RuntimeStats getRuntimeStats()
{
return session.getRuntimeStats();
}
@Override
public ConnectorSession forConnectorId(ConnectorId connectorId)
{
return this;
}
};
}
protected Transaction newTransaction()
{
return new HiveTransaction(transactionManager, metadataFactory.get());
}
protected interface Transaction
extends AutoCloseable
{
ConnectorMetadata getMetadata();
SemiTransactionalHiveMetastore getMetastore();
ConnectorTransactionHandle getTransactionHandle();
void commit();
void rollback();
@Override
void close();
}
static class HiveTransaction
implements Transaction
{
private final HiveTransactionManager transactionManager;
private final ConnectorTransactionHandle transactionHandle;
private boolean closed;
public HiveTransaction(HiveTransactionManager transactionManager, HiveMetadata hiveMetadata)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.transactionHandle = new HiveTransactionHandle();
transactionManager.put(transactionHandle, hiveMetadata);
getMetastore().testOnlyThrowOnCleanupFailures();
}
@Override
public ConnectorMetadata getMetadata()
{
return transactionManager.get(transactionHandle);
}
@Override
public SemiTransactionalHiveMetastore getMetastore()
{
return transactionManager.get(transactionHandle).getMetastore();
}
@Override
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}
@Override
public void commit()
{
checkState(!closed);
closed = true;
TransactionalMetadata metadata = transactionManager.remove(transactionHandle);
checkArgument(metadata != null, "no such transaction: %s", transactionHandle);
metadata.commit();
}
@Override
public void rollback()
{
checkState(!closed);
closed = true;
TransactionalMetadata metadata = transactionManager.remove(transactionHandle);
checkArgument(metadata != null, "no such transaction: %s", transactionHandle);
metadata.rollback();
}
@Override
public void close()
{
if (!closed) {
try {
getMetastore().testOnlyCheckIsReadOnly(); // transactions in this test with writes in it must explicitly commit or rollback
}
finally {
rollback();
}
}
}
}
@Test
public void testGetDatabaseNames()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
List<String> databases = metadata.listSchemaNames(newSession());
assertTrue(databases.contains(database));
}
}
@Test
public void testGetTableNames()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
List<SchemaTableName> tables = metadata.listTables(newSession(), database);
assertTrue(tables.contains(tablePartitionFormat));
assertTrue(tables.contains(tableUnpartitioned));
}
}
@Test
public void testGetAllTableNames()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
List<SchemaTableName> tables = metadata.listTables(newSession(), Optional.empty());
assertTrue(tables.contains(tablePartitionFormat));
assertTrue(tables.contains(tableUnpartitioned));
}
}
@Test
public void testGetAllTableColumns()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Map<SchemaTableName, List<ColumnMetadata>> allColumns = metadata.listTableColumns(newSession(), new SchemaTablePrefix());
assertTrue(allColumns.containsKey(tablePartitionFormat));
assertTrue(allColumns.containsKey(tableUnpartitioned));
}
}
@Test
public void testGetAllTableColumnsInSchema()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Map<SchemaTableName, List<ColumnMetadata>> allColumns = metadata.listTableColumns(newSession(), new SchemaTablePrefix(database));
assertTrue(allColumns.containsKey(tablePartitionFormat));
assertTrue(allColumns.containsKey(tableUnpartitioned));
}
}
@Test
public void testListUnknownSchema()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
assertNull(metadata.getTableHandle(session, new SchemaTableName(INVALID_DATABASE, INVALID_TABLE)));
assertEquals(metadata.listTables(session, INVALID_DATABASE), ImmutableList.of());
assertEquals(metadata.listTableColumns(session, new SchemaTablePrefix(INVALID_DATABASE, INVALID_TABLE)), ImmutableMap.of());
assertEquals(metadata.listViews(session, INVALID_DATABASE), ImmutableList.of());
assertEquals(metadata.getViews(session, new SchemaTablePrefix(INVALID_DATABASE, INVALID_TABLE)), ImmutableMap.of());
}
}
@Test
public void testGetPartitions()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tablePartitionFormat);
assertExpectedTableLayout(getTableLayout(newSession(), metadata, tableHandle, Constraint.alwaysTrue(), transaction), tableLayout);
}
}
@Test
public void testGetPartitionsWithBindings()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tablePartitionFormat);
ConnectorTableLayout actualTableLayout = getTableLayout(newSession(), metadata, tableHandle, new Constraint<>(withColumnDomains(ImmutableMap.of(intColumn, Domain.singleValue(BIGINT, 5L)))), transaction);
assertExpectedTableLayout(actualTableLayout, tableLayout);
}
}
@Test(expectedExceptions = TableNotFoundException.class)
public void testGetPartitionsException()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
getTableLayout(newSession(), metadata, invalidTableHandle, Constraint.alwaysTrue(), transaction);
}
}
@Test
public void testGetPartitionNames()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tablePartitionFormat);
assertExpectedTableLayout(getTableLayout(newSession(), metadata, tableHandle, Constraint.alwaysTrue(), transaction), tableLayout);
}
}
@Test
public void testMismatchSchemaTable()
throws Exception
{
boolean pushdownFilterEnabled = getHiveClientConfig().isPushdownFilterEnabled();
for (HiveStorageFormat storageFormat : createTableFormats) {
// TODO: fix coercion for JSON or PAGEFILE
if (storageFormat == JSON || storageFormat == PAGEFILE) {
continue;
}
SchemaTableName temporaryMismatchSchemaTable = temporaryTable("mismatch_schema");
try {
doTestMismatchSchemaTable(
temporaryMismatchSchemaTable,
storageFormat,
MISMATCH_SCHEMA_TABLE_BEFORE,
MISMATCH_SCHEMA_TABLE_DATA_BEFORE,
MISMATCH_SCHEMA_TABLE_AFTER,
MISMATCH_SCHEMA_TABLE_DATA_AFTER,
pushdownFilterEnabled ? MISMATCH_SCHEMA_TABLE_AFTER_FILTERS : ImmutableList.of(),
pushdownFilterEnabled ? MISMATCH_SCHEMA_TABLE_AFTER_RESULT_PREDICATES : ImmutableList.of());
}
finally {
dropTable(temporaryMismatchSchemaTable);
}
}
}
protected void doTestMismatchSchemaTable(
SchemaTableName schemaTableName,
HiveStorageFormat storageFormat,
List<ColumnMetadata> tableBefore,
MaterializedResult dataBefore,
List<ColumnMetadata> tableAfter,
MaterializedResult dataAfter,
List<RowExpression> afterFilters,
List<Predicate<MaterializedRow>> afterResultPredicates)
throws Exception
{
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
doCreateEmptyTable(schemaTableName, storageFormat, tableBefore);
// insert the data
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, schemaTableName);
ConnectorInsertTableHandle insertTableHandle = metadata.beginInsert(session, tableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, TEST_HIVE_PAGE_SINK_CONTEXT);
sink.appendPage(dataBefore.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
transaction.commit();
}
// load the table and verify the data
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, schemaTableName);
List<ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle).values().stream()
.filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
.collect(toList());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(result.getMaterializedRows(), dataBefore.getMaterializedRows());
transaction.commit();
}
// alter the table schema
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getClientTags(),
session.getSource(),
getMetastoreHeaders(session),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER,
session.getWarningCollector(),
session.getRuntimeStats());
PrincipalPrivileges principalPrivileges = testingPrincipalPrivilege(session);
Table oldTable = transaction.getMetastore().getTable(metastoreContext, schemaName, tableName).get();
HiveTypeTranslator hiveTypeTranslator = new HiveTypeTranslator();
List<Column> dataColumns = tableAfter.stream()
.filter(columnMetadata -> !columnMetadata.getName().equals("ds"))
.map(columnMetadata -> new Column(columnMetadata.getName(), toHiveType(hiveTypeTranslator, columnMetadata.getType()), Optional.empty(), Optional.empty()))
.collect(toList());
Table.Builder newTable = Table.builder(oldTable)
.setDataColumns(dataColumns);
transaction.getMetastore().replaceView(metastoreContext, schemaName, tableName, newTable.build(), principalPrivileges);
transaction.commit();
}
// load the altered table and verify the data
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, schemaTableName);
List<ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle).values().stream()
.filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
.collect(toList());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(result.getMaterializedRows(), dataAfter.getMaterializedRows());
int filterCount = afterFilters.size();
for (int i = 0; i < filterCount; i++) {
RowExpression predicate = afterFilters.get(i);
ConnectorTableLayoutHandle layoutHandle = pushdownFilter(
session,
metadata,
transaction.getMetastore(),
ROW_EXPRESSION_SERVICE,
FUNCTION_RESOLUTION,
hivePartitionManager,
METADATA.getFunctionAndTypeManager(),
tableHandle,
predicate,
Optional.empty()).getLayout().getHandle();
// Read all columns with a filter
MaterializedResult filteredResult = readTable(transaction, tableHandle, layoutHandle, columnHandles, session, OptionalInt.empty(), Optional.empty());
Predicate<MaterializedRow> rowPredicate = afterResultPredicates.get(i);
List<MaterializedRow> expectedRows = dataAfter.getMaterializedRows().stream().filter(rowPredicate::apply).collect(toList());
assertEqualsIgnoreOrder(filteredResult.getMaterializedRows(), expectedRows);
// Read all columns except the ones used in the filter
Set<String> filterColumnNames = extractUnique(predicate).stream().map(VariableReferenceExpression::getName).collect(toImmutableSet());
List<ColumnHandle> nonFilterColumns = columnHandles.stream()
.filter(column -> !filterColumnNames.contains(((HiveColumnHandle) column).getName()))
.collect(toList());
int resultCount = readTable(transaction, tableHandle, layoutHandle, nonFilterColumns, session, OptionalInt.empty(), Optional.empty()).getRowCount();
assertEquals(resultCount, expectedRows.size());
}
transaction.commit();
}
// insertions to the partitions with type mismatches should fail
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, schemaTableName);
ConnectorInsertTableHandle insertTableHandle = metadata.beginInsert(session, tableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, TEST_HIVE_PAGE_SINK_CONTEXT);
sink.appendPage(dataAfter.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
transaction.commit();
fail("expected exception");
}
catch (PrestoException e) {
// expected
assertEquals(e.getErrorCode(), HIVE_PARTITION_SCHEMA_MISMATCH.toErrorCode());
}
}
protected void assertExpectedTableLayout(ConnectorTableLayout actualTableLayout, ConnectorTableLayout expectedTableLayout)
{
assertExpectedTableLayoutHandle(actualTableLayout.getHandle(), expectedTableLayout.getHandle());
assertEquals(actualTableLayout.getPredicate(), expectedTableLayout.getPredicate());
assertEquals(actualTableLayout.getDiscretePredicates().isPresent(), expectedTableLayout.getDiscretePredicates().isPresent());
actualTableLayout.getDiscretePredicates().ifPresent(actual -> {
DiscretePredicates expected = expectedTableLayout.getDiscretePredicates().get();
assertEquals(actual.getColumns(), expected.getColumns());
assertEqualsIgnoreOrder(actual.getPredicates(), expected.getPredicates());
});
assertEquals(actualTableLayout.getStreamPartitioningColumns(), expectedTableLayout.getStreamPartitioningColumns());
assertEquals(actualTableLayout.getLocalProperties(), expectedTableLayout.getLocalProperties());
}
protected void assertExpectedTableLayoutHandle(ConnectorTableLayoutHandle actualTableLayoutHandle, ConnectorTableLayoutHandle expectedTableLayoutHandle)
{
assertInstanceOf(actualTableLayoutHandle, HiveTableLayoutHandle.class);
assertInstanceOf(expectedTableLayoutHandle, HiveTableLayoutHandle.class);
HiveTableLayoutHandle actual = (HiveTableLayoutHandle) actualTableLayoutHandle;
HiveTableLayoutHandle expected = (HiveTableLayoutHandle) expectedTableLayoutHandle;
assertExpectedPartitions(actual.getPartitions().get(), expected.getPartitions().get());
}
protected void assertExpectedPartitions(List<HivePartition> actualPartitions, Iterable<HivePartition> expectedPartitions)
{
Map<PartitionNameWithVersion, ?> actualById = uniqueIndex(actualPartitions, HivePartition::getPartitionId);
for (Object expected : expectedPartitions) {
assertInstanceOf(expected, HivePartition.class);
HivePartition expectedPartition = (HivePartition) expected;
Object actual = actualById.get(expectedPartition.getPartitionId());
assertEquals(actual, expected);
assertInstanceOf(actual, HivePartition.class);
HivePartition actualPartition = (HivePartition) actual;
assertNotNull(actualPartition, "partition " + expectedPartition.getPartitionId());
assertEquals(actualPartition.getPartitionId(), expectedPartition.getPartitionId());
assertEquals(actualPartition.getKeys(), expectedPartition.getKeys());
assertEquals(actualPartition.getTableName(), expectedPartition.getTableName());
}
}
@Test
public void testGetPartitionNamesUnpartitioned()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableUnpartitioned);
ConnectorTableLayout tableLayout = getTableLayout(newSession(), metadata, tableHandle, Constraint.alwaysTrue(), transaction);
assertEquals(getAllPartitions(tableLayout.getHandle()).size(), 1);
assertExpectedTableLayout(tableLayout, unpartitionedTableLayout);
}
}
@Test(expectedExceptions = TableNotFoundException.class)
public void testGetPartitionNamesException()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
getTableLayout(newSession(), metadata, invalidTableHandle, Constraint.alwaysTrue(), transaction);
}
}
@Test
public void testGetTableSchemaPartitionFormat()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(newSession(), getTableHandle(metadata, tablePartitionFormat));
Map<String, ColumnMetadata> map = uniqueIndex(tableMetadata.getColumns(), ColumnMetadata::getName);
assertPrimitiveField(map, "t_string", createUnboundedVarcharType(), false);
assertPrimitiveField(map, "t_tinyint", TINYINT, false);
assertPrimitiveField(map, "t_smallint", SMALLINT, false);
assertPrimitiveField(map, "t_int", INTEGER, false);
assertPrimitiveField(map, "t_bigint", BIGINT, false);
assertPrimitiveField(map, "t_float", REAL, false);
assertPrimitiveField(map, "t_double", DOUBLE, false);
assertPrimitiveField(map, "t_boolean", BOOLEAN, false);
assertPrimitiveField(map, "ds", createUnboundedVarcharType(), true);
assertPrimitiveField(map, "file_format", createUnboundedVarcharType(), true);
assertPrimitiveField(map, "dummy", INTEGER, true);
}
}
@Test
public void testGetTableSchemaUnpartitioned()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableUnpartitioned);
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(newSession(), tableHandle);
Map<String, ColumnMetadata> map = uniqueIndex(tableMetadata.getColumns(), ColumnMetadata::getName);
assertPrimitiveField(map, "t_string", createUnboundedVarcharType(), false);
assertPrimitiveField(map, "t_tinyint", TINYINT, false);
}
}
@Test
public void testGetTableSchemaOffline()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Map<SchemaTableName, List<ColumnMetadata>> columns = metadata.listTableColumns(newSession(), tableOffline.toSchemaTablePrefix());
assertEquals(columns.size(), 1);
Map<String, ColumnMetadata> map = uniqueIndex(getOnlyElement(columns.values()), ColumnMetadata::getName);
assertPrimitiveField(map, "t_string", createUnboundedVarcharType(), false);
}
}
@Test
public void testGetTableSchemaOfflinePartition()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableOfflinePartition);
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(newSession(), tableHandle);
Map<String, ColumnMetadata> map = uniqueIndex(tableMetadata.getColumns(), ColumnMetadata::getName);
assertPrimitiveField(map, "t_string", createUnboundedVarcharType(), false);
}
}
@Test
public void testGetTableSchemaNotReadablePartition()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableNotReadable);
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(newSession(), tableHandle);
Map<String, ColumnMetadata> map = uniqueIndex(tableMetadata.getColumns(), ColumnMetadata::getName);
assertPrimitiveField(map, "t_string", createUnboundedVarcharType(), false);
}
}
@Test
public void testGetTableSchemaException()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
assertNull(metadata.getTableHandle(newSession(), invalidTable));
}
}
@Test
public void testGetTableStatsBucketedStringInt()
{
assertTableStatsComputed(
tableBucketedStringInt,
ImmutableSet.of(
"t_bigint",
"t_boolean",
"t_double",
"t_float",
"t_int",
"t_smallint",
"t_string",
"t_tinyint",
"ds"));
}
@Test
public void testGetTableStatsUnpartitioned()
{
assertTableStatsComputed(
tableUnpartitioned,
ImmutableSet.of("t_string", "t_tinyint"));
}
private void assertTableStatsComputed(
SchemaTableName tableName,
Set<String> expectedColumnStatsColumns)
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> allColumnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
TableStatistics tableStatistics = metadata.getTableStatistics(session, tableHandle, Optional.empty(), allColumnHandles, Constraint.alwaysTrue());
assertFalse(tableStatistics.getRowCount().isUnknown(), "row count is unknown");
Map<String, ColumnStatistics> columnsStatistics = tableStatistics
.getColumnStatistics()
.entrySet()
.stream()
.collect(
toImmutableMap(
entry -> ((HiveColumnHandle) entry.getKey()).getName(),
Map.Entry::getValue));
assertEquals(columnsStatistics.keySet(), expectedColumnStatsColumns, "columns with statistics");
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle);
columnsStatistics.forEach((columnName, columnStatistics) -> {
ColumnHandle columnHandle = columnHandles.get(columnName);
Type columnType = metadata.getColumnMetadata(session, tableHandle, columnHandle).getType();
assertFalse(
columnStatistics.getNullsFraction().isUnknown(),
"unknown nulls fraction for " + columnName);
assertFalse(
columnStatistics.getDistinctValuesCount().isUnknown(),
"unknown distinct values count for " + columnName);
if (isVarcharType(columnType)) {
assertFalse(
columnStatistics.getDataSize().isUnknown(),
"unknown data size for " + columnName);
}
else {
assertTrue(
columnStatistics.getDataSize().isUnknown(),
"unknown data size for" + columnName);
}
});
}
}
@Test
public void testGetPartitionSplitsBatch()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tablePartitionFormat);
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, Constraint.alwaysTrue(), transaction);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT);
assertEquals(getSplitCount(splitSource), partitionCount);
}
}
// @Test
public void testGetEncryptionInformationInPartitionedTable()
throws Exception
{
SchemaTableName tableName = temporaryTable("test_encrypt_with_partitions");
ConnectorTableHandle tableHandle = new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
try {
doInsertIntoNewPartition(ORC, tableName, TEST_HIVE_PAGE_SINK_CONTEXT);
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, Constraint.alwaysTrue(), transaction);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT);
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertTrue(allSplits.size() >= 1, "There should be at least 1 split");
for (ConnectorSplit split : allSplits) {
HiveSplit hiveSplit = (HiveSplit) split;
assertTrue(hiveSplit.getEncryptionInformation().isPresent());
assertTrue(hiveSplit.getEncryptionInformation().get().getDwrfEncryptionMetadata().isPresent());
}
}
}
finally {
dropTable(tableName);
}
}
// @Test
public void testGetEncryptionInformationInUnpartitionedTable()
throws Exception
{
SchemaTableName tableName = temporaryTable("test_encrypt_with_no_partitions");
ConnectorTableHandle tableHandle = new HiveTableHandle(tableName.getSchemaName(), tableName.getTableName());
try {
doInsert(ORC, tableName, TEST_HIVE_PAGE_SINK_CONTEXT);
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, Constraint.alwaysTrue(), transaction);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT);
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertTrue(allSplits.size() >= 1, "There should be at least 1 split");
for (ConnectorSplit split : allSplits) {
HiveSplit hiveSplit = (HiveSplit) split;
assertTrue(hiveSplit.getEncryptionInformation().isPresent());
assertTrue(hiveSplit.getEncryptionInformation().get().getDwrfEncryptionMetadata().isPresent());
}
}
}
finally {
dropTable(tableName);
}
}
@Test
public void testGetPartitionSplitsBatchUnpartitioned()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableUnpartitioned);
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, Constraint.alwaysTrue(), transaction);
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT);
assertEquals(getSplitCount(splitSource), 1);
}
}
@Test(expectedExceptions = TableNotFoundException.class)
public void testGetPartitionSplitsBatchInvalidTable()
{
try (Transaction transaction = newTransaction()) {
splitManager.getSplits(transaction.getTransactionHandle(), newSession(), invalidTableLayoutHandle, SPLIT_SCHEDULING_CONTEXT);
}
}
@Test
public void testGetPartitionTableOffline()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
try {
getTableHandle(metadata, tableOffline);
fail("expected TableOfflineException");
}
catch (TableOfflineException e) {
assertEquals(e.getTableName(), tableOffline);
}
}
}
@Test
public void testGetPartitionSplitsTableOfflinePartition()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableOfflinePartition);
assertNotNull(tableHandle);
ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds");
assertNotNull(dsColumn);
Domain domain = Domain.singleValue(createUnboundedVarcharType(), utf8Slice("2012-12-30"));
TupleDomain<ColumnHandle> tupleDomain = withColumnDomains(ImmutableMap.of(dsColumn, domain));
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, new Constraint<>(tupleDomain), transaction);
try {
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT));
fail("Expected PartitionOfflineException");
}
catch (PartitionOfflineException e) {
assertEquals(e.getTableName(), tableOfflinePartition);
assertEquals(e.getPartition(), "ds=2012-12-30");
}
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession(ImmutableMap.of(OFFLINE_DATA_DEBUG_MODE_ENABLED, true));
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableOfflinePartition);
assertNotNull(tableHandle);
ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds");
assertNotNull(dsColumn);
Domain domain = Domain.singleValue(createUnboundedVarcharType(), utf8Slice("2012-12-30"));
TupleDomain<ColumnHandle> tupleDomain = withColumnDomains(ImmutableMap.of(dsColumn, domain));
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, new Constraint<>(tupleDomain), transaction);
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT));
}
}
@Test
public void testGetPartitionSplitsTableNotReadablePartition()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableNotReadable);
assertNotNull(tableHandle);
ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds");
assertNotNull(dsColumn);
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, Constraint.alwaysTrue(), transaction);
try {
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT));
fail("Expected HiveNotReadableException");
}
catch (HiveNotReadableException e) {
assertThat(e).hasMessageMatching("Table '.*\\.presto_test_not_readable' is not readable: reason for not readable");
assertEquals(e.getTableName(), tableNotReadable);
assertEquals(e.getPartition(), Optional.empty());
}
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession(ImmutableMap.of(OFFLINE_DATA_DEBUG_MODE_ENABLED, true));
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableNotReadable);
assertNotNull(tableHandle);
ColumnHandle dsColumn = metadata.getColumnHandles(session, tableHandle).get("ds");
assertNotNull(dsColumn);
ConnectorTableLayout tableLayout = getTableLayout(session, metadata, tableHandle, Constraint.alwaysTrue(), transaction);
getSplitCount(splitManager.getSplits(transaction.getTransactionHandle(), session, tableLayout.getHandle(), SPLIT_SCHEDULING_CONTEXT));
}
}
@Test
public void testBucketedTableStringInt()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableBucketedStringInt);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);
assertTableIsBucketed(transaction, tableHandle);
String testString = "test";
Integer testInt = 13;
Short testSmallint = 12;
// Reverse the order of bindings as compared to bucketing order
ImmutableMap<ColumnHandle, NullableValue> bindings = ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(columnHandles.get(columnIndex.get("t_int")), NullableValue.of(INTEGER, (long) testInt))
.put(columnHandles.get(columnIndex.get("t_string")), NullableValue.of(createUnboundedVarcharType(), utf8Slice(testString)))
.put(columnHandles.get(columnIndex.get("t_smallint")), NullableValue.of(SMALLINT, (long) testSmallint))
.build();
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.fromFixedValues(bindings), OptionalInt.of(1), Optional.empty());
boolean rowFound = false;
for (MaterializedRow row : result) {
if (testString.equals(row.getField(columnIndex.get("t_string"))) &&
testInt.equals(row.getField(columnIndex.get("t_int"))) &&
testSmallint.equals(row.getField(columnIndex.get("t_smallint")))) {
rowFound = true;
}
}
assertTrue(rowFound);
}
}
@Test
public void testBucketedTableBigintBoolean()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableBucketedBigintBoolean);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);
assertTableIsBucketed(transaction, tableHandle);
String testString = "test";
Long testBigint = 89L;
Boolean testBoolean = true;
ImmutableMap<ColumnHandle, NullableValue> bindings = ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(columnHandles.get(columnIndex.get("t_string")), NullableValue.of(createUnboundedVarcharType(), utf8Slice(testString)))
.put(columnHandles.get(columnIndex.get("t_bigint")), NullableValue.of(BIGINT, testBigint))
.put(columnHandles.get(columnIndex.get("t_boolean")), NullableValue.of(BOOLEAN, testBoolean))
.build();
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.fromFixedValues(bindings), OptionalInt.of(1), Optional.empty());
boolean rowFound = false;
for (MaterializedRow row : result) {
if (testString.equals(row.getField(columnIndex.get("t_string"))) &&
testBigint.equals(row.getField(columnIndex.get("t_bigint"))) &&
testBoolean.equals(row.getField(columnIndex.get("t_boolean")))) {
rowFound = true;
break;
}
}
assertTrue(rowFound);
}
}
@Test
public void testBucketedTableDoubleFloat()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableBucketedDoubleFloat);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);
assertTableIsBucketed(transaction, tableHandle);
ImmutableMap<ColumnHandle, NullableValue> bindings = ImmutableMap.<ColumnHandle, NullableValue>builder()
.put(columnHandles.get(columnIndex.get("t_float")), NullableValue.of(REAL, (long) floatToRawIntBits(87.1f)))
.put(columnHandles.get(columnIndex.get("t_double")), NullableValue.of(DOUBLE, 88.2))
.build();
// floats and doubles are not supported, so we should see all splits
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.fromFixedValues(bindings), OptionalInt.of(32), Optional.empty());
assertEquals(result.getRowCount(), 100);
}
}
@Test
public void testBucketedTableEvolutionWithDifferentReadBucketCount()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryBucketEvolutionTable = temporaryTable("bucket_evolution");
try {
doTestBucketedTableEvolutionWithDifferentReadCount(storageFormat, temporaryBucketEvolutionTable);
}
finally {
dropTable(temporaryBucketEvolutionTable);
}
}
}
private void doTestBucketedTableEvolutionWithDifferentReadCount(HiveStorageFormat storageFormat, SchemaTableName tableName)
throws Exception
{
int rowCount = 100;
int bucketCount = 16;
// Produce a table with a partition with bucket count different but compatible with the table bucket count
createEmptyTable(
tableName,
storageFormat,
ImmutableList.of(
new Column("id", HIVE_LONG, Optional.empty(), Optional.empty()),
new Column("name", HIVE_STRING, Optional.empty(), Optional.empty())),
ImmutableList.of(new Column("pk", HIVE_STRING, Optional.empty(), Optional.empty())),
Optional.of(new HiveBucketProperty(ImmutableList.of("id"), 4, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty())));
// write a 4-bucket partition
MaterializedResult.Builder bucket8Builder = MaterializedResult.resultBuilder(SESSION, BIGINT, VARCHAR, VARCHAR);
IntStream.range(0, rowCount).forEach(i -> bucket8Builder.row((long) i, String.valueOf(i), "four"));
insertData(tableName, bucket8Builder.build());
// Alter the bucket count to 16
alterBucketProperty(tableName, Optional.of(new HiveBucketProperty(ImmutableList.of("id"), bucketCount, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty())));
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableName);
// select all columns in table, including synthetic columns such as $path, except for $row_id.
// $row_id will throw some exceptions if there's a not a real metastore running to supply
// the row ID partition components.
List<ColumnHandle> columnHandles = metadata.getColumnHandles(session, hiveTableHandle).values()
.stream()
.map(e -> (HiveColumnHandle) e)
.filter(e -> !HiveColumnHandle.isRowIdColumnHandle(e))
.collect(Collectors.toList());
HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) getTableLayout(session, transaction.getMetadata(), hiveTableHandle, Constraint.alwaysTrue(), transaction).getHandle();
HiveBucketHandle bucketHandle = layoutHandle.getBucketHandle().get();
HiveTableLayoutHandle modifiedReadBucketCountLayoutHandle = layoutHandle.builder()
.setBucketHandle(Optional.of(new HiveBucketHandle(bucketHandle.getColumns(), bucketHandle.getTableBucketCount(), 2)))
.build();
List<ConnectorSplit> splits = getAllSplits(session, transaction, modifiedReadBucketCountLayoutHandle);
assertEquals(splits.size(), 16);
TableHandle tableHandle = toTableHandle(transaction, hiveTableHandle, modifiedReadBucketCountLayoutHandle);
ImmutableList.Builder<MaterializedRow> allRows = ImmutableList.builder();
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE, new RuntimeStats())) {
MaterializedResult intermediateResult = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
allRows.addAll(intermediateResult.getMaterializedRows());
}
}
MaterializedResult result = new MaterializedResult(allRows.build(), getTypes(columnHandles));
assertEquals(result.getRowCount(), rowCount);
Map<String, Integer> columnIndex = indexColumns(columnHandles);
int nameColumnIndex = columnIndex.get("name");
int bucketColumnIndex = columnIndex.get(BUCKET_COLUMN_NAME);
for (MaterializedRow row : result.getMaterializedRows()) {
String name = (String) row.getField(nameColumnIndex);
int bucket = (int) row.getField(bucketColumnIndex);
assertEquals(bucket, Integer.parseInt(name) % bucketCount);
}
}
}
@Test
public void testBucketedTableEvolution()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryBucketEvolutionTable = temporaryTable("bucket_evolution");
try {
doTestBucketedTableEvolution(storageFormat, temporaryBucketEvolutionTable);
}
finally {
dropTable(temporaryBucketEvolutionTable);
}
}
}
private void doTestBucketedTableEvolution(HiveStorageFormat storageFormat, SchemaTableName tableName)
throws Exception
{
int rowCount = 100;
//
// Produce a table with 8 buckets.
// The table has 3 partitions of 3 different bucket count (4, 8, 16).
createEmptyTable(
tableName,
storageFormat,
ImmutableList.of(
new Column("id", HIVE_LONG, Optional.empty(), Optional.empty()),
new Column("name", HIVE_STRING, Optional.empty(), Optional.empty())),
ImmutableList.of(new Column("pk", HIVE_STRING, Optional.empty(), Optional.empty())),
Optional.of(new HiveBucketProperty(ImmutableList.of("id"), 4, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty())));
// write a 4-bucket partition
MaterializedResult.Builder bucket4Builder = MaterializedResult.resultBuilder(SESSION, BIGINT, VARCHAR, VARCHAR);
IntStream.range(0, rowCount).forEach(i -> bucket4Builder.row((long) i, String.valueOf(i), "four"));
insertData(tableName, bucket4Builder.build());
// write a 16-bucket partition
alterBucketProperty(tableName, Optional.of(new HiveBucketProperty(ImmutableList.of("id"), 16, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty())));
MaterializedResult.Builder bucket16Builder = MaterializedResult.resultBuilder(SESSION, BIGINT, VARCHAR, VARCHAR);
IntStream.range(0, rowCount).forEach(i -> bucket16Builder.row((long) i, String.valueOf(i), "sixteen"));
insertData(tableName, bucket16Builder.build());
// write an 8-bucket partition
alterBucketProperty(tableName, Optional.of(new HiveBucketProperty(ImmutableList.of("id"), 8, ImmutableList.of(), HIVE_COMPATIBLE, Optional.empty())));
MaterializedResult.Builder bucket8Builder = MaterializedResult.resultBuilder(SESSION, BIGINT, VARCHAR, VARCHAR);
IntStream.range(0, rowCount).forEach(i -> bucket8Builder.row((long) i, String.valueOf(i), "eight"));
insertData(tableName, bucket8Builder.build());
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
// select all columns in table, including synthetic columns such as $path, except for $row_id.
// $row_id will throw some exceptions if there's a not a real metastore running to supply
// the row ID partition components.
List<ColumnHandle> columnHandles = metadata.getColumnHandles(session, tableHandle).values()
.stream()
.map(e -> (HiveColumnHandle) e)
.filter(e -> !HiveColumnHandle.isRowIdColumnHandle(e))
.collect(Collectors.toList());
MaterializedResult result = readTable(
transaction,
tableHandle,
columnHandles,
session,
TupleDomain.all(),
OptionalInt.empty(),
Optional.empty());
assertBucketTableEvolutionResult(result, columnHandles, ImmutableSet.of(0, 1, 2, 3, 4, 5, 6, 7), rowCount);
// read single bucket (table/logical bucket)
NullableValue singleBucket = NullableValue.of(INTEGER, 6L);
ConnectorTableLayoutHandle layoutHandle;
if (HiveSessionProperties.isPushdownFilterEnabled(session)) {
TupleDomain<VariableReferenceExpression> bucketDomain = TupleDomain.fromFixedValues(ImmutableMap.of(new VariableReferenceExpression(Optional.empty(), BUCKET_COLUMN_NAME, BIGINT), singleBucket));
RowExpression predicate = ROW_EXPRESSION_SERVICE.getDomainTranslator().toPredicate(bucketDomain);
layoutHandle = pushdownFilter(
session,
metadata,
transaction.getMetastore(),
ROW_EXPRESSION_SERVICE,
FUNCTION_RESOLUTION,
hivePartitionManager,
METADATA.getFunctionAndTypeManager(),
tableHandle,
predicate,
Optional.empty()).getLayout().getHandle();
}
else {
layoutHandle = getOnlyElement(metadata.getTableLayouts(session, tableHandle, new Constraint<>(TupleDomain.fromFixedValues(ImmutableMap.of(bucketColumnHandle(), singleBucket))), Optional.empty())).getTableLayout().getHandle();
}
result = readTable(
transaction,
tableHandle,
layoutHandle,
columnHandles,
session,
OptionalInt.empty(),
Optional.empty());
assertBucketTableEvolutionResult(result, columnHandles, ImmutableSet.of(6), rowCount);
// read single bucket, without selecting the bucketing column (i.e. id column) or the $row_id column
ImmutableList<ColumnHandle> nonBucketColumnHandles = ImmutableList.<ColumnHandle>builder()
.addAll(metadata.getColumnHandles(session, tableHandle).values().stream()
.filter(columnHandle -> !"id".equals(((HiveColumnHandle) columnHandle).getName()))
.filter(columnHandle -> !HiveColumnHandle.isRowIdColumnHandle((HiveColumnHandle) columnHandle))
.collect(toImmutableList()))
.build();
result = readTable(
transaction,
tableHandle,
layoutHandle,
nonBucketColumnHandles,
session,
OptionalInt.empty(),
Optional.empty());
assertBucketTableEvolutionResult(result, nonBucketColumnHandles, ImmutableSet.of(6), rowCount);
}
}
private static ConnectorPushdownFilterResult pushdownFilter(
ConnectorSession session,
ConnectorMetadata metadata,
SemiTransactionalHiveMetastore metastore,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
HivePartitionManager partitionManager,
FunctionMetadataManager functionMetadataManager,
ConnectorTableHandle connectorTableHandle,
RowExpression filter,
Optional<ConnectorTableLayoutHandle> currentLayoutHandle)
{
HiveTransactionManager hiveTransactionManager = new HiveTransactionManager();
SubfieldExtractionRewriter filterPushdown = new SubfieldExtractionRewriter(
session,
new PlanNodeIdAllocator(),
rowExpressionService,
functionResolution,
functionMetadataManager,
hiveTransactionManager,
partitionManager,
tableHandle -> HiveFilterPushdown.getConnectorMetadata(hiveTransactionManager, tableHandle));
return filterPushdown.pushdownFilter(session, metadata, connectorTableHandle, filter, currentLayoutHandle);
}
private static void assertBucketTableEvolutionResult(MaterializedResult result, List<ColumnHandle> columnHandles, Set<Integer> bucketIds, int rowCount)
{
// Assert that only elements in the specified bucket shows up, and each element shows up 3 times.
int bucketCount = 8;
Set<Long> expectedIds = LongStream.range(0, rowCount)
.filter(x -> bucketIds.contains(toIntExact(x % bucketCount)))
.boxed()
.collect(toImmutableSet());
// assert that content from all three buckets are the same
Map<String, Integer> columnIndex = indexColumns(columnHandles);
OptionalInt idColumnIndex = columnIndex.containsKey("id") ? OptionalInt.of(columnIndex.get("id")) : OptionalInt.empty();
int nameColumnIndex = columnIndex.get("name");
int bucketColumnIndex = columnIndex.get(BUCKET_COLUMN_NAME);
Map<Long, Integer> idCount = new HashMap<>();
for (MaterializedRow row : result.getMaterializedRows()) {
String name = (String) row.getField(nameColumnIndex);
int bucket = (int) row.getField(bucketColumnIndex);
idCount.compute(Long.parseLong(name), (key, oldValue) -> oldValue == null ? 1 : oldValue + 1);
assertEquals(bucket, Integer.parseInt(name) % bucketCount);
if (idColumnIndex.isPresent()) {
long id = (long) row.getField(idColumnIndex.getAsInt());
assertEquals(Integer.parseInt(name), id);
}
}
assertEquals(
(int) idCount.values().stream()
.distinct()
.collect(onlyElement()),
3);
assertEquals(idCount.keySet(), expectedIds);
}
private void assertTableIsBucketed(Transaction transaction, ConnectorTableHandle tableHandle)
{
// the bucketed test tables should have exactly 32 splits
List<ConnectorSplit> splits = getAllSplits(transaction, tableHandle, TupleDomain.all());
assertEquals(splits.size(), 32);
// verify all paths are unique
Set<String> paths = new HashSet<>();
for (ConnectorSplit split : splits) {
assertTrue(paths.add(((HiveSplit) split).getFileSplit().getPath()));
}
}
@Test
public void testGetRecords()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tablePartitionFormat);
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, hiveTableHandle);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, hiveTableHandle).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);
ConnectorTableLayoutHandle layoutHandle = getLayout(session, transaction, hiveTableHandle, TupleDomain.all());
List<ConnectorSplit> splits = getAllSplits(session, transaction, layoutHandle);
assertEquals(splits.size(), partitionCount);
for (ConnectorSplit split : splits) {
HiveSplit hiveSplit = (HiveSplit) split;
List<HivePartitionKey> partitionKeys = hiveSplit.getPartitionKeys();
String ds = partitionKeys.get(0).getValue().orElse(null);
String fileFormat = partitionKeys.get(1).getValue().orElse(null);
HiveStorageFormat fileType = HiveStorageFormat.valueOf(fileFormat.toUpperCase());
int dummyPartition = Integer.parseInt(partitionKeys.get(2).getValue().orElse(null));
long rowNumber = 0;
long completedBytes = 0;
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
assertPageSourceType(pageSource, fileType);
for (MaterializedRow row : result) {
try {
assertValueTypes(row, tableMetadata.getColumns());
}
catch (RuntimeException e) {
throw new RuntimeException("row " + rowNumber, e);
}
rowNumber++;
Object value;
value = row.getField(columnIndex.get("t_string"));
if (rowNumber % 19 == 0) {
assertNull(value);
}
else if (rowNumber % 19 == 1) {
assertEquals(value, "");
}
else {
assertEquals(value, "test");
}
assertEquals(row.getField(columnIndex.get("t_tinyint")), (byte) (1 + rowNumber));
assertEquals(row.getField(columnIndex.get("t_smallint")), (short) (2 + rowNumber));
assertEquals(row.getField(columnIndex.get("t_int")), 3 + (int) rowNumber);
if (rowNumber % 13 == 0) {
assertNull(row.getField(columnIndex.get("t_bigint")));
}
else {
assertEquals(row.getField(columnIndex.get("t_bigint")), 4 + rowNumber);
}
assertEquals((Float) row.getField(columnIndex.get("t_float")), 5.1f + rowNumber, 0.001);
assertEquals(row.getField(columnIndex.get("t_double")), 6.2 + rowNumber);
if (rowNumber % 3 == 2) {
assertNull(row.getField(columnIndex.get("t_boolean")));
}
else {
assertEquals(row.getField(columnIndex.get("t_boolean")), rowNumber % 3 != 0);
}
assertEquals(row.getField(columnIndex.get("ds")), ds);
assertEquals(row.getField(columnIndex.get("file_format")), fileFormat);
assertEquals(row.getField(columnIndex.get("dummy")), dummyPartition);
long newCompletedBytes = pageSource.getCompletedBytes();
assertTrue(newCompletedBytes >= completedBytes);
assertTrue(newCompletedBytes <= hiveSplit.getFileSplit().getLength());
completedBytes = newCompletedBytes;
}
assertTrue(completedBytes <= hiveSplit.getFileSplit().getLength());
assertEquals(rowNumber, 100);
}
}
}
}
@Test
public void testGetPartialRecords()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tablePartitionFormat);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, hiveTableHandle).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);
ConnectorTableLayoutHandle layoutHandle = getLayout(session, transaction, hiveTableHandle, TupleDomain.all());
List<ConnectorSplit> splits = getAllSplits(session, transaction, layoutHandle);
assertEquals(splits.size(), partitionCount);
for (ConnectorSplit split : splits) {
HiveSplit hiveSplit = (HiveSplit) split;
List<HivePartitionKey> partitionKeys = hiveSplit.getPartitionKeys();
String ds = partitionKeys.get(0).getValue().orElse(null);
String fileFormat = partitionKeys.get(1).getValue().orElse(null);
HiveStorageFormat fileType = HiveStorageFormat.valueOf(fileFormat.toUpperCase());
int dummyPartition = Integer.parseInt(partitionKeys.get(2).getValue().orElse(null));
long rowNumber = 0;
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
assertPageSourceType(pageSource, fileType);
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
for (MaterializedRow row : result) {
rowNumber++;
assertEquals(row.getField(columnIndex.get("t_double")), 6.2 + rowNumber);
assertEquals(row.getField(columnIndex.get("ds")), ds);
assertEquals(row.getField(columnIndex.get("file_format")), fileFormat);
assertEquals(row.getField(columnIndex.get("dummy")), dummyPartition);
}
}
assertEquals(rowNumber, 100);
}
}
}
@Test
public void testGetRecordsUnpartitioned()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, tableUnpartitioned);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, hiveTableHandle).values());
Map<String, Integer> columnIndex = indexColumns(columnHandles);
ConnectorTableLayoutHandle layoutHandle = getLayout(session, transaction, hiveTableHandle, TupleDomain.all());
List<ConnectorSplit> splits = getAllSplits(session, transaction, layoutHandle);
assertEquals(splits.size(), 1);
for (ConnectorSplit split : splits) {
HiveSplit hiveSplit = (HiveSplit) split;
assertEquals(hiveSplit.getPartitionKeys(), ImmutableList.of());
long rowNumber = 0;
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
assertPageSourceType(pageSource, TEXTFILE);
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
for (MaterializedRow row : result) {
rowNumber++;
if (rowNumber % 19 == 0) {
assertNull(row.getField(columnIndex.get("t_string")));
}
else if (rowNumber % 19 == 1) {
assertEquals(row.getField(columnIndex.get("t_string")), "");
}
else {
assertEquals(row.getField(columnIndex.get("t_string")), "unpartitioned");
}
assertEquals(row.getField(columnIndex.get("t_tinyint")), (byte) (1 + rowNumber));
}
}
assertEquals(rowNumber, 100);
}
}
}
@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*" + INVALID_COLUMN + ".*")
public void testGetRecordsInvalidColumn()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata connectorMetadata = transaction.getMetadata();
ConnectorTableHandle table = getTableHandle(connectorMetadata, tableUnpartitioned);
readTable(transaction, table, ImmutableList.of(invalidColumnHandle), newSession(), TupleDomain.all(), OptionalInt.empty(), Optional.empty());
}
}
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = ".*The column 't_data' in table '.*\\.presto_test_partition_schema_change' is declared as type 'double', but partition 'ds=2012-12-29' declared column 't_data' as type 'string'.")
public void testPartitionSchemaMismatch()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle table = getTableHandle(metadata, tablePartitionSchemaChange);
readTable(transaction, table, ImmutableList.of(dsColumn), newSession(), TupleDomain.all(), OptionalInt.empty(), Optional.empty());
}
}
// TODO coercion of non-canonical values should be supported
@Test(enabled = false)
public void testPartitionSchemaNonCanonical()
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle table = getTableHandle(metadata, tablePartitionSchemaChangeNonCanonical);
ColumnHandle column = metadata.getColumnHandles(session, table).get("t_boolean");
assertNotNull(column);
ConnectorTableLayoutHandle layoutHandle = getTableLayout(session, metadata, table, new Constraint<>(withColumnDomains(ImmutableMap.of(intColumn, Domain.singleValue(BIGINT, 5L)))), transaction).getHandle();
assertEquals(getAllPartitions(layoutHandle).size(), 1);
assertEquals(getPartitionId(getAllPartitions(layoutHandle).get(0)), "t_boolean=0");
ConnectorSplitSource splitSource = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT);
ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));
ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column);
try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
fail("expected exception");
}
catch (PrestoException e) {
assertEquals(e.getErrorCode(), HIVE_INVALID_PARTITION_VALUE.toErrorCode());
}
}
}
protected ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorMetadata metadata, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint, Transaction transaction)
{
if (HiveSessionProperties.isPushdownFilterEnabled(session)) {
assertTrue(constraint.getSummary().isAll());
return pushdownFilter(
session,
metadata,
transaction.getMetastore(),
ROW_EXPRESSION_SERVICE,
FUNCTION_RESOLUTION,
hivePartitionManager,
METADATA.getFunctionAndTypeManager(),
tableHandle,
TRUE_CONSTANT,
Optional.empty()).getLayout();
}
List<ConnectorTableLayoutResult> tableLayoutResults = metadata.getTableLayouts(session, tableHandle, constraint, Optional.empty());
return getOnlyElement(tableLayoutResults).getTableLayout();
}
@Test
public void testTypesTextFile()
throws Exception
{
assertGetRecords("presto_test_types_textfile", TEXTFILE);
}
@Test
public void testTypesSequenceFile()
throws Exception
{
assertGetRecords("presto_test_types_sequencefile", SEQUENCEFILE);
}
@Test
public void testTypesRcText()
throws Exception
{
assertGetRecords("presto_test_types_rctext", RCTEXT);
}
@Test
public void testTypesRcBinary()
throws Exception
{
assertGetRecords("presto_test_types_rcbinary", RCBINARY);
}
@Test
public void testTypesOrc()
throws Exception
{
assertGetRecordsOptional("presto_test_types_orc", ORC);
}
@Test
public void testTypesParquet()
throws Exception
{
assertGetRecordsOptional("presto_test_types_parquet", PARQUET);
}
@Test
public void testEmptyTextFile()
throws Exception
{
checkSupportedStorageFormat(TEXTFILE);
assertEmptyFile(TEXTFILE);
}
private void checkSupportedStorageFormat(HiveStorageFormat storageFormat)
{
if (!createTableFormats.contains(storageFormat)) {
throw new SkipException(storageFormat + " format is not supported");
}
}
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "Error opening Hive split .*SequenceFile.*EOFException")
public void testEmptySequenceFile()
throws Exception
{
checkSupportedStorageFormat(SEQUENCEFILE);
assertEmptyFile(SEQUENCEFILE);
}
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "RCFile is empty: .*")
public void testEmptyRcTextFile()
throws Exception
{
checkSupportedStorageFormat(RCTEXT);
assertEmptyFile(RCTEXT);
}
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "RCFile is empty: .*")
public void testEmptyRcBinaryFile()
throws Exception
{
checkSupportedStorageFormat(RCBINARY);
assertEmptyFile(RCBINARY);
}
@Test
public void testEmptyOrcFile()
throws Exception
{
checkSupportedStorageFormat(ORC);
assertEmptyFile(ORC);
}
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "ORC file is empty: .*")
public void testEmptyDwrfFile()
throws Exception
{
checkSupportedStorageFormat(DWRF);
assertEmptyFile(DWRF);
}
private void assertEmptyFile(HiveStorageFormat format)
throws Exception
{
SchemaTableName tableName = temporaryTable("empty_file");
try {
List<Column> columns = ImmutableList.of(new Column("test", HIVE_STRING, Optional.empty(), Optional.empty()));
createEmptyTable(tableName, format, columns, ImmutableList.of());
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
Table table = transaction.getMetastore()
.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(AssertionError::new);
// verify directory is empty
HdfsContext context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), table.getStorage().getLocation(), false);
Path location = new Path(table.getStorage().getLocation());
assertTrue(listDirectory(context, location).isEmpty());
// read table with empty directory
readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.of(0), Optional.of(ORC));
// create empty file
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, location);
assertTrue(fileSystem.createNewFile(new Path(location, "empty-file")));
assertEquals(listDirectory(context, location), ImmutableList.of("empty-file"));
// read table with empty file
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.of(1), Optional.empty());
assertEquals(result.getRowCount(), 0);
}
}
finally {
dropTable(tableName);
}
}
@Test
public void testHiveViewsAreNotSupported()
{
try (Transaction transaction = newTransaction()) {
try {
ConnectorMetadata metadata = transaction.getMetadata();
getTableHandle(metadata, view);
fail("Expected HiveViewNotSupportedException");
}
catch (HiveViewNotSupportedException e) {
assertEquals(e.getTableName(), view);
}
}
}
@Test
public void testHiveViewsHaveNoColumns()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
assertEquals(metadata.listTableColumns(newSession(), new SchemaTablePrefix(view.getSchemaName(), view.getTableName())), ImmutableMap.of());
}
}
@Test
public void testRenameTable()
{
SchemaTableName temporaryRenameTableOld = temporaryTable("rename_old");
SchemaTableName temporaryRenameTableNew = temporaryTable("rename_new");
try {
createDummyTable(temporaryRenameTableOld);
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
metadata.renameTable(session, getTableHandle(metadata, temporaryRenameTableOld), temporaryRenameTableNew);
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
assertNull(metadata.getTableHandle(session, temporaryRenameTableOld));
assertNotNull(metadata.getTableHandle(session, temporaryRenameTableNew));
}
}
finally {
dropTable(temporaryRenameTableOld);
dropTable(temporaryRenameTableNew);
}
}
@Test
public void testTableCreation()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryCreateTable = temporaryTable("create");
SchemaTableName temporaryCreateTableForPageSinkCommit = temporaryTable("create_table_page_sink_commit");
try {
doCreateTable(temporaryCreateTable, storageFormat, TEST_HIVE_PAGE_SINK_CONTEXT);
doCreateTable(temporaryCreateTableForPageSinkCommit, storageFormat, PageSinkContext.builder().setCommitRequired(true).setConnectorMetadataUpdater(new HiveMetadataUpdater(EXECUTOR)).build());
}
finally {
dropTable(temporaryCreateTable);
dropTable(temporaryCreateTableForPageSinkCommit);
}
}
}
@Test
public void testTableCreationRollback()
throws Exception
{
SchemaTableName temporaryCreateRollbackTable = temporaryTable("create_rollback");
try {
Path stagingPathRoot;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
// begin creating the table
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(temporaryCreateRollbackTable, CREATE_TABLE_COLUMNS, createTableProperties(RCBINARY));
HiveOutputTableHandle outputHandle = (HiveOutputTableHandle) metadata.beginCreateTable(session, tableMetadata, Optional.empty());
// write the data
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, outputHandle, TEST_HIVE_PAGE_SINK_CONTEXT);
sink.appendPage(CREATE_TABLE_DATA.toPage());
getFutureValue(sink.finish());
// verify we have data files
stagingPathRoot = getStagingPathRoot(outputHandle);
HdfsContext context = new HdfsContext(
session,
temporaryCreateRollbackTable.getSchemaName(),
temporaryCreateRollbackTable.getTableName(),
outputHandle.getLocationHandle().getTargetPath().toString(),
true);
assertFalse(listAllDataFiles(context, stagingPathRoot).isEmpty());
// rollback the table
transaction.rollback();
}
// verify all files have been deleted
HdfsContext context = new HdfsContext(
newSession(),
temporaryCreateRollbackTable.getSchemaName(),
temporaryCreateRollbackTable.getTableName(),
"test_path",
false);
assertTrue(listAllDataFiles(context, stagingPathRoot).isEmpty());
// verify table is not in the metastore
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
assertNull(metadata.getTableHandle(session, temporaryCreateRollbackTable));
}
}
finally {
dropTable(temporaryCreateRollbackTable);
}
}
@Test
public void testTableCreationIgnoreExisting()
{
List<Column> columns = ImmutableList.of(new Column("dummy", HiveType.valueOf("uniontype<smallint,tinyint>"), Optional.empty(), Optional.empty()));
SchemaTableName schemaTableName = temporaryTable("create");
ConnectorSession session = newSession();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
PrincipalPrivileges privileges = testingPrincipalPrivilege(session);
Path targetPath;
try {
try (Transaction transaction = newTransaction()) {
LocationService locationService = getLocationService();
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, false);
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();
Table table = createSimpleTable(schemaTableName, columns, session, targetPath, "q1");
transaction.getMetastore()
.createTable(session, table, privileges, Optional.empty(), false, EMPTY_TABLE_STATISTICS, emptyList());
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
Optional<Table> tableHandle = transaction.getMetastore().getTable(metastoreContext, schemaName, tableName);
assertTrue(tableHandle.isPresent());
transaction.commit();
}
// try creating it again from another transaction with ignoreExisting=false
try (Transaction transaction = newTransaction()) {
Table table = createSimpleTable(schemaTableName, columns, session, targetPath.suffix("_2"), "q2");
transaction.getMetastore()
.createTable(session, table, privileges, Optional.empty(), false, EMPTY_TABLE_STATISTICS, emptyList());
transaction.commit();
fail("Expected exception");
}
catch (PrestoException e) {
assertInstanceOf(e, TableAlreadyExistsException.class);
}
// try creating it again from another transaction with ignoreExisting=true
try (Transaction transaction = newTransaction()) {
Table table = createSimpleTable(schemaTableName, columns, session, targetPath.suffix("_3"), "q3");
transaction.getMetastore()
.createTable(session, table, privileges, Optional.empty(), true, EMPTY_TABLE_STATISTICS, emptyList());
transaction.commit();
}
// at this point the table should exist, now try creating the table again with a different table definition
columns = ImmutableList.of(new Column("new_column", HiveType.valueOf("string"), Optional.empty(), Optional.empty()));
try (Transaction transaction = newTransaction()) {
Table table = createSimpleTable(schemaTableName, columns, session, targetPath.suffix("_4"), "q4");
transaction.getMetastore()
.createTable(session, table, privileges, Optional.empty(), true, EMPTY_TABLE_STATISTICS, emptyList());
transaction.commit();
fail("Expected exception");
}
catch (PrestoException e) {
assertEquals(e.getErrorCode(), TRANSACTION_CONFLICT.toErrorCode());
assertEquals(e.getMessage(), format("Table already exists with a different schema: '%s'", schemaTableName.getTableName()));
}
}
finally {
dropTable(schemaTableName);
}
}
private static Table createSimpleTable(SchemaTableName schemaTableName, List<Column> columns, ConnectorSession session, Path targetPath, String queryId)
{
String tableOwner = session.getUser();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
return Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(tableOwner)
.setTableType(MANAGED_TABLE)
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, TEST_SERVER_VERSION,
PRESTO_QUERY_ID_NAME, queryId))
.setDataColumns(columns)
.withStorage(storage -> storage
.setLocation(targetPath.toString())
.setStorageFormat(fromHiveStorageFormat(ORC))
.setSerdeParameters(ImmutableMap.of()))
.build();
}
@Test
public void testBucketSortedTables()
throws Exception
{
SchemaTableName table = temporaryTable("create_sorted");
try {
doTestBucketSortedTables(table, false, ORC);
}
finally {
dropTable(table);
}
}
@Test
public void testBucketSortedTablesTempPath()
throws Exception
{
SchemaTableName tableWithTempPath = temporaryTable("create_sorted_with_temp_path");
try {
doTestBucketSortedTables(tableWithTempPath, true, ORC);
}
finally {
dropTable(tableWithTempPath);
}
}
private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath, HiveStorageFormat storageFormat)
throws IOException
{
int bucketCount = 3;
int expectedRowCount = 0;
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession(ImmutableMap.of(SORTED_WRITE_TO_TEMP_PATH_ENABLED, useTempPath));
// begin creating the table
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(
table,
ImmutableList.<ColumnMetadata>builder()
.add(ColumnMetadata.builder().setName("id").setType(VARCHAR).build())
.add(ColumnMetadata.builder().setName("value_asc").setType(VARCHAR).build())
.add(ColumnMetadata.builder().setName("value_desc").setType(BIGINT).build())
.add(ColumnMetadata.builder().setName("ds").setType(VARCHAR).build())
.build(),
ImmutableMap.<String, Object>builder()
.put(STORAGE_FORMAT_PROPERTY, storageFormat)
.put(PARTITIONED_BY_PROPERTY, ImmutableList.of("ds"))
.put(BUCKETED_BY_PROPERTY, ImmutableList.of("id"))
.put(BUCKET_COUNT_PROPERTY, bucketCount)
.put(SORTED_BY_PROPERTY, ImmutableList.builder()
.add(new SortingColumn("value_asc", SortingColumn.Order.ASCENDING))
.add(new SortingColumn("value_desc", SortingColumn.Order.DESCENDING))
.build())
.build());
HiveOutputTableHandle outputHandle = (HiveOutputTableHandle) metadata.beginCreateTable(session, tableMetadata, Optional.empty());
// write the data
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, outputHandle, TEST_HIVE_PAGE_SINK_CONTEXT);
List<Type> types = tableMetadata.getColumns().stream()
.map(ColumnMetadata::getType)
.collect(toList());
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < 50; i++) {
MaterializedResult.Builder builder = MaterializedResult.resultBuilder(session, types);
for (int j = 0; j < 1000; j++) {
builder.row(
sha256().hashLong(random.nextLong()).toString(),
"test" + random.nextInt(100),
random.nextLong(100_000),
"2018-04-01");
expectedRowCount++;
}
sink.appendPage(builder.build().toPage());
}
// verify we have enough temporary files per bucket to require multiple passes
Path path = useTempPath ? getTempFilePathRoot(outputHandle).get() : getStagingPathRoot(outputHandle);
HdfsContext context = new HdfsContext(
session,
table.getSchemaName(),
table.getTableName(),
outputHandle.getLocationHandle().getTargetPath().toString(),
true);
assertThat(listAllDataFiles(context, path))
.filteredOn(file -> file.contains(".tmp-sort"))
.size().isGreaterThan(bucketCount * getSortingFileWriterConfig().getMaxOpenSortFiles() * 2);
// finish the write
Collection<Slice> fragments = getFutureValue(sink.finish());
// verify there are no temporary files
for (String file : listAllDataFiles(context, path)) {
assertThat(file).doesNotContain(".tmp-sort.");
}
// finish creating table
metadata.finishCreateTable(session, outputHandle, fragments, ImmutableList.of());
// TODO: problem: in normal usage creating the table sends this to Metastore and we get it back later,
// but here it seems we somehow use it again without calling Metastore? so the row ID partition component
// isn't set?
transaction.commit();
}
// verify that bucket files are sorted
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession(ImmutableMap.of(SORTED_WRITE_TO_TEMP_PATH_ENABLED, useTempPath));
ConnectorTableHandle hiveTableHandle = getTableHandle(metadata, table);
// select all columns in table, including synthetic columns such as $path, except for $row_id.
// $row_id will throw some exceptions if there's a not a real metastore running to supply
// the row ID partition components.
List<ColumnHandle> columnHandles = metadata.getColumnHandles(session, hiveTableHandle).values()
.stream()
.map(e -> (HiveColumnHandle) e)
.filter(e -> !HiveColumnHandle.isRowIdColumnHandle(e))
.collect(Collectors.toList());
ConnectorTableLayoutHandle layoutHandle = getLayout(session, transaction, hiveTableHandle, TupleDomain.all());
List<ConnectorSplit> splits = getAllSplits(session, transaction, layoutHandle);
assertThat(splits).hasSize(bucketCount);
int actualRowCount = 0;
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
String lastValueAsc = null;
long lastValueDesc = -1;
while (!pageSource.isFinished()) {
Page page = pageSource.getNextPage();
if (page == null) {
continue;
}
for (int i = 0; i < page.getPositionCount(); i++) {
Block blockAsc = page.getBlock(1);
Block blockDesc = page.getBlock(2);
assertFalse(blockAsc.isNull(i));
assertFalse(blockDesc.isNull(i));
String valueAsc = VARCHAR.getSlice(blockAsc, i).toStringUtf8();
if (lastValueAsc != null) {
assertGreaterThanOrEqual(valueAsc, lastValueAsc);
if (valueAsc.equals(lastValueAsc)) {
long valueDesc = BIGINT.getLong(blockDesc, i);
if (lastValueDesc != -1) {
assertLessThanOrEqual(valueDesc, lastValueDesc);
}
lastValueDesc = valueDesc;
}
else {
lastValueDesc = -1;
}
}
lastValueAsc = valueAsc;
actualRowCount++;
}
}
}
}
assertThat(actualRowCount).isEqualTo(expectedRowCount);
}
}
private TableHandle toTableHandle(Transaction transaction, ConnectorTableHandle connectorTableHandle, ConnectorTableLayoutHandle connectorLayoutHandle)
{
return new TableHandle(
new ConnectorId(clientId),
connectorTableHandle,
transaction.getTransactionHandle(),
Optional.of(connectorLayoutHandle));
}
@Test
public void testInsert()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryInsertTable = temporaryTable("insert");
SchemaTableName temporaryInsertTableForPageSinkCommit = temporaryTable("insert_table_page_sink_commit");
try {
doInsert(storageFormat, temporaryInsertTable, TEST_HIVE_PAGE_SINK_CONTEXT);
doInsert(storageFormat, temporaryInsertTableForPageSinkCommit, PageSinkContext.builder().setCommitRequired(true).setConnectorMetadataUpdater(new HiveMetadataUpdater(EXECUTOR)).build());
}
finally {
dropTable(temporaryInsertTable);
dropTable(temporaryInsertTableForPageSinkCommit);
}
}
}
@Test
public void testInsertIntoNewPartition()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryInsertIntoNewPartitionTable = temporaryTable("insert_new_partitioned");
SchemaTableName temporaryInsertIntoNewPartitionTableForPageSinkCommit = temporaryTable("insert_new_partitioned_page_sink_commit");
try {
doInsertIntoNewPartition(storageFormat, temporaryInsertIntoNewPartitionTable, TEST_HIVE_PAGE_SINK_CONTEXT);
doInsertIntoNewPartition(storageFormat, temporaryInsertIntoNewPartitionTableForPageSinkCommit, PageSinkContext.builder().setCommitRequired(true).setConnectorMetadataUpdater(new HiveMetadataUpdater(EXECUTOR)).build());
}
finally {
dropTable(temporaryInsertIntoNewPartitionTable);
dropTable(temporaryInsertIntoNewPartitionTableForPageSinkCommit);
}
}
}
@Test
public void testInsertIntoExistingPartition()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryInsertIntoExistingPartitionTable = temporaryTable("insert_existing_partitioned");
SchemaTableName temporaryInsertIntoExistingPartitionTableForPageSinkCommit = temporaryTable("insert_existing_partitioned_page_sink_commit");
try {
doInsertIntoExistingPartition(storageFormat, temporaryInsertIntoExistingPartitionTable, TEST_HIVE_PAGE_SINK_CONTEXT);
doInsertIntoExistingPartition(storageFormat, temporaryInsertIntoExistingPartitionTableForPageSinkCommit, PageSinkContext.builder().setCommitRequired(true).setConnectorMetadataUpdater(new HiveMetadataUpdater(EXECUTOR)).build());
}
finally {
dropTable(temporaryInsertIntoExistingPartitionTable);
dropTable(temporaryInsertIntoExistingPartitionTableForPageSinkCommit);
}
}
}
@Test
public void testInsertIntoExistingPartitionEmptyStatistics()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryInsertIntoExistingPartitionTable = temporaryTable("insert_existing_partitioned_empty_statistics");
try {
doInsertIntoExistingPartitionEmptyStatistics(storageFormat, temporaryInsertIntoExistingPartitionTable);
}
finally {
dropTable(temporaryInsertIntoExistingPartitionTable);
}
}
}
@Test
public void testInsertUnsupportedWriteType()
throws Exception
{
SchemaTableName temporaryInsertUnsupportedWriteType = temporaryTable("insert_unsupported_type");
try {
doInsertUnsupportedWriteType(ORC, temporaryInsertUnsupportedWriteType);
}
finally {
dropTable(temporaryInsertUnsupportedWriteType);
}
}
@Test
public void testMetadataDelete()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryMetadataDeleteTable = temporaryTable("metadata_delete");
try {
doTestMetadataDelete(storageFormat, temporaryMetadataDeleteTable);
}
finally {
dropTable(temporaryMetadataDeleteTable);
}
}
}
@Test
public void testEmptyTableCreation()
throws Exception
{
for (HiveStorageFormat storageFormat : createTableFormats) {
SchemaTableName temporaryCreateEmptyTable = temporaryTable("create_empty");
try {
doCreateEmptyTable(temporaryCreateEmptyTable, storageFormat, CREATE_TABLE_COLUMNS);
}
finally {
dropTable(temporaryCreateEmptyTable);
}
}
}
@Test
public void testViewCreation()
{
SchemaTableName temporaryCreateView = temporaryTable("create_view");
try {
verifyViewCreation(temporaryCreateView);
}
finally {
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.dropView(newSession(), temporaryCreateView);
transaction.commit();
}
catch (RuntimeException e) {
// this usually occurs because the view was not created
}
}
}
@Test
public void testCreateTableUnsupportedType()
{
for (HiveStorageFormat storageFormat : createTableFormats) {
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
List<ColumnMetadata> columns = ImmutableList.of(ColumnMetadata.builder()
.setName("dummy")
.setType(HYPER_LOG_LOG)
.build());
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(invalidTable, columns, createTableProperties(storageFormat));
metadata.beginCreateTable(session, tableMetadata, Optional.empty());
fail("create table with unsupported type should fail for storage format " + storageFormat);
}
catch (PrestoException e) {
assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
}
}
}
@Test
public void testCreateBucketedTemporaryTableWithMissingBuckets()
{
List<ColumnMetadata> columns = TEMPORARY_TABLE_COLUMNS;
List<String> bucketingColumns = TEMPORARY_TABLE_BUCKET_COLUMNS;
int bucketCount = TEMPORARY_TABLE_BUCKET_COUNT;
MaterializedResult singleRow = MaterializedResult.resultBuilder(SESSION, VARCHAR, VARCHAR)
.row("1", "value1")
.build();
ConnectorSession session = newSession();
HiveTableHandle tableHandle;
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
// prepare temporary table schema
List<Type> types = columns.stream()
.map(ColumnMetadata::getType)
.collect(toImmutableList());
ConnectorPartitioningMetadata partitioning = new ConnectorPartitioningMetadata(
metadata.getPartitioningHandleForExchange(session, bucketCount, types),
bucketingColumns);
// create temporary table
tableHandle = (HiveTableHandle) metadata.createTemporaryTable(session, columns, Optional.of(partitioning));
// begin insert into temporary table
HiveInsertTableHandle insert = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
// insert into temporary table
ConnectorPageSink firstSink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insert, TEST_HIVE_PAGE_SINK_CONTEXT);
firstSink.appendPage(singleRow.toPage());
Collection<Slice> fragments = getFutureValue(firstSink.finish());
if (singleRow.getRowCount() == 0) {
assertThat(fragments).isEmpty();
}
// finish insert
metadata.finishInsert(session, insert, fragments, ImmutableList.of());
// Only one split since there is only one non empty bucket
assertEquals(getAllSplits(transaction, tableHandle, TupleDomain.all()).size(), 1);
transaction.rollback();
}
}
@Test
public void testCreateBucketedTemporaryTable()
throws Exception
{
testCreateBucketedTemporaryTable(newSession());
}
protected void testCreateBucketedTemporaryTable(ConnectorSession session)
throws Exception
{
testCreateBucketedTemporaryTable(session, true);
testCreateBucketedTemporaryTable(session, false);
}
private void testCreateBucketedTemporaryTable(ConnectorSession session, boolean commit)
throws Exception
{
// with data
testCreateTemporaryTable(TEMPORARY_TABLE_COLUMNS, TEMPORARY_TABLE_BUCKET_COUNT, TEMPORARY_TABLE_BUCKET_COLUMNS, TEMPORARY_TABLE_DATA, session, commit);
// empty
testCreateTemporaryTable(
TEMPORARY_TABLE_COLUMNS,
TEMPORARY_TABLE_BUCKET_COUNT,
TEMPORARY_TABLE_BUCKET_COLUMNS,
MaterializedResult.resultBuilder(SESSION, VARCHAR, VARCHAR).build(),
session,
commit);
// bucketed on zero columns
testCreateTemporaryTable(TEMPORARY_TABLE_COLUMNS, TEMPORARY_TABLE_BUCKET_COUNT, ImmutableList.of(), TEMPORARY_TABLE_DATA, session, commit);
}
private void testCreateTemporaryTable(
List<ColumnMetadata> columns,
int bucketCount,
List<String> bucketingColumns,
MaterializedResult inputRows,
ConnectorSession session,
boolean commit)
throws Exception
{
List<Path> insertLocations = new ArrayList<>();
HiveTableHandle tableHandle;
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
// prepare temporary table schema
List<Type> types = columns.stream()
.map(ColumnMetadata::getType)
.collect(toImmutableList());
ConnectorPartitioningMetadata partitioning = new ConnectorPartitioningMetadata(
metadata.getPartitioningHandleForExchange(session, bucketCount, types),
bucketingColumns);
// create temporary table
tableHandle = (HiveTableHandle) metadata.createTemporaryTable(session, columns, Optional.of(partitioning));
// begin insert into temporary table
HiveInsertTableHandle firstInsert = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
insertLocations.add(firstInsert.getLocationHandle().getTargetPath());
insertLocations.add(firstInsert.getLocationHandle().getWritePath());
// insert into temporary table
ConnectorPageSink firstSink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, firstInsert, TEST_HIVE_PAGE_SINK_CONTEXT);
firstSink.appendPage(inputRows.toPage());
Collection<Slice> firstFragments = getFutureValue(firstSink.finish());
if (inputRows.getRowCount() == 0) {
assertThat(firstFragments).isEmpty();
}
// begin second insert into temporary table
HiveInsertTableHandle secondInsert = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
insertLocations.add(secondInsert.getLocationHandle().getTargetPath());
insertLocations.add(secondInsert.getLocationHandle().getWritePath());
// insert into temporary table
ConnectorPageSink secondSink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, secondInsert, TEST_HIVE_PAGE_SINK_CONTEXT);
secondSink.appendPage(inputRows.toPage());
Collection<Slice> secondFragments = getFutureValue(secondSink.finish());
if (inputRows.getRowCount() == 0) {
assertThat(secondFragments).isEmpty();
}
// finish only second insert
metadata.finishInsert(session, secondInsert, secondFragments, ImmutableList.of());
// no splits for empty buckets if zero row file is not created
assertLessThanOrEqual(getAllSplits(transaction, tableHandle, TupleDomain.all()).size(), bucketCount);
// verify written data
Map<String, ColumnHandle> allColumnHandles = metadata.getColumnHandles(session, tableHandle);
List<ColumnHandle> dataColumnHandles = columns.stream()
.map(ColumnMetadata::getName)
.map(allColumnHandles::get)
.collect(toImmutableList());
// check that all columns are regular columns (not partition columns)
dataColumnHandles.stream()
.map(HiveColumnHandle.class::cast)
.forEach(handle -> {
if (handle.isPartitionKey()) {
fail("partitioning column found: " + handle.getName());
}
});
MaterializedResult outputRows = readTable(transaction, tableHandle, dataColumnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(inputRows.getMaterializedRows(), outputRows.getMaterializedRows());
if (commit) {
transaction.commit();
}
else {
transaction.rollback();
}
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
assertThatThrownBy(() -> metadata.getColumnHandles(session, tableHandle))
.isInstanceOf(TableNotFoundException.class);
}
HdfsContext context = new HdfsContext(session, tableHandle.getSchemaName(), tableHandle.getTableName(), "test_path", false);
for (Path location : insertLocations) {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, location);
assertFalse(fileSystem.exists(location));
}
}
@Test
public void testUpdateBasicTableStatistics()
throws Exception
{
SchemaTableName tableName = temporaryTable("update_basic_table_statistics");
try {
doCreateEmptyTable(tableName, ORC, STATISTICS_TABLE_COLUMNS);
testUpdateTableStatistics(tableName, EMPTY_TABLE_STATISTICS, BASIC_STATISTICS_1, BASIC_STATISTICS_2);
}
finally {
dropTable(tableName);
}
}
@Test
public void testUpdateTableColumnStatistics()
throws Exception
{
SchemaTableName tableName = temporaryTable("update_table_column_statistics");
try {
doCreateEmptyTable(tableName, ORC, STATISTICS_TABLE_COLUMNS);
testUpdateTableStatistics(tableName, EMPTY_TABLE_STATISTICS, STATISTICS_1_1, STATISTICS_1_2, STATISTICS_2);
}
finally {
dropTable(tableName);
}
}
@Test
public void testUpdateTableColumnStatisticsEmptyOptionalFields()
throws Exception
{
SchemaTableName tableName = temporaryTable("update_table_column_statistics_empty_optional_fields");
try {
doCreateEmptyTable(tableName, ORC, STATISTICS_TABLE_COLUMNS);
testUpdateTableStatistics(tableName, EMPTY_TABLE_STATISTICS, STATISTICS_EMPTY_OPTIONAL_FIELDS);
}
finally {
dropTable(tableName);
}
}
protected void testUpdateTableStatistics(SchemaTableName tableName, PartitionStatistics initialStatistics, PartitionStatistics... statistics)
{
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
assertThat(metastoreClient.getTableStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName()))
.isEqualTo(initialStatistics);
AtomicReference<PartitionStatistics> expectedStatistics = new AtomicReference<>(initialStatistics);
for (PartitionStatistics partitionStatistics : statistics) {
metastoreClient.updateTableStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), actualStatistics -> {
assertThat(actualStatistics).isEqualTo(expectedStatistics.get());
return partitionStatistics;
});
assertThat(metastoreClient.getTableStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName()))
.isEqualTo(partitionStatistics);
expectedStatistics.set(partitionStatistics);
}
assertThat(metastoreClient.getTableStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName()))
.isEqualTo(expectedStatistics.get());
metastoreClient.updateTableStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), actualStatistics -> {
assertThat(actualStatistics).isEqualTo(expectedStatistics.get());
return initialStatistics;
});
assertThat(metastoreClient.getTableStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName()))
.isEqualTo(initialStatistics);
}
@Test
public void testUpdateBasicPartitionStatistics()
throws Exception
{
SchemaTableName tableName = temporaryTable("update_basic_partition_statistics");
try {
createDummyPartitionedTable(tableName, STATISTICS_PARTITIONED_TABLE_COLUMNS);
testUpdatePartitionStatistics(
tableName,
EMPTY_TABLE_STATISTICS,
ImmutableList.of(BASIC_STATISTICS_1, BASIC_STATISTICS_2),
ImmutableList.of(BASIC_STATISTICS_2, BASIC_STATISTICS_1));
}
finally {
dropTable(tableName);
}
}
@Test
public void testUpdatePartitionColumnStatistics()
throws Exception
{
SchemaTableName tableName = temporaryTable("update_partition_column_statistics");
try {
createDummyPartitionedTable(tableName, STATISTICS_PARTITIONED_TABLE_COLUMNS);
testUpdatePartitionStatistics(
tableName,
EMPTY_TABLE_STATISTICS,
ImmutableList.of(STATISTICS_1_1, STATISTICS_1_2, STATISTICS_2),
ImmutableList.of(STATISTICS_1_2, STATISTICS_1_1, STATISTICS_2));
}
finally {
dropTable(tableName);
}
}
@Test
public void testUpdatePartitionColumnStatisticsEmptyOptionalFields()
throws Exception
{
SchemaTableName tableName = temporaryTable("update_partition_column_statistics");
try {
createDummyPartitionedTable(tableName, STATISTICS_PARTITIONED_TABLE_COLUMNS);
testUpdatePartitionStatistics(
tableName,
EMPTY_TABLE_STATISTICS,
ImmutableList.of(STATISTICS_EMPTY_OPTIONAL_FIELDS),
ImmutableList.of(STATISTICS_EMPTY_OPTIONAL_FIELDS));
}
finally {
dropTable(tableName);
}
}
/**
* During table scan, the illegal storage format for some specific table should not fail the whole table scan
*/
@Test
public void testIllegalStorageFormatDuringTableScan()
{
SchemaTableName schemaTableName = temporaryTable("test_illegal_storage_format");
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
List<Column> columns = ImmutableList.of(new Column("pk", HIVE_STRING, Optional.empty(), Optional.empty()));
String tableOwner = session.getUser();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
LocationHandle locationHandle = getLocationService().forNewTable(transaction.getMetastore(), session, schemaName, tableName, false);
Path targetPath = getLocationService().getQueryWriteInfo(locationHandle).getTargetPath();
//create table whose storage format is null
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(tableOwner)
.setTableType(PrestoTableType.MANAGED_TABLE)
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, TEST_SERVER_VERSION,
PRESTO_QUERY_ID_NAME, session.getQueryId()))
.setDataColumns(columns)
.withStorage(storage -> storage
.setLocation(targetPath.toString())
.setStorageFormat(StorageFormat.createNullable(null, null, null))
.setSerdeParameters(ImmutableMap.of()));
PrincipalPrivileges principalPrivileges = testingPrincipalPrivilege(tableOwner, session.getUser());
transaction.getMetastore().createTable(session, tableBuilder.build(), principalPrivileges, Optional.empty(), true, EMPTY_TABLE_STATISTICS, emptyList());
transaction.commit();
}
// We retrieve the table whose storageFormat has null serde/inputFormat/outputFormat
// to make sure it can still be retrieved instead of throwing exception.
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Map<SchemaTableName, List<ColumnMetadata>> allColumns = metadata.listTableColumns(newSession(), new SchemaTablePrefix(schemaTableName.getSchemaName(), schemaTableName.getTableName()));
assertTrue(allColumns.containsKey(schemaTableName));
}
finally {
dropTable(schemaTableName);
}
}
private void createDummyTable(SchemaTableName tableName)
{
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
List<ColumnMetadata> columns = ImmutableList.of(ColumnMetadata.builder()
.setName("dummy")
.setType(createUnboundedVarcharType())
.build());
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName, columns, createTableProperties(TEXTFILE));
ConnectorOutputTableHandle handle = metadata.beginCreateTable(session, tableMetadata, Optional.empty());
metadata.finishCreateTable(session, handle, ImmutableList.of(), ImmutableList.of());
transaction.commit();
}
}
protected void createDummyPartitionedTable(SchemaTableName tableName, List<ColumnMetadata> columns)
throws Exception
{
doCreateEmptyTable(tableName, ORC, columns);
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
Table table = metastoreClient.getTable(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
List<String> firstPartitionValues = ImmutableList.of("2016-01-01");
List<String> secondPartitionValues = ImmutableList.of("2016-01-02");
String firstPartitionName = makePartName(ImmutableList.of("ds"), firstPartitionValues);
String secondPartitionName = makePartName(ImmutableList.of("ds"), secondPartitionValues);
List<PartitionWithStatistics> partitions = ImmutableList.of(firstPartitionName, secondPartitionName)
.stream()
.map(partitionName -> new PartitionWithStatistics(createDummyPartition(table, partitionName), partitionName, PartitionStatistics.empty()))
.collect(toImmutableList());
metastoreClient.addPartitions(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), partitions);
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), firstPartitionName, currentStatistics -> EMPTY_TABLE_STATISTICS);
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), secondPartitionName, currentStatistics -> EMPTY_TABLE_STATISTICS);
}
protected void testUpdatePartitionStatistics(
SchemaTableName tableName,
PartitionStatistics initialStatistics,
List<PartitionStatistics> firstPartitionStatistics,
List<PartitionStatistics> secondPartitionStatistics)
{
verify(firstPartitionStatistics.size() == secondPartitionStatistics.size());
String firstPartitionName = "ds=2016-01-01";
String secondPartitionName = "ds=2016-01-02";
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(firstPartitionName, secondPartitionName)))
.isEqualTo(ImmutableMap.of(firstPartitionName, initialStatistics, secondPartitionName, initialStatistics));
AtomicReference<PartitionStatistics> expectedStatisticsPartition1 = new AtomicReference<>(initialStatistics);
AtomicReference<PartitionStatistics> expectedStatisticsPartition2 = new AtomicReference<>(initialStatistics);
for (int i = 0; i < firstPartitionStatistics.size(); i++) {
PartitionStatistics statisticsPartition1 = firstPartitionStatistics.get(i);
PartitionStatistics statisticsPartition2 = secondPartitionStatistics.get(i);
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), firstPartitionName, actualStatistics -> {
assertThat(actualStatistics).isEqualTo(expectedStatisticsPartition1.get());
return statisticsPartition1;
});
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), secondPartitionName, actualStatistics -> {
assertThat(actualStatistics).isEqualTo(expectedStatisticsPartition2.get());
return statisticsPartition2;
});
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(firstPartitionName, secondPartitionName)))
.isEqualTo(ImmutableMap.of(firstPartitionName, statisticsPartition1, secondPartitionName, statisticsPartition2));
expectedStatisticsPartition1.set(statisticsPartition1);
expectedStatisticsPartition2.set(statisticsPartition2);
}
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(firstPartitionName, secondPartitionName)))
.isEqualTo(ImmutableMap.of(firstPartitionName, expectedStatisticsPartition1.get(), secondPartitionName, expectedStatisticsPartition2.get()));
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), firstPartitionName, currentStatistics -> {
assertThat(currentStatistics).isEqualTo(expectedStatisticsPartition1.get());
return initialStatistics;
});
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), secondPartitionName, currentStatistics -> {
assertThat(currentStatistics).isEqualTo(expectedStatisticsPartition2.get());
return initialStatistics;
});
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(firstPartitionName, secondPartitionName)))
.isEqualTo(ImmutableMap.of(firstPartitionName, initialStatistics, secondPartitionName, initialStatistics));
}
@Test
public void testStorePartitionWithStatistics()
throws Exception
{
testStorePartitionWithStatistics(STATISTICS_PARTITIONED_TABLE_COLUMNS, STATISTICS_1, STATISTICS_2, STATISTICS_1_1, EMPTY_TABLE_STATISTICS);
}
protected void testStorePartitionWithStatistics(
List<ColumnMetadata> columns,
PartitionStatistics statsForAllColumns1,
PartitionStatistics statsForAllColumns2,
PartitionStatistics statsForSubsetOfColumns,
PartitionStatistics emptyStatistics)
throws Exception
{
testStorePartitionWithStatistics(columns, statsForAllColumns1, statsForAllColumns2, statsForSubsetOfColumns, emptyStatistics, new Duration(0, SECONDS));
}
protected void testStorePartitionWithStatistics(
List<ColumnMetadata> columns,
PartitionStatistics statsForAllColumns1,
PartitionStatistics statsForAllColumns2,
PartitionStatistics statsForSubsetOfColumns,
PartitionStatistics emptyStatistics,
Duration delayBetweenAlters)
throws Exception
{
SchemaTableName tableName = temporaryTable("store_partition_with_statistics");
try {
doCreateEmptyTable(tableName, ORC, columns);
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
Table table = metastoreClient.getTable(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
List<String> partitionValues = ImmutableList.of("2016-01-01");
String partitionName = makePartName(ImmutableList.of("ds"), partitionValues);
Partition partition = createDummyPartition(table, partitionName);
// create partition with stats for all columns
metastoreClient.addPartitions(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableList.of(new PartitionWithStatistics(partition, partitionName, statsForAllColumns1)));
assertEquals(
metastoreClient.getPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), partitionValues).get().getStorage().getStorageFormat(),
fromHiveStorageFormat(ORC));
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(partitionName)))
.isEqualTo(ImmutableMap.of(partitionName, statsForAllColumns1));
sleep(delayBetweenAlters.toMillis());
// alter the partition into one with other stats
Partition modifiedPartition = Partition.builder(partition)
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(DWRF))
.setLocation(partitionTargetPath(tableName, partitionName)))
.build();
metastoreClient.alterPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), new PartitionWithStatistics(modifiedPartition, partitionName, statsForAllColumns2));
assertEquals(
metastoreClient.getPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), partitionValues).get().getStorage().getStorageFormat(),
fromHiveStorageFormat(DWRF));
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(partitionName)))
.isEqualTo(ImmutableMap.of(partitionName, statsForAllColumns2));
sleep(delayBetweenAlters.toMillis());
// alter the partition into one with stats for only subset of columns
modifiedPartition = Partition.builder(partition)
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(TEXTFILE))
.setLocation(partitionTargetPath(tableName, partitionName)))
.build();
metastoreClient.alterPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), new PartitionWithStatistics(modifiedPartition, partitionName, statsForSubsetOfColumns));
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(partitionName)))
.isEqualTo(ImmutableMap.of(partitionName, statsForSubsetOfColumns));
sleep(delayBetweenAlters.toMillis());
// alter the partition into one without stats
modifiedPartition = Partition.builder(partition)
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(TEXTFILE))
.setLocation(partitionTargetPath(tableName, partitionName)))
.build();
metastoreClient.alterPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), new PartitionWithStatistics(modifiedPartition, partitionName, emptyStatistics));
assertThat(metastoreClient.getPartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(partitionName)))
.isEqualTo(ImmutableMap.of(partitionName, emptyStatistics));
}
finally {
dropTable(tableName);
}
}
protected Partition createDummyPartition(Table table, String partitionName)
{
return createDummyPartition(table, partitionName, Optional.empty());
}
protected Partition createDummyPartition(Table table, String partitionName, Optional<HiveBucketProperty> bucketProperty)
{
byte[] rowIdPartitionComponent = {98, 45};
return Partition.builder()
.setCatalogName(table.getCatalogName())
.setDatabaseName(table.getDatabaseName())
.setTableName(table.getTableName())
.setColumns(table.getDataColumns())
.setValues(toPartitionValues(partitionName))
.setRowIdPartitionComponent(Optional.of(rowIdPartitionComponent))
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(HiveStorageFormat.ORC))
.setLocation(partitionTargetPath(new SchemaTableName(table.getDatabaseName(), table.getTableName()), partitionName))
.setBucketProperty(bucketProperty))
.setParameters(ImmutableMap.of(
PRESTO_VERSION_NAME, "testversion",
PRESTO_QUERY_ID_NAME, "20180101_123456_00001_x1y2z"))
.build();
}
protected String partitionTargetPath(SchemaTableName schemaTableName, String partitionName)
{
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
SemiTransactionalHiveMetastore metastore = transaction.getMetastore();
LocationService locationService = getLocationService();
Table table = metastore.getTable(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats()), schemaTableName.getSchemaName(), schemaTableName.getTableName()).get();
LocationHandle handle = locationService.forExistingTable(metastore, session, table, false);
return locationService.getPartitionWriteInfo(handle, Optional.empty(), partitionName).getTargetPath().toString();
}
}
@Test
public void testAddColumn()
throws Exception
{
SchemaTableName tableName = temporaryTable("test_add_column");
try {
doCreateEmptyTable(tableName, ORC, CREATE_TABLE_COLUMNS);
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
metastoreClient.addColumn(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), "new_col", HIVE_LONG, null);
Optional<Table> table = metastoreClient.getTable(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName());
assertTrue(table.isPresent());
List<Column> columns = table.get().getDataColumns();
assertEquals(columns.get(columns.size() - 1).getName(), "new_col");
}
finally {
dropTable(tableName);
}
}
@Test
public void testDropColumn()
throws Exception
{
SchemaTableName tableName = temporaryTable("test_drop_column");
try {
doCreateEmptyTable(tableName, ORC, CREATE_TABLE_COLUMNS);
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
metastoreClient.dropColumn(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), CREATE_TABLE_COLUMNS.get(0).getName());
Optional<Table> table = metastoreClient.getTable(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName());
assertTrue(table.isPresent());
List<Column> columns = table.get().getDataColumns();
assertEquals(columns.get(0).getName(), CREATE_TABLE_COLUMNS.get(1).getName());
assertFalse(columns.stream().map(Column::getName).anyMatch(colName -> colName.equals(CREATE_TABLE_COLUMNS.get(0).getName())));
}
finally {
dropTable(tableName);
}
}
/**
* This test creates 2 identical partitions and verifies that the statistics projected based on
* a single partition sample are equal to the statistics computed in a fair way
*/
@Test
public void testPartitionStatisticsSampling()
throws Exception
{
testPartitionStatisticsSampling(STATISTICS_PARTITIONED_TABLE_COLUMNS, STATISTICS_1);
}
protected void testPartitionStatisticsSampling(List<ColumnMetadata> columns, PartitionStatistics statistics)
throws Exception
{
SchemaTableName tableName = temporaryTable("test_partition_statistics_sampling");
try {
createDummyPartitionedTable(tableName, columns);
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), "ds=2016-01-01", actualStatistics -> statistics);
metastoreClient.updatePartitionStatistics(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), "ds=2016-01-02", actualStatistics -> statistics);
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = metadata.getTableHandle(session, tableName);
List<ColumnHandle> allColumnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
TableStatistics unsampledStatistics = metadata.getTableStatistics(sampleSize(2), tableHandle, Optional.empty(), allColumnHandles, Constraint.alwaysTrue());
TableStatistics sampledStatistics = metadata.getTableStatistics(sampleSize(1), tableHandle, Optional.empty(), allColumnHandles, Constraint.alwaysTrue());
assertEquals(sampledStatistics, unsampledStatistics);
}
}
finally {
dropTable(tableName);
}
}
private ConnectorSession sampleSize(int sampleSize)
{
return newSession(getHiveClientConfig().setPartitionStatisticsSampleSize(sampleSize), getHiveCommonClientConfig());
}
private void verifyViewCreation(SchemaTableName temporaryCreateView)
{
// replace works for new view
doCreateView(temporaryCreateView, true);
// replace works for existing view
doCreateView(temporaryCreateView, true);
// create fails for existing view
try {
doCreateView(temporaryCreateView, false);
fail("create existing should fail");
}
catch (ViewAlreadyExistsException e) {
assertEquals(e.getViewName(), temporaryCreateView);
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
// drop works when view exists
metadata.dropView(newSession(), temporaryCreateView);
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
assertEquals(metadata.getViews(newSession(), temporaryCreateView.toSchemaTablePrefix()).size(), 0);
assertFalse(metadata.listViews(newSession(), temporaryCreateView.getSchemaName()).contains(temporaryCreateView));
}
// drop fails when view does not exist
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
metadata.dropView(newSession(), temporaryCreateView);
fail("drop non-existing should fail");
}
catch (ViewNotFoundException e) {
assertEquals(e.getViewName(), temporaryCreateView);
}
// create works for new view
doCreateView(temporaryCreateView, false);
}
private void doCreateView(SchemaTableName viewName, boolean replace)
{
String viewData = "test data";
try (Transaction transaction = newTransaction()) {
ConnectorTableMetadata viewMetadata1 = new ConnectorTableMetadata(
viewName,
ImmutableList.of(ColumnMetadata.builder().setName("a").setType(BIGINT).build()));
transaction.getMetadata().createView(newSession(), viewMetadata1, viewData, replace);
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
Map<SchemaTableName, ConnectorViewDefinition> views = metadata.getViews(newSession(), viewName.toSchemaTablePrefix());
assertEquals(views.size(), 1);
assertEquals(views.get(viewName).getViewData(), viewData);
assertTrue(metadata.listViews(newSession(), viewName.getSchemaName()).contains(viewName));
}
}
protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storageFormat, PageSinkContext pageSinkContext)
throws Exception
{
String queryId;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
queryId = session.getQueryId();
// begin creating the table
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName, CREATE_TABLE_COLUMNS, createTableProperties(storageFormat));
HiveOutputTableHandle outputHandle = (HiveOutputTableHandle) metadata.beginCreateTable(session, tableMetadata, Optional.empty());
// write the data
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, outputHandle, pageSinkContext);
sink.appendPage(CREATE_TABLE_DATA.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkContext.isCommitRequired()) {
assertValidPageSinkCommitFragments(fragments);
metadata.commitPageSinkAsync(session, outputHandle, fragments).get();
}
// verify all new files start with the unique prefix
HdfsContext context = new HdfsContext(
session,
tableName.getSchemaName(),
tableName.getTableName(),
outputHandle.getLocationHandle().getTargetPath().toString(),
true);
for (String filePath : listAllDataFiles(context, getStagingPathRoot(outputHandle))) {
assertTrue(new Path(filePath).getName().startsWith(session.getQueryId()));
}
// commit the table
metadata.finishCreateTable(session, outputHandle, fragments, ImmutableList.of());
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// load the new table
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
// verify the metadata
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName));
assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), CREATE_TABLE_COLUMNS);
// verify the data
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(result.getMaterializedRows(), CREATE_TABLE_DATA.getMaterializedRows());
// verify the node version and query ID in table
Table table = getMetastoreClient().getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()).get();
assertEquals(table.getParameters().get(PRESTO_VERSION_NAME), TEST_SERVER_VERSION);
assertEquals(table.getParameters().get(PRESTO_QUERY_ID_NAME), queryId);
// verify basic statistics
HiveBasicStatistics statistics = getBasicStatisticsForTable(metastoreContext, transaction, tableName);
assertEquals(statistics.getRowCount().getAsLong(), CREATE_TABLE_DATA.getRowCount());
assertEquals(statistics.getFileCount().getAsLong(), 1L);
assertGreaterThan(statistics.getInMemoryDataSizeInBytes().getAsLong(), 0L);
assertGreaterThan(statistics.getOnDiskDataSizeInBytes().getAsLong(), 0L);
}
}
protected void doCreateEmptyTable(SchemaTableName tableName, HiveStorageFormat storageFormat, List<ColumnMetadata> createTableColumns)
throws Exception
{
List<String> partitionedBy = createTableColumns.stream()
.map(ColumnMetadata::getName)
.filter(PARTITION_COLUMN_FILTER)
.collect(toList());
doCreateEmptyTable(tableName, storageFormat, createTableColumns, partitionedBy);
}
protected void doCreateEmptyTable(SchemaTableName tableName, HiveStorageFormat storageFormat, List<ColumnMetadata> createTableColumns, List<String> partitionedBy)
throws Exception
{
String queryId;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
queryId = session.getQueryId();
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName, createTableColumns, createTableProperties(storageFormat, partitionedBy));
metadata.createTable(session, tableMetadata, false);
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// load the new table
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
// verify the metadata
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName));
List<ColumnMetadata> expectedColumns = createTableColumns.stream()
.map(column -> ColumnMetadata.builder()
.setName(column.getName())
.setType(column.getType())
.setComment(column.getComment().orElse(null))
.setExtraInfo(columnExtraInfo(partitionedBy.contains(column.getName())))
.setHidden(false)
.build())
.collect(toList());
assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), expectedColumns);
// verify table format
Table table = transaction.getMetastore().getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName()).get();
assertEquals(table.getStorage().getStorageFormat().getInputFormat(), storageFormat.getInputFormat());
// verify the node version and query ID
assertEquals(table.getParameters().get(PRESTO_VERSION_NAME), TEST_SERVER_VERSION);
assertEquals(table.getParameters().get(PRESTO_QUERY_ID_NAME), queryId);
// verify the table is empty
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEquals(result.getRowCount(), 0);
// verify basic statistics
if (partitionedBy.isEmpty()) {
assertEmptyTableStatistics(metastoreContext, tableName, transaction);
}
}
}
protected void assertEmptyTableStatistics(MetastoreContext metastoreContext, SchemaTableName tableName, Transaction transaction)
{
HiveBasicStatistics statistics = getBasicStatisticsForTable(metastoreContext, transaction, tableName);
assertEquals(statistics.getRowCount().getAsLong(), 0L);
assertEquals(statistics.getFileCount().getAsLong(), 0L);
assertEquals(statistics.getInMemoryDataSizeInBytes().getAsLong(), 0L);
assertEquals(statistics.getOnDiskDataSizeInBytes().getAsLong(), 0L);
}
private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName, PageSinkContext pageSinkContext)
throws Exception
{
// creating the table
doCreateEmptyTable(tableName, storageFormat, CREATE_TABLE_COLUMNS);
MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder(SESSION, CREATE_TABLE_DATA.getTypes());
for (int i = 0; i < 3; i++) {
insertData(tableName, CREATE_TABLE_DATA);
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// load the new table
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
// verify the metadata
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, getTableHandle(metadata, tableName));
assertEquals(filterNonHiddenColumnMetadata(tableMetadata.getColumns()), CREATE_TABLE_COLUMNS);
// verify the data
resultBuilder.rows(CREATE_TABLE_DATA.getMaterializedRows());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(result.getMaterializedRows(), resultBuilder.build().getMaterializedRows());
// statistics
HiveBasicStatistics tableStatistics = getBasicStatisticsForTable(metastoreContext, transaction, tableName);
assertEquals(tableStatistics.getRowCount().getAsLong(), CREATE_TABLE_DATA.getRowCount() * (i + 1));
assertEquals(tableStatistics.getFileCount().getAsLong(), i + 1L);
assertGreaterThan(tableStatistics.getInMemoryDataSizeInBytes().getAsLong(), 0L);
assertGreaterThan(tableStatistics.getOnDiskDataSizeInBytes().getAsLong(), 0L);
}
}
// test rollback
Set<String> existingFiles;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
existingFiles = listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName());
assertFalse(existingFiles.isEmpty());
}
Path stagingPathRoot;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
MetastoreContext metastoreContext = new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getClientTags(),
session.getSource(),
getMetastoreHeaders(session),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER,
session.getWarningCollector(),
session.getRuntimeStats());
// "stage" insert data
HiveInsertTableHandle insertTableHandle = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, pageSinkContext);
sink.appendPage(CREATE_TABLE_DATA.toPage());
sink.appendPage(CREATE_TABLE_DATA.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkContext.isCommitRequired()) {
assertValidPageSinkCommitFragments(fragments);
metadata.commitPageSinkAsync(session, insertTableHandle, fragments).get();
}
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
// statistics, visible from within transaction
HiveBasicStatistics tableStatistics = getBasicStatisticsForTable(metastoreContext, transaction, tableName);
assertEquals(tableStatistics.getRowCount().getAsLong(), CREATE_TABLE_DATA.getRowCount() * 5L);
try (Transaction otherTransaction = newTransaction()) {
// statistics, not visible from outside transaction
HiveBasicStatistics otherTableStatistics = getBasicStatisticsForTable(metastoreContext, otherTransaction, tableName);
assertEquals(otherTableStatistics.getRowCount().getAsLong(), CREATE_TABLE_DATA.getRowCount() * 3L);
}
// verify we did not modify the table target directory
HdfsContext context = new HdfsContext(
session,
tableName.getSchemaName(),
tableName.getTableName(),
insertTableHandle.getLocationHandle().getTargetPath().toString(),
false);
Path targetPathRoot = getTargetPathRoot(insertTableHandle);
assertEquals(listAllDataFiles(context, targetPathRoot), existingFiles);
// verify all temp files start with the unique prefix
stagingPathRoot = getStagingPathRoot(insertTableHandle);
Set<String> tempFiles = listAllDataFiles(context, stagingPathRoot);
assertFalse(tempFiles.isEmpty());
for (String filePath : tempFiles) {
assertTrue(new Path(filePath).getName().startsWith(session.getQueryId()));
}
// rollback insert
transaction.rollback();
}
// verify temp directory is empty
HdfsContext context = new HdfsContext(newSession(), tableName.getSchemaName(), tableName.getTableName(), "temp_path", false);
assertTrue(listAllDataFiles(context, stagingPathRoot).isEmpty());
// verify the data is unchanged
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
MetastoreContext metastoreContext = new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getClientTags(),
session.getSource(),
getMetastoreHeaders(session),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER,
session.getWarningCollector(),
session.getRuntimeStats());
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(result.getMaterializedRows(), resultBuilder.build().getMaterializedRows());
// verify we did not modify the table directory
assertEquals(listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName()), existingFiles);
}
// verify statistics unchanged
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(
session.getIdentity(),
session.getQueryId(),
session.getClientInfo(),
session.getClientTags(),
session.getSource(),
getMetastoreHeaders(session),
false,
DEFAULT_COLUMN_CONVERTER_PROVIDER,
session.getWarningCollector(),
session.getRuntimeStats());
HiveBasicStatistics statistics = getBasicStatisticsForTable(metastoreContext, transaction, tableName);
assertEquals(statistics.getRowCount().getAsLong(), CREATE_TABLE_DATA.getRowCount() * 3L);
assertEquals(statistics.getFileCount().getAsLong(), 3L);
}
}
// These are protected so extensions to the hive connector can replace the handle classes
protected Path getStagingPathRoot(ConnectorInsertTableHandle insertTableHandle)
{
HiveInsertTableHandle handle = (HiveInsertTableHandle) insertTableHandle;
WriteInfo writeInfo = getLocationService().getQueryWriteInfo(handle.getLocationHandle());
if (writeInfo.getWriteMode() != STAGE_AND_MOVE_TO_TARGET_DIRECTORY) {
throw new AssertionError("writeMode is not STAGE_AND_MOVE_TO_TARGET_DIRECTORY");
}
return writeInfo.getWritePath();
}
protected Path getStagingPathRoot(ConnectorOutputTableHandle outputTableHandle)
{
HiveOutputTableHandle handle = (HiveOutputTableHandle) outputTableHandle;
return getLocationService()
.getQueryWriteInfo(handle.getLocationHandle())
.getWritePath();
}
protected Path getTargetPathRoot(ConnectorInsertTableHandle insertTableHandle)
{
HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) insertTableHandle;
return getLocationService()
.getQueryWriteInfo(hiveInsertTableHandle.getLocationHandle())
.getTargetPath();
}
protected Optional<Path> getTempFilePathRoot(ConnectorOutputTableHandle outputTableHandle)
{
HiveOutputTableHandle handle = (HiveOutputTableHandle) outputTableHandle;
return getLocationService()
.getQueryWriteInfo(handle.getLocationHandle())
.getTempPath();
}
protected Set<String> listAllDataFiles(MetastoreContext metastoreContext, Transaction transaction, String schemaName, String tableName)
throws IOException
{
HdfsContext context = new HdfsContext(newSession(), schemaName, tableName, "test_path", false);
Set<String> existingFiles = new HashSet<>();
for (String location : listAllDataPaths(metastoreContext, transaction.getMetastore(), schemaName, tableName)) {
existingFiles.addAll(listAllDataFiles(context, new Path(location)));
}
return existingFiles;
}
public static List<String> listAllDataPaths(MetastoreContext metastoreContext, SemiTransactionalHiveMetastore metastore, String schemaName, String tableName)
{
ImmutableList.Builder<String> locations = ImmutableList.builder();
Table table = metastore.getTable(metastoreContext, schemaName, tableName).get();
if (table.getStorage().getLocation() != null) {
// For partitioned table, there should be nothing directly under this directory.
// But including this location in the set makes the directory content assert more
// extensive, which is desirable.
locations.add(table.getStorage().getLocation());
}
Optional<List<PartitionNameWithVersion>> partitionNames = metastore.getPartitionNames(metastoreContext, schemaName, tableName);
if (partitionNames.isPresent()) {
metastore.getPartitionsByNames(metastoreContext, schemaName, tableName, partitionNames.get()).values().stream()
.map(Optional::get)
.map(partition -> partition.getStorage().getLocation())
.filter(location -> !location.startsWith(table.getStorage().getLocation()))
.forEach(locations::add);
}
return locations.build();
}
protected Set<String> listAllDataFiles(HdfsContext context, Path path)
throws IOException
{
Set<String> result = new HashSet<>();
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, path);
if (fileSystem.exists(path)) {
for (FileStatus fileStatus : fileSystem.listStatus(path)) {
if (fileStatus.getPath().getName().startsWith(".presto")) {
// skip hidden files
}
else if (fileStatus.isFile()) {
result.add(fileStatus.getPath().toString());
}
else if (fileStatus.isDirectory()) {
result.addAll(listAllDataFiles(context, fileStatus.getPath()));
}
}
}
return result;
}
private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTableName tableName, PageSinkContext pageSinkContext)
throws Exception
{
// creating the table
doCreateEmptyTable(tableName, storageFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
// insert the data
String queryId = insertData(tableName, CREATE_TABLE_PARTITIONED_DATA);
Set<String> existingFiles;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// verify partitions were created
List<PartitionNameWithVersion> partitionNamesWithVersion = transaction.getMetastore().getPartitionNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
assertEqualsIgnoreOrder(getPartitionNames(partitionNamesWithVersion), CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows().stream()
.map(row -> "ds=" + row.getField(CREATE_TABLE_PARTITIONED_DATA.getTypes().size() - 1))
.collect(toList()));
// verify the node versions in partitions
Map<String, Optional<Partition>> partitions = getMetastoreClient().getPartitionsByNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), partitionNamesWithVersion);
assertEquals(partitions.size(), partitionNamesWithVersion.size());
for (PartitionNameWithVersion partitionNameWithVersion : partitionNamesWithVersion) {
Partition partition = partitions.get(partitionNameWithVersion.getPartitionName()).get();
assertEquals(partition.getParameters().get(PRESTO_VERSION_NAME), TEST_SERVER_VERSION);
assertEquals(partition.getParameters().get(PRESTO_QUERY_ID_NAME), queryId);
}
// load the new table
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
// verify the data
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(result.getMaterializedRows(), CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows());
// test rollback
existingFiles = listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName());
assertFalse(existingFiles.isEmpty());
// test statistics
for (PartitionNameWithVersion partitionNameWithVersion : partitionNamesWithVersion) {
HiveBasicStatistics partitionStatistics = getBasicStatisticsForPartition(metastoreContext, transaction, tableName, partitionNameWithVersion.getPartitionName());
assertEquals(partitionStatistics.getRowCount().getAsLong(), 1L);
assertEquals(partitionStatistics.getFileCount().getAsLong(), 1L);
assertGreaterThan(partitionStatistics.getInMemoryDataSizeInBytes().getAsLong(), 0L);
assertGreaterThan(partitionStatistics.getOnDiskDataSizeInBytes().getAsLong(), 0L);
}
}
Path stagingPathRoot;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
// "stage" insert data
HiveInsertTableHandle insertTableHandle = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
stagingPathRoot = getStagingPathRoot(insertTableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, pageSinkContext);
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA_2ND.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkContext.isCommitRequired()) {
assertValidPageSinkCommitFragments(fragments);
metadata.commitPageSinkAsync(session, insertTableHandle, fragments).get();
}
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
// verify all temp files start with the unique prefix
HdfsContext context = new HdfsContext(
session,
tableName.getSchemaName(),
tableName.getTableName(),
insertTableHandle.getLocationHandle().getTargetPath().toString(),
false);
Set<String> tempFiles = listAllDataFiles(context, getStagingPathRoot(insertTableHandle));
assertFalse(tempFiles.isEmpty());
for (String filePath : tempFiles) {
assertTrue(new Path(filePath).getName().startsWith(session.getQueryId()));
}
// rollback insert
transaction.rollback();
}
// verify the data is unchanged
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, newSession(), TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(result.getMaterializedRows(), CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows());
// verify we did not modify the table directory
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
assertEquals(listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName()), existingFiles);
// verify temp directory is empty
HdfsContext context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), stagingPathRoot.toString(), false);
assertTrue(listAllDataFiles(context, stagingPathRoot).isEmpty());
}
}
private void doInsertUnsupportedWriteType(HiveStorageFormat storageFormat, SchemaTableName tableName)
throws Exception
{
List<Column> columns = ImmutableList.of(new Column("dummy", HiveType.valueOf("uniontype<smallint,tinyint>"), Optional.empty(), Optional.empty()));
List<Column> partitionColumns = ImmutableList.of(new Column("name", HIVE_STRING, Optional.empty(), Optional.empty()));
createEmptyTable(tableName, storageFormat, columns, partitionColumns);
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
metadata.beginInsert(session, tableHandle);
fail("expected failure");
}
catch (PrestoException e) {
assertThat(e).hasMessageMatching("Inserting into Hive table .* with column type uniontype<smallint,tinyint> not supported");
}
}
private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, SchemaTableName tableName, PageSinkContext pageSinkContext)
throws Exception
{
// creating the table
doCreateEmptyTable(tableName, storageFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder(SESSION, CREATE_TABLE_PARTITIONED_DATA.getTypes());
for (int i = 0; i < 3; i++) {
// insert the data
insertData(tableName, CREATE_TABLE_PARTITIONED_DATA);
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// verify partitions were created
List<PartitionNameWithVersion> partitionsNameWithVersion = transaction.getMetastore().getPartitionNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
assertEqualsIgnoreOrder(getPartitionNames(partitionsNameWithVersion), CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows().stream()
.map(row -> "ds=" + row.getField(CREATE_TABLE_PARTITIONED_DATA.getTypes().size() - 1))
.collect(toList()));
// load the new table
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
// verify the data
resultBuilder.rows(CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(result.getMaterializedRows(), resultBuilder.build().getMaterializedRows());
// test statistics
for (PartitionNameWithVersion partitionNameWithVersion : partitionsNameWithVersion) {
HiveBasicStatistics statistics = getBasicStatisticsForPartition(metastoreContext, transaction, tableName, partitionNameWithVersion.getPartitionName());
assertEquals(statistics.getRowCount().getAsLong(), i + 1L);
assertEquals(statistics.getFileCount().getAsLong(), i + 1L);
assertGreaterThan(statistics.getInMemoryDataSizeInBytes().getAsLong(), 0L);
assertGreaterThan(statistics.getOnDiskDataSizeInBytes().getAsLong(), 0L);
}
}
}
// test rollback
Set<String> existingFiles;
Path stagingPathRoot;
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
existingFiles = listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName());
assertFalse(existingFiles.isEmpty());
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
// "stage" insert data
HiveInsertTableHandle insertTableHandle = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
stagingPathRoot = getStagingPathRoot(insertTableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, pageSinkContext);
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage());
sink.appendPage(CREATE_TABLE_PARTITIONED_DATA.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkContext.isCommitRequired()) {
assertValidPageSinkCommitFragments(fragments);
metadata.commitPageSinkAsync(session, insertTableHandle, fragments).get();
}
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
// verify all temp files start with the unique prefix
HdfsContext context = new HdfsContext(
session,
tableName.getSchemaName(),
tableName.getTableName(),
insertTableHandle.getLocationHandle().getTargetPath().toString(),
false);
Set<String> tempFiles = listAllDataFiles(context, getStagingPathRoot(insertTableHandle));
assertFalse(tempFiles.isEmpty());
for (String filePath : tempFiles) {
assertTrue(new Path(filePath).getName().startsWith(session.getQueryId()));
}
// verify statistics are visible from within of the current transaction
List<PartitionNameWithVersion> partitionNamesWithVersion = transaction.getMetastore().getPartitionNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
for (PartitionNameWithVersion partitionNameWithVersion : partitionNamesWithVersion) {
HiveBasicStatistics partitionStatistics = getBasicStatisticsForPartition(metastoreContext, transaction, tableName, partitionNameWithVersion.getPartitionName());
assertEquals(partitionStatistics.getRowCount().getAsLong(), 5L);
}
// rollback insert
transaction.rollback();
}
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// verify the data is unchanged
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, newSession(), TupleDomain.all(), OptionalInt.empty(), Optional.empty());
assertEqualsIgnoreOrder(result.getMaterializedRows(), resultBuilder.build().getMaterializedRows());
// verify we did not modify the table directory
assertEquals(listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName()), existingFiles);
// verify temp directory is empty
HdfsContext context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), stagingPathRoot.toString(), false);
assertTrue(listAllDataFiles(context, stagingPathRoot).isEmpty());
// verify statistics have been rolled back
List<PartitionNameWithVersion> partitionNamesWithVersion = transaction.getMetastore().getPartitionNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
for (PartitionNameWithVersion partitionNameWithVersion : partitionNamesWithVersion) {
HiveBasicStatistics partitionStatistics = getBasicStatisticsForPartition(metastoreContext, transaction, tableName, partitionNameWithVersion.getPartitionName());
assertEquals(partitionStatistics.getRowCount().getAsLong(), 3L);
}
}
}
private static void assertValidPageSinkCommitFragments(Collection<Slice> fragments)
{
fragments.stream()
.map(Slice::getBytes)
.map(HiveTestUtils.PARTITION_UPDATE_CODEC::fromJson)
.map(PartitionUpdate::getFileWriteInfos)
.flatMap(List::stream)
.forEach(fileWriteInfo -> assertNotEquals(fileWriteInfo.getWriteFileName(), fileWriteInfo.getTargetFileName()));
}
private void doInsertIntoExistingPartitionEmptyStatistics(HiveStorageFormat storageFormat, SchemaTableName tableName)
throws Exception
{
doCreateEmptyTable(tableName, storageFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
insertData(tableName, CREATE_TABLE_PARTITIONED_DATA);
eraseStatistics(tableName);
insertData(tableName, CREATE_TABLE_PARTITIONED_DATA);
ConnectorSession session = newSession();
try (Transaction transaction = newTransaction()) {
List<PartitionNameWithVersion> partitionNamesWithVersion = transaction.getMetastore().getPartitionNames(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats()), tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
for (PartitionNameWithVersion partitionNameWithVersion : partitionNamesWithVersion) {
HiveBasicStatistics statistics = getBasicStatisticsForPartition(new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats()), transaction, tableName, partitionNameWithVersion.getPartitionName());
assertThat(statistics.getRowCount()).isNotPresent();
assertThat(statistics.getInMemoryDataSizeInBytes()).isNotPresent();
// fileCount and rawSize statistics are computed on the fly by the metastore, thus cannot be erased
}
}
}
private static HiveBasicStatistics getBasicStatisticsForTable(MetastoreContext metastoreContext, Transaction transaction, SchemaTableName table)
{
return transaction
.getMetastore()
.getTableStatistics(metastoreContext, table.getSchemaName(), table.getTableName())
.getBasicStatistics();
}
private static HiveBasicStatistics getBasicStatisticsForPartition(MetastoreContext metastoreContext, Transaction transaction, SchemaTableName table, String partitionName)
{
return transaction
.getMetastore()
.getPartitionStatistics(metastoreContext, table.getSchemaName(), table.getTableName(), ImmutableSet.of(partitionName))
.get(partitionName)
.getBasicStatistics();
}
private void eraseStatistics(SchemaTableName schemaTableName)
{
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
metastoreClient.updateTableStatistics(METASTORE_CONTEXT, schemaTableName.getSchemaName(), schemaTableName.getTableName(), statistics -> new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()));
Table table = metastoreClient.getTable(METASTORE_CONTEXT, schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
List<String> partitionColumns = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(toImmutableList());
if (!table.getPartitionColumns().isEmpty()) {
List<PartitionNameWithVersion> partitionNames = metastoreClient.getPartitionNames(METASTORE_CONTEXT, schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElse(ImmutableList.of());
List<Partition> partitions = metastoreClient
.getPartitionsByNames(METASTORE_CONTEXT, schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionNames)
.entrySet()
.stream()
.map(Map.Entry::getValue)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());
for (Partition partition : partitions) {
metastoreClient.updatePartitionStatistics(
METASTORE_CONTEXT,
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
makePartName(partitionColumns, partition.getValues()),
statistics -> new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()));
}
}
}
/**
* @return query id
*/
protected String insertData(SchemaTableName tableName, MaterializedResult data)
throws Exception
{
return insertData(tableName, data, newSession());
}
protected String insertData(SchemaTableName tableName, MaterializedResult data, ConnectorSession session)
throws Exception
{
Path writePath;
Path targetPath;
String queryId;
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
HiveInsertTableHandle insertTableHandle = (HiveInsertTableHandle) metadata.beginInsert(session, tableHandle);
queryId = session.getQueryId();
writePath = getStagingPathRoot(insertTableHandle);
targetPath = getTargetPathRoot(insertTableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, TEST_HIVE_PAGE_SINK_CONTEXT);
// write data
sink.appendPage(data.toPage());
Collection<Slice> fragments = getFutureValue(sink.finish());
// commit the insert
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
transaction.commit();
}
// check that temporary files are removed
if (!writePath.equals(targetPath)) {
HdfsContext context = new HdfsContext(
newSession(),
tableName.getSchemaName(),
tableName.getTableName(),
targetPath.toString(),
false);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, writePath);
assertFalse(fileSystem.exists(writePath));
}
return queryId;
}
private void doTestMetadataDelete(HiveStorageFormat storageFormat, SchemaTableName tableName)
throws Exception
{
// creating the table
doCreateEmptyTable(tableName, storageFormat, CREATE_TABLE_COLUMNS_PARTITIONED);
insertData(tableName, CREATE_TABLE_PARTITIONED_DATA);
MaterializedResult.Builder expectedResultBuilder = MaterializedResult.resultBuilder(SESSION, CREATE_TABLE_PARTITIONED_DATA.getTypes());
expectedResultBuilder.rows(CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows());
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// verify partitions were created
List<PartitionNameWithVersion> partitionNamesWithVersion = transaction.getMetastore().getPartitionNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
assertEqualsIgnoreOrder(getPartitionNames(partitionNamesWithVersion), CREATE_TABLE_PARTITIONED_DATA.getMaterializedRows().stream()
.map(row -> "ds=" + row.getField(CREATE_TABLE_PARTITIONED_DATA.getTypes().size() - 1))
.collect(toList()));
// verify table directory is not empty
Set<String> filesAfterInsert = listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName());
assertFalse(filesAfterInsert.isEmpty());
// verify the data
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(result.getMaterializedRows(), expectedResultBuilder.build().getMaterializedRows());
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
// get ds column handle
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
HiveColumnHandle dsColumnHandle = (HiveColumnHandle) metadata.getColumnHandles(session, tableHandle).get("ds");
// delete ds=2015-07-03
session = newSession();
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.fromFixedValues(ImmutableMap.of(dsColumnHandle, NullableValue.of(createUnboundedVarcharType(), utf8Slice("2015-07-03"))));
Constraint<ColumnHandle> constraint = new Constraint<>(tupleDomain, convertToPredicate(tupleDomain));
ConnectorTableLayoutHandle tableLayoutHandle = getTableLayout(session, metadata, tableHandle, constraint, transaction).getHandle();
metadata.metadataDelete(session, tableHandle, tableLayoutHandle);
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
HiveColumnHandle dsColumnHandle = (HiveColumnHandle) metadata.getColumnHandles(session, tableHandle).get("ds");
int dsColumnOrdinalPosition = columnHandles.indexOf(dsColumnHandle);
// verify the data
session = newSession();
ImmutableList<MaterializedRow> expectedRows = expectedResultBuilder.build().getMaterializedRows().stream()
.filter(row -> !"2015-07-03".equals(row.getField(dsColumnOrdinalPosition)))
.collect(toImmutableList());
MaterializedResult actualAfterDelete = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(actualAfterDelete.getMaterializedRows(), expectedRows);
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
HiveColumnHandle dsColumnHandle = (HiveColumnHandle) metadata.getColumnHandles(session, tableHandle).get("ds");
// delete ds=2015-07-01 and 2015-07-02
session = newSession();
TupleDomain<ColumnHandle> tupleDomain2 = withColumnDomains(
ImmutableMap.of(dsColumnHandle, Domain.create(ValueSet.ofRanges(Range.range(createUnboundedVarcharType(), utf8Slice("2015-07-01"), true, utf8Slice("2015-07-02"), true)), false)));
Constraint<ColumnHandle> constraint2 = new Constraint<>(tupleDomain2, convertToPredicate(tupleDomain2));
ConnectorTableLayoutHandle tableLayoutHandle2 = getTableLayout(session, metadata, tableHandle, constraint2, transaction).getHandle();
metadata.metadataDelete(session, tableHandle, tableLayoutHandle2);
transaction.commit();
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
// verify the data
session = newSession();
MaterializedResult actualAfterDelete2 = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(actualAfterDelete2.getMaterializedRows(), ImmutableList.of());
// verify table directory is empty
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
Set<String> filesAfterDelete = listAllDataFiles(metastoreContext, transaction, tableName.getSchemaName(), tableName.getTableName());
assertTrue(filesAfterDelete.isEmpty());
}
}
protected void assertGetRecordsOptional(String tableName, HiveStorageFormat hiveStorageFormat)
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
if (metadata.getTableHandle(newSession(), new SchemaTableName(database, tableName)) != null) {
assertGetRecords(tableName, hiveStorageFormat);
}
}
}
protected void assertGetRecords(String tableName, HiveStorageFormat hiveStorageFormat)
throws Exception
{
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, new SchemaTableName(database, tableName));
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle);
ConnectorTableLayoutHandle layoutHandle = getLayout(session, transaction, tableHandle, TupleDomain.all());
List<ConnectorSplit> splits = getAllSplits(session, transaction, layoutHandle);
assertEquals(splits.size(), 1);
HiveSplit hiveSplit = (HiveSplit) getOnlyElement(splits);
List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());
ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats());
assertGetRecords(hiveStorageFormat, tableMetadata, hiveSplit, pageSource, columnHandles);
}
}
protected void assertGetRecords(
HiveStorageFormat hiveStorageFormat,
ConnectorTableMetadata tableMetadata,
HiveSplit hiveSplit,
ConnectorPageSource pageSource,
List<? extends ColumnHandle> columnHandles)
throws IOException
{
// Some page sources may need to read additional bytes (eg: the metadata footer) in addition to the split data
long initialPageSourceCompletedBytes = pageSource.getCompletedBytes();
try {
MaterializedResult result = materializeSourceDataStream(newSession(), pageSource, getTypes(columnHandles));
assertPageSourceType(pageSource, hiveStorageFormat);
ImmutableMap<String, Integer> columnIndex = indexColumns(tableMetadata);
long rowNumber = 0;
long completedBytes = 0;
for (MaterializedRow row : result) {
try {
assertValueTypes(row, tableMetadata.getColumns());
}
catch (RuntimeException e) {
throw new RuntimeException("row " + rowNumber, e);
}
rowNumber++;
Integer index;
Object value;
// STRING
index = columnIndex.get("t_string");
value = row.getField(index);
if (rowNumber % 19 == 0) {
assertNull(value);
}
else if (rowNumber % 19 == 1) {
assertEquals(value, "");
}
else {
assertEquals(value, "test");
}
// NUMBERS
assertEquals(row.getField(columnIndex.get("t_tinyint")), (byte) (1 + rowNumber));
assertEquals(row.getField(columnIndex.get("t_smallint")), (short) (2 + rowNumber));
assertEquals(row.getField(columnIndex.get("t_int")), (int) (3 + rowNumber));
index = columnIndex.get("t_bigint");
if ((rowNumber % 13) == 0) {
assertNull(row.getField(index));
}
else {
assertEquals(row.getField(index), 4 + rowNumber);
}
assertEquals((Float) row.getField(columnIndex.get("t_float")), 5.1f + rowNumber, 0.001);
assertEquals(row.getField(columnIndex.get("t_double")), 6.2 + rowNumber);
// BOOLEAN
index = columnIndex.get("t_boolean");
if ((rowNumber % 3) == 2) {
assertNull(row.getField(index));
}
else {
assertEquals(row.getField(index), (rowNumber % 3) != 0);
}
// TIMESTAMP
index = columnIndex.get("t_timestamp");
if (index != null) {
if ((rowNumber % 17) == 0) {
assertNull(row.getField(index));
}
else {
SqlTimestamp expected = sqlTimestampOf(2011, 5, 6, 7, 8, 9, 123, timeZone, UTC_KEY, SESSION);
assertEquals(row.getField(index), expected);
}
}
// BINARY
index = columnIndex.get("t_binary");
if (index != null) {
if ((rowNumber % 23) == 0) {
assertNull(row.getField(index));
}
else {
assertEquals(row.getField(index), new SqlVarbinary("test binary".getBytes(UTF_8)));
}
}
// DATE
index = columnIndex.get("t_date");
if (index != null) {
if ((rowNumber % 37) == 0) {
assertNull(row.getField(index));
}
else {
SqlDate expected = new SqlDate(toIntExact(MILLISECONDS.toDays(new DateTime(2013, 8, 9, 0, 0, 0, UTC).getMillis())));
assertEquals(row.getField(index), expected);
}
}
// VARCHAR(50)
index = columnIndex.get("t_varchar");
if (index != null) {
value = row.getField(index);
if (rowNumber % 39 == 0) {
assertNull(value);
}
else if (rowNumber % 39 == 1) {
// https://issues.apache.org/jira/browse/HIVE-13289
// RCBINARY reads empty VARCHAR as null
if (hiveStorageFormat == RCBINARY) {
assertNull(value);
}
else {
assertEquals(value, "");
}
}
else {
assertEquals(value, "test varchar");
}
}
//CHAR(25)
index = columnIndex.get("t_char");
if (index != null) {
value = row.getField(index);
if ((rowNumber % 41) == 0) {
assertNull(value);
}
else {
assertEquals(value, (rowNumber % 41) == 1 ? " " : "test char ");
}
}
// MAP<STRING, STRING>
index = columnIndex.get("t_map");
if (index != null) {
if ((rowNumber % 27) == 0) {
assertNull(row.getField(index));
}
else {
assertEquals(row.getField(index), ImmutableMap.of("test key", "test value"));
}
}
// ARRAY<STRING>
index = columnIndex.get("t_array_string");
if (index != null) {
if ((rowNumber % 29) == 0) {
assertNull(row.getField(index));
}
else {
assertEquals(row.getField(index), ImmutableList.of("abc", "xyz", "data"));
}
}
// ARRAY<STRUCT<s_string: STRING, s_double:DOUBLE>>
index = columnIndex.get("t_array_struct");
if (index != null) {
if ((rowNumber % 31) == 0) {
assertNull(row.getField(index));
}
else {
List<Object> expected1 = ImmutableList.of("test abc", 0.1);
List<Object> expected2 = ImmutableList.of("test xyz", 0.2);
assertEquals(row.getField(index), ImmutableList.of(expected1, expected2));
}
}
// STRUCT<s_string: STRING, s_double:DOUBLE>
index = columnIndex.get("t_struct");
if (index != null) {
if ((rowNumber % 31) == 0) {
assertNull(row.getField(index));
}
else {
assertTrue(row.getField(index) instanceof List);
List values = (List) row.getField(index);
assertEquals(values.size(), 2);
assertEquals(values.get(0), "test abc");
assertEquals(values.get(1), 0.1);
}
}
// MAP<INT, ARRAY<STRUCT<s_string: STRING, s_double:DOUBLE>>>
index = columnIndex.get("t_complex");
if (index != null) {
if ((rowNumber % 33) == 0) {
assertNull(row.getField(index));
}
else {
List<Object> expected1 = ImmutableList.of("test abc", 0.1);
List<Object> expected2 = ImmutableList.of("test xyz", 0.2);
assertEquals(row.getField(index), ImmutableMap.of(1, ImmutableList.of(expected1, expected2)));
}
}
// NEW COLUMN
assertNull(row.getField(columnIndex.get("new_column")));
long newCompletedBytes = pageSource.getCompletedBytes();
assertTrue(newCompletedBytes >= completedBytes);
assertTrue(newCompletedBytes <= hiveSplit.getFileSplit().getLength() + initialPageSourceCompletedBytes);
completedBytes = newCompletedBytes;
}
assertTrue(completedBytes <= hiveSplit.getFileSplit().getLength() + initialPageSourceCompletedBytes);
assertEquals(rowNumber, 100);
}
finally {
pageSource.close();
}
}
protected void dropTable(SchemaTableName table)
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorSession session = newSession();
ConnectorTableHandle handle = metadata.getTableHandle(session, table);
if (handle == null) {
return;
}
metadata.dropTable(session, handle);
try {
// todo I have no idea why this is needed... maybe there is a propagation delay in the metastore?
metadata.dropTable(session, handle);
fail("expected NotFoundException");
}
catch (TableNotFoundException expected) {
}
transaction.commit();
}
catch (Exception e) {
Logger.get(getClass()).warn(e, "failed to drop table");
}
}
protected ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName)
{
ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName);
checkArgument(handle != null, "table not found: %s", tableName);
return handle;
}
protected MaterializedResult readTable(
Transaction transaction,
ConnectorTableHandle hiveTableHandle,
List<ColumnHandle> columnHandles,
ConnectorSession session,
TupleDomain<ColumnHandle> tupleDomain,
OptionalInt expectedSplitCount,
Optional<HiveStorageFormat> expectedStorageFormat)
throws Exception
{
ConnectorTableLayoutHandle layoutHandle = getTableLayout(session, transaction.getMetadata(), hiveTableHandle, new Constraint<>(tupleDomain), transaction).getHandle();
return readTable(transaction, hiveTableHandle, layoutHandle, columnHandles, session, expectedSplitCount, expectedStorageFormat);
}
protected MaterializedResult readTable(
Transaction transaction,
ConnectorTableHandle hiveTableHandle,
ConnectorTableLayoutHandle hiveTableLayoutHandle,
List<ColumnHandle> columnHandles,
ConnectorSession session,
OptionalInt expectedSplitCount,
Optional<HiveStorageFormat> expectedStorageFormat)
throws Exception
{
List<ConnectorSplit> splits = getAllSplits(session, transaction, hiveTableLayoutHandle);
if (expectedSplitCount.isPresent()) {
assertEquals(splits.size(), expectedSplitCount.getAsInt());
}
TableHandle tableHandle = toTableHandle(transaction, hiveTableHandle, hiveTableLayoutHandle);
ImmutableList.Builder<MaterializedRow> allRows = ImmutableList.builder();
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE, new RuntimeStats())) {
expectedStorageFormat.ifPresent(format -> assertPageSourceType(pageSource, format));
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
allRows.addAll(result.getMaterializedRows());
}
}
return new MaterializedResult(allRows.build(), getTypes(columnHandles));
}
public ExtendedHiveMetastore getMetastoreClient()
{
return metastoreClient;
}
public LocationService getLocationService()
{
return locationService;
}
protected static int getSplitCount(ConnectorSplitSource splitSource)
{
int splitCount = 0;
while (!splitSource.isFinished()) {
splitCount += getFutureValue(splitSource.getNextBatch(NOT_PARTITIONED, 1000)).getSplits().size();
}
return splitCount;
}
private List<ConnectorSplit> getAllSplits(Transaction transaction, ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain)
{
ConnectorSession session = newSession();
ConnectorTableLayoutHandle layoutHandle = getLayout(session, transaction, tableHandle, tupleDomain);
return getAllSplits(session, transaction, layoutHandle);
}
private List<ConnectorSplit> getAllSplits(ConnectorSession session, Transaction transaction, ConnectorTableLayoutHandle layoutHandle)
{
ConnectorSplitSource splits = splitManager.getSplits(transaction.getTransactionHandle(), session, layoutHandle, SPLIT_SCHEDULING_CONTEXT);
return getAllSplits(splits);
}
private ConnectorTableLayoutHandle getLayout(ConnectorSession session, Transaction transaction, ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain)
{
ConnectorMetadata metadata = transaction.getMetadata();
return getTableLayout(session, metadata, tableHandle, new Constraint<>(tupleDomain), transaction).getHandle();
}
protected static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource)
{
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
while (!splitSource.isFinished()) {
CompletableFuture<ConnectorSplitSource.ConnectorSplitBatch> nextBatch = splitSource.getNextBatch(NOT_PARTITIONED, 1000);
ConnectorSplitSource.ConnectorSplitBatch futureValue = getFutureValue(nextBatch);
List<ConnectorSplit> splits1 = futureValue.getSplits();
splits.addAll(splits1);
}
return splits.build();
}
protected List<?> getAllPartitions(ConnectorTableLayoutHandle layoutHandle)
{
return ((HiveTableLayoutHandle) layoutHandle).getPartitions()
.orElseThrow(() -> new AssertionError("layout has no partitions"));
}
protected String getPartitionId(Object partition)
{
return ((HivePartition) partition).getPartitionId().getPartitionName();
}
private static void assertPageSourceType(ConnectorPageSource pageSource, HiveStorageFormat hiveStorageFormat)
{
if (pageSource instanceof OrcSelectivePageSource) {
assertTrue(hiveStorageFormat == ORC || hiveStorageFormat == DWRF);
}
else if (pageSource instanceof RecordPageSource) {
RecordCursor hiveRecordCursor = ((RecordPageSource) pageSource).getCursor();
hiveRecordCursor = ((HiveRecordCursor) hiveRecordCursor).getRegularColumnRecordCursor();
if (hiveRecordCursor instanceof HiveCoercionRecordCursor) {
hiveRecordCursor = ((HiveCoercionRecordCursor) hiveRecordCursor).getRegularColumnRecordCursor();
}
assertInstanceOf(hiveRecordCursor, recordCursorType(hiveStorageFormat), hiveStorageFormat.name());
}
else {
assertInstanceOf(((HivePageSource) pageSource).getPageSource(), pageSourceType(hiveStorageFormat), hiveStorageFormat.name());
}
}
private static Class<? extends RecordCursor> recordCursorType(HiveStorageFormat hiveStorageFormat)
{
return GenericHiveRecordCursor.class;
}
private static Class<? extends ConnectorPageSource> pageSourceType(HiveStorageFormat hiveStorageFormat)
{
switch (hiveStorageFormat) {
case RCTEXT:
case RCBINARY:
return RcFilePageSource.class;
case ORC:
case DWRF:
return OrcBatchPageSource.class;
case PARQUET:
return ParquetPageSource.class;
case PAGEFILE:
return PageFilePageSource.class;
default:
throw new AssertionError("File type does not use a PageSource: " + hiveStorageFormat);
}
}
private static void assertValueTypes(MaterializedRow row, List<ColumnMetadata> schema)
{
for (int columnIndex = 0; columnIndex < schema.size(); columnIndex++) {
ColumnMetadata column = schema.get(columnIndex);
Object value = row.getField(columnIndex);
if (value != null) {
if (BOOLEAN.equals(column.getType())) {
assertInstanceOf(value, Boolean.class);
}
else if (TINYINT.equals(column.getType())) {
assertInstanceOf(value, Byte.class);
}
else if (SMALLINT.equals(column.getType())) {
assertInstanceOf(value, Short.class);
}
else if (INTEGER.equals(column.getType())) {
assertInstanceOf(value, Integer.class);
}
else if (BIGINT.equals(column.getType())) {
assertInstanceOf(value, Long.class);
}
else if (DOUBLE.equals(column.getType())) {
assertInstanceOf(value, Double.class);
}
else if (REAL.equals(column.getType())) {
assertInstanceOf(value, Float.class);
}
else if (isVarcharType(column.getType())) {
assertInstanceOf(value, String.class);
}
else if (isCharType(column.getType())) {
assertInstanceOf(value, String.class);
}
else if (VARBINARY.equals(column.getType())) {
assertInstanceOf(value, SqlVarbinary.class);
}
else if (TIMESTAMP.equals(column.getType())) {
assertInstanceOf(value, SqlTimestamp.class);
}
else if (DATE.equals(column.getType())) {
assertInstanceOf(value, SqlDate.class);
}
else if (column.getType() instanceof ArrayType || column.getType() instanceof RowType) {
assertInstanceOf(value, List.class);
}
else if (column.getType() instanceof MapType) {
assertInstanceOf(value, Map.class);
}
else {
fail("Unknown primitive type " + columnIndex);
}
}
}
}
private static void assertPrimitiveField(Map<String, ColumnMetadata> map, String name, Type type, boolean partitionKey)
{
assertTrue(map.containsKey(name));
ColumnMetadata column = map.get(name);
assertEquals(column.getType(), type, name);
assertEquals(column.getExtraInfo().orElse(null), columnExtraInfo(partitionKey));
}
protected static ImmutableMap<String, Integer> indexColumns(List<ColumnHandle> columnHandles)
{
ImmutableMap.Builder<String, Integer> index = ImmutableMap.builder();
int i = 0;
for (ColumnHandle columnHandle : columnHandles) {
HiveColumnHandle hiveColumnHandle = (HiveColumnHandle) columnHandle;
index.put(hiveColumnHandle.getName(), i);
i++;
}
return index.build();
}
protected static ImmutableMap<String, Integer> indexColumns(ConnectorTableMetadata tableMetadata)
{
ImmutableMap.Builder<String, Integer> index = ImmutableMap.builder();
int i = 0;
for (ColumnMetadata columnMetadata : tableMetadata.getColumns()) {
index.put(columnMetadata.getName(), i);
i++;
}
return index.build();
}
protected SchemaTableName temporaryTable(String tableName)
{
return temporaryTable(database, tableName);
}
protected static SchemaTableName temporaryTable(String database, String tableName)
{
String randomName = UUID.randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
return new SchemaTableName(database, TEMPORARY_TABLE_PREFIX + tableName + "_" + randomName);
}
protected static Map<String, Object> createTableProperties(HiveStorageFormat storageFormat)
{
return createTableProperties(storageFormat, ImmutableList.of());
}
private static Map<String, Object> createTableProperties(HiveStorageFormat storageFormat, Iterable<String> partitionedBy)
{
return ImmutableMap.<String, Object>builder()
.put(STORAGE_FORMAT_PROPERTY, storageFormat)
.put(PARTITIONED_BY_PROPERTY, ImmutableList.copyOf(partitionedBy))
.put(BUCKETED_BY_PROPERTY, ImmutableList.of())
.put(BUCKET_COUNT_PROPERTY, 0)
.put(SORTED_BY_PROPERTY, ImmutableList.of())
.build();
}
protected static List<ColumnHandle> filterNonHiddenColumnHandles(Collection<ColumnHandle> columnHandles)
{
return columnHandles.stream()
.filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden())
.collect(toList());
}
protected static List<ColumnMetadata> filterNonHiddenColumnMetadata(Collection<ColumnMetadata> columnMetadatas)
{
return columnMetadatas.stream()
.filter(columnMetadata -> !columnMetadata.isHidden())
.collect(toList());
}
protected Table createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns)
throws Exception
{
return createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, Optional.empty());
}
protected Table createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns, Optional<HiveBucketProperty> bucketProperty)
throws Exception
{
return createEmptyTable(schemaTableName, hiveStorageFormat, columns, partitionColumns, bucketProperty, ImmutableMap.of());
}
protected Table createEmptyTable(SchemaTableName schemaTableName, HiveStorageFormat hiveStorageFormat, List<Column> columns, List<Column> partitionColumns, Optional<HiveBucketProperty> bucketProperty, Map<String, String> parameters)
throws Exception
{
Path targetPath;
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
String tableOwner = session.getUser();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
LocationService locationService = getLocationService();
LocationHandle locationHandle = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName, false);
targetPath = locationService.getQueryWriteInfo(locationHandle).getTargetPath();
Table.Builder tableBuilder = Table.builder()
.setDatabaseName(schemaName)
.setTableName(tableName)
.setOwner(tableOwner)
.setTableType(MANAGED_TABLE)
.setParameters(ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, TEST_SERVER_VERSION)
.put(PRESTO_QUERY_ID_NAME, session.getQueryId())
.putAll(parameters)
.build())
.setDataColumns(columns)
.setPartitionColumns(partitionColumns);
tableBuilder.getStorageBuilder()
.setLocation(targetPath.toString())
.setStorageFormat(StorageFormat.create(hiveStorageFormat.getSerDe(), hiveStorageFormat.getInputFormat(), hiveStorageFormat.getOutputFormat()))
.setBucketProperty(bucketProperty)
.setSerdeParameters(ImmutableMap.of());
PrincipalPrivileges principalPrivileges = testingPrincipalPrivilege(tableOwner, session.getUser());
transaction.getMetastore().createTable(session, tableBuilder.build(), principalPrivileges, Optional.empty(), true, EMPTY_TABLE_STATISTICS, emptyList());
transaction.commit();
}
HdfsContext context = new HdfsContext(newSession(), schemaTableName.getSchemaName(), schemaTableName.getTableName(), targetPath.toString(), false);
List<String> targetDirectoryList = listDirectory(context, targetPath);
assertEquals(targetDirectoryList, ImmutableList.of());
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
return transaction.getMetastore().getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName()).get();
}
}
private void alterBucketProperty(SchemaTableName schemaTableName, Optional<HiveBucketProperty> bucketProperty)
{
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
String tableOwner = session.getUser();
String schemaName = schemaTableName.getSchemaName();
String tableName = schemaTableName.getTableName();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
Optional<Table> table = transaction.getMetastore().getTable(metastoreContext, schemaName, tableName);
Table.Builder tableBuilder = Table.builder(table.get());
tableBuilder.getStorageBuilder().setBucketProperty(bucketProperty);
PrincipalPrivileges principalPrivileges = testingPrincipalPrivilege(tableOwner, session.getUser());
// hack: replaceView can be used as replaceTable despite its name
transaction.getMetastore().replaceView(metastoreContext, schemaName, tableName, tableBuilder.build(), principalPrivileges);
transaction.commit();
}
}
private PrincipalPrivileges testingPrincipalPrivilege(ConnectorSession session)
{
return testingPrincipalPrivilege(session.getUser(), session.getUser());
}
private PrincipalPrivileges testingPrincipalPrivilege(String tableOwner, String grantor)
{
return new PrincipalPrivileges(
ImmutableMultimap.<String, HivePrivilegeInfo>builder()
.put(tableOwner, new HivePrivilegeInfo(HivePrivilege.SELECT, true, new PrestoPrincipal(USER, grantor), new PrestoPrincipal(USER, grantor)))
.put(tableOwner, new HivePrivilegeInfo(HivePrivilege.INSERT, true, new PrestoPrincipal(USER, grantor), new PrestoPrincipal(USER, grantor)))
.put(tableOwner, new HivePrivilegeInfo(HivePrivilege.UPDATE, true, new PrestoPrincipal(USER, grantor), new PrestoPrincipal(USER, grantor)))
.put(tableOwner, new HivePrivilegeInfo(HivePrivilege.DELETE, true, new PrestoPrincipal(USER, grantor), new PrestoPrincipal(USER, grantor)))
.build(),
ImmutableMultimap.of());
}
private List<String> listDirectory(HdfsContext context, Path path)
throws IOException
{
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, path);
return Arrays.stream(fileSystem.listStatus(path))
.map(FileStatus::getPath)
.map(Path::getName)
.filter(name -> !name.startsWith(".presto"))
.collect(toList());
}
@Test
public void testTransactionDeleteInsert()
throws Exception
{
doTestTransactionDeleteInsert(
RCBINARY,
true,
ImmutableList.<TransactionDeleteInsertTestCase>builder()
.add(new TransactionDeleteInsertTestCase(false, false, ROLLBACK_RIGHT_AWAY, Optional.empty()))
.add(new TransactionDeleteInsertTestCase(false, false, ROLLBACK_AFTER_DELETE, Optional.empty()))
.add(new TransactionDeleteInsertTestCase(false, false, ROLLBACK_AFTER_BEGIN_INSERT, Optional.empty()))
.add(new TransactionDeleteInsertTestCase(false, false, ROLLBACK_AFTER_APPEND_PAGE, Optional.empty()))
.add(new TransactionDeleteInsertTestCase(false, false, ROLLBACK_AFTER_SINK_FINISH, Optional.empty()))
.add(new TransactionDeleteInsertTestCase(false, false, ROLLBACK_AFTER_FINISH_INSERT, Optional.empty()))
.add(new TransactionDeleteInsertTestCase(false, false, COMMIT, Optional.of(new AddPartitionFailure())))
.add(new TransactionDeleteInsertTestCase(false, false, COMMIT, Optional.of(new DirectoryRenameFailure())))
.add(new TransactionDeleteInsertTestCase(false, false, COMMIT, Optional.of(new FileRenameFailure())))
.add(new TransactionDeleteInsertTestCase(true, false, COMMIT, Optional.of(new DropPartitionFailure())))
.add(new TransactionDeleteInsertTestCase(true, true, COMMIT, Optional.empty()))
.build());
}
protected void doTestTransactionDeleteInsert(HiveStorageFormat storageFormat, boolean allowInsertExisting, List<TransactionDeleteInsertTestCase> testCases)
throws Exception
{
// There are 4 types of operations on a partition: add, drop, alter (drop then add), insert existing.
// There are 12 partitions in this test, 3 for each type.
// 3 is chosen to verify that cleanups, commit aborts, rollbacks are always as complete as possible regardless of failure.
MaterializedResult beforeData =
MaterializedResult.resultBuilder(SESSION, BIGINT, createUnboundedVarcharType(), createUnboundedVarcharType())
.row(110L, "a", "alter1")
.row(120L, "a", "insert1")
.row(140L, "a", "drop1")
.row(210L, "b", "drop2")
.row(310L, "c", "alter2")
.row(320L, "c", "alter3")
.row(510L, "e", "drop3")
.row(610L, "f", "insert2")
.row(620L, "f", "insert3")
.build();
Domain domainToDrop = Domain.create(ValueSet.of(
createUnboundedVarcharType(),
utf8Slice("alter1"), utf8Slice("alter2"), utf8Slice("alter3"), utf8Slice("drop1"), utf8Slice("drop2"), utf8Slice("drop3")),
false);
List<MaterializedRow> extraRowsForInsertExisting = ImmutableList.of();
if (allowInsertExisting) {
extraRowsForInsertExisting = MaterializedResult.resultBuilder(SESSION, BIGINT, createUnboundedVarcharType(), createUnboundedVarcharType())
.row(121L, "a", "insert1")
.row(611L, "f", "insert2")
.row(621L, "f", "insert3")
.build()
.getMaterializedRows();
}
MaterializedResult insertData =
MaterializedResult.resultBuilder(SESSION, BIGINT, createUnboundedVarcharType(), createUnboundedVarcharType())
.row(111L, "a", "alter1")
.row(131L, "a", "add1")
.row(221L, "b", "add2")
.row(311L, "c", "alter2")
.row(321L, "c", "alter3")
.row(411L, "d", "add3")
.rows(extraRowsForInsertExisting)
.build();
MaterializedResult afterData =
MaterializedResult.resultBuilder(SESSION, BIGINT, createUnboundedVarcharType(), createUnboundedVarcharType())
.row(120L, "a", "insert1")
.row(610L, "f", "insert2")
.row(620L, "f", "insert3")
.rows(insertData.getMaterializedRows())
.build();
for (TransactionDeleteInsertTestCase testCase : testCases) {
SchemaTableName temporaryDeleteInsert = temporaryTable("delete_insert");
try {
createEmptyTable(
temporaryDeleteInsert,
storageFormat,
ImmutableList.of(new Column("col1", HIVE_LONG, Optional.empty(), Optional.empty())),
ImmutableList.of(new Column("pk1", HIVE_STRING, Optional.empty(), Optional.empty()), new Column("pk2", HIVE_STRING, Optional.empty(), Optional.empty())));
insertData(temporaryDeleteInsert, beforeData);
try {
doTestTransactionDeleteInsert(
storageFormat,
temporaryDeleteInsert,
domainToDrop,
insertData,
testCase.isExpectCommittedData() ? afterData : beforeData,
testCase.getTag(),
testCase.isExpectQuerySucceed(),
testCase.getConflictTrigger());
}
catch (AssertionError e) {
throw new AssertionError(format("Test case: %s", testCase.toString()), e);
}
}
finally {
dropTable(temporaryDeleteInsert);
}
}
}
private void doTestTransactionDeleteInsert(
HiveStorageFormat storageFormat,
SchemaTableName tableName,
Domain domainToDrop,
MaterializedResult insertData,
MaterializedResult expectedData,
TransactionDeleteInsertTestTag tag,
boolean expectQuerySucceed,
Optional<ConflictTrigger> conflictTrigger)
throws Exception
{
Path writePath = null;
Path targetPath = null;
try (Transaction transaction = newTransaction()) {
try {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
ConnectorSession session;
rollbackIfEquals(tag, ROLLBACK_RIGHT_AWAY);
// Query 1: delete
session = newSession();
HiveColumnHandle dsColumnHandle = (HiveColumnHandle) metadata.getColumnHandles(session, tableHandle).get("pk2");
TupleDomain<ColumnHandle> tupleDomain = withColumnDomains(ImmutableMap.of(
dsColumnHandle, domainToDrop));
Constraint<ColumnHandle> constraint = new Constraint<>(tupleDomain, convertToPredicate(tupleDomain));
ConnectorTableLayoutHandle tableLayoutHandle = getTableLayout(session, metadata, tableHandle, constraint, transaction).getHandle();
metadata.metadataDelete(session, tableHandle, tableLayoutHandle);
rollbackIfEquals(tag, ROLLBACK_AFTER_DELETE);
// Query 2: insert
session = newSession();
ConnectorInsertTableHandle insertTableHandle = metadata.beginInsert(session, tableHandle);
rollbackIfEquals(tag, ROLLBACK_AFTER_BEGIN_INSERT);
writePath = getStagingPathRoot(insertTableHandle);
targetPath = getTargetPathRoot(insertTableHandle);
ConnectorPageSink sink = pageSinkProvider.createPageSink(transaction.getTransactionHandle(), session, insertTableHandle, TEST_HIVE_PAGE_SINK_CONTEXT);
sink.appendPage(insertData.toPage());
rollbackIfEquals(tag, ROLLBACK_AFTER_APPEND_PAGE);
Collection<Slice> fragments = getFutureValue(sink.finish());
rollbackIfEquals(tag, ROLLBACK_AFTER_SINK_FINISH);
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());
rollbackIfEquals(tag, ROLLBACK_AFTER_FINISH_INSERT);
assertEquals(tag, COMMIT);
if (conflictTrigger.isPresent()) {
List<PartitionUpdate> partitionUpdates = fragments.stream()
.map(Slice::getBytes)
.map(HiveTestUtils.PARTITION_UPDATE_CODEC::fromJson)
.collect(toList());
conflictTrigger.get().triggerConflict(session, tableName, insertTableHandle, partitionUpdates);
}
transaction.commit();
if (conflictTrigger.isPresent()) {
assertTrue(expectQuerySucceed);
conflictTrigger.get().verifyAndCleanup(tableName);
}
}
catch (TestingRollbackException e) {
transaction.rollback();
}
catch (PrestoException e) {
assertFalse(expectQuerySucceed);
if (conflictTrigger.isPresent()) {
conflictTrigger.get().verifyAndCleanup(tableName);
}
}
}
// check that temporary files are removed
if (writePath != null && !writePath.equals(targetPath)) {
HdfsContext context = new HdfsContext(newSession(), tableName.getSchemaName(), tableName.getTableName(), writePath.toString(), false);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, writePath);
assertFalse(fileSystem.exists(writePath));
}
try (Transaction transaction = newTransaction()) {
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
// verify partitions
List<PartitionNameWithVersion> partitionNamesWithVersion = transaction.getMetastore()
.getPartitionNames(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new AssertionError("Table does not exist: " + tableName));
assertEqualsIgnoreOrder(
getPartitionNames(partitionNamesWithVersion),
expectedData.getMaterializedRows().stream()
.map(row -> format("pk1=%s/pk2=%s", row.getField(1), row.getField(2)))
.distinct()
.collect(toList()));
// load the new table
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tableName);
List<ColumnHandle> columnHandles = filterNonHiddenColumnHandles(metadata.getColumnHandles(session, tableHandle).values());
// verify the data
MaterializedResult result = readTable(transaction, tableHandle, columnHandles, session, TupleDomain.all(), OptionalInt.empty(), Optional.of(storageFormat));
assertEqualsIgnoreOrder(result.getMaterializedRows(), expectedData.getMaterializedRows());
}
}
@Test
public void testTableConstraints()
{
for (SchemaTableName table : constraintsTableList) {
List<TableConstraint<String>> tableConstraints = getTableConstraints(table);
List<TableConstraint<String>> expectedConstraints = tableConstraintsMap.get(table);
compareTableConstraints(tableConstraints, expectedConstraints);
}
}
protected Set<HiveStorageFormat> getSupportedCreateTableHiveStorageFormats()
{
return difference(
ImmutableSet.copyOf(HiveStorageFormat.values()),
// exclude formats that change table schema with serde
// exclude ALPHA because it does not support DML yet
ImmutableSet.of(AVRO, CSV, ALPHA));
}
private List<TableConstraint<String>> getTableConstraints(SchemaTableName tableName)
{
ConnectorSession session = newSession();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
return metastoreClient.getTableConstraints(metastoreContext, tableName.getSchemaName(), tableName.getTableName());
}
private void compareTableConstraints(List<TableConstraint<String>> tableConstraints, List<TableConstraint<String>> expectedConstraints)
{
assertEquals(tableConstraints.size(), expectedConstraints.size());
for (int i = 0; i < tableConstraints.size(); i++) {
TableConstraint<String> constraint = tableConstraints.get(i);
TableConstraint<String> expectedConstraint = expectedConstraints.get(i);
// Hive primary key name is auto-generated, hence explicit comparison of members excluding name
if (constraint instanceof PrimaryKeyConstraint) {
assertEquals(constraint.getColumns(), expectedConstraint.getColumns());
assertEquals(constraint.isEnabled(), expectedConstraint.isEnabled());
assertEquals(constraint.isRely(), expectedConstraint.isRely());
}
else {
assertEquals(constraint, expectedConstraint);
}
}
}
private static void rollbackIfEquals(TransactionDeleteInsertTestTag tag, TransactionDeleteInsertTestTag expectedTag)
{
if (expectedTag == tag) {
throw new TestingRollbackException();
}
}
private static class TestingRollbackException
extends RuntimeException
{
}
protected static class TransactionDeleteInsertTestCase
{
private final boolean expectCommittedData;
private final boolean expectQuerySucceed;
private final TransactionDeleteInsertTestTag tag;
private final Optional<ConflictTrigger> conflictTrigger;
public TransactionDeleteInsertTestCase(boolean expectCommittedData, boolean expectQuerySucceed, TransactionDeleteInsertTestTag tag, Optional<ConflictTrigger> conflictTrigger)
{
this.expectCommittedData = expectCommittedData;
this.expectQuerySucceed = expectQuerySucceed;
this.tag = tag;
this.conflictTrigger = conflictTrigger;
}
public boolean isExpectCommittedData()
{
return expectCommittedData;
}
public boolean isExpectQuerySucceed()
{
return expectQuerySucceed;
}
public TransactionDeleteInsertTestTag getTag()
{
return tag;
}
public Optional<ConflictTrigger> getConflictTrigger()
{
return conflictTrigger;
}
@Override
public String toString()
{
return toStringHelper(this)
.add("tag", tag)
.add("conflictTrigger", conflictTrigger.map(conflictTrigger -> conflictTrigger.getClass().getName()))
.add("expectCommittedData", expectCommittedData)
.add("expectQuerySucceed", expectQuerySucceed)
.toString();
}
}
protected enum TransactionDeleteInsertTestTag
{
ROLLBACK_RIGHT_AWAY,
ROLLBACK_AFTER_DELETE,
ROLLBACK_AFTER_BEGIN_INSERT,
ROLLBACK_AFTER_APPEND_PAGE,
ROLLBACK_AFTER_SINK_FINISH,
ROLLBACK_AFTER_FINISH_INSERT,
COMMIT,
}
protected interface ConflictTrigger
{
void triggerConflict(ConnectorSession session, SchemaTableName tableName, ConnectorInsertTableHandle insertTableHandle, List<PartitionUpdate> partitionUpdates)
throws IOException;
void verifyAndCleanup(SchemaTableName tableName)
throws IOException;
}
protected class AddPartitionFailure
implements ConflictTrigger
{
private final ImmutableList<String> copyPartitionFrom = ImmutableList.of("a", "insert1");
private final String partitionNameToConflict = "pk1=b/pk2=add2";
private Partition conflictPartition;
@Override
public void triggerConflict(ConnectorSession session, SchemaTableName tableName, ConnectorInsertTableHandle insertTableHandle, List<PartitionUpdate> partitionUpdates)
{
// This method bypasses transaction interface because this method is inherently hacky and doesn't work well with the transaction abstraction.
// Additionally, this method is not part of a test. Its purpose is to set up an environment for another test.
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
MetastoreContext metastoreContext = new MetastoreContext(session.getIdentity(), session.getQueryId(), session.getClientInfo(), session.getClientTags(), session.getSource(), getMetastoreHeaders(session), false, DEFAULT_COLUMN_CONVERTER_PROVIDER, session.getWarningCollector(), session.getRuntimeStats());
Optional<Partition> partition = metastoreClient.getPartition(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), copyPartitionFrom);
conflictPartition = Partition.builder(partition.get())
.setValues(toPartitionValues(partitionNameToConflict))
.build();
metastoreClient.addPartitions(
metastoreContext,
tableName.getSchemaName(),
tableName.getTableName(),
ImmutableList.of(new PartitionWithStatistics(conflictPartition, partitionNameToConflict, PartitionStatistics.empty())));
}
@Override
public void verifyAndCleanup(SchemaTableName tableName)
{
// This method bypasses transaction interface because this method is inherently hacky and doesn't work well with the transaction abstraction.
// Additionally, this method is not part of a test. Its purpose is to set up an environment for another test.
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
Optional<Partition> actualPartition = metastoreClient.getPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), toPartitionValues(partitionNameToConflict));
// Make sure the partition inserted to trigger conflict was not overwritten
// Checking storage location is sufficient because implement never uses .../pk1=a/pk2=a2 as the directory for partition [b, b2].
assertEquals(actualPartition.get().getStorage().getLocation(), conflictPartition.getStorage().getLocation());
metastoreClient.dropPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), conflictPartition.getValues(), false);
}
}
protected class DropPartitionFailure
implements ConflictTrigger
{
private final ImmutableList<String> partitionValueToConflict = ImmutableList.of("b", "drop2");
@Override
public void triggerConflict(ConnectorSession session, SchemaTableName tableName, ConnectorInsertTableHandle insertTableHandle, List<PartitionUpdate> partitionUpdates)
{
// This method bypasses transaction interface because this method is inherently hacky and doesn't work well with the transaction abstraction.
// Additionally, this method is not part of a test. Its purpose is to set up an environment for another test.
ExtendedHiveMetastore metastoreClient = getMetastoreClient();
metastoreClient.dropPartition(METASTORE_CONTEXT, tableName.getSchemaName(), tableName.getTableName(), partitionValueToConflict, false);
}
@Override
public void verifyAndCleanup(SchemaTableName tableName)
{
// Do not add back the deleted partition because the implementation is expected to move forward instead of backward when delete fails
}
}
protected class DirectoryRenameFailure
implements ConflictTrigger
{
private HdfsContext context;
private Path path;
@Override
public void triggerConflict(ConnectorSession session, SchemaTableName tableName, ConnectorInsertTableHandle insertTableHandle, List<PartitionUpdate> partitionUpdates)
{
Path writePath = getStagingPathRoot(insertTableHandle);
Path targetPath = getTargetPathRoot(insertTableHandle);
if (writePath.equals(targetPath)) {
// This conflict does not apply. Trigger a rollback right away so that this test case passes.
throw new TestingRollbackException();
}
path = new Path(targetPath + "/pk1=b/pk2=add2");
context = new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), path.toString(), true);
createDirectory(context, hdfsEnvironment, path);
}
@Override
public void verifyAndCleanup(SchemaTableName tableName)
throws IOException
{
assertEquals(listDirectory(context, path), ImmutableList.of());
hdfsEnvironment.getFileSystem(context, path).delete(path, false);
}
}
protected class FileRenameFailure
implements ConflictTrigger
{
private HdfsContext context;
private Path path;
@Override
public void triggerConflict(ConnectorSession session, SchemaTableName tableName, ConnectorInsertTableHandle insertTableHandle, List<PartitionUpdate> partitionUpdates)
throws IOException
{
for (PartitionUpdate partitionUpdate : partitionUpdates) {
if ("pk2=insert2".equals(partitionUpdate.getTargetPath().getName())) {
path = new Path(partitionUpdate.getTargetPath(), partitionUpdate.getFileWriteInfos().get(0).getTargetFileName());
break;
}
}
assertNotNull(path);
context = new HdfsContext(
session,
tableName.getSchemaName(),
tableName.getSchemaName(),
path.toString(),
true);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, path);
fileSystem.createNewFile(path);
}
@Override
public void verifyAndCleanup(SchemaTableName tableName)
throws IOException
{
// The file we added to trigger a conflict was cleaned up because it matches the query prefix.
// Consider this the same as a network failure that caused the successful creation of file not reported to the caller.
assertFalse(hdfsEnvironment.getFileSystem(context, path).exists(path));
}
}
}