DIFFSyncConsistencyTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Map;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
public class DIFFSyncConsistencyTest extends QuorumPeerTestBase {
private static int SERVER_COUNT = 3;
private MainThread[] mt = new MainThread[SERVER_COUNT];
@Test
@Timeout(value = 120)
public void testInconsistentDueToUncommittedLog() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
final int[] clientPorts = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new MockTestQPMain();
}
};
mt[i].start();
}
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}
int leader = findLeader(mt);
CountdownWatcher watch = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[leader], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
Map<Long, Proposal> outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
// Increase the tick time to delay the leader going to looking to allow us proposal a transaction while other
// followers are offline.
int previousTick = mt[leader].main.quorumPeer.tickTime;
mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
// Let the previous tick on the leader exhaust itself so the new tick time takes effect
Thread.sleep(previousTick);
LOG.info("LEADER ELECTED {}", leader);
// Shutdown followers to make sure we don't accidentally send the proposal we are going to make to follower.
// In other words, we want to make sure the followers get the proposal later through DIFF sync.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
mt[i].shutdown();
}
}
// Send a create request to old leader and make sure it's synced to disk.
try {
zk.create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
}
// Make sure that we actually did get it in process at the leader; there can be extra sessionClose proposals.
assertTrue(outstanding.size() > 0);
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull(p, "Old leader doesn't have 'create' proposal");
// Make sure leader sync the proposal to disk.
int sleepTime = 0;
Long longLeader = (long) leader;
while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
if (sleepTime > 2000) {
fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader);
}
Thread.sleep(100);
sleepTime += 100;
}
// Start controlled followers where we deliberately make the follower fail once follower receive the UPTODATE
// message from leader. Because followers only persist proposals from DIFF sync after UPTODATE, this can
// deterministically simulate the situation where followers ACK NEWLEADER (which makes leader think she has the
// quorum support, but actually not afterwards) but immediately fail afterwards without persisting the proposals
// from DIFF sync.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}
mt[i].start();
int sleepCount = 0;
while (mt[i].getQuorumPeer() == null) {
++sleepCount;
if (sleepCount > 100) {
fail("Can't start follower " + i + " !");
}
Thread.sleep(100);
}
((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(true);
LOG.info("Follower {} started.", i);
}
// Verify leader can see it. The fact that leader can see it implies that
// leader should, at this point in time, get a quorum of ACK of NEWLEADER
// from two followers so leader can start serving requests; this also implies
// that DIFF sync from leader to followers are finished at this point in time.
// We then verify later that followers should have the same view after we shutdown
// this leader, otherwise it's a violation of ZAB / sequential consistency.
int c = 0;
while (c < 100) {
++c;
try {
Stat stat = zk.exists("/zk" + leader, false);
assertNotNull(stat, "server " + leader + " should have /zk");
break;
} catch (KeeperException.ConnectionLossException e) {
}
Thread.sleep(100);
}
// Shutdown all servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
waitForOne(zk, States.CONNECTING);
// Now restart all servers except the old leader. Only old leader has the transaction sync to disk.
// The old followers only had in memory view of the transaction, and they didn't have a chance
// to sync to disk because we made them fail at UPTODATE.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}
mt[i].start();
int sleepCount = 0;
while (mt[i].getQuorumPeer() == null) {
++sleepCount;
if (sleepCount > 100) {
fail("Can't start follower " + i + " !");
}
Thread.sleep(100);
}
((CustomQuorumPeer) mt[i].getQuorumPeer()).setInjectError(false);
LOG.info("Follower {} started again.", i);
}
int newLeader = findLeader(mt);
assertNotEquals(newLeader, leader, "new leader is still the old leader " + leader + " !!");
// This simulates the case where clients connected to the old leader had a view of the data
// "/zkX", but clients connect to the new leader does not have the same view of data (missing "/zkX").
// This inconsistent view of the quorum exposed from leaders is a violation of ZAB.
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != newLeader) {
continue;
}
zk.close();
zk = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
Stat val = zk.exists("/zk" + leader, false);
assertNotNull(val, "Data inconsistency detected! "
+ "Server " + i + " should have a view of /zk" + leader + "!");
}
zk.close();
}
@AfterEach
public void tearDown() {
for (int i = 0; i < mt.length; i++) {
try {
mt[i].shutdown();
} catch (InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}
static class CustomQuorumPeer extends QuorumPeer {
private volatile boolean injectError = false;
public CustomQuorumPeer() throws SaslException {
}
@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
@Override
void readPacket(QuorumPacket pp) throws IOException {
/**
* In real scenario got SocketTimeoutException while reading
* the packet from leader because of network problem, but
* here throwing SocketTimeoutException based on whether
* error is injected or not
*/
super.readPacket(pp);
if (injectError && pp.getType() == Leader.UPTODATE) {
String type = LearnerHandler.packetToString(pp);
throw new SocketTimeoutException("Socket timeout while reading the packet for operation "
+ type);
}
}
};
}
public void setInjectError(boolean injectError) {
this.injectError = injectError;
}
}
static class MockTestQPMain extends TestQPMain {
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new CustomQuorumPeer();
}
}
private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
for (Proposal proposal : proposals.values()) {
if (proposal.request.getHdr().getType() == type) {
return proposal;
}
}
return null;
}
private int findLeader(MainThread[] mt) {
for (int i = 0; i < mt.length; i++) {
if (mt[i].main.quorumPeer.leader != null) {
return i;
}
}
return -1;
}
}