FollowerResyncConcurrencyTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FollowerResyncConcurrencyTest extends ZKTestCase {
private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class);
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
private AtomicInteger counter = new AtomicInteger(0);
private AtomicInteger errors = new AtomicInteger(0);
/**
* Keep track of pending async operations, we shouldn't start verifying
* the state until pending operation is 0
*/
private AtomicInteger pending = new AtomicInteger(0);
@BeforeEach
public void setUp() throws Exception {
pending.set(0);
errors.set(0);
counter.set(0);
}
@AfterEach
public void tearDown() throws Exception {
LOG.info("Error count {}", errors.get());
}
/**
* See ZOOKEEPER-1319 - verify that a lagging follower resyncs correctly
*
* 1) start with down quorum
* 2) start leader/follower1, add some data
* 3) restart leader/follower1
* 4) start follower2
* 5) verify data consistency across the ensemble
*
* @throws Exception
*/
@Test
public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception {
CountdownWatcher watcher1 = new CountdownWatcher();
CountdownWatcher watcher2 = new CountdownWatcher();
CountdownWatcher watcher3 = new CountdownWatcher();
QuorumUtil qu = new QuorumUtil(1);
qu.shutdownAll();
qu.start(1);
qu.start(2);
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server up");
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server up");
ZooKeeper zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1);
LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
final String resyncPath = "/resyncundernewepoch";
zk1.create(resyncPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.close();
qu.shutdown(1);
qu.shutdown(2);
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server down");
assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server down");
qu.start(1);
qu.start(2);
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server up");
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server up");
qu.start(3);
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT),
"Waiting for server up");
zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1);
LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
assertNotNull(zk1.exists(resyncPath, false), "zk1 has data");
final ZooKeeper zk2 = createClient(qu.getPeer(2).peer.getClientPort(), watcher2);
LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId()));
assertNotNull(zk2.exists(resyncPath, false), "zk2 has data");
final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId()));
assertNotNull(zk3.exists(resyncPath, false), "zk3 has data");
zk1.close();
zk2.close();
zk3.close();
qu.shutdownAll();
}
/**
* See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
* setting the ZXID of the SNAP packet
* Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
* The non-leader ZKs are writing to cluster
* Shut down F1 again
* Restart after sessions are expired, expect to get a snap file
* Shut down, run some transactions through.
* Restart to a diff while transactions are running in leader
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void testResyncBySnapThenDiffAfterFollowerCrashes() throws Throwable {
followerResyncCrashTest(false);
}
/**
* Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync
* follower using txnlog
*
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void testResyncByTxnlogThenDiffAfterFollowerCrashes() throws Throwable {
followerResyncCrashTest(true);
}
public void followerResyncCrashTest(boolean useTxnLogResync) throws Throwable {
final Semaphore sem = new Semaphore(0);
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
CountdownWatcher watcher1 = new CountdownWatcher();
CountdownWatcher watcher2 = new CountdownWatcher();
CountdownWatcher watcher3 = new CountdownWatcher();
int index = 1;
while (qu.getPeer(index).peer.leader == null) {
index++;
}
Leader leader = qu.getPeer(index).peer.leader;
assertNotNull(leader);
if (useTxnLogResync) {
// Set the factor to high value so that this test case always
// resync using txnlog
qu.getPeer(index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(1000);
} else {
// Disable sending DIFF using txnlog, so that this test still
// testing the ZOOKEEPER-962 bug
qu.getPeer(index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(-1);
}
/* Reusing the index variable to select a follower to connect to */
index = (index == 1) ? 2 : 1;
LOG.info("Connecting to follower: {}", index);
qu.shutdown(index);
final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId()));
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
qu.restart(index);
final ZooKeeper zk1 = createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
final ZooKeeper zk2 = createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId()));
zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Prepare a thread that will create znodes.
Thread mytestfooThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 3000; i++) {
// Here we create 3000 znodes
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
pending.decrementAndGet();
counter.incrementAndGet();
if (rc != 0) {
errors.incrementAndGet();
}
if (counter.get() == 16200) {
sem.release();
}
}, null);
pending.incrementAndGet();
if (i % 10 == 0) {
try {
Thread.sleep(100);
} catch (Exception e) {
}
}
}
}
});
// Here we start populating the server and shutdown the follower after
// initial data is written.
for (int i = 0; i < 13000; i++) {
// Here we create 13000 znodes
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
pending.decrementAndGet();
counter.incrementAndGet();
if (rc != 0) {
errors.incrementAndGet();
}
if (counter.get() == 16200) {
sem.release();
}
}, null);
pending.incrementAndGet();
if (i == 5000) {
qu.shutdown(index);
LOG.info("Shutting down s1");
}
if (i == 12000) {
// Start the prepared thread so that it is writing znodes while
// the follower is restarting. On the first restart, the follow
// should use txnlog to catchup. For subsequent restart, the
// follower should use a diff to catchup.
mytestfooThread.start();
LOG.info("Restarting follower: {}", index);
qu.restart(index);
Thread.sleep(300);
LOG.info("Shutdown follower: {}", index);
qu.shutdown(index);
Thread.sleep(300);
LOG.info("Restarting follower: {}", index);
qu.restart(index);
LOG.info("Setting up server: {}", index);
}
if ((i % 1000) == 0) {
Thread.sleep(1000);
}
if (i % 50 == 0) {
zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
pending.decrementAndGet();
counter.incrementAndGet();
if (rc != 0) {
errors.incrementAndGet();
}
if (counter.get() == 16200) {
sem.release();
}
}, null);
pending.incrementAndGet();
}
}
// Wait until all updates return
if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.warn("Did not acquire semaphore fast enough");
}
mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
if (mytestfooThread.isAlive()) {
LOG.error("mytestfooThread is still alive");
}
assertTrue(waitForPendingRequests(60));
assertTrue(waitForSync(qu, index, 10));
verifyState(qu, index, leader);
zk1.close();
zk2.close();
zk3.close();
qu.shutdownAll();
}
/**
* This test:
* Starts up 3 ZKs. The non-leader ZKs are writing to cluster
* Shut down one of the non-leader ZKs.
* Restart after sessions have expired but less than 500 txns have taken place (get a diff)
* Shut down immediately after restarting, start running separate thread with other transactions
* Restart to a diff while transactions are running in leader
*
*
* Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
* completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
* were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
* would be missed
*
* This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
* however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
* during the leader's diff forwarding.
*
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
* @throws Throwable
*/
@Test
public void testResyncByDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable {
final Semaphore sem = new Semaphore(0);
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
CountdownWatcher watcher1 = new CountdownWatcher();
CountdownWatcher watcher2 = new CountdownWatcher();
CountdownWatcher watcher3 = new CountdownWatcher();
int index = 1;
while (qu.getPeer(index).peer.leader == null) {
index++;
}
Leader leader = qu.getPeer(index).peer.leader;
assertNotNull(leader);
/* Reusing the index variable to select a follower to connect to */
index = (index == 1) ? 2 : 1;
LOG.info("Connecting to follower: {}", index);
final ZooKeeper zk1 = createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
final ZooKeeper zk2 = createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId()));
final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId()));
zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
final AtomicBoolean runNow = new AtomicBoolean(false);
Thread mytestfooThread = new Thread(new Runnable() {
@Override
public void run() {
int inSyncCounter = 0;
while (inSyncCounter < 400) {
if (runNow.get()) {
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
pending.decrementAndGet();
counter.incrementAndGet();
if (rc != 0) {
errors.incrementAndGet();
}
if (counter.get() > 7300) {
sem.release();
}
}, null);
pending.incrementAndGet();
try {
Thread.sleep(10);
} catch (Exception e) {
}
inSyncCounter++;
} else {
Thread.yield();
}
}
}
});
mytestfooThread.start();
for (int i = 0; i < 5000; i++) {
zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
pending.decrementAndGet();
counter.incrementAndGet();
if (rc != 0) {
errors.incrementAndGet();
}
if (counter.get() > 7300) {
sem.release();
}
}, null);
pending.incrementAndGet();
if (i == 1000) {
qu.shutdown(index);
Thread.sleep(1100);
LOG.info("Shutting down s1");
}
if (i == 1100 || i == 1150 || i == 1200) {
Thread.sleep(1000);
}
if (i == 1200) {
qu.startThenShutdown(index);
runNow.set(true);
qu.restart(index);
LOG.info("Setting up server: {}", index);
}
if (i >= 1000 && i % 2 == 0) {
zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
pending.decrementAndGet();
counter.incrementAndGet();
if (rc != 0) {
errors.incrementAndGet();
}
if (counter.get() > 7300) {
sem.release();
}
}, null);
pending.incrementAndGet();
}
if (i == 1050 || i == 1100 || i == 1150) {
Thread.sleep(1000);
}
}
// Wait until all updates return
if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.warn("Did not acquire semaphore fast enough");
}
mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
if (mytestfooThread.isAlive()) {
LOG.error("mytestfooThread is still alive");
}
assertTrue(waitForPendingRequests(60));
assertTrue(waitForSync(qu, index, 10));
// Verify that server is following and has the same epoch as the leader
verifyState(qu, index, leader);
zk1.close();
zk2.close();
zk3.close();
qu.shutdownAll();
}
private static DisconnectableZooKeeper createClient(int port, CountdownWatcher watcher) throws IOException, TimeoutException, InterruptedException {
DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
"127.0.0.1:" + port,
ClientBase.CONNECTION_TIMEOUT,
watcher);
watcher.waitForConnected(CONNECTION_TIMEOUT);
return zk;
}
/**
* Wait for all async operation to return. So we know that we can start
* verifying the state
*/
private boolean waitForPendingRequests(int timeout) throws InterruptedException {
LOG.info("Wait for pending requests: {}", pending.get());
for (int i = 0; i < timeout; ++i) {
Thread.sleep(1000);
if (pending.get() == 0) {
return true;
}
}
LOG.info("Timeout waiting for pending requests: {}", pending.get());
return false;
}
/**
* Wait for all server to have the same lastProcessedZxid. Timeout in seconds
*/
private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException {
LOG.info("Wait for server to sync");
int leaderIndex = (index == 1) ? 2 : 1;
ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
ZKDatabase cleanDb = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
long leadZxid = 0;
long cleanZxid = 0;
long restartedZxid = 0;
for (int i = 0; i < timeout; ++i) {
leadZxid = leadDb.getDataTreeLastProcessedZxid();
cleanZxid = cleanDb.getDataTreeLastProcessedZxid();
restartedZxid = restartedDb.getDataTreeLastProcessedZxid();
if (leadZxid == cleanZxid && leadZxid == restartedZxid) {
return true;
}
Thread.sleep(1000);
}
LOG.info(
"Timeout waiting for zxid to sync: leader 0x{} clean 0x{} restarted 0x{}",
Long.toHexString(leadZxid),
Long.toHexString(cleanZxid),
Long.toHexString(restartedZxid));
return false;
}
private static TestableZooKeeper createTestableClient(String hp) throws IOException, TimeoutException, InterruptedException {
CountdownWatcher watcher = new CountdownWatcher();
return createTestableClient(watcher, hp);
}
private static TestableZooKeeper createTestableClient(
CountdownWatcher watcher, String hp) throws IOException, TimeoutException, InterruptedException {
TestableZooKeeper zk = new TestableZooKeeper(hp, ClientBase.CONNECTION_TIMEOUT, watcher);
watcher.waitForConnected(CONNECTION_TIMEOUT);
return zk;
}
private void verifyState(QuorumUtil qu, int index, Leader leader) {
LOG.info("Verifying state");
assertTrue(qu.getPeer(index).peer.follower != null, "Not following");
long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
long epochL = (leader.getEpoch() >> 32L);
assertTrue(epochF == epochL,
"Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid()
+ "Current epoch: " + epochF);
int leaderIndex = (index == 1) ? 2 : 1;
Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
for (Long l : sessionsRestarted) {
assertTrue(sessionsNotRestarted.contains(l), "Should have same set of sessions in both servers, did not expect: " + l);
}
assertEquals(sessionsNotRestarted.size(), sessionsRestarted.size(), "Should have same number of sessions");
ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
for (Long l : sessionsRestarted) {
LOG.info("Validating ephemeral for session id 0x{}", Long.toHexString(l));
assertTrue(sessionsNotRestarted.contains(l), "Should have same set of sessions in both servers, did not expect: " + l);
Set<String> ephemerals = restarted.getEphemerals(l);
Set<String> cleanEphemerals = clean.getEphemerals(l);
for (String o : cleanEphemerals) {
if (!ephemerals.contains(o)) {
LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
}
}
for (String o : ephemerals) {
if (!cleanEphemerals.contains(o)) {
LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid()));
}
}
Set<String> leadEphemerals = lead.getEphemerals(l);
for (String o : leadEphemerals) {
if (!cleanEphemerals.contains(o)) {
LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid()));
}
}
for (String o : cleanEphemerals) {
if (!leadEphemerals.contains(o)) {
LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
}
}
assertEquals(ephemerals.size(), cleanEphemerals.size(), "Should have same number of ephemerals in both followers");
assertEquals(lead.getEphemerals(l).size(), cleanEphemerals.size(), "Leader should equal follower");
}
}
/**
* Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
*/
@Test
public void testFollowerSendsLastZxid() throws Exception {
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
int index = 1;
while (qu.getPeer(index).peer.follower == null) {
index++;
}
LOG.info("Connecting to follower: {}", index);
TestableZooKeeper zk = createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
assertEquals(0L, zk.testableLastZxid());
zk.exists("/", false);
long lzxid = zk.testableLastZxid();
assertTrue(lzxid > 0, "lzxid:" + lzxid + " > 0");
zk.close();
qu.shutdownAll();
}
private class MyWatcher extends CountdownWatcher {
LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
public void process(WatchedEvent event) {
super.process(event);
if (event.getType() != Event.EventType.None) {
try {
events.put(event);
} catch (InterruptedException e) {
LOG.warn("ignoring interrupt during event.put");
}
}
}
}
/**
* Verify that the server is sending the proper zxid, and as a result
* the watch doesn't fire. See ZOOKEEPER-1412.
*/
@Test
public void testFollowerWatcherResync() throws Exception {
QuorumUtil qu = new QuorumUtil(1);
qu.startAll();
int index = 1;
while (qu.getPeer(index).peer.follower == null) {
index++;
}
LOG.info("Connecting to follower: {}", index);
TestableZooKeeper zk1 = createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher watcher = new MyWatcher();
TestableZooKeeper zk2 = createTestableClient(watcher, "localhost:" + qu.getPeer(index).peer.getClientPort());
zk2.exists("/foo", true);
watcher.reset();
zk2.testableConnloss();
if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
fail("Unable to connect to server");
}
assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
zk1.close();
zk2.close();
qu.shutdownAll();
}
}