BlueThrottleTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.QuorumUtil;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlueThrottleTest extends ZKTestCase {
private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
private static final int RAPID_TIMEOUT = 10000;
class MockRandom extends Random {
int flag = 0;
BlueThrottle throttle;
@Override
public double nextDouble() {
if (throttle.getDropChance() > 0) {
flag = 1 - flag;
return flag;
} else {
return 1;
}
}
}
class BlueThrottleWithMockRandom extends BlueThrottle {
public BlueThrottleWithMockRandom(MockRandom random) {
super();
this.rng = random;
random.throttle = this;
}
}
@Test
public void testThrottleDisabled() {
BlueThrottle throttler = new BlueThrottle();
assertTrue(throttler.checkLimit(1), "Throttle should be disabled by default");
}
@Test
public void testThrottleWithoutRefill() {
BlueThrottle throttler = new BlueThrottle();
throttler.setMaxTokens(1);
throttler.setFillTime(2000);
assertTrue(throttler.checkLimit(1), "First request should be allowed");
assertFalse(throttler.checkLimit(1), "Second request should be denied");
}
@Test
public void testThrottleWithRefill() throws InterruptedException {
BlueThrottle throttler = new BlueThrottle();
throttler.setMaxTokens(1);
throttler.setFillTime(500);
assertTrue(throttler.checkLimit(1), "First request should be allowed");
assertFalse(throttler.checkLimit(1), "Second request should be denied");
//wait for the bucket to be refilled
Thread.sleep(750);
assertTrue(throttler.checkLimit(1), "Third request should be allowed since we've got a new token");
}
@Test
public void testThrottleWithoutRandomDropping() throws InterruptedException {
int maxTokens = 5;
BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom());
throttler.setMaxTokens(maxTokens);
throttler.setFillCount(maxTokens);
throttler.setFillTime(1000);
for (int i = 0; i < maxTokens; i++) {
throttler.checkLimit(1);
}
assertEquals(throttler.getMaxTokens(), throttler.getDeficit(), "All tokens should be used up by now");
Thread.sleep(110);
throttler.checkLimit(1);
assertFalse(throttler.getDropChance() > 0, "Dropping probability should still be zero");
//allow bucket to be refilled
Thread.sleep(1500);
for (int i = 0; i < maxTokens; i++) {
assertTrue(throttler.checkLimit(1), "The first " + maxTokens + " requests should be allowed");
}
for (int i = 0; i < maxTokens; i++) {
assertFalse(throttler.checkLimit(1), "The latter " + maxTokens + " requests should be denied");
}
}
@Test
public void testThrottleWithRandomDropping() throws InterruptedException {
int maxTokens = 5;
BlueThrottle throttler = new BlueThrottleWithMockRandom(new MockRandom());
throttler.setMaxTokens(maxTokens);
throttler.setFillCount(maxTokens);
throttler.setFillTime(1000);
throttler.setFreezeTime(100);
throttler.setDropIncrease(0.5);
for (int i = 0; i < maxTokens; i++) {
throttler.checkLimit(1);
}
assertEquals(throttler.getMaxTokens(), throttler.getDeficit(), "All tokens should be used up by now");
Thread.sleep(120);
//this will trigger dropping probability being increased
throttler.checkLimit(1);
assertTrue(throttler.getDropChance() > 0, "Dropping probability should be increased");
LOG.info("Dropping probability is {}", throttler.getDropChance());
//allow bucket to be refilled
Thread.sleep(1100);
LOG.info("Bucket is refilled with {} tokens.", maxTokens);
int accepted = 0;
for (int i = 0; i < maxTokens; i++) {
if (throttler.checkLimit(1)) {
accepted++;
}
}
LOG.info("Send {} requests, {} are accepted", maxTokens, accepted);
assertTrue(accepted < maxTokens, "The dropping should be distributed");
accepted = 0;
for (int i = 0; i < maxTokens; i++) {
if (throttler.checkLimit(1)) {
accepted++;
}
}
LOG.info("Send another {} requests, {} are accepted", maxTokens, accepted);
assertTrue(accepted > 0, "Later requests should have a chance");
}
private QuorumUtil quorumUtil = new QuorumUtil(1);
private ClientBase.CountdownWatcher[] watchers;
private ZooKeeper[] zks;
private int connect(int n) throws Exception {
String connStr = quorumUtil.getConnectionStringForServer(1);
int connected = 0;
zks = new ZooKeeper[n];
watchers = new ClientBase.CountdownWatcher[n];
for (int i = 0; i < n; i++){
watchers[i] = new ClientBase.CountdownWatcher();
zks[i] = new ZooKeeper(connStr, 3000, watchers[i]);
try {
watchers[i].waitForConnected(RAPID_TIMEOUT);
connected++;
} catch (TimeoutException e) {
LOG.info("Connection denied by the throttler due to insufficient tokens");
break;
}
}
return connected;
}
private void shutdownQuorum() throws Exception{
for (ZooKeeper zk : zks) {
if (zk != null) {
zk.close();
}
}
quorumUtil.shutdownAll();
}
@Test
public void testNoThrottling() throws Exception {
quorumUtil.startAll();
//disable throttling
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(0);
int connected = connect(10);
assertEquals(10, connected);
shutdownQuorum();
}
@Test
public void testThrottling() throws Exception {
quorumUtil.enableLocalSession(true);
quorumUtil.startAll();
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
//no refill, makes testing easier
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
int connected = connect(3);
assertEquals(2, connected);
shutdownQuorum();
quorumUtil.enableLocalSession(false);
quorumUtil.startAll();
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
//no refill, makes testing easier
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
connected = connect(3);
assertEquals(2, connected);
shutdownQuorum();
}
@Test
public void testWeighedThrottling() throws Exception {
// this test depends on the session weights set to the default values
// 3 for global session, 2 for renew sessions, 1 for local sessions
BlueThrottle.setConnectionWeightEnabled(true);
quorumUtil.enableLocalSession(true);
quorumUtil.startAll();
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
//try to create 11 local sessions, 10 created, because we have only 10 tokens
int connected = connect(11);
assertEquals(10, connected);
shutdownQuorum();
quorumUtil.enableLocalSession(false);
quorumUtil.startAll();
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
//tyr to create 11 global sessions, 3 created, because we have 10 tokens and each connection needs 3
connected = connect(11);
assertEquals(3, connected);
shutdownQuorum();
quorumUtil.startAll();
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
connected = connect(2);
assertEquals(2, connected);
quorumUtil.shutdown(1);
watchers[0].waitForDisconnected(RAPID_TIMEOUT);
watchers[1].waitForDisconnected(RAPID_TIMEOUT);
quorumUtil.restart(1);
//client will try to reconnect
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(3);
quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
int reconnected = 0;
for (int i = 0; i < 2; i++){
try {
watchers[i].waitForConnected(RAPID_TIMEOUT);
reconnected++;
} catch (TimeoutException e) {
LOG.info("One reconnect fails due to insufficient tokens");
}
}
//each reconnect takes two tokens, we have 3, so only one reconnects
LOG.info("reconnected {}", reconnected);
assertEquals(1, reconnected);
shutdownQuorum();
}
}