TestQueryTrackerQueuedTime.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.execution.QueryTracker.TrackedQuery;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import io.airlift.units.Duration;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static io.airlift.units.Duration.succinctDuration;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestQueryTrackerQueuedTime
{
    private ScheduledExecutorService executor;
    private QueryTracker<MockTrackedQuery> queryTracker;

    @BeforeMethod
    public void setUp()
    {
        executor = newSingleThreadScheduledExecutor();
        QueryManagerConfig config = new QueryManagerConfig();
        queryTracker = new QueryTracker<>(config, executor, Optional.empty());
        queryTracker.start();
    }

    @AfterMethod
    public void tearDown()
    {
        if (queryTracker != null) {
            queryTracker.stop();
        }
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @Test
    public void testQueryExceedsQueuedTimeLimit()
            throws Exception
    {
        // Create a session with 1 second queued time limit
        Session session = Session.builder(TEST_SESSION)
                .setSystemProperty(SystemSessionProperties.QUERY_MAX_QUEUED_TIME, "1s")
                .build();

        AtomicReference<PrestoException> failureException = new AtomicReference<>();
        AtomicBoolean queryFailed = new AtomicBoolean(false);

        // Create a mock query that has been queued for 2 seconds (exceeds limit)
        long currentTime = System.currentTimeMillis();
        MockTrackedQuery query = new MockTrackedQuery(
                new QueryId("test_query_1"),
                session,
                currentTime - 2000, // Created 2 seconds ago
                0, // Not started execution yet
                currentTime,
                failureException,
                queryFailed);

        queryTracker.addQuery(query);

        // Manually trigger time limit enforcement
        queryTracker.enforceTimeLimits();

        // Verify the query was failed due to exceeding queued time limit
        assertTrue(queryFailed.get(), "Query should have been failed");
        assertNotNull(failureException.get(), "Failure exception should be set");
        assertEquals(failureException.get().getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode());
        assertTrue(failureException.get().getMessage().contains("Query exceeded maximum queued time limit"));
    }

    @Test
    public void testQueryWithinQueuedTimeLimit()
            throws Exception
    {
        // Create a session with 5 second queued time limit
        Session session = Session.builder(TEST_SESSION)
                .setSystemProperty(SystemSessionProperties.QUERY_MAX_QUEUED_TIME, "5s")
                .build();

        AtomicReference<PrestoException> failureException = new AtomicReference<>();
        AtomicBoolean queryFailed = new AtomicBoolean(false);

        // Create a mock query that has been queued for 1 second (within limit)
        long currentTime = System.currentTimeMillis();
        MockTrackedQuery query = new MockTrackedQuery(
                new QueryId("test_query_2"),
                session,
                currentTime - 1000, // Created 1 second ago
                0, // Not started execution yet
                currentTime,
                failureException,
                queryFailed);

        queryTracker.addQuery(query);

        // Manually trigger time limit enforcement
        queryTracker.enforceTimeLimits();

        // Verify the query was not failed
        assertFalse(queryFailed.get(), "Query should not have been failed");
    }

    @Test
    public void testQueryStartedExecutionQueuedTimeCalculation()
            throws Exception
    {
        // Create a session with 1 second queued time limit
        Session session = Session.builder(TEST_SESSION)
                .setSystemProperty(SystemSessionProperties.QUERY_MAX_QUEUED_TIME, "1s")
                .build();

        AtomicReference<PrestoException> failureException = new AtomicReference<>();
        AtomicBoolean queryFailed = new AtomicBoolean(false);

        // Create a mock query that was queued for 2 seconds but started execution
        long currentTime = System.currentTimeMillis();
        MockTrackedQuery query = new MockTrackedQuery(
                new QueryId("test_query_3"),
                session,
                currentTime - 3000, // Created 3 seconds ago
                currentTime - 1000, // Started execution 1 second ago (queued for 2 seconds)
                currentTime,
                failureException,
                queryFailed);

        queryTracker.addQuery(query);

        // Manually trigger time limit enforcement
        queryTracker.enforceTimeLimits();

        // Verify the query was failed because it was queued for 2 seconds (exceeds 1s limit)
        assertTrue(queryFailed.get(), "Query should have been failed");
        assertNotNull(failureException.get(), "Failure exception should be set");
        assertEquals(failureException.get().getErrorCode(), EXCEEDED_TIME_LIMIT.toErrorCode());
        assertTrue(failureException.get().getMessage().contains("Query exceeded maximum queued time limit"));
    }

    @Test
    public void testQueryStartedExecutionWithinQueuedTimeLimit()
            throws Exception
    {
        // Create a session with 5 second queued time limit
        Session session = Session.builder(TEST_SESSION)
                .setSystemProperty(SystemSessionProperties.QUERY_MAX_QUEUED_TIME, "5s")
                .build();

        AtomicReference<PrestoException> failureException = new AtomicReference<>();
        AtomicBoolean queryFailed = new AtomicBoolean(false);

        // Create a mock query that was queued for 1 second and started execution
        long currentTime = System.currentTimeMillis();
        MockTrackedQuery query = new MockTrackedQuery(
                new QueryId("test_query_4"),
                session,
                currentTime - 2000, // Created 2 seconds ago
                currentTime - 1000, // Started execution 1 second ago (queued for 1 second)
                currentTime,
                failureException,
                queryFailed);

        queryTracker.addQuery(query);

        // Manually trigger time limit enforcement
        queryTracker.enforceTimeLimits();

        // Verify the query was not failed
        assertFalse(queryFailed.get(), "Query should not have been failed");
    }

    @Test
    public void testCompletedQueryNotChecked()
            throws Exception
    {
        // Create a session with 1 second queued time limit
        Session session = Session.builder(TEST_SESSION)
                .setSystemProperty(SystemSessionProperties.QUERY_MAX_QUEUED_TIME, "1s")
                .build();

        AtomicReference<PrestoException> failureException = new AtomicReference<>();
        AtomicBoolean queryFailed = new AtomicBoolean(false);

        // Create a mock query that is already completed
        long currentTime = System.currentTimeMillis();
        MockTrackedQuery query = new MockTrackedQuery(
                new QueryId("test_query_5"),
                session,
                currentTime - 5000, // Created 5 seconds ago
                0, // Not started execution yet
                currentTime,
                failureException,
                queryFailed);
        query.setDone(true); // Mark as completed

        queryTracker.addQuery(query);

        // Manually trigger time limit enforcement
        queryTracker.enforceTimeLimits();

        // Verify the completed query was not failed
        assertFalse(queryFailed.get(), "Completed query should not be checked for time limits");
    }

    private static class MockTrackedQuery
            implements TrackedQuery
    {
        private final QueryId queryId;
        private final Session session;
        private final long createTimeInMillis;
        private final long executionStartTimeInMillis;
        private final long lastHeartbeatInMillis;
        private final AtomicReference<PrestoException> failureException;
        private final AtomicBoolean queryFailed;
        private boolean done;

        public MockTrackedQuery(
                QueryId queryId,
                Session session,
                long createTimeInMillis,
                long executionStartTimeInMillis,
                long lastHeartbeatInMillis,
                AtomicReference<PrestoException> failureException,
                AtomicBoolean queryFailed)
        {
            this.queryId = queryId;
            this.session = session;
            this.createTimeInMillis = createTimeInMillis;
            this.executionStartTimeInMillis = executionStartTimeInMillis;
            this.lastHeartbeatInMillis = lastHeartbeatInMillis;
            this.failureException = failureException;
            this.queryFailed = queryFailed;
        }

        public void setDone(boolean done)
        {
            this.done = done;
        }

        @Override
        public QueryId getQueryId()
        {
            return queryId;
        }

        @Override
        public boolean isDone()
        {
            return done;
        }

        @Override
        public Session getSession()
        {
            return session;
        }

        @Override
        public long getCreateTimeInMillis()
        {
            return createTimeInMillis;
        }

        @Override
        public Duration getQueuedTime()
        {
            long queuedTimeInMillis;
            if (executionStartTimeInMillis > 0) {
                queuedTimeInMillis = executionStartTimeInMillis - createTimeInMillis;
            }
            else {
                queuedTimeInMillis = System.currentTimeMillis() - createTimeInMillis;
            }
            return succinctDuration(queuedTimeInMillis, MILLISECONDS);
        }

        @Override
        public long getExecutionStartTimeInMillis()
        {
            return executionStartTimeInMillis;
        }

        @Override
        public long getLastHeartbeatInMillis()
        {
            return lastHeartbeatInMillis;
        }

        @Override
        public long getEndTimeInMillis()
        {
            return done ? System.currentTimeMillis() : 0;
        }

        @Override
        public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
        {
            return Optional.empty();
        }

        @Override
        public void fail(Throwable cause)
        {
            if (cause instanceof PrestoException) {
                failureException.set((PrestoException) cause);
            }
            queryFailed.set(true);
            done = true;
        }

        @Override
        public void pruneExpiredQueryInfo()
        {
            // No-op for test
        }

        @Override
        public void pruneFinishedQueryInfo()
        {
            // No-op for test
        }
    }
}