CommitProcessorConcurrencyTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.WorkerService;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommitProcessorConcurrencyTest extends ZKTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
BlockingQueue<Request> processedRequests;
MockCommitProcessor processor;
int defaultSizeOfThreadPool = 16;
@BeforeEach
public void setUp() throws Exception {
processedRequests = new LinkedBlockingQueue<>();
processor = new MockCommitProcessor();
CommitProcessor.setMaxReadBatchSize(-1);
CommitProcessor.setMaxCommitBatchSize(1);
}
@AfterEach
public void tearDown() throws Exception {
processor.shutdown();
}
// This queue is infinite if we use "poll" to get requests, but returns a
// finite size when asked.
class MockRequestsQueue extends LinkedBlockingQueue<Request> {
private static final long serialVersionUID = 1L;
int readReqId = 0;
// Always have a request to return.
public Request poll() {
readReqId++;
try {
return newRequest(new GetDataRequest("/", false), OpCode.getData, readReqId % 50, readReqId);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
// Fixed queue size.
public int size() {
return 42;
}
}
class MockCommitProcessor extends CommitProcessor {
MockCommitProcessor() {
super(new RequestProcessor() {
public void processRequest(Request request) throws RequestProcessorException {
processedRequests.offer(request);
}
public void shutdown() {
}
}, "0", false, new ZooKeeperServerListener() {
@Override
public void notifyStopping(String threadName, int errorCode) {
fail("Commit processor crashed " + errorCode);
}
});
}
public void initThreads(int poolSize) {
this.stopped = false;
this.workerPool = new WorkerService("CommitProcWork", poolSize, true);
}
}
private Request newRequest(Record rec, int type, int sessionId, int xid) throws IOException {
ByteArrayOutputStream boas = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
rec.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
return new Request(null, sessionId, xid, type, RequestRecord.fromBytes(bb), new ArrayList<Id>());
}
/**
* We place a read request followed by committed update request of the same
* session in queuedRequests. We verify that both requests are processed,
* according to the order of the session (first read, then the write).
*/
@Test
public void committedAndUncommittedOfTheSameSessionRaceTest() throws Exception {
final String path = "/testCvsUCRace";
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x0, 0);
Request writeReq = newRequest(new SetDataRequest(path, new byte[16], -1), OpCode.setData, 0x0, 1);
processor.committedRequests.add(writeReq);
processor.queuedRequests.add(readReq);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
processor.initThreads(1);
processor.stoppedMainLoop = true;
processor.run();
assertTrue(
processedRequests.peek() != null && processedRequests.peek().equals(readReq),
"Request was not processed " + readReq + " instead " + processedRequests.peek());
processedRequests.poll();
assertTrue(
processedRequests.peek() != null && processedRequests.peek().equals(writeReq),
"Request was not processed " + writeReq + " instead " + processedRequests.peek());
}
/**
* Here we create the following requests queue structure: R1_1, W1_2, R1_3,
* R2_1, R2_2, W2_3, R2_4, R3_1, R3_2, R3_3, W3_4, R3_5, ... , W5_6, R5_7
* i.e., 5 sessions, each has different amount or read requests, followed by
* single write and afterwards single read. The idea is to check that all of
* the reads that can be processed concurrently do so, and that none of the
* uncommitted requests, followed by the reads are processed.
*/
@Test
public void processAsMuchUncommittedRequestsAsPossibleTest() throws Exception {
final String path = "/testAsMuchAsPossible";
List<Request> shouldBeProcessed = new LinkedList<>();
Set<Request> shouldNotBeProcessed = new HashSet<>();
for (int sessionId = 1; sessionId <= 5; ++sessionId) {
for (int readReqId = 1; readReqId <= sessionId; ++readReqId) {
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionId, readReqId);
shouldBeProcessed.add(readReq);
processor.queuedRequests.add(readReq);
}
Request writeReq = newRequest(
new CreateRequest(
path,
new byte[0],
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
sessionId,
sessionId + 1);
Request readReq = newRequest(
new GetDataRequest(path, false),
OpCode.getData,
sessionId,
sessionId + 2);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
processor.queuedRequests.add(readReq);
shouldNotBeProcessed.add(writeReq);
shouldNotBeProcessed.add(readReq);
}
processor.initThreads(defaultSizeOfThreadPool);
processor.stoppedMainLoop = true;
processor.run();
Thread.sleep(1000);
shouldBeProcessed.removeAll(processedRequests);
for (Request r : shouldBeProcessed) {
LOG.error("Did not process {}", r);
}
assertTrue(shouldBeProcessed.isEmpty(), "Not all requests were processed");
assertFalse(shouldNotBeProcessed.removeAll(processedRequests), "Processed a wrong request");
}
/**
* In the following test, we add a write request followed by several read
* requests of the same session, and we verify several things - 1. The write
* is not processed until commit arrives. 2. Once the write is processed,
* all the read requests are processed as well. 3. All read requests are
* executed after the write, before any other write, along with new reads.
*/
@Test
public void processAllFollowingUncommittedAfterFirstCommitTest() throws Exception {
final String path = "/testUncommittedFollowingCommitted";
Set<Request> shouldBeInPending = new HashSet<>();
Set<Request> shouldBeProcessedAfterPending = new HashSet<>();
Request writeReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x1,
1);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
shouldBeInPending.add(writeReq);
for (int readReqId = 2; readReqId <= 5; ++readReqId) {
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId);
processor.queuedRequests.add(readReq);
shouldBeInPending.add(readReq);
shouldBeProcessedAfterPending.add(readReq);
}
processor.initThreads(defaultSizeOfThreadPool);
processor.stoppedMainLoop = true;
processor.run();
assertTrue(processedRequests.isEmpty(), "Processed without waiting for commit");
assertTrue(processor.queuedRequests.isEmpty(), "Did not handled all of queuedRequests' requests");
assertTrue(!processor.queuedWriteRequests.isEmpty(), "Removed from blockedQueuedRequests before commit");
shouldBeInPending.removeAll(processor.pendingRequests.get(writeReq.sessionId));
for (Request r : shouldBeInPending) {
LOG.error("Should be in pending {}", r);
}
assertTrue(shouldBeInPending.isEmpty(), "Not all requests moved to pending from queuedRequests");
processor.committedRequests.add(writeReq);
processor.stoppedMainLoop = true;
processor.run();
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
assertTrue(processedRequests.peek() == writeReq, "Did not process committed request");
assertTrue(processedRequests.containsAll(shouldBeProcessedAfterPending), "Did not process following read request");
assertTrue(processor.committedRequests.isEmpty(), "Did not process committed request");
assertTrue(processor.pendingRequests.isEmpty(), "Did not process committed request");
assertTrue(processor.queuedWriteRequests.isEmpty(), "Did not remove from blockedQueuedRequests");
}
/**
* In the following test, we add a write request followed by several read
* requests of the same session. We will do this for 2 sessions. For the
* second session, we will queue up another write after the reads, and
* we verify several things - 1. The writes are not processed until
* the commits arrive. 2. Only 2 writes are processed, with maxCommitBatchSize
* of 3, due to the blocking reads. 3. Once the writes are processed,
* all the read requests are processed as well. 4. All read requests are
* executed after the write, before any other write for that session,
* along with new reads. 5. Then we add another read for session 1, and
* another write and commit for session 2. 6. Only the old write, and the read
* are processed, leaving the commit in the queue. 7. Last write is executed
* in the last iteration, and all lists are empty.
*/
@Test
public void processAllWritesMaxBatchSize() throws Exception {
final String path = "/processAllWritesMaxBatchSize";
HashSet<Request> shouldBeProcessedAfterPending = new HashSet<>();
Request writeReq = newRequest(
new CreateRequest(
path + "_1",
new byte[0],
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x1,
1);
processor.queuedRequests.add(writeReq);
processor.queuedWriteRequests.add(writeReq);
Request writeReq2 = newRequest(
new CreateRequest(
path + "_2",
new byte[0],
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x2,
1);
processor.queuedRequests.add(writeReq2);
processor.queuedWriteRequests.add(writeReq2);
for (int readReqId = 2; readReqId <= 5; ++readReqId) {
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, readReqId);
Request readReq2 = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x2, readReqId);
processor.queuedRequests.add(readReq);
shouldBeProcessedAfterPending.add(readReq);
processor.queuedRequests.add(readReq2);
shouldBeProcessedAfterPending.add(readReq2);
}
Request writeReq3 = newRequest(
new CreateRequest(
path + "_3",
new byte[0],
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x2,
6);
processor.queuedRequests.add(writeReq3);
processor.queuedWriteRequests.add(writeReq3);
processor.initThreads(defaultSizeOfThreadPool);
processor.stoppedMainLoop = true;
CommitProcessor.setMaxCommitBatchSize(2);
processor.run();
assertTrue(processedRequests.isEmpty(), "Processed without waiting for commit");
assertTrue(processor.queuedRequests.isEmpty(), "Did not handled all of queuedRequests' requests");
assertTrue(!processor.queuedWriteRequests.isEmpty(), "Removed from blockedQueuedRequests before commit");
assertTrue(processor.pendingRequests.containsKey(writeReq.sessionId), "Missing session 1 in pending queue");
assertTrue(processor.pendingRequests.containsKey(writeReq2.sessionId), "Missing session 2 in pending queue");
processor.committedRequests.add(writeReq);
processor.committedRequests.add(writeReq2);
processor.committedRequests.add(writeReq3);
processor.stoppedMainLoop = true;
CommitProcessor.setMaxCommitBatchSize(3);
processor.run();
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
assertTrue(processedRequests.peek() == writeReq, "Did not process committed request");
assertTrue(processedRequests.containsAll(shouldBeProcessedAfterPending), "Did not process following read request");
assertTrue(!processor.committedRequests.isEmpty(), "Processed committed request");
assertTrue(processor.committedRequests.peek() == writeReq3, "Removed commit for write req 3");
assertTrue(!processor.pendingRequests.isEmpty(), "Processed committed request");
assertTrue(processor.pendingRequests.containsKey(writeReq3.sessionId), "Missing session 2 in pending queue");
assertTrue(processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3,
"Missing write 3 in pending queue");
assertTrue(!processor.queuedWriteRequests.isEmpty(),
"Removed from blockedQueuedRequests");
assertTrue(processor.queuedWriteRequests.peek() == writeReq3,
"Removed write req 3 from blockedQueuedRequests");
Request readReq3 = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 7);
processor.queuedRequests.add(readReq3);
shouldBeProcessedAfterPending.add(readReq3);
Request writeReq4 = newRequest(
new CreateRequest(
path + "_4",
new byte[0],
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x2,
7);
processor.queuedRequests.add(writeReq4);
processor.queuedWriteRequests.add(writeReq4);
processor.committedRequests.add(writeReq4);
processor.stoppedMainLoop = true;
CommitProcessor.setMaxCommitBatchSize(3);
processor.run();
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
assertTrue(processedRequests.peek() == writeReq, "Did not process committed request");
assertTrue(processedRequests.containsAll(shouldBeProcessedAfterPending), "Did not process following read request");
assertTrue(!processor.committedRequests.isEmpty(), "Processed unexpected committed request");
assertTrue(processor.pendingRequests.isEmpty(), "Unexpected pending request");
assertTrue(!processor.queuedWriteRequests.isEmpty(), "Removed from blockedQueuedRequests");
assertTrue(processor.queuedWriteRequests.peek() == writeReq4,
"Removed write req 4 from blockedQueuedRequests");
processor.stoppedMainLoop = true;
CommitProcessor.setMaxCommitBatchSize(3);
processor.run();
processor.initThreads(defaultSizeOfThreadPool);
Thread.sleep(500);
assertTrue(processedRequests.peek() == writeReq, "Did not process committed request");
assertTrue(processedRequests.containsAll(shouldBeProcessedAfterPending), "Did not process following read request");
assertTrue(processor.committedRequests.isEmpty(), "Did not process committed request");
assertTrue(processor.pendingRequests.isEmpty(), "Did not process committed request");
assertTrue(processor.queuedWriteRequests.isEmpty(), "Did not remove from blockedQueuedRequests");
}
/**
* In the following test, we verify that committed requests are processed
* even when queuedRequests never gets empty. We add 10 committed request
* and use infinite queuedRequests. We verify that the committed request was
* processed.
*/
@Test
@Timeout(value = 1)
public void noStarvationOfNonLocalCommittedRequestsTest() throws Exception {
final String path = "/noStarvationOfCommittedRequests";
processor.queuedRequests = new MockRequestsQueue();
Set<Request> nonLocalCommits = new HashSet<>();
for (int i = 0; i < 10; i++) {
Request nonLocalCommitReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, 51, i + 1);
processor.committedRequests.add(nonLocalCommitReq);
nonLocalCommits.add(nonLocalCommitReq);
}
for (int i = 0; i < 10; i++) {
processor.initThreads(defaultSizeOfThreadPool);
processor.stoppedMainLoop = true;
processor.run();
}
assertTrue(processedRequests.containsAll(nonLocalCommits), "commit request was not processed");
}
/**
* In the following test, we verify that committed writes are not causing
* reads starvation. We populate the commit processor with the following
* order of requests: 1 committed local updated, 1 read request, 100
* committed non-local updates. 50 read requests. We verify that after the
* first call to processor.run, only the first write is processed, then
* after the second call, all reads are processed along with the second
* write.
*/
@Test
public void noStarvationOfReadRequestsTest() throws Exception {
final String path = "/noStarvationOfReadRequests";
// +1 committed requests (also head of queuedRequests)
Request firstCommittedReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x3,
1);
processor.queuedRequests.add(firstCommittedReq);
processor.queuedWriteRequests.add(firstCommittedReq);
processor.committedRequests.add(firstCommittedReq);
Set<Request> allReads = new HashSet<>();
// +1 read request to queuedRequests
Request firstRead = newRequest(new GetDataRequest(path, false), OpCode.getData, 0x1, 0);
allReads.add(firstRead);
processor.queuedRequests.add(firstRead);
// +1 non local commit
Request secondCommittedReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x99,
2);
processor.committedRequests.add(secondCommittedReq);
Set<Request> waitingCommittedRequests = new HashSet<>();
// +99 non local committed requests
for (int writeReqId = 3; writeReqId < 102; ++writeReqId) {
Request writeReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create,
0x8,
writeReqId);
processor.committedRequests.add(writeReq);
waitingCommittedRequests.add(writeReq);
}
// +50 read requests to queuedRequests
for (int readReqId = 1; readReqId <= 50; ++readReqId) {
Request readReq = newRequest(
new GetDataRequest(path, false),
OpCode.getData,
0x5,
readReqId);
allReads.add(readReq);
processor.queuedRequests.add(readReq);
}
processor.initThreads(defaultSizeOfThreadPool);
processor.stoppedMainLoop = true;
processor.run();
assertTrue(processedRequests.contains(firstCommittedReq), "Did not process the first write request");
for (Request r : allReads) {
assertTrue(!processedRequests.contains(r), "Processed read request");
}
processor.run();
assertTrue(processedRequests.containsAll(allReads), "did not processed all reads");
assertTrue(processedRequests.contains(secondCommittedReq), "Did not process the second write request");
for (Request r : waitingCommittedRequests) {
assertTrue(!processedRequests.contains(r), "Processed additional committed request");
}
}
/**
* In the following test, we verify that we can handle the case that we got a commit
* of a request we never seen since the session that we just established. This can happen
* when a session is just established and there is request waiting to be committed in the
* session queue but it sees a commit for a request that belongs to the previous connection.
*/
@Test
@Timeout(value = 5)
public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws Exception {
final String path = "/noCrash/OnCommittedRequests/OfUnseenRequestTest";
final int numberofReads = 10;
final int sessionid = 0x123456;
final int firstCXid = 0x100;
int readReqId = firstCXid;
processor.stoppedMainLoop = true;
HashSet<Request> localRequests = new HashSet<>();
// queue the blocking write request to queuedRequests
Request firstCommittedReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, sessionid, readReqId++);
processor.queuedRequests.add(firstCommittedReq);
processor.queuedWriteRequests.add(firstCommittedReq);
localRequests.add(firstCommittedReq);
// queue read requests to queuedRequests
for (; readReqId <= numberofReads + firstCXid; ++readReqId) {
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId);
processor.queuedRequests.add(readReq);
localRequests.add(readReq);
}
//run once
assertTrue(processor.queuedRequests.containsAll(localRequests));
processor.initThreads(defaultSizeOfThreadPool);
processor.run();
Thread.sleep(1000);
//We verify that the processor is waiting for the commit
assertTrue(processedRequests.isEmpty());
// We add a commit that belongs to the same session but with smaller cxid,
// i.e., commit of an update from previous connection of this session.
Request preSessionCommittedReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, sessionid, firstCXid - 2);
processor.committedRequests.add(preSessionCommittedReq);
processor.committedRequests.add(firstCommittedReq);
processor.run();
Thread.sleep(1000);
//We verify that the commit processor processed the old commit prior to the newer messages
assertTrue(processedRequests.peek() == preSessionCommittedReq);
processor.run();
Thread.sleep(1000);
//We verify that the commit processor handle all messages.
assertTrue(processedRequests.containsAll(localRequests));
}
/**
* In the following test, we verify if we handle the case in which we get a commit
* for a request that has higher Cxid than the one we are waiting. This can happen
* when a session connection is lost but there is a request waiting to be committed in the
* session queue. However, since the session has moved, new requests can get to
* the leader out of order. Hence, the commits can also arrive "out of order" w.r.t. cxid.
* We should commit the requests according to the order we receive from the leader, i.e., wait for the relevant commit.
*/
@Test
@Timeout(value = 5)
public void noCrashOnOutofOrderCommittedRequestTest() throws Exception {
final String path = "/noCrash/OnCommittedRequests/OfUnSeenRequestTest";
final int sessionid = 0x123456;
final int lastCXid = 0x100;
final int numberofReads = 10;
int readReqId = lastCXid;
processor.stoppedMainLoop = true;
HashSet<Request> localRequests = new HashSet<>();
// queue the blocking write request to queuedRequests
Request orphanCommittedReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, sessionid, lastCXid);
processor.queuedRequests.add(orphanCommittedReq);
processor.queuedWriteRequests.add(orphanCommittedReq);
localRequests.add(orphanCommittedReq);
// queue read requests to queuedRequests
for (; readReqId <= numberofReads + lastCXid; ++readReqId) {
Request readReq = newRequest(new GetDataRequest(path, false), OpCode.getData, sessionid, readReqId);
processor.queuedRequests.add(readReq);
localRequests.add(readReq);
}
//run once
processor.initThreads(defaultSizeOfThreadPool);
processor.run();
Thread.sleep(1000);
//We verify that the processor is waiting for the commit
assertTrue(processedRequests.isEmpty());
// We add a commit that belongs to the same session but with larger cxid,
// i.e., commit of an update from the next connection of this session.
Request otherSessionCommittedReq = newRequest(
new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, sessionid, lastCXid + 10);
processor.committedRequests.add(otherSessionCommittedReq);
processor.committedRequests.add(orphanCommittedReq);
processor.run();
Thread.sleep(1000);
//We verify that the commit processor processed the old commit prior to the newer messages
assertTrue(processedRequests.size() == 1);
assertTrue(processedRequests.contains(otherSessionCommittedReq));
processor.run();
Thread.sleep(1000);
//We verify that the commit processor handle all messages.
assertTrue(processedRequests.containsAll(localRequests));
}
}