ReconfigRollingRestartCompatibilityTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ReconfigTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* ReconfigRollingRestartCompatibilityTest - we want to make sure that users
* can continue using the rolling restart approach when reconfig feature is disabled.
* It is important to stay compatible with rolling restart because dynamic reconfig
* has its limitation: it requires a quorum of server to work. When no quorum can be formed,
* rolling restart is the only approach to reconfigure the ensemble (e.g. removing bad nodes
* such that a new quorum with smaller number of nodes can be formed.).
*
* See ZOOKEEPER-2819 for more details.
*/
public class ReconfigRollingRestartCompatibilityTest extends QuorumPeerTestBase {
private static final String ZOO_CFG_BAK_FILE = "zoo.cfg.bak";
Map<Integer, Integer> clientPorts = new HashMap<>(5);
Map<Integer, String> serverAddress = new HashMap<>(5);
private String generateNewQuorumConfig(int serverCount) {
StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < serverCount; i++) {
clientPorts.put(i, PortAssignment.unique());
server = "server." + i + "=localhost:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;localhost:" + clientPorts.get(i);
serverAddress.put(i, server);
sb.append(server + "\n");
}
return sb.toString();
}
private String updateExistingQuorumConfig(List<Integer> sidsToAdd, List<Integer> sidsToRemove) {
StringBuilder sb = new StringBuilder();
for (Integer sid : sidsToAdd) {
clientPorts.put(sid, PortAssignment.unique());
serverAddress.put(sid, "server." + sid + "=localhost:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;localhost:" + clientPorts.get(sid));
}
for (Integer sid : sidsToRemove) {
clientPorts.remove(sid);
serverAddress.remove(sid);
}
for (String server : serverAddress.values()) {
sb.append(server + "\n");
}
return sb.toString();
}
// Verify no zoo.cfg.dynamic and zoo.cfg.bak files existing locally
// when reconfig feature flag is off by default.
@Test
@Timeout(value = 60)
public void testNoLocalDynamicConfigAndBackupFiles() throws InterruptedException, IOException {
int serverCount = 3;
String config = generateNewQuorumConfig(serverCount);
QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
String[] staticFileContent = new String[serverCount];
for (int i = 0; i < serverCount; i++) {
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
}
for (int i = 0; i < serverCount; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
assertNull(mt[i].getFileByName(ZOO_CFG_BAK_FILE), "static file backup (zoo.cfg.bak) shouldn't exist!");
assertNull(mt[i].getFileByName(mt[i].getQuorumPeer().getNextDynamicConfigFilename()), "dynamic configuration file (zoo.cfg.dynamic.*) shouldn't exist!");
staticFileContent[i] = Files.readAllLines(mt[i].confFile.toPath(), StandardCharsets.UTF_8).toString();
assertTrue(staticFileContent[i].contains(serverAddress.get(i)), "static config file should contain server entry " + serverAddress.get(i));
}
for (int i = 0; i < serverCount; i++) {
mt[i].shutdown();
}
}
// This test simulate the usual rolling restart with no membership change:
// 1. A node is shutdown first (e.g. to upgrade software, or hardware, or cleanup local data.).
// 2. After upgrade, start the node.
// 3. Do this for every node, one at a time.
@Test
@Timeout(value = 60)
public void testRollingRestartWithoutMembershipChange() throws Exception {
int serverCount = 3;
String config = generateNewQuorumConfig(serverCount);
List<String> joiningServers = new ArrayList<>();
QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
for (int i = 0; i < serverCount; ++i) {
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
joiningServers.add(serverAddress.get(i));
}
for (int i = 0; i < serverCount; ++i) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
}
for (int i = 0; i < serverCount; ++i) {
mt[i].shutdown();
mt[i].start();
verifyQuorumConfig(i, joiningServers, null);
verifyQuorumMembers(mt[i]);
}
for (int i = 0; i < serverCount; i++) {
mt[i].shutdown();
}
}
// This test simulate the use case of change of membership by starting new servers
// without dynamic reconfig. For a 3 node ensemble we expand it to a 5 node ensemble, verify
// during the process each node has the expected configuration setting pushed
// via updating local zoo.cfg file.
@Test
@Timeout(value = 90)
public void testExtendingQuorumWithNewMembers() throws Exception {
int serverCount = 3;
String config = generateNewQuorumConfig(serverCount);
QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
List<String> joiningServers = new ArrayList<>();
for (int i = 0; i < serverCount; ++i) {
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
joiningServers.add(serverAddress.get(i));
}
for (int i = 0; i < serverCount; ++i) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
}
for (int i = 0; i < serverCount; ++i) {
verifyQuorumConfig(i, joiningServers, null);
verifyQuorumMembers(mt[i]);
}
Map<Integer, String> oldServerAddress = new HashMap<>(serverAddress);
List<String> newServers = new ArrayList<>(joiningServers);
config = updateExistingQuorumConfig(Arrays.asList(3, 4), new ArrayList<>());
newServers.add(serverAddress.get(3));
newServers.add(serverAddress.get(4));
serverCount = serverAddress.size();
assertEquals(serverCount, 5, "Server count should be 5 after config update.");
// We are adding two new servers to the ensemble. These two servers should have the config which includes
// all five servers (the old three servers, plus the two servers added). The old three servers should only
// have the old three server config, because disabling reconfig will prevent synchronizing configs between
// peers.
mt = Arrays.copyOf(mt, mt.length + 2);
for (int i = 3; i < 5; ++i) {
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
verifyQuorumConfig(i, newServers, null);
verifyQuorumMembers(mt[i]);
}
Set<String> expectedConfigs = new HashSet<>();
for (String conf : oldServerAddress.values()) {
// Remove "server.x=" prefix which quorum peer does not include.
expectedConfigs.add(conf.substring(conf.indexOf('=') + 1));
}
for (int i = 0; i < 3; ++i) {
verifyQuorumConfig(i, joiningServers, null);
verifyQuorumMembers(mt[i], expectedConfigs);
}
for (int i = 0; i < serverCount; ++i) {
mt[i].shutdown();
}
}
@Test
public void testRollingRestartWithExtendedMembershipConfig() throws Exception {
// in this test we are performing rolling restart with extended quorum config, see ZOOKEEPER-3829
// Start a quorum with 3 members
int serverCount = 3;
String config = generateNewQuorumConfig(serverCount);
QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
List<String> joiningServers = new ArrayList<>();
for (int i = 0; i < serverCount; i++) {
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
joiningServers.add(serverAddress.get(i));
}
for (int i = 0; i < serverCount; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
}
for (int i = 0; i < serverCount; i++) {
verifyQuorumConfig(i, joiningServers, null);
verifyQuorumMembers(mt[i]);
}
// Create updated config with 4 members
List<String> newServers = new ArrayList<>(joiningServers);
config = updateExistingQuorumConfig(Arrays.asList(3), new ArrayList<>());
newServers.add(serverAddress.get(3));
serverCount = serverAddress.size();
assertEquals(serverCount, 4, "Server count should be 4 after config update.");
// We are adding one new server to the ensemble. The new server should be started with the new config
mt = Arrays.copyOf(mt, mt.length + 1);
mt[3] = new QuorumPeerTestBase.MainThread(3, clientPorts.get(3), config, false);
mt[3].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(3), CONNECTION_TIMEOUT), "waiting for server 3 being up");
verifyQuorumConfig(3, newServers, null);
verifyQuorumMembers(mt[3]);
// Now we restart the first 3 servers, one-by-one with the new config
for (int i = 0; i < 3; i++) {
mt[i].shutdown();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + clientPorts.get(i), ClientBase.CONNECTION_TIMEOUT),
String.format("Timeout during waiting for server %d to go down", i));
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
verifyQuorumConfig(i, newServers, null);
verifyQuorumMembers(mt[i]);
}
// now verify that all nodes can handle traffic
for (int i = 0; i < 4; ++i) {
ZooKeeper zk = ClientBase.createZKClient("127.0.0.1:" + clientPorts.get(i));
ReconfigTest.testNormalOperation(zk, zk, false);
}
for (int i = 0; i < 4; ++i) {
mt[i].shutdown();
}
}
@Test
public void testRollingRestartWithHostAddedAndRemoved() throws Exception {
// in this test we are performing rolling restart with a new quorum config,
// contains a deleted node and a new node
// Start a quorum with 3 members
int serverCount = 3;
String config = generateNewQuorumConfig(serverCount);
QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
List<String> originalServers = new ArrayList<>();
for (int i = 0; i < serverCount; i++) {
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
originalServers.add(serverAddress.get(i));
}
for (int i = 0; i < serverCount; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
}
for (int i = 0; i < serverCount; i++) {
verifyQuorumConfig(i, originalServers, null);
verifyQuorumMembers(mt[i]);
}
// we are stopping the third server (myid=2)
mt[2].shutdown();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + clientPorts.get(2), ClientBase.CONNECTION_TIMEOUT),
String.format("Timeout during waiting for server %d to go down", 2));
String leavingServer = originalServers.get(2);
// Create updated config with the first 2 existing members, but we remove 3rd and add one with different myid
config = updateExistingQuorumConfig(Arrays.asList(3), Arrays.asList(2));
List<String> newServers = new ArrayList<>(serverAddress.values());
serverCount = serverAddress.size();
assertEquals(serverCount, 3, "Server count should be 3 after config update.");
// We are adding one new server to the ensemble. The new server should be started with the new config
mt = Arrays.copyOf(mt, mt.length + 1);
mt[3] = new QuorumPeerTestBase.MainThread(3, clientPorts.get(3), config, false);
mt[3].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(3), CONNECTION_TIMEOUT), "waiting for server 3 being up");
verifyQuorumConfig(3, newServers, Arrays.asList(leavingServer));
verifyQuorumMembers(mt[3]);
// Now we restart the first 2 servers, one-by-one with the new config
for (int i = 0; i < 2; i++) {
mt[i].shutdown();
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + clientPorts.get(i), ClientBase.CONNECTION_TIMEOUT),
String.format("Timeout during waiting for server %d to go down", i));
mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
mt[i].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
verifyQuorumConfig(i, newServers, null);
verifyQuorumMembers(mt[i]);
}
// now verify that all three nodes can handle traffic
for (int i : serverAddress.keySet()) {
ZooKeeper zk = ClientBase.createZKClient("127.0.0.1:" + clientPorts.get(i));
ReconfigTest.testNormalOperation(zk, zk, false);
}
for (int i : serverAddress.keySet()) {
mt[i].shutdown();
}
}
// Verify each quorum peer has expected config in its config zNode.
private void verifyQuorumConfig(int sid, List<String> joiningServers, List<String> leavingServers) throws Exception {
ZooKeeper zk = ClientBase.createZKClient("127.0.0.1:" + clientPorts.get(sid));
ReconfigTest.testNormalOperation(zk, zk);
ReconfigTest.testServerHasConfig(zk, joiningServers, leavingServers);
zk.close();
}
// Verify each quorum peer has expected quorum member view.
private void verifyQuorumMembers(QuorumPeerTestBase.MainThread mt) {
Set<String> expectedConfigs = new HashSet<>();
for (String config : serverAddress.values()) {
expectedConfigs.add(config.substring(config.indexOf('=') + 1));
}
verifyQuorumMembers(mt, expectedConfigs);
}
private void verifyQuorumMembers(QuorumPeerTestBase.MainThread mt, Set<String> expectedConfigs) {
Map<Long, QuorumPeer.QuorumServer> members = mt.getQuorumPeer().getQuorumVerifier().getAllMembers();
assertTrue(members.size() == expectedConfigs.size(), "Quorum member should not change.");
for (QuorumPeer.QuorumServer qs : members.values()) {
String actualConfig = qs.toString();
assertTrue(expectedConfigs.contains(actualConfig), "Unexpected config " + actualConfig + " found!");
}
}
}