TestLocalDispatchQuery.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.dispatcher;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.Session;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.cost.HistoryBasedOptimizationConfig;
import com.facebook.presto.cost.HistoryBasedPlanStatisticsManager;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.event.QueryMonitorConfig;
import com.facebook.presto.eventlistener.EventListenerConfig;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.ClusterSizeMonitor;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.MockQueryExecution;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.resourceGroups.QueryQueueFullException;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.operator.OperatorInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.QueryProgressEvent;
import com.facebook.presto.spi.eventlistener.QueryUpdatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import com.facebook.presto.spi.prerequisites.QueryPrerequisites;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessDeniedException;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.transaction.TransactionManager;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import org.testng.annotations.Test;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.presto.client.NodeVersion.UNKNOWN;
import static com.facebook.presto.execution.QueryState.DISPATCHING;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.execution.TaskTestUtils.createQueryStateMachine;
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_TASK;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL;
import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static com.facebook.presto.transaction.InMemoryTransactionManager.createTestTransactionManager;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class TestLocalDispatchQuery
{
private static final QueryPrerequisites QUERY_PREREQUISITES = new DefaultQueryPrerequisites();
private final MetadataManager metadata = MetadataManager.createTestMetadataManager();
@Test
public void testSimpleExecutionCreationFailure()
{
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
createStateMachine(),
createQueryMonitor(eventListener),
immediateFailedFuture(new IllegalStateException("abc")),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
QUERY_PREREQUISITES);
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), GENERIC_INTERNAL_ERROR.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), GENERIC_INTERNAL_ERROR.toErrorCode());
}
@Test
public void testQueryQueuedExceptionBeforeDispatch()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
SettableFuture<QueryExecution> queryExecutionFuture = SettableFuture.create();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
queryExecutionFuture,
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {
throw new QueryQueueFullException(new ResourceGroupId("global"));
},
execution -> {},
false,
QUERY_PREREQUISITES);
query.startWaitingForPrerequisites();
queryExecutionFuture.setException(new IllegalStateException("abc"));
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), QUERY_QUEUE_FULL.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), QUERY_QUEUE_FULL.toErrorCode());
}
@Test
public void testErrorInPrerequisitesFuture()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {
throw new AccessDeniedException("sdf");
},
false,
(queryId, context, warningCollector) -> {
CompletableFuture<?> future = new CompletableFuture<>();
future.completeExceptionally(new PrestoException(ABANDONED_TASK, "something went wrong"));
return future;
});
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.startWaitingForPrerequisites();
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), ABANDONED_TASK.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), ABANDONED_TASK.toErrorCode());
}
@Test
public void testErrorInPrerequisitesSubmission()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {
throw new AccessDeniedException("sdf");
},
false,
(queryId, context, warningCollector) -> {
throw new PrestoException(ABANDONED_QUERY, "something went wrong");
});
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
try {
query.startWaitingForPrerequisites();
fail("Exception should be thrown");
}
catch (Throwable t) {
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), ABANDONED_QUERY.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), ABANDONED_QUERY.toErrorCode());
}
}
@Test
public void testPrerequisitesQueryFinishedCalled()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
CompletableFuture<?> prerequisitesFuture = new CompletableFuture<>();
AtomicBoolean queryFinishedCalled = new AtomicBoolean();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
new QueryPrerequisites()
{
@Override
public CompletableFuture<?> waitForPrerequisites(QueryId queryId, QueryPrerequisitesContext context, WarningCollector warningCollector)
{
return prerequisitesFuture;
}
@Override
public void queryFinished(QueryId queryId)
{
queryFinishedCalled.set(true);
}
});
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.startWaitingForPrerequisites();
prerequisitesFuture.complete(null);
query.fail(new PrestoException(ABANDONED_QUERY, "foo"));
assertTrue(queryFinishedCalled.get());
}
@Test
public void testPrerequisiteFutureCancellationWhenQueryCancelled()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
CompletableFuture<?> prerequisitesFuture = new CompletableFuture<>();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
(queryId, context, warningCollector) -> prerequisitesFuture);
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.startWaitingForPrerequisites();
query.fail(new PrestoException(ABANDONED_QUERY, "foo"));
assertTrue(prerequisitesFuture.isCancelled());
}
@Test
public void testQueryQueueSubmission()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
AtomicBoolean queryQueuerCalled = new AtomicBoolean();
CompletableFuture<?> prerequisitesFuture = new CompletableFuture<>();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {
queryQueuerCalled.compareAndSet(false, true);
},
execution -> {},
false,
(queryId, context, warningCollector) -> prerequisitesFuture);
assertEquals(stateMachine.getBasicQueryInfo(Optional.empty()).getState(), WAITING_FOR_PREREQUISITES);
query.startWaitingForPrerequisites();
assertEquals(stateMachine.getBasicQueryInfo(Optional.empty()).getState(), WAITING_FOR_PREREQUISITES);
assertFalse(queryQueuerCalled.get());
prerequisitesFuture.complete(null);
assertEquals(stateMachine.getBasicQueryInfo(Optional.empty()).getState(), QUEUED);
assertTrue(queryQueuerCalled.get());
}
@Test
public void testErrorInQuerySubmitter()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(new MockQueryExecution()),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {
throw new AccessDeniedException("sdf");
},
false,
QUERY_PREREQUISITES);
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.startWaitingForResources();
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), PERMISSION_DENIED.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), PERMISSION_DENIED.toErrorCode());
}
@Test
public void testTimeOutWaitingForClusterResources()
throws Exception
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(1),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
QUERY_PREREQUISITES);
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.startWaitingForResources();
Thread.sleep(2000); // Sleep long enough to ensure resource exhaustion error
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), GENERIC_INSUFFICIENT_RESOURCES.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), GENERIC_INSUFFICIENT_RESOURCES.toErrorCode());
}
@Test
public void testQueryCancellation()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(null),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
QUERY_PREREQUISITES);
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.cancel();
assertEquals(query.getBasicQueryInfo().getState(), FAILED);
assertEquals(query.getBasicQueryInfo().getErrorCode(), USER_CANCELED.toErrorCode());
assertTrue(eventListener.getQueryCompletedEvent().isPresent());
assertTrue(eventListener.getQueryCompletedEvent().get().getFailureInfo().isPresent());
assertEquals(eventListener.getQueryCompletedEvent().get().getFailureInfo().get().getErrorCode(), USER_CANCELED.toErrorCode());
}
@Test
public void testQueryDispatched()
{
QueryStateMachine stateMachine = createStateMachine();
CountingEventListener eventListener = new CountingEventListener();
LocalDispatchQuery query = new LocalDispatchQuery(
stateMachine,
createQueryMonitor(eventListener),
immediateFuture(new MockQueryExecution()),
createClusterSizeMonitor(0),
directExecutor(),
dispatchQuery -> {},
execution -> {},
false,
QUERY_PREREQUISITES);
assertEquals(query.getBasicQueryInfo().getState(), WAITING_FOR_PREREQUISITES);
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
query.startWaitingForResources();
assertEquals(query.getBasicQueryInfo().getState(), DISPATCHING);
assertNull(query.getBasicQueryInfo().getErrorCode());
assertFalse(eventListener.getQueryCompletedEvent().isPresent());
}
private ClusterSizeMonitor createClusterSizeMonitor(int minimumNodes)
{
return new ClusterSizeMonitor(new InMemoryNodeManager(), true, minimumNodes, minimumNodes, new Duration(10, MILLISECONDS), 1, 1, new Duration(1, SECONDS), new Duration(1, SECONDS), 0, false);
}
private QueryMonitor createQueryMonitor(CountingEventListener eventListener)
{
EventListenerManager eventListenerManager = createEventListenerManager(eventListener);
return new QueryMonitor(
jsonCodec(StageInfo.class),
jsonCodec(ExecutionFailureInfo.class),
jsonCodec(OperatorInfo.class),
eventListenerManager,
new NodeInfo("test"),
UNKNOWN,
createTestingSessionPropertyManager(),
metadata,
new QueryMonitorConfig(),
new HistoryBasedPlanStatisticsManager(new ObjectMapper(), createTestingSessionPropertyManager(), metadata, new HistoryBasedOptimizationConfig(), new FeaturesConfig(), new NodeVersion("1")),
new FeaturesConfig());
}
private EventListenerManager createEventListenerManager(CountingEventListener countingEventListener)
{
EventListenerManager eventListenerManager = new EventListenerManager(new EventListenerConfig());
eventListenerManager.addEventListenerFactory(new TestEventListenerFactory(countingEventListener));
eventListenerManager.loadConfiguredEventListener(ImmutableMap.of("event-listener.name", TestEventListenerFactory.NAME));
return eventListenerManager;
}
private QueryStateMachine createStateMachine()
{
TransactionManager transactionManager = createTestTransactionManager();
Session session = testSessionBuilder()
.setCatalog("tpch")
.setSchema(TINY_SCHEMA_NAME)
.setTransactionId(transactionManager.beginTransaction(false))
.build();
return createQueryStateMachine("COMMIT", session, true, transactionManager, directExecutor(), metadata);
}
private static class TestEventListenerFactory
implements EventListenerFactory
{
public static final String NAME = "name";
private final CountingEventListener countingEventListener;
public TestEventListenerFactory(CountingEventListener countingEventListener)
{
this.countingEventListener = requireNonNull(countingEventListener, "countingEventListener is null");
}
@Override
public String getName()
{
return NAME;
}
@Override
public EventListener create(Map<String, String> config)
{
return countingEventListener;
}
}
private static class CountingEventListener
implements EventListener
{
private final AtomicReference<QueryCompletedEvent> queryCompletedEvent = new AtomicReference<>();
@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
fail("Query creation events should not be created in this test");
}
@Override
public void queryUpdated(QueryUpdatedEvent queryUpdatedEvent)
{
fail("Query update events should not be created in this test");
}
@Override
public void publishQueryProgress(QueryProgressEvent queryProgressEvent)
{
fail("Query Progress events should not be created in this test");
}
@Override
public void queryCompleted(QueryCompletedEvent event)
{
assertTrue(queryCompletedEvent.compareAndSet(null, requireNonNull(event, "event is null")), "Duplicate completion event sent");
}
@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
fail("splitCompleted should never be called");
}
public Optional<QueryCompletedEvent> getQueryCompletedEvent()
{
return Optional.ofNullable(queryCompletedEvent.get());
}
}
}