TxnLogDigestTest.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.server.metric.SimpleCounter;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.server.quorum.QuorumPeerMainTest;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TxnLogDigestTest extends ClientBase {
private static final Logger LOG =
LoggerFactory.getLogger(TxnLogDigestTest.class);
private ZooKeeper zk;
private ZooKeeperServer server;
@BeforeEach
public void setUp() throws Exception {
System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true");
super.setUp();
server = serverFactory.getZooKeeperServer();
zk = createClient();
}
@AfterEach
public void tearDown() throws Exception {
System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
// server will be closed in super.tearDown
super.tearDown();
if (zk != null) {
zk.close();
}
MockedFileTxnLog.reset();
}
@Override
public void setupCustomizedEnv() {
ZooKeeperServer.setDigestEnabled(true);
ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
}
@Override
public void cleanUpCustomizedEnv() {
ZooKeeperServer.setDigestEnabled(false);
ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
}
@BeforeAll
public static void applyMockUps() {
new MockedFileTxnLog();
}
/**
* Check that the digest stored in the txn matches the digest calculated
* from DataTree.
*/
@Test
public void digestFromTxnLogsMatchesTree() throws Exception {
// reset the mismatch metrics
SimpleCounter digestMistachesCount = (SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT;
digestMistachesCount.reset();
// trigger some write ops
performOperations(createClient(), "/digestFromTxnLogsMatchesTree");
// make sure there is no digest mismatch
assertEquals(0, digestMistachesCount.get());
// verify that the digest is wrote to disk with txn
TxnDigest lastDigest = getLastTxnLogDigest();
assertNotNull(lastDigest);
assertEquals(server.getZKDatabase().getDataTree().getTreeDigest(),
lastDigest.getTreeDigest());
}
/**
* Test the compatible when enable/disable digest:
*
* * check that txns which were written with digest can be read when
* digest is disabled
* * check that txns which were written without digest can be read
* when digest is enabled.
*/
@Test
public void checkTxnCompatibleWithAndWithoutDigest() throws Exception {
// 1. start server with digest disabled
restartServerWithDigestFlag(false);
// trigger some write ops
Map<String, String> expectedNodes = performOperations(createClient(), "/p1");
// reset the mismatch metrics
SimpleCounter digestMistachesCount = (SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT;
digestMistachesCount.reset();
// 2. restart server with digest enabled
restartServerWithDigestFlag(true);
// make sure the data wrote when digest was disabled can be
// successfully read
checkNodes(expectedNodes);
Map<String, String> expectedNodes1 = performOperations(createClient(), "/p2");
// make sure there is no digest mismatch
assertEquals(0, digestMistachesCount.get());
// 3. disable the digest again and make sure everything is fine
restartServerWithDigestFlag(false);
checkNodes(expectedNodes);
checkNodes(expectedNodes1);
}
/**
* Simulate the scenario where txn is missing, and make sure the
* digest code can catch this issue.
*/
@Test
public void testTxnMissing() throws Exception {
// updated MockedFileTxnLog to skip append txn on specific txn
MockedFileTxnLog.skipAppendZxid = 3;
// trigger some write operations
performOperations(createClient(), "/testTxnMissing");
// restart server to load the corrupted txn file
SimpleCounter digestMistachesCount = (SimpleCounter) ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT;
digestMistachesCount.reset();
restartServerWithDigestFlag(true);
// check that digest mismatch is reported
assertThat("mismtach should be reported", digestMistachesCount.get(), greaterThan(0L));
// restart server with digest disabled
digestMistachesCount.reset();
restartServerWithDigestFlag(false);
// check that no digest mismatch is reported
assertEquals(0, digestMistachesCount.get());
}
private void restartServerWithDigestFlag(boolean digestEnabled)
throws Exception {
stopServer();
QuorumPeerMainTest.waitForOne(zk, States.CONNECTING);
ZooKeeperServer.setDigestEnabled(digestEnabled);
ZooKeeperServer.setSerializeLastProcessedZxidEnabled(digestEnabled);
startServer();
QuorumPeerMainTest.waitForOne(zk, States.CONNECTED);
}
private TxnDigest getLastTxnLogDigest() throws IOException {
TxnIterator itr = new FileTxnLog(new File(tmpDir, "version-2")).read(1);
TxnDigest lastDigest = null;
while (itr.next()) {
lastDigest = itr.getDigest();
}
return lastDigest;
}
public static void create(ZooKeeper client, String path, CreateMode mode)
throws Exception {
client.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
}
/**
* Helper method to trigger various write ops inside ZK.
*/
public static Map<String, String> performOperations(
ZooKeeper client, String prefix) throws Exception {
Map<String, String> nodes = new HashMap<>();
String path = prefix;
create(client, path, CreateMode.PERSISTENT);
nodes.put(path, path);
path = prefix + "/child1";
create(client, path, CreateMode.PERSISTENT);
nodes.put(path, path);
path = prefix + "/child2";
create(client, path, CreateMode.PERSISTENT);
client.delete(prefix + "/child2", -1);
path = prefix + "/child1/leaf";
create(client, path, CreateMode.PERSISTENT);
String updatedData = "updated data";
client.setData(path, updatedData.getBytes(), -1);
nodes.put(path, updatedData);
List<Op> subTxns = new ArrayList<>();
for (int i = 0; i < 3; i++) {
path = prefix + "/m" + i;
subTxns.add(Op.create(path, path.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
nodes.put(path, path);
}
client.multi(subTxns);
client.close();
return nodes;
}
private void checkNodes(Map<String, String> expectedNodes) throws Exception {
ZooKeeper client = createClient();
try {
for (Map.Entry<String, String> entry: expectedNodes.entrySet()) {
assertEquals(entry.getValue(),
new String(client.getData(entry.getKey(), false, null)));
}
} finally {
client.close();
}
}
public static final class MockedFileTxnLog extends MockUp<FileTxnLog> {
static long skipAppendZxid = -1;
@Mock
public synchronized boolean append(Invocation invocation, Request request) throws IOException {
TxnHeader hdr = request.getHdr();
if (hdr != null && hdr.getZxid() == skipAppendZxid) {
LOG.info("skipping txn {}", skipAppendZxid);
return true;
}
return invocation.proceed(request);
}
public static void reset() {
skipAppendZxid = -1;
}
};
}