TestPrestoNativeAsyncDataCacheCleanupAPI.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.nativeworker;

import com.facebook.presto.Session;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.testing.ExpectedQueryRunner;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.facebook.presto.nativeworker.NativeApiEndpointUtils.fetchScalarLongMetrics;
import static com.facebook.presto.nativeworker.NativeApiEndpointUtils.sendWorkerRequest;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createCustomer;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

public class TestPrestoNativeAsyncDataCacheCleanupAPI
        extends AbstractTestQueryFramework
{
    private static final String memoryCacheCleanupEndPoint = "/v1/operation/server/clearCache?type=memory";
    private static final String ssdCacheCleanupEndPoint = "/v1/operation/server/clearCache?type=ssd";
    private static final String metricsEndPoint = "/v1/info/metrics";
    private static final String writeToSsdEndPoint = "/v1/operation/server/writeSsd";

    @Override
    protected QueryRunner createQueryRunner() throws Exception
    {
        return PrestoNativeQueryRunnerUtils.nativeHiveQueryRunnerBuilder()
                .setCacheMaxSize(4096)
                .setUseThrift(true)
                .setAddStorageFormatToPath(true)
                .setEnableRuntimeMetricsCollection(true)
                .setEnableSsdCache(true)
                .build();
    }

    @Override
    protected ExpectedQueryRunner createExpectedQueryRunner() throws Exception
    {
        return PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder()
                .setAddStorageFormatToPath(true)
                .build();
    }

    @Override
    protected void createTables()
    {
        QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner();
        createCustomer(queryRunner);
    }

    @Test(groups = {"async_data_cache"}, enabled = false)
    public void testAsyncDataCacheCleanup() throws Exception
    {
        Session session = Session.builder(super.getSession())
                .setCatalogSessionProperty("hive", "node_selection_strategy", "SOFT_AFFINITY")
                .build();

        QueryRunner queryRunner = getQueryRunner();
        DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;
        Set<InternalNode> workerNodes = getWorkerNodes(distributedQueryRunner);

        // 1. Collect initial cache metrics
        Metrics initialMetrics = collectCacheMetrics(workerNodes, metricsEndPoint);
        assertEquals(0, initialMetrics.memoryCacheHits, "Cache hits should be zero initially.");
        assertEquals(0, initialMetrics.memoryCacheEntries, "Cache entries should be zero initially.");

        // 2. Execute queries to populate cache
        for (int i = 0; i < 4; i++) {
            queryRunner.execute(session, "SELECT count(*) FROM customer");
        }
        TimeUnit.SECONDS.sleep(60); // Sleep to allow cache updates

        // 3. Collect cache metrics after queries
        Metrics populatedMetrics = collectCacheMetrics(workerNodes, metricsEndPoint);
        assertNotEquals(0, populatedMetrics.memoryCacheHits, "Cache should have hits after queries.");
        assertNotEquals(0, populatedMetrics.memoryCacheEntries, "Cache should have entries after queries.");

        // 4. Write cache data to ssd
        workerNodes.parallelStream().forEach(worker -> {
            int responseCode = sendWorkerRequest(worker.getInternalUri().toString(), writeToSsdEndPoint);
            assertEquals(200, responseCode, "Expected a 200 OK response for writing to ssd cache.");
        });
        TimeUnit.SECONDS.sleep(60); // Sleep to allow cache updates

        // 5. Collect SSD metrics after ssd write
        Metrics ssdMetrics = collectCacheMetrics(workerNodes, metricsEndPoint);
        assertNotEquals(0, ssdMetrics.ssdCacheWriteEntries, "SSD Cache should have write entries after write API call.");
        assertEquals(0, ssdMetrics.ssdCacheReadEntries, "SSD Cache should have 0 read entries currently.");

        // 6. Trigger memory async cache cleanup
        workerNodes.parallelStream().forEach(worker -> {
            int responseCode = sendWorkerRequest(worker.getInternalUri().toString(), memoryCacheCleanupEndPoint);
            assertEquals(200, responseCode, "Expected a 200 OK response for cache cleanup.");
        });
        TimeUnit.SECONDS.sleep(60); // Sleep to allow cache updates

        // 7. Validate memory cache is cleared
        Metrics finalMetrics = collectCacheMetrics(workerNodes, metricsEndPoint);
        assertEquals(0, finalMetrics.memoryCacheEntries, "Cache should be empty after cleanup.");

        // 8. Execute queries to read from SSD cache
        for (int i = 0; i < 4; i++) {
            queryRunner.execute(session, "SELECT count(*) FROM customer");
        }
        TimeUnit.SECONDS.sleep(60); // Sleep to allow cache updates

        // 9. Collect SSD metrics to check read entries metrics
        Metrics populatedSsdMetrics = collectCacheMetrics(workerNodes, metricsEndPoint);
        assertNotEquals(0, populatedSsdMetrics.ssdCacheReadEntries, "SSD Cache should have non-zero read entries.");

        // 10. Trigger SSD cache clean up
        workerNodes.parallelStream().forEach(worker -> {
            int responseCode = sendWorkerRequest(worker.getInternalUri().toString(), ssdCacheCleanupEndPoint);
            assertEquals(200, responseCode, "Expected a 200 OK response for cache cleanup.");
        });
        TimeUnit.SECONDS.sleep(60); // Sleep to allow cache updates

        // 11. Validate SSD cache is cleared
        Metrics finalSSDCacheMetrics = collectCacheMetrics(workerNodes, metricsEndPoint);
        assertEquals(0, finalSSDCacheMetrics.ssdCacheCachedEntries, "SSD Cache should be empty after cleanup.");
    }

    private Metrics collectCacheMetrics(Set<InternalNode> workerNodes, String endpoint)
            throws Exception
    {
        int memoryCacheHits = 0;
        int memoryCacheEntries = 0;
        int ssdCacheReadEntries = 0;
        int ssdCacheWriteEntries = 0;
        int ssdCacheCachedEntries = 0;

        for (InternalNode worker : workerNodes) {
            Map<String, Long> metrics = fetchScalarLongMetrics(worker.getInternalUri().toString(), endpoint, "GET");
            memoryCacheHits += metrics.get("velox_memory_cache_num_hits");
            memoryCacheEntries += metrics.get("velox_memory_cache_num_entries");
            ssdCacheReadEntries += metrics.get("velox_ssd_cache_read_entries");
            ssdCacheWriteEntries += metrics.get("velox_ssd_cache_written_entries");
            ssdCacheCachedEntries += metrics.get("velox_ssd_cache_cached_entries");
        }
        return new Metrics(memoryCacheHits, memoryCacheEntries, ssdCacheReadEntries, ssdCacheWriteEntries, ssdCacheCachedEntries);
    }

    private static class Metrics
    {
        final int memoryCacheHits;
        final int memoryCacheEntries;
        final int ssdCacheReadEntries;
        final int ssdCacheWriteEntries;
        final int ssdCacheCachedEntries;

        Metrics(int memoryCacheHits, int memoryCacheEntries, int ssdCacheReadEntries, int ssdCacheWriteEntries, int ssdCacheCachedEntries)
        {
            this.memoryCacheHits = memoryCacheHits;
            this.memoryCacheEntries = memoryCacheEntries;
            this.ssdCacheReadEntries = ssdCacheReadEntries;
            this.ssdCacheWriteEntries = ssdCacheWriteEntries;
            this.ssdCacheCachedEntries = ssdCacheCachedEntries;
        }
    }

    private boolean isCoordinator(DistributedQueryRunner distributedQueryRunner, InternalNode node)
    {
        return distributedQueryRunner.getCoordinator().getNodeManager().getCoordinators().contains(node);
    }

    private Set<InternalNode> getWorkerNodes(DistributedQueryRunner queryRunner)
    {
        return queryRunner.getCoordinator()
                .getNodeManager()
                .getAllNodes()
                .getActiveNodes()
                .stream()
                .filter(node -> !isCoordinator(queryRunner, node))
                .collect(Collectors.toSet());
    }

    @Test(groups = {"async_data_cache"}, enabled = false)
    public void testAsyncDataCacheCleanupApiFormat()
    {
        QueryRunner queryRunner = getQueryRunner();
        DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;
        Set<InternalNode> workerNodes = getWorkerNodes(distributedQueryRunner);
        InternalNode workerNode = workerNodes.iterator().next();

        int responseCode = sendWorkerRequest(workerNode.getInternalUri().toString(), memoryCacheCleanupEndPoint);
        assertEquals(responseCode, 200, "Expected a 200 OK response for valid shutdown request");

        String invalidEndPoint = "/v1/operation/server/clearCacheNonExisting?name=hive&id=hive";
        responseCode = sendWorkerRequest(workerNode.getInternalUri().toString(), invalidEndPoint);
        assertEquals(responseCode, 500);
    }
}