LeaderWithObserverTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import org.apache.zookeeper.PortAssignment;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class LeaderWithObserverTest {
QuorumPeer peer;
Leader leader;
@TempDir
File tmpDir;
long participantId;
long observerId;
@BeforeEach
public void setUp() throws Exception {
peer = createQuorumPeer(tmpDir);
participantId = 1;
Map<Long, QuorumPeer.QuorumServer> peers = peer.getQuorumVerifier().getAllMembers();
observerId = peers.size();
leader = createLeader(tmpDir, peer);
peer.leader = leader;
peers.put(observerId, new QuorumPeer.QuorumServer(observerId, new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique()), new InetSocketAddress("127.0.0.1", PortAssignment.unique()), QuorumPeer.LearnerType.OBSERVER));
// these tests are serial, we can speed up InterruptedException
peer.tickTime = 1;
}
@AfterEach
public void tearDown() {
leader.shutdown("end of test");
}
@Test
public void testGetEpochToPropose() throws Exception {
long lastAcceptedEpoch = 5;
peer.setAcceptedEpoch(5);
assertEquals(0, leader.connectingFollowers.size(), "Unexpected vote in connectingFollowers");
assertTrue(leader.waitingForNewEpoch);
try {
// Leader asks for epoch (mocking Leader.lead behavior)
// First add to connectingFollowers
leader.getEpochToPropose(peer.getMyId(), lastAcceptedEpoch);
} catch (InterruptedException e) {
// ignore timeout
}
assertEquals(1, leader.connectingFollowers.size(), "Unexpected vote in connectingFollowers");
assertEquals(lastAcceptedEpoch, peer.getAcceptedEpoch(), "Leader shouldn't set new epoch until quorum of participants is in connectingFollowers");
assertTrue(leader.waitingForNewEpoch);
try {
// Observer asks for epoch (mocking LearnerHandler behavior)
leader.getEpochToPropose(observerId, lastAcceptedEpoch);
} catch (InterruptedException e) {
// ignore timeout
}
assertEquals(1, leader.connectingFollowers.size(), "Unexpected vote in connectingFollowers");
assertEquals(lastAcceptedEpoch, peer.getAcceptedEpoch(), "Leader shouldn't set new epoch after observer asks for epoch");
assertTrue(leader.waitingForNewEpoch);
try {
// Now participant asks for epoch (mocking LearnerHandler behavior). Second add to connectingFollowers.
// Triggers verifier.containsQuorum = true
leader.getEpochToPropose(participantId, lastAcceptedEpoch);
} catch (Exception e) {
fail("Timed out in getEpochToPropose");
}
assertEquals(2, leader.connectingFollowers.size(), "Unexpected vote in connectingFollowers");
assertEquals(lastAcceptedEpoch + 1, peer.getAcceptedEpoch(), "Leader should record next epoch");
assertFalse(leader.waitingForNewEpoch);
}
@Test
public void testWaitForEpochAck() throws Exception {
// things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
assertEquals(0, leader.electingFollowers.size(), "Unexpected vote in electingFollowers");
assertFalse(leader.electionFinished);
try {
// leader calls waitForEpochAck, first add to electingFollowers
leader.waitForEpochAck(peer.getMyId(), new StateSummary(0, 0));
} catch (InterruptedException e) {
// ignore timeout
}
assertEquals(1, leader.electingFollowers.size(), "Unexpected vote in electingFollowers");
assertFalse(leader.electionFinished);
try {
// observer calls waitForEpochAck, should fail verifier.containsQuorum
leader.waitForEpochAck(observerId, new StateSummary(0, 0));
} catch (InterruptedException e) {
// ignore timeout
}
assertEquals(1, leader.electingFollowers.size(), "Unexpected vote in electingFollowers");
assertFalse(leader.electionFinished);
try {
// second add to electingFollowers, verifier.containsQuorum=true, waitForEpochAck returns without exceptions
leader.waitForEpochAck(participantId, new StateSummary(0, 0));
assertEquals(2, leader.electingFollowers.size(), "Unexpected vote in electingFollowers");
assertTrue(leader.electionFinished);
} catch (Exception e) {
fail("Timed out in waitForEpochAck");
}
}
@Test
public void testWaitForNewLeaderAck() throws Exception {
long zxid = leader.zk.getZxid();
// things needed for waitForNewLeaderAck to run (usually in leader.lead(), but we're not running leader here)
Field field = Leader.Proposal.class.getDeclaredField("packet");
field.setAccessible(true);
field.set(leader.newLeaderProposal, new QuorumPacket(0, zxid, null, null));
leader.newLeaderProposal.addQuorumVerifier(peer.getQuorumVerifier());
Set<Long> ackSet = leader.newLeaderProposal.qvAcksetPairs.get(0).getAckset();
assertEquals(0, ackSet.size(), "Unexpected vote in ackSet");
assertFalse(leader.quorumFormed);
try {
// leader calls waitForNewLeaderAck, first add to ackSet
leader.waitForNewLeaderAck(peer.getMyId(), zxid);
} catch (InterruptedException e) {
// ignore timeout
}
assertEquals(1, ackSet.size(), "Unexpected vote in ackSet");
assertFalse(leader.quorumFormed);
try {
// observer calls waitForNewLeaderAck, should fail verifier.containsQuorum
leader.waitForNewLeaderAck(observerId, zxid);
} catch (InterruptedException e) {
// ignore timeout
}
assertEquals(1, ackSet.size(), "Unexpected vote in ackSet");
assertFalse(leader.quorumFormed);
try {
// second add to ackSet, verifier.containsQuorum=true, waitForNewLeaderAck returns without exceptions
leader.waitForNewLeaderAck(participantId, zxid);
assertEquals(2, ackSet.size(), "Unexpected vote in ackSet");
assertTrue(leader.quorumFormed);
} catch (Exception e) {
fail("Timed out in waitForEpochAck");
}
}
}