LearnerMetricsTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import static org.hamcrest.core.Is.is;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import java.util.HashMap;
import java.util.Map;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class LearnerMetricsTest extends QuorumPeerTestBase {

    private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
    private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
    private ZooKeeper zk_client;
    private static boolean bakAsyncSending;

    @BeforeAll
    public static void saveAsyncSendingFlag() {
        bakAsyncSending = Learner.getAsyncSending();
    }

    @AfterAll
    public static void resetAsyncSendingFlag() {
        Learner.setAsyncSending(bakAsyncSending);
    }

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void testLearnerMetricsTest(boolean asyncSending) throws Exception {
        Learner.setAsyncSending(asyncSending);
        ServerMetrics.getMetrics().resetAll();
        ClientBase.setupTestEnv();

        final String path = "/zk-testLeanerMetrics";
        final byte[] data = new byte[512];
        final int[] clientPorts = new int[SERVER_COUNT];
        StringBuilder sb = new StringBuilder();
        int observer = 0;
        clientPorts[observer] = PortAssignment.unique();
        sb.append("server." + observer + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":observer\n");
        for (int i = 1; i < SERVER_COUNT; i++) {
            clientPorts[i] = PortAssignment.unique();
            sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
        }

        // start the three participants
        String quorumCfgSection = sb.toString();
        for (int i = 1; i < SERVER_COUNT; i++) {
            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts[i], quorumCfgSection);
            mt[i].start();
        }

        // start the observer
        Map<String, String> observerConfig = new HashMap<>();
        observerConfig.put("peerType", "observer");
        mt[observer] = new QuorumPeerTestBase.MainThread(observer, clientPorts[observer], quorumCfgSection, observerConfig);
        mt[observer].start();

        // connect to the observer node and wait for CONNECTED state
        // (this way we make sure to wait until the leader election finished and the observer node joined as well)
        zk_client = new ZooKeeper("127.0.0.1:" + clientPorts[observer], ClientBase.CONNECTION_TIMEOUT, this);
        waitForOne(zk_client, ZooKeeper.States.CONNECTED);

        // creating a node
        zk_client.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


        // there are two proposals by now, one for the global client session creation, one for the create request

        // there are two followers, each received two PROPOSALs
        waitForMetric("learner_proposal_received_count", is(4L));
        waitForMetric("cnt_proposal_latency", is(4L));
        waitForMetric("min_proposal_latency", greaterThanOrEqualTo(0L));

        // the two ACKs are processed by the leader and by each of the two followers
        waitForMetric("cnt_proposal_ack_creation_latency", is(6L));
        waitForMetric("min_proposal_ack_creation_latency", greaterThanOrEqualTo(0L));

        // two COMMITs are received by each of the two followers, and two INFORMs are received by the single observer
        // (the INFORM message is also counted into the "commit_received" metrics)
        waitForMetric("learner_commit_received_count", is(6L));
        waitForMetric("cnt_commit_propagation_latency", is(6L));
        waitForMetric("min_commit_propagation_latency", greaterThanOrEqualTo(0L));
    }

    @AfterEach
    public void tearDown() throws Exception {
        zk_client.close();
        for (int i = 0; i < SERVER_COUNT; i++) {
            mt[i].shutdown();
        }
    }

}