TestQueryStateMachine.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution;
import com.facebook.airlift.testing.TestingTicker;
import com.facebook.presto.Session;
import com.facebook.presto.client.FailureInfo;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.analyzer.UpdateInfo;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.transaction.DelegatingTransactionManager;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import static com.facebook.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.execution.QueryState.DISPATCHING;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.FINISHED;
import static com.facebook.presto.execution.QueryState.FINISHING;
import static com.facebook.presto.execution.QueryState.PLANNING;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.execution.QueryState.STARTING;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
import static com.facebook.presto.transaction.InMemoryTransactionManager.createTestTransactionManager;
import static com.google.common.util.concurrent.Futures.allAsList;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
public class TestQueryStateMachine
{
private static final String QUERY = "sql";
private static final URI LOCATION = URI.create("fake://fake-query");
private static final SQLException FAILED_CAUSE = new SQLException("FAILED");
private static final List<Input> INPUTS = ImmutableList.of(new Input(new ConnectorId("connector"), "schema", "table", Optional.empty(), ImmutableList.of(new Column("a", "varchar")), Optional.empty(), ""));
private static final Optional<Output> OUTPUT = Optional.empty();
private static final List<String> OUTPUT_FIELD_NAMES = ImmutableList.of("a", "b", "c");
private static final List<Type> OUTPUT_FIELD_TYPES = ImmutableList.of(BIGINT, BIGINT, BIGINT);
private static final VersionedMemoryPoolId MEMORY_POOL = new VersionedMemoryPoolId(new MemoryPoolId("pool"), 42);
private static final Map<String, String> SET_SESSION_PROPERTIES = ImmutableMap.<String, String>builder()
.put("fruit", "apple")
.put("drink", "coffee")
.build();
private static final List<String> RESET_SESSION_PROPERTIES = ImmutableList.of("candy");
private static final Optional<QueryType> QUERY_TYPE = Optional.of(QueryType.SELECT);
private final ExecutorService executor = newCachedThreadPool();
@AfterClass(alwaysRun = true)
public void tearDown()
{
executor.shutdownNow();
}
@Test
public void testBasicStateChanges()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertState(stateMachine, WAITING_FOR_PREREQUISITES);
assertTrue(stateMachine.transitionToQueued());
assertState(stateMachine, QUEUED);
assertTrue(stateMachine.transitionToDispatching());
assertState(stateMachine, DISPATCHING);
assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);
assertTrue(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertState(stateMachine, FINISHED);
}
@Test
public void testStateChangesWithResourceWaiting()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertState(stateMachine, WAITING_FOR_PREREQUISITES);
assertTrue(stateMachine.transitionToQueued());
assertState(stateMachine, QUEUED);
assertTrue(stateMachine.transitionToWaitingForResources());
assertState(stateMachine, WAITING_FOR_RESOURCES);
assertTrue(stateMachine.transitionToDispatching());
assertState(stateMachine, DISPATCHING);
assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);
assertTrue(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertState(stateMachine, FINISHED);
}
@Test
public void testWaitingForPrerequisites()
{
// all time before the first state transition is accounted to queueing
assertAllTimeSpentInWaitingForPrerequisites(WAITING_FOR_PREREQUISITES, queryStateMachine -> {});
assertAllTimeSpentInWaitingForPrerequisites(QUEUED, QueryStateMachine::transitionToQueued);
assertAllTimeSpentInWaitingForPrerequisites(WAITING_FOR_RESOURCES, QueryStateMachine::transitionToWaitingForResources);
assertAllTimeSpentInWaitingForPrerequisites(DISPATCHING, QueryStateMachine::transitionToDispatching);
assertAllTimeSpentInWaitingForPrerequisites(PLANNING, QueryStateMachine::transitionToPlanning);
assertAllTimeSpentInWaitingForPrerequisites(STARTING, QueryStateMachine::transitionToStarting);
assertAllTimeSpentInWaitingForPrerequisites(RUNNING, QueryStateMachine::transitionToRunning);
assertAllTimeSpentInWaitingForPrerequisites(FINISHED, stateMachine -> {
stateMachine.transitionToFinishing();
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
});
assertAllTimeSpentInWaitingForPrerequisites(FAILED, stateMachine -> stateMachine.transitionToFailed(FAILED_CAUSE));
}
@Test
public void testQueued()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToQueued());
assertState(stateMachine, QUEUED);
assertTrue(stateMachine.transitionToWaitingForResources());
assertState(stateMachine, WAITING_FOR_RESOURCES);
// Ensure that we can directly move to failed from queued
stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToQueued());
assertState(stateMachine, QUEUED);
assertTrue(stateMachine.transitionToFailed(FAILED_CAUSE));
assertState(stateMachine, FAILED, FAILED_CAUSE);
}
private void assertAllTimeSpentInWaitingForPrerequisites(QueryState expectedState, Consumer<QueryStateMachine> stateTransition)
{
TestingTicker ticker = new TestingTicker();
QueryStateMachine stateMachine = createQueryStateMachineWithTicker(ticker);
ticker.increment(7, MILLISECONDS);
stateTransition.accept(stateMachine);
assertEquals(stateMachine.getQueryState(), expectedState);
QueryStats queryStats = stateMachine.getQueryInfo(Optional.empty()).getQueryStats();
assertEquals(queryStats.getWaitingForPrerequisitesTime(), new Duration(7, MILLISECONDS));
assertEquals(queryStats.getQueuedTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getResourceWaitingTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getSemanticAnalyzingTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getColumnAccessPermissionCheckingTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getDispatchingTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getTotalPlanningTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getExecutionTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getFinishingTime(), new Duration(0, MILLISECONDS));
}
@Test
public void testPlanning()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);
assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, PLANNING);
assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);
assertTrue(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);
stateMachine = createQueryStateMachine();
stateMachine.transitionToPlanning();
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
stateMachine = createQueryStateMachine();
stateMachine.transitionToPlanning();
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertState(stateMachine, FINISHED);
stateMachine = createQueryStateMachine();
stateMachine.transitionToPlanning();
assertTrue(stateMachine.transitionToFailed(FAILED_CAUSE));
assertState(stateMachine, FAILED, FAILED_CAUSE);
}
@Test
public void testStarting()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);
assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, STARTING);
assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, STARTING);
assertFalse(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
stateMachine = createQueryStateMachine();
stateMachine.transitionToStarting();
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertState(stateMachine, FINISHED);
stateMachine = createQueryStateMachine();
stateMachine.transitionToStarting();
assertTrue(stateMachine.transitionToFailed(FAILED_CAUSE));
assertState(stateMachine, FAILED, FAILED_CAUSE);
}
@Test
public void testRunning()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, RUNNING);
assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, RUNNING);
assertFalse(stateMachine.transitionToStarting());
assertState(stateMachine, RUNNING);
assertFalse(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertState(stateMachine, FINISHED);
stateMachine = createQueryStateMachine();
stateMachine.transitionToRunning();
assertTrue(stateMachine.transitionToFailed(FAILED_CAUSE));
assertState(stateMachine, FAILED, FAILED_CAUSE);
}
@Test
public void testFinished()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertFinalState(stateMachine, FINISHED);
}
@Test
public void testFailed()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToFailed(FAILED_CAUSE));
assertFinalState(stateMachine, FAILED, FAILED_CAUSE);
}
@Test
public void testCanceled()
{
QueryStateMachine stateMachine = createQueryStateMachine();
assertTrue(stateMachine.transitionToCanceled());
assertFinalState(stateMachine, FAILED, new PrestoException(USER_CANCELED, "canceled"));
}
@Test
public void testPlanningTimeDuration()
{
TestingTicker mockTicker = new TestingTicker();
QueryStateMachine stateMachine = createQueryStateMachineWithTicker(mockTicker);
assertState(stateMachine, WAITING_FOR_PREREQUISITES);
mockTicker.increment(30, MILLISECONDS);
stateMachine.beginSemanticAnalyzing();
assertState(stateMachine, WAITING_FOR_PREREQUISITES);
mockTicker.increment(30, MILLISECONDS);
stateMachine.beginColumnAccessPermissionChecking();
assertState(stateMachine, WAITING_FOR_PREREQUISITES);
mockTicker.increment(15, MILLISECONDS);
assertTrue(stateMachine.transitionToQueued());
assertState(stateMachine, QUEUED);
mockTicker.increment(30, MILLISECONDS);
stateMachine.endColumnAccessPermissionChecking();
assertState(stateMachine, QUEUED);
mockTicker.increment(25, MILLISECONDS);
assertTrue(stateMachine.transitionToWaitingForResources());
assertState(stateMachine, WAITING_FOR_RESOURCES);
mockTicker.increment(50, MILLISECONDS);
assertTrue(stateMachine.transitionToDispatching());
assertState(stateMachine, DISPATCHING);
mockTicker.increment(100, MILLISECONDS);
assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);
mockTicker.increment(200, MILLISECONDS);
assertTrue(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);
mockTicker.increment(300, MILLISECONDS);
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);
mockTicker.increment(400, MILLISECONDS);
assertTrue(stateMachine.transitionToFinishing());
tryGetFutureValue(stateMachine.getStateChange(FINISHING), 2, SECONDS);
assertState(stateMachine, FINISHED);
QueryStats queryStats = stateMachine.getQueryInfo(Optional.empty()).getQueryStats();
assertEquals(queryStats.getElapsedTime().toMillis(), 1180);
assertEquals(queryStats.getWaitingForPrerequisitesTime().toMillis(), 75);
assertEquals(queryStats.getSemanticAnalyzingTime().toMillis(), 30);
assertEquals(queryStats.getColumnAccessPermissionCheckingTime().toMillis(), 45);
assertEquals(queryStats.getQueuedTime().toMillis(), 55);
assertEquals(queryStats.getResourceWaitingTime().toMillis(), 50);
assertEquals(queryStats.getDispatchingTime().toMillis(), 100);
assertEquals(queryStats.getTotalPlanningTime().toMillis(), 200);
// there is no way to induce finishing time without a transaction and connector
assertEquals(queryStats.getFinishingTime().toMillis(), 0);
// query execution time is starts when query transitions to planning
assertEquals(queryStats.getExecutionTime().toMillis(), 900);
}
@Test
public void testUpdateMemoryUsage()
{
QueryStateMachine stateMachine = createQueryStateMachine();
stateMachine.updateMemoryUsage(5, 10, 1, 3, 3);
assertEquals(stateMachine.getPeakUserMemoryInBytes(), 5);
assertEquals(stateMachine.getPeakTotalMemoryInBytes(), 10);
assertEquals(stateMachine.getPeakTaskUserMemory(), 1);
assertEquals(stateMachine.getPeakTaskTotalMemory(), 3);
assertEquals(stateMachine.getPeakNodeTotalMemory(), 3);
stateMachine.updateMemoryUsage(0, 0, 2, 2, 2);
assertEquals(stateMachine.getPeakUserMemoryInBytes(), 5);
assertEquals(stateMachine.getPeakTotalMemoryInBytes(), 10);
assertEquals(stateMachine.getPeakTaskUserMemory(), 2);
assertEquals(stateMachine.getPeakTaskTotalMemory(), 3);
assertEquals(stateMachine.getPeakNodeTotalMemory(), 3);
stateMachine.updateMemoryUsage(1, 1, 1, 5, 5);
assertEquals(stateMachine.getPeakUserMemoryInBytes(), 6);
assertEquals(stateMachine.getPeakTotalMemoryInBytes(), 11);
assertEquals(stateMachine.getPeakTaskUserMemory(), 2);
assertEquals(stateMachine.getPeakTaskTotalMemory(), 5);
assertEquals(stateMachine.getPeakNodeTotalMemory(), 5);
stateMachine.updateMemoryUsage(3, 3, 5, 2, 2);
assertEquals(stateMachine.getPeakUserMemoryInBytes(), 9);
assertEquals(stateMachine.getPeakTotalMemoryInBytes(), 14);
assertEquals(stateMachine.getPeakTaskUserMemory(), 5);
assertEquals(stateMachine.getPeakTaskTotalMemory(), 5);
assertEquals(stateMachine.getPeakNodeTotalMemory(), 5);
}
@Test
public void testTransitionToFailedAfterTransitionToFinishing()
{
SettableFuture<?> commitFuture = SettableFuture.create();
TransactionManager transactionManager = new DelegatingTransactionManager(createTestTransactionManager())
{
@Override
public ListenableFuture<?> asyncCommit(TransactionId transactionId)
{
return allAsList(commitFuture, super.asyncCommit(transactionId));
}
};
QueryStateMachine stateMachine = createQueryStateMachine(transactionManager);
stateMachine.transitionToFinishing();
assertEquals(stateMachine.getQueryState(), FINISHING);
assertFalse(stateMachine.transitionToFailed(new RuntimeException("failed")));
assertEquals(stateMachine.getQueryState(), FINISHING);
commitFuture.set(null);
tryGetFutureValue(stateMachine.getStateChange(FINISHED), 2, SECONDS);
assertEquals(stateMachine.getQueryState(), FINISHED);
}
@Test
public void testTransitionToCanceledAfterTransitionToFinishing()
{
SettableFuture<?> commitFuture = SettableFuture.create();
TransactionManager transactionManager = new DelegatingTransactionManager(createTestTransactionManager())
{
@Override
public ListenableFuture<?> asyncCommit(TransactionId transactionId)
{
return allAsList(commitFuture, super.asyncCommit(transactionId));
}
};
QueryStateMachine stateMachine = createQueryStateMachine(transactionManager);
stateMachine.transitionToFinishing();
assertEquals(stateMachine.getQueryState(), FINISHING);
assertTrue(stateMachine.transitionToCanceled());
assertEquals(stateMachine.getQueryState(), FAILED);
commitFuture.set(null);
assertEquals(stateMachine.getQueryState(), FAILED);
assertEquals(stateMachine.getFailureInfo().get().getMessage(), "Query was canceled");
}
@Test
public void testCommitFailure()
{
SettableFuture<?> commitFuture = SettableFuture.create();
TransactionManager transactionManager = new DelegatingTransactionManager(createTestTransactionManager())
{
@Override
public ListenableFuture<?> asyncCommit(TransactionId transactionId)
{
return allAsList(commitFuture, super.asyncCommit(transactionId));
}
};
QueryStateMachine stateMachine = createQueryStateMachine(transactionManager);
stateMachine.transitionToFinishing();
// after transitioning to finishing, the transaction is gone
assertEquals(stateMachine.getQueryState(), FINISHING);
assertFalse(stateMachine.transitionToFailed(new RuntimeException("failed")));
assertEquals(stateMachine.getQueryState(), FINISHING);
commitFuture.setException(new RuntimeException("transaction failed"));
tryGetFutureValue(stateMachine.getStateChange(FAILED), 2, SECONDS);
assertEquals(stateMachine.getQueryState(), FAILED);
assertEquals(stateMachine.getFailureInfo().get().getMessage(), "transaction failed");
}
private static void assertFinalState(QueryStateMachine stateMachine, QueryState expectedState)
{
assertFinalState(stateMachine, expectedState, null);
}
private static void assertFinalState(QueryStateMachine stateMachine, QueryState expectedState, Exception expectedException)
{
assertTrue(expectedState.isDone());
assertState(stateMachine, expectedState, expectedException);
assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, expectedState, expectedException);
assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, expectedState, expectedException);
assertFalse(stateMachine.transitionToStarting());
assertState(stateMachine, expectedState, expectedException);
assertFalse(stateMachine.transitionToRunning());
assertState(stateMachine, expectedState, expectedException);
assertFalse(stateMachine.transitionToFinishing());
assertState(stateMachine, expectedState, expectedException);
assertFalse(stateMachine.transitionToFailed(FAILED_CAUSE));
assertState(stateMachine, expectedState, expectedException);
// attempt to fail with another exception, which will fail
assertFalse(stateMachine.transitionToFailed(new IOException("failure after finish")));
assertState(stateMachine, expectedState, expectedException);
}
private static void assertState(QueryStateMachine stateMachine, QueryState expectedState)
{
assertState(stateMachine, expectedState, null);
}
private static void assertState(QueryStateMachine stateMachine, QueryState expectedState, Exception expectedException)
{
assertEquals(stateMachine.getQueryId(), TEST_SESSION.getQueryId());
assertEqualSessionsWithoutTransactionId(stateMachine.getSession(), TEST_SESSION);
assertSame(stateMachine.getMemoryPool(), MEMORY_POOL);
assertEquals(stateMachine.getSetSessionProperties(), SET_SESSION_PROPERTIES);
assertEquals(stateMachine.getResetSessionProperties(), RESET_SESSION_PROPERTIES);
QueryInfo queryInfo = stateMachine.getQueryInfo(Optional.empty());
assertEquals(queryInfo.getQueryId(), TEST_SESSION.getQueryId());
assertEquals(queryInfo.getSelf(), LOCATION);
assertFalse(queryInfo.getOutputStage().isPresent());
assertEquals(queryInfo.getQuery(), QUERY);
assertEquals(queryInfo.getInputs(), INPUTS);
assertEquals(queryInfo.getOutput(), OUTPUT);
assertEquals(queryInfo.getFieldNames(), OUTPUT_FIELD_NAMES);
assertEquals(queryInfo.getUpdateInfo(), new UpdateInfo("UPDATE TYPE", ""));
assertEquals(queryInfo.getMemoryPool(), MEMORY_POOL.getId());
assertEquals(queryInfo.getQueryType(), QUERY_TYPE);
QueryStats queryStats = queryInfo.getQueryStats();
assertNotNull(queryStats.getElapsedTime());
assertNotNull(queryStats.getQueuedTime());
assertNotNull(queryStats.getResourceWaitingTime());
assertNotNull(queryStats.getSemanticAnalyzingTime());
assertNotNull(queryStats.getColumnAccessPermissionCheckingTime());
assertNotNull(queryStats.getDispatchingTime());
assertNotNull(queryStats.getExecutionTime());
assertNotNull(queryStats.getTotalPlanningTime());
assertNotNull(queryStats.getFinishingTime());
assertTrue(queryStats.getCreateTimeInMillis() > 0);
if (queryInfo.getState() == WAITING_FOR_PREREQUISITES || queryInfo.getState() == QUEUED || queryInfo.getState() == WAITING_FOR_RESOURCES || queryInfo.getState() == DISPATCHING) {
assertEquals(queryStats.getExecutionStartTimeInMillis(), 0L);
}
else {
assertTrue(queryStats.getExecutionStartTimeInMillis() > 0);
}
if (queryInfo.getState().isDone()) {
assertTrue(queryStats.getEndTimeInMillis() > 0);
}
else {
assertEquals(queryStats.getEndTimeInMillis(), 0);
}
assertEquals(stateMachine.getQueryState(), expectedState);
assertEquals(queryInfo.getState(), expectedState);
assertEquals(stateMachine.isDone(), expectedState.isDone());
if (expectedState == FAILED) {
assertNotNull(queryInfo.getFailureInfo());
FailureInfo failure = queryInfo.getFailureInfo().toFailureInfo();
assertNotNull(failure);
assertEquals(failure.getType(), expectedException.getClass().getName());
if (expectedException instanceof PrestoException) {
assertEquals(queryInfo.getErrorCode(), ((PrestoException) expectedException).getErrorCode());
}
else {
assertEquals(queryInfo.getErrorCode(), GENERIC_INTERNAL_ERROR.toErrorCode());
}
}
else {
assertNull(queryInfo.getFailureInfo());
}
}
private QueryStateMachine createQueryStateMachine()
{
return createQueryStateMachineWithTicker(Ticker.systemTicker());
}
private QueryStateMachine createQueryStateMachine(TransactionManager transactionManager)
{
return createQueryStateMachineWithTicker(Ticker.systemTicker(), transactionManager);
}
private QueryStateMachine createQueryStateMachineWithTicker(Ticker ticker)
{
return createQueryStateMachineWithTicker(ticker, createTestTransactionManager());
}
private QueryStateMachine createQueryStateMachineWithTicker(Ticker ticker, TransactionManager transactionManager)
{
Metadata metadata = MetadataManager.createTestMetadataManager();
AccessControl accessControl = new AccessControlManager(transactionManager);
QueryStateMachine stateMachine = QueryStateMachine.beginWithTicker(
QUERY,
Optional.empty(),
TEST_SESSION,
LOCATION,
new ResourceGroupId("test"),
QUERY_TYPE,
false,
transactionManager,
accessControl,
executor,
ticker,
metadata,
WarningCollector.NOOP);
stateMachine.setInputs(INPUTS);
stateMachine.setOutput(OUTPUT);
stateMachine.setColumns(OUTPUT_FIELD_NAMES, OUTPUT_FIELD_TYPES);
stateMachine.setUpdateInfo(new UpdateInfo("UPDATE TYPE", ""));
stateMachine.setMemoryPool(MEMORY_POOL);
for (Entry<String, String> entry : SET_SESSION_PROPERTIES.entrySet()) {
stateMachine.addSetSessionProperties(entry.getKey(), entry.getValue());
}
RESET_SESSION_PROPERTIES.forEach(stateMachine::addResetSessionProperties);
return stateMachine;
}
private static void assertEqualSessionsWithoutTransactionId(Session actual, Session expected)
{
assertEquals(actual.getQueryId(), expected.getQueryId());
assertEquals(actual.getIdentity(), expected.getIdentity());
assertEquals(actual.getSource(), expected.getSource());
assertEquals(actual.getCatalog(), expected.getCatalog());
assertEquals(actual.getSchema(), expected.getSchema());
assertEquals(actual.getTimeZoneKey(), expected.getTimeZoneKey());
assertEquals(actual.getLocale(), expected.getLocale());
assertEquals(actual.getRemoteUserAddress(), expected.getRemoteUserAddress());
assertEquals(actual.getUserAgent(), expected.getUserAgent());
assertEquals(actual.getStartTime(), expected.getStartTime());
assertEquals(actual.getSystemProperties(), expected.getSystemProperties());
assertEquals(actual.getConnectorProperties(), expected.getConnectorProperties());
}
}