RequestPathMetricsCollectorTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.util;

import static org.apache.zookeeper.ZooDefs.OpCode.create;
import static org.apache.zookeeper.ZooDefs.OpCode.create2;
import static org.apache.zookeeper.ZooDefs.OpCode.delete;
import static org.apache.zookeeper.ZooDefs.OpCode.exists;
import static org.apache.zookeeper.ZooDefs.OpCode.getChildren;
import static org.apache.zookeeper.ZooDefs.OpCode.getChildren2;
import static org.apache.zookeeper.ZooDefs.OpCode.getData;
import static org.apache.zookeeper.ZooDefs.OpCode.setData;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class RequestPathMetricsCollectorTest {

    @BeforeEach
    public void setUp() {
        System.setProperty("zookeeper.pathStats.enabled", "true");
        System.setProperty("zookeeper.pathStats.slotCapacity", "60");
        System.setProperty("zookeeper.pathStats.slotDuration", "1");
        System.setProperty("zookeeper.pathStats.maxDepth", "6");
        System.setProperty("zookeeper.pathStats.sampleRate", "1.0");
    }

    @AfterEach
    public void tearDown() {
        System.clearProperty("zookeeper.pathStats.enabled");
        System.clearProperty("zookeeper.pathStats.slotCapacity");
        System.clearProperty("zookeeper.pathStats.slotDuration");
        System.clearProperty("zookeeper.pathStats.maxDepth");
        System.clearProperty("zookeeper.pathStats.sampleRate");
    }

    @Test
    public void testTrimPath() {
        //normal cases
        String trimmedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 1);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1"));
        trimmedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 2);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1/p2"));
        trimmedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 3);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1/p2/p3"));
        trimmedPath = RequestPathMetricsCollector.trimPathDepth("/p1/p2/p3", 4);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1/p2/p3"));
        //some extra symbols
        trimmedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next", 3);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1 next/p2.index/p3:next"));
        trimmedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next", 2);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1 next/p2.index"));
        trimmedPath = RequestPathMetricsCollector.trimPathDepth("//p1 next/p2.index/p3:next", 6);
        assertTrue(trimmedPath.equalsIgnoreCase("/p1 next/p2.index/p3:next"));
    }

    @Test
    public void testQueueMapReduce() throws InterruptedException {
        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
        RequestPathMetricsCollector.PathStatsQueue pathStatsQueue = requestPathMetricsCollector.new PathStatsQueue(create2);
        Thread path7 = new Thread(() -> {
            for (int i = 0; i < 1000000; i++) {
                pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6/path7" + "_" + i);
            }
        });
        path7.start();
        Thread path6 = new Thread(() -> {
            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6");
            for (int i = 1; i < 100000; i++) {
                pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6" + "_" + i);
            }
        });
        path6.start();
        for (int i = 0; i < 1; i++) {
            pathStatsQueue.registerRequest("/path1");
        }
        for (int i = 0; i < 10; i++) {
            pathStatsQueue.registerRequest("/path1/path2" + "_" + i);
        }
        for (int i = 0; i < 100; i++) {
            pathStatsQueue.registerRequest("/path1/path2/path3" + "_" + i);
        }
        for (int i = 0; i < 1000; i++) {
            pathStatsQueue.registerRequest("/path1/path2/path3/path4" + "_" + i);
        }
        for (int i = 0; i < 10000; i++) {
            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5" + "_" + i);
        }
        path6.join();
        path7.join();
        Map<String, Integer> newSlot = pathStatsQueue.mapReducePaths(1, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 1);
        assertTrue(newSlot.get("/path1").compareTo(1111111) == 0);
        //cut up to 2
        newSlot = pathStatsQueue.mapReducePaths(2, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 12);
        assertTrue(newSlot.get("/path1").compareTo(1) == 0);
        assertTrue(newSlot.get("/path1/path2").compareTo(1111100) == 0);
        //cut up to 3
        newSlot = pathStatsQueue.mapReducePaths(3, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 112);
        assertTrue(newSlot.get("/path1").compareTo(1) == 0);
        assertTrue(newSlot.get("/path1/path2/path3").compareTo(1111000) == 0);
        //cut up to 4
        newSlot = pathStatsQueue.mapReducePaths(4, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 1112);
        assertTrue(newSlot.get("/path1/path2/path3/path4").compareTo(1110000) == 0);
        //cut up to 5
        newSlot = pathStatsQueue.mapReducePaths(5, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 11112);
        assertTrue(newSlot.get("/path1/path2/path3/path4/path5").compareTo(1100000) == 0);
        //cut up to 6
        newSlot = pathStatsQueue.mapReducePaths(6, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 111111);
        assertTrue(newSlot.get("/path1/path2/path3/path4/path5/path6").compareTo(1000001) == 0);
        //cut up to 7
        newSlot = pathStatsQueue.mapReducePaths(7, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.size() == 1111111);
    }

    @Test
    public void testCollectEmptyStats() throws InterruptedException {
        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
        RequestPathMetricsCollector.PathStatsQueue pathStatsQueue = requestPathMetricsCollector.new PathStatsQueue(getChildren);
        Thread.sleep(5000);
        Map<String, Integer> newSlot = pathStatsQueue.mapReducePaths(3, pathStatsQueue.getCurrentSlot());
        assertTrue(newSlot.isEmpty());
        pathStatsQueue.start();
        Thread.sleep(15000);
        newSlot = pathStatsQueue.collectStats(1);
        assertTrue(newSlot.size() == 0);
        newSlot = pathStatsQueue.collectStats(2);
        assertTrue(newSlot.size() == 0);
        newSlot = pathStatsQueue.collectStats(5);
        assertTrue(newSlot.size() == 0);
    }

    @Test
    @Disabled
    public void testCollectStats() throws InterruptedException {
        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector(true);
        RequestPathMetricsCollector.PathStatsQueue pathStatsQueue = requestPathMetricsCollector.new PathStatsQueue(getChildren);
        pathStatsQueue.start();
        Thread path7 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 100000; j++) {
                    pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6/path7" + "_" + i + "_" + j);
                }
            }
        });
        path7.start();
        Thread path6 = new Thread(() -> {
            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6");
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 10000; j++) {
                    pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5/path6" + "_" + i + "_" + j);
                }
            }
        });
        path6.start();
        for (int i = 0; i < 1; i++) {
            pathStatsQueue.registerRequest("/path1");
        }
        for (int i = 0; i < 10; i++) {
            pathStatsQueue.registerRequest("/path1/path2" + "_" + i);
        }
        for (int i = 0; i < 100; i++) {
            pathStatsQueue.registerRequest("/path1/path2/path3" + "_" + i);
        }
        for (int i = 0; i < 1000; i++) {
            pathStatsQueue.registerRequest("/path1/path2/path3/path4" + "_" + i);
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 10000; i++) {
            pathStatsQueue.registerRequest("/path1/path2/path3/path4/path5" + "_" + i);
        }
        path6.join();
        path7.join();
        Map<String, Integer> newSlot = pathStatsQueue.collectStats(1);
        assertEquals(newSlot.size(), 1);
        assertEquals(newSlot.get("/path1").intValue(), 1111112);
        //cut up to 2
        newSlot = pathStatsQueue.collectStats(2);
        assertEquals(newSlot.size(), 12);
        assertEquals(newSlot.get("/path1").intValue(), 1);
        assertEquals(newSlot.get("/path1/path2").intValue(), 1111101);
        //cut up to 3
        newSlot = pathStatsQueue.collectStats(3);
        assertEquals(newSlot.size(), 112);
        assertEquals(newSlot.get("/path1").intValue(), 1);
        assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1111001);
        //cut up to 4
        newSlot = pathStatsQueue.collectStats(4);
        assertEquals(newSlot.size(), 1112);
        assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
        //cut up to 5
        newSlot = pathStatsQueue.collectStats(5);
        assertEquals(newSlot.size(), 11112);
        assertEquals(newSlot.get("/path1/path2/path3/path4/path5").intValue(), 1100001);
        //cut up to 6
        newSlot = pathStatsQueue.collectStats(6);
        assertEquals(newSlot.size(), 111112);
        assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
    }

    @Test
    public void testAggregate() throws InterruptedException {
        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector(true);
        Thread path7 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 100000; j++) {
                    requestPathMetricsCollector.registerRequest(getData, "/path1/path2/path3/path4/path5/path6/path7"
                                                                                 + "_"
                                                                                 + i
                                                                                 + "_"
                                                                                 + j);
                }
            }
        });
        path7.start();
        Thread path6 = new Thread(() -> {
            requestPathMetricsCollector.registerRequest(getChildren2, "/path1/path2/path3/path4/path5/path6");
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 10000; j++) {
                    requestPathMetricsCollector.registerRequest(getChildren, "/path1/path2/path3/path4/path5/path6"
                                                                                     + "_"
                                                                                     + i
                                                                                     + "_"
                                                                                     + j);
                }
            }
        });
        path6.start();
        for (int i = 0; i < 1; i++) {
            requestPathMetricsCollector.registerRequest(create2, "/path1");
        }
        for (int i = 0; i < 10; i++) {
            requestPathMetricsCollector.registerRequest(create, "/path1/path2" + "_" + i);
        }
        for (int i = 0; i < 100; i++) {
            requestPathMetricsCollector.registerRequest(delete, "/path1/path2/path3" + "_" + i);
        }
        for (int i = 0; i < 1000; i++) {
            requestPathMetricsCollector.registerRequest(setData, "/path1/path2/path3/path4" + "_" + i);
        }
        for (int i = 0; i < 10000; i++) {
            requestPathMetricsCollector.registerRequest(exists, "/path1/path2/path3/path4/path5" + "_" + i);
        }
        path6.join();
        path7.join();
        Map<String, Integer> newSlot = requestPathMetricsCollector.aggregatePaths(2, queue -> true);
        assertEquals(newSlot.size(), 12);
        assertEquals(newSlot.get("/path1").intValue(), 1);
        assertEquals(newSlot.get("/path1/path2").intValue(), 1111101);
        //cut up to 3
        newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> true);
        assertEquals(newSlot.size(), 112);
        assertEquals(newSlot.get("/path1").intValue(), 1);
        assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1111001);
        //cut up to 4
        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> true);
        assertEquals(newSlot.size(), 1112);
        assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
        //cut up to 5
        newSlot = requestPathMetricsCollector.aggregatePaths(5, queue -> true);
        assertEquals(newSlot.size(), 11112);
        assertEquals(newSlot.get("/path1/path2/path3/path4/path5").intValue(), 1100001);
        //cut up to 6
        newSlot = requestPathMetricsCollector.aggregatePaths(6, queue -> true);
        assertEquals(newSlot.size(), 111112);
        assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
        //cut up to 7 but the initial mapReduce kept only 6
        newSlot = requestPathMetricsCollector.aggregatePaths(7, queue -> true);
        assertEquals(newSlot.size(), 111112);
        assertEquals(newSlot.get("/path1/path2/path3/path4/path5/path6").intValue(), 1000001);
        //test predicate
        //cut up to 4 for all the reads
        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> !queue.isWriteOperation());
        assertEquals(newSlot.size(), 1);
        assertEquals(newSlot.get("/path1/path2/path3/path4").intValue(), 1110001);
        //cut up to 4 for all the write
        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> queue.isWriteOperation());
        assertEquals(newSlot.size(), 1111);
        //cut up to 3 for all the write
        newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> queue.isWriteOperation());
        assertEquals(newSlot.size(), 112);
        assertEquals(newSlot.get("/path1/path2/path3").intValue(), 1000);
    }

    @Test
    public void testTopPath() throws InterruptedException {
        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector(true);
        Thread path7 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 100000; j++) {
                    requestPathMetricsCollector.registerRequest(getData, "/path1/path2/path3/path4/path5/path6/path7"
                                                                                 + "_"
                                                                                 + i
                                                                                 + "_"
                                                                                 + j);
                }
            }
        });
        path7.start();
        Thread path6 = new Thread(() -> {
            requestPathMetricsCollector.registerRequest(getChildren2, "/path1/path2/path3/path4/path5/path6");
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 10000; j++) {
                    requestPathMetricsCollector.registerRequest(getChildren, "/path1/path2/path3/path4/path5/path6"
                                                                                     + "_"
                                                                                     + i
                                                                                     + "_"
                                                                                     + j);
                }
            }
        });
        path6.start();
        for (int i = 0; i < 1; i++) {
            requestPathMetricsCollector.registerRequest(create2, "/path1");
        }
        for (int i = 0; i < 10; i++) {
            requestPathMetricsCollector.registerRequest(create, "/path1/path2" + "_" + i);
        }
        for (int i = 0; i < 100; i++) {
            requestPathMetricsCollector.registerRequest(delete, "/path1/path2/path3" + "_" + i);
        }
        for (int i = 0; i < 1000; i++) {
            requestPathMetricsCollector.registerRequest(setData, "/path1/path2/path3/path4" + "_" + i);
        }
        for (int i = 0; i < 10000; i++) {
            requestPathMetricsCollector.registerRequest(exists, "/path1/path2/path3/path4/path5" + "_" + i);
        }
        path6.join();
        path7.join();
        StringBuilder sb1 = new StringBuilder();
        Map<String, Integer> newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> queue.isWriteOperation());
        requestPathMetricsCollector.logTopPaths(newSlot, entry -> sb1.append(entry.getKey()
                                                                                     + " : "
                                                                                     + entry.getValue()
                                                                                     + "\n"));
        assertTrue(sb1.toString().startsWith("/path1/path2/path3 : 1000"));
        StringBuilder sb2 = new StringBuilder();
        newSlot = requestPathMetricsCollector.aggregatePaths(3, queue -> !queue.isWriteOperation());
        requestPathMetricsCollector.logTopPaths(newSlot, entry -> sb2.append(entry.getKey()
                                                                                     + " : "
                                                                                     + entry.getValue()
                                                                                     + "\n"));
        assertTrue(sb2.toString().startsWith("/path1/path2/path3 : 1110001"));
        StringBuilder sb3 = new StringBuilder();
        newSlot = requestPathMetricsCollector.aggregatePaths(4, queue -> true);
        requestPathMetricsCollector.logTopPaths(newSlot, entry -> sb3.append(entry.getKey()
                                                                                     + " : "
                                                                                     + entry.getValue()
                                                                                     + "\n"));
        assertTrue(sb3.toString().startsWith("/path1/path2/path3/path4 : 1110001"));
    }

    @Test
    public void testMultiThreadPerf() throws InterruptedException {
        RequestPathMetricsCollector requestPathMetricsCollector = new RequestPathMetricsCollector();
        Random rand = new Random(System.currentTimeMillis());
        Long startTime = System.currentTimeMillis();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        //call 100k get Data
        for (int i = 0; i < 100000; i++) {
            executor.submit(
                () -> requestPathMetricsCollector.registerRequest(getData, "/path1/path2/path" + rand.nextInt(10)));
        }
        //5K create
        for (int i = 0; i < 5000; i++) {
            executor.submit(
                () -> requestPathMetricsCollector.registerRequest(create2, "/path1/path2/path" + rand.nextInt(10)));
        }
        //5K delete
        for (int i = 0; i < 5000; i++) {
            executor.submit(
                () -> requestPathMetricsCollector.registerRequest(delete, "/path1/path2/path" + rand.nextInt(10)));
        }
        //40K getChildren
        for (int i = 0; i < 40000; i++) {
            executor.submit(
                () -> requestPathMetricsCollector.registerRequest(getChildren, "/path1/path2/path" + rand.nextInt(10)));
        }
        executor.shutdown();
        //wait for at most 10 mill seconds
        executor.awaitTermination(10, TimeUnit.MILLISECONDS);
        assertTrue(executor.isTerminated());
        Long endTime = System.currentTimeMillis();
        //less than 2 seconds total time
        assertTrue(TimeUnit.MILLISECONDS.toSeconds(endTime - startTime) < 3);
    }

}