DIFFSyncTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class DIFFSyncTest extends QuorumPeerTestBase {
private static final int SERVER_COUNT = 3;
private static final String PATH_PREFIX = "/test_";
private int[] clientPorts;
private MainThread[] mt;
private ZooKeeper[] zkClients;
@BeforeEach
public void start() throws Exception {
clientPorts = new int[SERVER_COUNT];
mt = startQuorum(clientPorts);
zkClients = new ZooKeeper[SERVER_COUNT];
}
@AfterEach
public void tearDown() throws Exception{
for (final ZooKeeper zk : zkClients) {
try {
if (zk != null) {
zk.close();
}
} catch (final InterruptedException e) {
LOG.warn("ZooKeeper interrupted while shutting it down", e);
}
}
for (final MainThread mainThread : mt) {
try {
mainThread.shutdown();
} catch (final InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}
@Test
@Timeout(value = 120)
public void testTxnLoss_FailToPersistAndCommitTxns() throws Exception {
final List<String> paths = new ArrayList<>();
assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
// create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>)
createZKClient(2);
// create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
paths.add(createNode(zkClients[2], PATH_PREFIX + "0"));
// shut down S0
mt[0].shutdown();
LOG.info("S0 shutdown.");
// create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind
paths.add(createNode(zkClients[2], PATH_PREFIX + "1"));
logEpochsAndLastLoggedTxnForAllServers();
// shut down S1
mt[1].shutdown();
LOG.info("S1 shutdown.");
// restart S0 and trigger a new leader election (currentEpoch=2)
// S0 starts with MockSyncRequestProcessor and MockCommitProcessor to simulate it writes the
// currentEpoch and sends NEWLEADER ACK but fails to persist and commit txns afterwards
// in DIFF sync
mt[0].start(new MockTestQPMain());
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT),
"waiting for server 0 being up");
LOG.info("S0 restarted.");
logEpochsAndLastLoggedTxnForAllServers();
// validate S2 is still the leader
assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
// shut down the leader (i.e. S2). This causes S0 disconnects from leader, performs partial
// shutdown, fast forwards its database to the latest persisted tnx (i.e. <1, 3>) and change
// its state to LOOKING
mt[2].shutdown();
LOG.info("S2 shutdown.");
// start S1 and trigger a leader election (currentEpoch=3)
mt[1].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1], CONNECTION_TIMEOUT),
"waiting for server 1 being up");
LOG.info("S1 restarted.");
logEpochsAndLastLoggedTxnForAllServers();
// validate S0 is the new leader because of it has higher epoch
assertEquals(0, mt[0].getQuorumPeer().getLeaderId());
// connect to the new leader (i.e. S0) (currentEpoch=3, lastLoggedZxid=<3, 1>
createZKClient(0);
// create a znode (currentEpoch=3, lastLoggedZxid=<3, 2>)
paths.add(createNode(zkClients[0], PATH_PREFIX + "3"));
// start S2 which is the old leader
mt[2].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], CONNECTION_TIMEOUT),
"waiting for server " + 2 + " being up");
LOG.info("S2 restarted.");
logEpochsAndLastLoggedTxnForAllServers();
// validate all the znodes exist from all the clients
validateDataFromAllClients(paths);
}
@Test
@Timeout(value = 120)
public void testLeaderShutdown_AckProposalBeforeAckNewLeader() throws Exception {
assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
// create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>)
createZKClient(2);
// create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
createNode(zkClients[2], PATH_PREFIX + "0");
// shut down S0
mt[0].shutdown();
LOG.info("S0 shutdown.");
// create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind
createNode(zkClients[2], PATH_PREFIX + "1");
logEpochsAndLastLoggedTxnForAllServers();
// shut down S1
mt[1].shutdown();
LOG.info("S1 shutdown.");
// restart S0 and trigger a new leader election and DIFF sync (currentEpoch=2)
mt[0].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT),
"waiting for server 0 being up");
LOG.info("S0 restarted.");
// create a znode (currentEpoch=2, lastLoggedZxid=<2, 1>)
createNode(zkClients[2], PATH_PREFIX + "2");
// validate quorum is up without additional round of leader election
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != 1) {
final QuorumPeer qp = mt[i].getQuorumPeer();
assertNotNull(qp);
assertEquals(2, qp.getCurrentEpoch());
assertEquals(2, qp.getAcceptedEpoch());
assertEquals("200000001", Long.toHexString(qp.getLastLoggedZxid()));
}
}
}
private MainThread[] startQuorum(final int[] clientPorts) throws IOException {
final StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server);
sb.append("\n");
}
final MainThread[] mt = new MainThread[SERVER_COUNT];
// start all the servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], sb.toString(), false);
mt[i].start();
}
// ensure all servers started
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}
return mt;
}
private void createZKClient(final int idx) throws Exception {
zkClients[idx] = null;
final ClientBase.CountdownWatcher watch = new ClientBase.CountdownWatcher();
zkClients[idx] = new ZooKeeper("127.0.0.1:" + clientPorts[idx], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
}
private String createNode(final ZooKeeper zk, final String path) throws Exception {
final String fullPath = zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertNotNull(zk.exists(path, false));
return fullPath;
}
private static class MockTestQPMain extends TestQPMain {
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new TestQuorumPeer();
}
}
private static class TestQuorumPeer extends QuorumPeer {
public TestQuorumPeer() throws SaslException {
}
@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
final FollowerZooKeeperServer followerZookeeperServer = new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new MockCommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new MockSyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
};
return new Follower(this, followerZookeeperServer);
}
}
private static class MockSyncRequestProcessor extends SyncRequestProcessor {
public MockSyncRequestProcessor(final ZooKeeperServer zks, final RequestProcessor nextProcessor) {
super(zks, nextProcessor);
}
@Override
public void processRequest(final Request request) {
LOG.info("Sync request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid()));
}
}
private static class MockCommitProcessor extends CommitProcessor {
public MockCommitProcessor(final RequestProcessor nextProcessor, final String id,
final boolean matchSyncs, final ZooKeeperServerListener listener) {
super(nextProcessor, id, matchSyncs, listener);
}
@Override
public void commit(final Request request) {
LOG.info("Commit request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid()));
}
}
private void logEpochsAndLastLoggedTxnForAllServers() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
final QuorumPeer qp = mt[i].getQuorumPeer();
if (qp != null) {
LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s",
qp.getMyId(), qp.getAcceptedEpoch(),
qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid())));
}
}
}
private void validateDataFromAllClients(final List<String> paths) throws Exception{
for (int i = 0; i < SERVER_COUNT; i++) {
if (zkClients[i] == null) {
createZKClient(i);
}
for (final String path : paths) {
assertNotNull(zkClients[i].exists(path, false), "znode " + path + " is missing");
}
assertEquals(3, paths.size());
}
}
}