BlueThrottleTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.QuorumUtil;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlueThrottleTest extends ZKTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
    private static final int RAPID_TIMEOUT = 10000;

    class MockRandom extends Random {

        int flag = 0;
        BlueThrottle throttle;

        @Override
        public double nextDouble() {
            if (throttle.getDropChance() > 0) {
                flag = 1 - flag;
                return flag;
            } else {
                return 1;
            }
        }

    }

    class BlueThrottleWithMockRandom extends BlueThrottle {

        public BlueThrottleWithMockRandom(MockRandom random) {
            super();
            this.rng = random;
            random.throttle = this;
        }

    }

    @Test
    public void testThrottleDisabled() {
        BlueThrottle throttler = new BlueThrottle();
        assertTrue(throttler.checkLimit(1), "Throttle should be disabled by default");
    }

    @Test
    public void testThrottleWithoutRefill() {
        BlueThrottle throttler = new BlueThrottle();
        throttler.setMaxTokens(1);
        throttler.setFillTime(2000);
        assertTrue(throttler.checkLimit(1), "First request should be allowed");
        assertFalse(throttler.checkLimit(1), "Second request should be denied");
    }

    @Test
    public void testThrottleWithRefill() throws InterruptedException {
        BlueThrottle throttler = new BlueThrottle();
        throttler.setMaxTokens(1);
        throttler.setFillTime(500);
        assertTrue(throttler.checkLimit(1), "First request should be allowed");
        assertFalse(throttler.checkLimit(1), "Second request should be denied");

        //wait for the bucket to be refilled
        Thread.sleep(750);
        assertTrue(throttler.checkLimit(1), "Third request should be allowed since we've got a new token");
    }

    @Test
    public void testThrottleWithoutRandomDropping() throws InterruptedException {
        int maxTokens = 5;
        BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom());
        throttler.setMaxTokens(maxTokens);
        throttler.setFillCount(maxTokens);
        throttler.setFillTime(1000);

        for (int i = 0; i < maxTokens; i++) {
            throttler.checkLimit(1);
        }
        assertEquals(throttler.getMaxTokens(), throttler.getDeficit(), "All tokens should be used up by now");

        Thread.sleep(110);
        throttler.checkLimit(1);
        assertFalse(throttler.getDropChance() > 0, "Dropping probability should still be zero");

        //allow bucket to be refilled
        Thread.sleep(1500);

        for (int i = 0; i < maxTokens; i++) {
            assertTrue(throttler.checkLimit(1), "The first " + maxTokens + " requests should be allowed");
        }

        for (int i = 0; i < maxTokens; i++) {
            assertFalse(throttler.checkLimit(1), "The latter " + maxTokens + " requests should be denied");
        }
    }

    @Test
    public void testThrottleWithRandomDropping() throws InterruptedException {
        int maxTokens = 5;
        BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom());
        throttler.setMaxTokens(maxTokens);
        throttler.setFillCount(maxTokens);
        throttler.setFillTime(1000);
        throttler.setFreezeTime(100);
        throttler.setDropIncrease(0.5);

        for (int i = 0; i < maxTokens; i++) {
            throttler.checkLimit(1);
        }
        assertEquals(throttler.getMaxTokens(), throttler.getDeficit(), "All tokens should be used up by now");

        Thread.sleep(120);
        //this will trigger dropping probability being increased
        throttler.checkLimit(1);
        assertTrue(throttler.getDropChance() > 0, "Dropping probability should be increased");
        LOG.info("Dropping probability is {}", throttler.getDropChance());

        //allow bucket to be refilled
        Thread.sleep(1100);
        LOG.info("Bucket is refilled with {} tokens.", maxTokens);

        int accepted = 0;
        for (int i = 0; i < maxTokens; i++) {
            if (throttler.checkLimit(1)) {
                accepted++;
            }
        }

        LOG.info("Send {} requests, {} are accepted", maxTokens, accepted);
        assertTrue(accepted < maxTokens, "The dropping should be distributed");

        accepted = 0;

        for (int i = 0; i < maxTokens; i++) {
            if (throttler.checkLimit(1)) {
                accepted++;
            }
        }

        LOG.info("Send another {} requests, {} are accepted", maxTokens, accepted);
        assertTrue(accepted > 0, "Later requests should have a chance");
    }

    private QuorumUtil quorumUtil = new QuorumUtil(1);
    private ClientBase.CountdownWatcher[] watchers;
    private ZooKeeper[] zks;

    private int connect(int n) throws Exception {
        String connStr = quorumUtil.getConnectionStringForServer(1);
        int connected = 0;

        zks = new ZooKeeper[n];
        watchers = new ClientBase.CountdownWatcher[n];
        for (int i = 0; i < n; i++){
            watchers[i] = new ClientBase.CountdownWatcher();
            zks[i] = new ZooKeeper(connStr, 3000, watchers[i]);
            try {
                watchers[i].waitForConnected(RAPID_TIMEOUT);
                connected++;
            } catch (TimeoutException e) {
                LOG.info("Connection denied by the throttler due to insufficient tokens");
                break;
            }
        }

        return connected;
    }

    private void shutdownQuorum() throws Exception{
        for (ZooKeeper zk : zks) {
            if (zk != null) {
                zk.close();
            }
        }

        quorumUtil.shutdownAll();
    }

    @Test
    public void testNoThrottling() throws Exception {
        quorumUtil.startAll();

        //disable throttling
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(0);

        int connected = connect(10);

        assertEquals(10, connected);
        shutdownQuorum();
    }

    @Test
    public void testThrottling() throws Exception {
        quorumUtil.enableLocalSession(true);
        quorumUtil.startAll();

        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
        //no refill, makes testing easier
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);


        int connected = connect(3);
        assertEquals(2, connected);
        shutdownQuorum();

        quorumUtil.enableLocalSession(false);
        quorumUtil.startAll();

        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
        //no refill, makes testing easier
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);


        connected = connect(3);
        assertEquals(2, connected);
        shutdownQuorum();
    }

    @Test
    public void testWeighedThrottling() throws Exception {
        // this test depends on the session weights set to the default values
        // 3 for global session, 2 for renew sessions, 1 for local sessions
        BlueThrottle.setConnectionWeightEnabled(true);

        quorumUtil.enableLocalSession(true);
        quorumUtil.startAll();
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);

        //try to create 11 local sessions, 10 created, because we have only 10 tokens
        int connected = connect(11);
        assertEquals(10, connected);
        shutdownQuorum();

        quorumUtil.enableLocalSession(false);
        quorumUtil.startAll();
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
        //tyr to create 11 global sessions, 3 created, because we have 10 tokens and each connection needs 3
        connected = connect(11);
        assertEquals(3, connected);
        shutdownQuorum();

        quorumUtil.startAll();
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
        connected = connect(2);
        assertEquals(2, connected);

        quorumUtil.shutdown(1);
        watchers[0].waitForDisconnected(RAPID_TIMEOUT);
        watchers[1].waitForDisconnected(RAPID_TIMEOUT);

        quorumUtil.restart(1);
        //client will try to reconnect
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(3);
        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
        int reconnected = 0;
        for (int i = 0; i < 2; i++){
            try {
                watchers[i].waitForConnected(RAPID_TIMEOUT);
                reconnected++;
            } catch (TimeoutException e) {
                LOG.info("One reconnect fails due to insufficient tokens");
            }
        }
        //each reconnect takes two tokens, we have 3, so only one reconnects
        LOG.info("reconnected {}", reconnected);
        assertEquals(1, reconnected);
        shutdownQuorum();
    }
}