RequestThrottlerTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestThrottlerTest extends ZKTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(RequestThrottlerTest.class);

    private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
    private static String GLOBAL_OUTSTANDING_LIMIT = "1";
    private static final int TOTAL_REQUESTS = 5;
    private static final int STALL_TIME = 5000;

    // latch to hold requests in the PrepRequestProcessor to
    // keep them from going down the pipeline to reach the final
    // request processor, where the number of in process requests
    // will be decreased
    CountDownLatch resumeProcess = null;

    // latch to make sure all requests are submitted
    CountDownLatch submitted = null;

    // latch to make sure all requests entered the pipeline
    CountDownLatch entered = null;

    // latch to make sure requests finished the pipeline
    CountDownLatch finished = null;

    CountDownLatch disconnected = null;

    CountDownLatch throttled = null;
    CountDownLatch throttling = null;

    ZooKeeperServer zks = null;
    ServerCnxnFactory f = null;
    ZooKeeper zk = null;
    int connectionLossCount = 0;

    @BeforeEach
    public void setup() throws Exception {
        // start a server and create a client
        File tmpDir = ClientBase.createTmpDir();
        ClientBase.setupTestEnv();
        zks = new TestZooKeeperServer(tmpDir, tmpDir, 3000);
        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
        f = ServerCnxnFactory.createFactory(PORT, -1);
        f.startup(zks);
        LOG.info("starting up the zookeeper server .. waiting");
        assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT), "waiting for server being up");

        resumeProcess = null;
        submitted = null;

        zk = ClientBase.createZKClient(HOSTPORT);
    }

    @AfterEach
    public void tearDown() throws Exception {
        // shut down the server and the client
        if (null != zk) {
            zk.close();
        }

        if (null != f) {
            f.shutdown();
        }
        if (null != zks) {
            zks.shutdown();
        }
    }

    // TestZooKeeperServer
    // 1. uses our version of PrepRequestProcessor, which can hold the request as long as we want
    // 2. count the number of submitted requests
    class TestZooKeeperServer extends ZooKeeperServer {

        public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
            super(snapDir, logDir, tickTime);
        }

        @Override
        protected RequestThrottler createRequestThrottler() {
            return new TestRequestThrottler(this);
        }

        @Override
        protected void setupRequestProcessors() {
            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
            RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
            ((SyncRequestProcessor) syncProcessor).start();
            firstProcessor = new TestPrepRequestProcessor(this, syncProcessor);
            ((TestPrepRequestProcessor) firstProcessor).start();
        }

        @Override
        public void submitRequest(Request si) {
            if (null != submitted) {
                submitted.countDown();
            }
            super.submitRequest(si);
        }

        @Override
        public void requestFinished(Request request) {
            if (null != finished){
                finished.countDown();
            }
            super.requestFinished(request);
        }
    }

    class TestRequestThrottler extends RequestThrottler {
        public TestRequestThrottler(ZooKeeperServer zks) {
            super(zks);
        }

        @Override
        synchronized void throttleSleep(int stallTime) throws InterruptedException {
            if (throttling != null) {
                throttling.countDown();
            }
            super.throttleSleep(stallTime);
            // Defend against unstable timing and potential spurious wakeup.
            if (throttled != null) {
                assertTrue(throttled.await(20, TimeUnit.SECONDS));
            }
        }
    }

    class TestPrepRequestProcessor extends PrepRequestProcessor {

        public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
            super(zks, syncProcessor);
        }

        @Override
        protected void pRequest(Request request) throws RequestProcessorException {
            // keep the request in the processor as long as we want
            if (resumeProcess != null) {
                try {
                    resumeProcess.await(20, TimeUnit.SECONDS);
                } catch (Exception e) {

                }
            }

            if (entered != null) {
                entered.countDown();
            }

            super.pRequest(request);
        }

    }

    @Test
    public void testRequestThrottler() throws Exception {
        ServerMetrics.getMetrics().resetAll();

        // we only allow two requests in the pipeline
        RequestThrottler.setMaxRequests(2);

        RequestThrottler.setStallTime(STALL_TIME);
        RequestThrottler.setDropStaleRequests(false);

        // no requests can go through the pipeline unless we raise the latch
        resumeProcess = new CountDownLatch(1);
        submitted = new CountDownLatch(TOTAL_REQUESTS);
        entered = new CountDownLatch(TOTAL_REQUESTS);

        // send 5 requests asynchronously
        for (int i = 0; i < TOTAL_REQUESTS; i++) {
            zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
                                                               + i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
            }, null);
        }

        // make sure the server received all 5 requests
        submitted.await(5, TimeUnit.SECONDS);

        // but only two requests can get into the pipeline because of the throttler
        waitForMetric("prep_processor_request_queued", is(2L));
        waitForMetric("request_throttle_wait_count", greaterThanOrEqualTo(1L));

        // let the requests go through the pipeline and the throttler will be waken up to allow more requests
        // to enter the pipeline
        resumeProcess.countDown();

        // wait for more than one STALL_TIME to reduce timeout before wakeup
        assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS));

        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
        assertEquals(TOTAL_REQUESTS, (long) metrics.get("prep_processor_request_queued"));
    }

    @Test
    public void testDropStaleRequests() throws Exception {
        ServerMetrics.getMetrics().resetAll();

        // we only allow two requests in the pipeline
        RequestThrottler.setMaxRequests(2);

        RequestThrottler.setStallTime(STALL_TIME);

        RequestThrottler.setDropStaleRequests(true);

        // no requests can go through the pipeline unless we raise the latch
        resumeProcess = new CountDownLatch(1);
        submitted = new CountDownLatch(TOTAL_REQUESTS);

        throttled = new CountDownLatch(1);
        throttling = new CountDownLatch(1);

        // send 5 requests asynchronously
        for (int i = 0; i < TOTAL_REQUESTS; i++) {
            zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
                                                               + i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
            }, null);
        }

        // make sure the server received all 5 requests
        assertTrue(submitted.await(5, TimeUnit.SECONDS));

        // stale throttled requests
        assertTrue(throttling.await(5, TimeUnit.SECONDS));
        for (ServerCnxn cnxn : f.cnxns) {
            cnxn.setStale();
        }
        throttled.countDown();
        zk = null;

        // only first three requests are counted as finished
        finished = new CountDownLatch(3);

        // let the requests go through the pipeline
        resumeProcess.countDown();
        LOG.info("raise the latch");

        while (zks.getInflight() > 0) {
            Thread.sleep(50);
        }

        assertTrue(finished.await(5, TimeUnit.SECONDS));

        // assert after all requests processed to avoid concurrent issues as metrics are
        // counted in different threads.
        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();

        // only two requests can get into the pipeline because of the throttler
        assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));

        // the rest of the 3 requests will be dropped
        // but only the first one for a connection will be counted
        assertEquals(1L, (long) metrics.get("request_throttle_wait_count"));
        assertEquals(1, (long) metrics.get("stale_requests_dropped"));
    }

    @Test
    public void testLargeRequestThrottling() throws Exception {
        ServerMetrics.getMetrics().resetAll();

        AsyncCallback.StringCallback createCallback = (rc, path, ctx, name) -> {
            if (KeeperException.Code.get(rc) == KeeperException.Code.CONNECTIONLOSS) {
                connectionLossCount++;
                disconnected.countDown();
            }
        };

        // the total length of the request is about 170-180 bytes, so only two requests are allowed
        byte[] data = new byte[100];
        // the third request will incur throttle. We don't send more requests to avoid reconnecting
        // due to unstable test environment(e.g. slow sending).
        int number_requests = 3;

        // we allow more requests in the pipeline
        RequestThrottler.setMaxRequests(number_requests + 2);

        // request could become stale in processor threads due to throttle in io thread
        RequestThrottler.setDropStaleRequests(false);

        // enable large request throttling
        zks.setLargeRequestThreshold(150);
        zks.setLargeRequestMaxBytes(400);

        // no requests can go through the pipeline unless we raise the latch
        resumeProcess = new CountDownLatch(1);
        // the connection will be close when large requests exceed the limit
        // we can't use the submitted latch because requests after close won't be submitted
        disconnected = new CountDownLatch(number_requests);

        // send requests asynchronously
        for (int i = 0; i < number_requests; i++) {
            zk.create("/request_throttle_test- " + i , data,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createCallback, null);
        }

        // make sure the server received all requests
        assertTrue(disconnected.await(30, TimeUnit.SECONDS));

        finished = new CountDownLatch(2);
        // let the requests go through the pipeline
        resumeProcess.countDown();
        assertTrue(finished.await(5, TimeUnit.SECONDS));

        // assert metrics after finished so metrics in no io threads are set also.
        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();

        // but only two requests can get into the pipeline because they are large requests
        // the connection will be closed
        assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
        assertEquals(1L, (long) metrics.get("large_requests_rejected"));
        assertEquals(number_requests, connectionLossCount);

        // when the two requests finish, they are stale because the connection is closed already
        assertEquals(2, (long) metrics.get("stale_replies"));
    }

    @Test
    public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() throws Exception {
        try {
            System.setProperty(ZooKeeperServer.GLOBAL_OUTSTANDING_LIMIT, GLOBAL_OUTSTANDING_LIMIT);

            ServerMetrics.getMetrics().resetAll();

            // Here we disable RequestThrottler and let incoming requests queued at first request processor.
            RequestThrottler.setMaxRequests(0);
            resumeProcess = new CountDownLatch(1);
            int totalRequests = 10;

            for (int i = 0; i < totalRequests; i++) {
                zk.create("/request_throttle_test- " + i, ("/request_throttle_test- "
                        + i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
                }, null);
            }

            // We should start throttling instead of queuing more requests.
            //
            // We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of requests coming in request processing pipeline
            // before throttling. For the next request, we will throttle by disabling receiving future requests but we still
            // allow this single request coming in. Ideally, the total number of queued requests in processing pipeline would
            // be GLOBAL_OUTSTANDING_LIMIT + 2.
            //
            // But due to leak of consistent view of number of outstanding requests, the number could be larger.
            waitForMetric("prep_processor_request_queued", greaterThanOrEqualTo(Long.parseLong(GLOBAL_OUTSTANDING_LIMIT) + 2));

            resumeProcess.countDown();
        } catch (Exception e) {
            throw e;
        } finally {
            System.clearProperty(ZooKeeperServer.GLOBAL_OUTSTANDING_LIMIT);
        }
    }
}