DataVerification.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.verifier.framework;

import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.jdbc.QueryStats;
import com.facebook.presto.sql.tree.QualifiedName;
import com.facebook.presto.sql.tree.Query;
import com.facebook.presto.verifier.checksum.ChecksumResult;
import com.facebook.presto.verifier.checksum.ChecksumValidator;
import com.facebook.presto.verifier.event.DeterminismAnalysisDetails;
import com.facebook.presto.verifier.event.DeterminismAnalysisRun;
import com.facebook.presto.verifier.event.QueryInfo;
import com.facebook.presto.verifier.prestoaction.QueryActions;
import com.facebook.presto.verifier.prestoaction.SqlExceptionClassifier;
import com.facebook.presto.verifier.resolver.FailureResolverManager;
import com.facebook.presto.verifier.rewrite.QueryRewriter;
import com.facebook.presto.verifier.source.SnapshotQueryConsumer;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.verifier.framework.DataMatchResult.DataType.DATA;
import static com.facebook.presto.verifier.framework.DataMatchResult.MatchType.MATCH;
import static com.facebook.presto.verifier.framework.DataMatchResult.MatchType.SNAPSHOT_DOES_NOT_EXIST;
import static com.facebook.presto.verifier.framework.DataVerificationUtil.getColumns;
import static com.facebook.presto.verifier.framework.DataVerificationUtil.match;
import static com.facebook.presto.verifier.framework.QueryStage.CONTROL_CHECKSUM;
import static com.facebook.presto.verifier.framework.QueryStage.TEST_CHECKSUM;
import static com.facebook.presto.verifier.framework.VerifierConfig.QUERY_BANK_MODE;
import static com.facebook.presto.verifier.framework.VerifierUtil.callAndConsume;
import static com.facebook.presto.verifier.source.AbstractJdbiSnapshotQuerySupplier.VERIFIER_SNAPSHOT_KEY_PATTERN;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class DataVerification
        extends AbstractVerification<QueryObjectBundle, DataMatchResult, Void>
{
    protected final QueryRewriter queryRewriter;
    protected final DeterminismAnalyzer determinismAnalyzer;
    protected final FailureResolverManager failureResolverManager;
    protected final TypeManager typeManager;
    protected final ChecksumValidator checksumValidator;

    public DataVerification(
            QueryActions queryActions,
            SourceQuery sourceQuery,
            QueryRewriter queryRewriter,
            DeterminismAnalyzer determinismAnalyzer,
            FailureResolverManager failureResolverManager,
            SqlExceptionClassifier exceptionClassifier,
            VerificationContext verificationContext,
            VerifierConfig verifierConfig,
            TypeManager typeManager,
            ChecksumValidator checksumValidator,
            ListeningExecutorService executor,
            SnapshotQueryConsumer snapshotQueryConsumer,
            Map<String, SnapshotQuery> snapshotQueries)
    {
        super(queryActions, sourceQuery, exceptionClassifier, verificationContext, Optional.empty(), verifierConfig, executor, snapshotQueryConsumer, snapshotQueries);
        this.queryRewriter = requireNonNull(queryRewriter, "queryRewriter is null");
        this.determinismAnalyzer = requireNonNull(determinismAnalyzer, "determinismAnalyzer is null");
        this.failureResolverManager = requireNonNull(failureResolverManager, "failureResolverManager is null");
        this.typeManager = requireNonNull(typeManager, "typeManager is null");
        this.checksumValidator = requireNonNull(checksumValidator, "checksumValidator is null");
    }

    @Override
    protected QueryObjectBundle getQueryRewrite(ClusterType clusterType)
    {
        return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), getSourceQuery().getQueryConfiguration(clusterType), clusterType,
                getVerificationContext().getResubmissionCount() == 0);
    }

    @Override
    protected void updateQueryInfoWithQueryBundle(QueryInfo.Builder queryInfo, Optional<QueryObjectBundle> queryBundle)
    {
        super.updateQueryInfoWithQueryBundle(queryInfo, queryBundle);
        queryInfo.setQuery(queryBundle.map(bundle -> formatSql(bundle.getQuery(), bundle.getRewrittenFunctionCalls())))
                .setOutputTableName(queryBundle.map(QueryObjectBundle::getObjectName).map(QualifiedName::toString))
                .setIsReuseTable(queryBundle.map(QueryObjectBundle::isReuseTable).orElse(false));
    }

    @Override
    public DataMatchResult verify(
            QueryObjectBundle control,
            QueryObjectBundle test,
            Optional<QueryResult<Void>> controlQueryResult,
            Optional<QueryResult<Void>> testQueryResult,
            ChecksumQueryContext controlChecksumQueryContext,
            ChecksumQueryContext testChecksumQueryContext)
    {
        List<Column> testColumns = getColumns(getHelperAction(), typeManager, test.getObjectName());
        Query testChecksumQuery = checksumValidator.generateChecksumQuery(test.getObjectName(), testColumns, test.getPartitionsPredicate());
        testChecksumQueryContext.setChecksumQuery(formatSql(testChecksumQuery));

        List<Column> controlColumns = null;
        ChecksumResult controlChecksumResult = null;

        if (isControlEnabled()) {
            controlColumns = getColumns(getHelperAction(), typeManager, control.getObjectName());
            Query controlChecksumQuery = checksumValidator.generateChecksumQuery(control.getObjectName(), controlColumns, control.getPartitionsPredicate());
            controlChecksumQueryContext.setChecksumQuery(formatSql(controlChecksumQuery));

            QueryResult<ChecksumResult> controlChecksum = callAndConsume(
                    () -> getHelperAction().execute(controlChecksumQuery, CONTROL_CHECKSUM, ChecksumResult::fromResultSet),
                    stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(controlChecksumQueryContext::setChecksumQueryId));
            controlChecksumResult = getOnlyElement(controlChecksum.getResults());

            if (saveSnapshot) {
                String snapshot = ChecksumResult.toJson(controlChecksumResult);

                snapshotQueryConsumer.accept(new SnapshotQuery(getSourceQuery().getSuite(), getSourceQuery().getName(), isExplain, snapshot));
                return new DataMatchResult(
                        DATA,
                        MATCH,
                        Optional.empty(),
                        Optional.empty(),
                        OptionalLong.empty(),
                        OptionalLong.empty(),
                        ImmutableList.of());
            }
        }
        else if (QUERY_BANK_MODE.equals(runningMode)) {
            controlColumns = testColumns;
            String key = format(VERIFIER_SNAPSHOT_KEY_PATTERN, getSourceQuery().getSuite(), getSourceQuery().getName(), isExplain);
            SnapshotQuery snapshotQuery = snapshotQueries.get(key);
            if (snapshotQuery != null) {
                String snapshotJson = snapshotQuery.getSnapshot();
                controlChecksumResult = ChecksumResult.fromJson(snapshotJson);
            }
            else {
                return new DataMatchResult(
                        DATA,
                        SNAPSHOT_DOES_NOT_EXIST,
                        Optional.empty(),
                        Optional.empty(),
                        OptionalLong.empty(),
                        OptionalLong.empty(),
                        Collections.emptyList());
            }
        }

        QueryResult<ChecksumResult> testChecksum = callAndConsume(
                () -> getHelperAction().execute(testChecksumQuery, TEST_CHECKSUM, ChecksumResult::fromResultSet),
                stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(testChecksumQueryContext::setChecksumQueryId));
        ChecksumResult testChecksumResult = getOnlyElement(testChecksum.getResults());

        return match(DATA, checksumValidator, controlColumns, testColumns, controlChecksumResult, testChecksumResult);
    }

    @Override
    protected DeterminismAnalysisDetails analyzeDeterminism(QueryObjectBundle controlObject, QueryObjectBundle testObject, DataMatchResult matchResult)
    {
        if (isRunDeterminismAnalysisOnTest) {
            DeterminismAnalysisDetails analysis = determinismAnalyzer.analyze(getTestAction(), testObject, matchResult.getTestChecksum());
            if (!analysis.getDeterminismAnalysis().isNonDeterministic()) {
                return analysis;
            }
            // In case we rerun determinism analysis on control, we keep the test runs for stats.
            List<DeterminismAnalysisRun> runs = analysis.getRuns();
            analysis = determinismAnalyzer.analyze(getControlAction(), controlObject, matchResult.getControlChecksum());
            return new DeterminismAnalysisDetails(
                    analysis.getDeterminismAnalysis(),
                    Stream.concat(runs.stream(), analysis.getRuns().stream()).collect(Collectors.toList()),
                    LimitQueryDeterminismAnalysis.valueOf(analysis.getLimitQueryAnalysis()),
                    Optional.ofNullable(analysis.getLimitQueryAnalysisQueryId()));
        }
        return determinismAnalyzer.analyze(getControlAction(), controlObject, matchResult.getControlChecksum());
    }

    @Override
    protected Optional<String> resolveFailure(
            Optional<QueryObjectBundle> control,
            Optional<QueryObjectBundle> test,
            QueryContext controlQueryContext,
            Optional<DataMatchResult> matchResult,
            Optional<Throwable> throwable)
    {
        if (matchResult.isPresent() && !matchResult.get().isMatched()) {
            checkState(control.isPresent(), "control is missing");
            return failureResolverManager.resolveResultMismatch((DataMatchResult) matchResult.get(), control.get());
        }
        if (throwable.isPresent() && ImmutableList.of(QueryState.SUCCEEDED, QueryState.REUSE).contains(controlQueryContext.getState())) {
            checkState(controlQueryContext.getMainQueryStats().isPresent(), "controlQueryStats is missing");
            return failureResolverManager.resolveException(controlQueryContext.getMainQueryStats().get(), throwable.get(), test);
        }
        return Optional.empty();
    }
}