PrepRequestProcessorMetricsTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.QuorumUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrepRequestProcessorMetricsTest extends ZKTestCase {

    private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorMetricsTest.class);

    ZooKeeperServer zks;
    RequestProcessor nextProcessor;

    @BeforeEach
    public void setup() {
        System.setProperty(ZooKeeperServer.SKIP_ACL, "true");
        zks = spy(new ZooKeeperServer());
        zks.sessionTracker = mock(SessionTracker.class);

        ZKDatabase db = mock(ZKDatabase.class);
        when(zks.getZKDatabase()).thenReturn(db);

        DataNode node = new DataNode(new byte[1], null, mock(StatPersisted.class));
        when(db.getNode(anyString())).thenReturn(node);

        DataTree dataTree = mock(DataTree.class);
        when(db.getDataTree()).thenReturn(dataTree);

        Set<String> ephemerals = new HashSet<>();
        ephemerals.add("/crystalmountain");
        ephemerals.add("/stevenspass");
        when(db.getEphemerals(anyLong())).thenReturn(ephemerals);

        nextProcessor = mock(RequestProcessor.class);
        ServerMetrics.getMetrics().resetAll();
    }

    @AfterEach
    public void tearDown() throws Exception {
        System.clearProperty(ZooKeeperServer.SKIP_ACL);
    }

    private Request createRequest(Record record, int opCode) throws IOException {
        return new Request(null, 1L, 0, opCode, RequestRecord.fromRecord(record), null);
    }

    private Request createRequest(String path, int opCode) throws IOException {
        Record record;
        switch (opCode) {
        case ZooDefs.OpCode.setData:
            record = new SetDataRequest(path, new byte[0], -1);
            break;
        case ZooDefs.OpCode.delete:
            record = new DeleteRequest(path, -1);
            break;
        default:
            record = new DeleteRequest(path, -1);
            break;
        }

        return createRequest(record, opCode);
    }

    private Request createRequest(long sessionId, int opCode) {
        return new Request(null, sessionId, 0, opCode, null, null);
    }

    @Test
    public void testPrepRequestProcessorMetrics() throws Exception {
        CountDownLatch threeRequests = new CountDownLatch(3);
        doAnswer(invocationOnMock -> {
            threeRequests.countDown();
            return null;
        }).when(nextProcessor).processRequest(any(Request.class));

        PrepRequestProcessor prepRequestProcessor = new PrepRequestProcessor(zks, nextProcessor);

        //setData will generate one change
        prepRequestProcessor.processRequest(createRequest("/foo", ZooDefs.OpCode.setData));
        //delete will generate two changes, one for itself, one for its parent
        prepRequestProcessor.processRequest(createRequest("/foo/bar", ZooDefs.OpCode.delete));
        //mocking two ephemeral nodes exists for this session so two changes
        prepRequestProcessor.processRequest(createRequest(2, ZooDefs.OpCode.closeSession));

        Map<String, Object> values = MetricsUtils.currentServerMetrics();
        assertEquals(3L, values.get("prep_processor_request_queued"));

        // the sleep is just to make sure the requests will stay in the queue for some time
        Thread.sleep(20);
        prepRequestProcessor.start();

        threeRequests.await(500, TimeUnit.MILLISECONDS);

        values = MetricsUtils.currentServerMetrics();
        assertEquals(3L, values.get("max_prep_processor_queue_size"));

        assertThat((long) values.get("min_prep_processor_queue_time_ms"), greaterThan(20L));
        assertEquals(3L, values.get("cnt_prep_processor_queue_time_ms"));

        assertEquals(3L, values.get("cnt_prep_process_time"));
        assertThat((long) values.get("max_prep_process_time"), greaterThan(0L));

        assertEquals(1L, values.get("cnt_close_session_prep_time"));
        assertThat((long) values.get("max_close_session_prep_time"), greaterThanOrEqualTo(0L));

        // With digest feature, we have two more OUTSTANDING_CHANGES_QUEUED than w/o digest
        // The expected should 5 in open source until we upstream the digest feature
        assertEquals(7L, values.get("outstanding_changes_queued"));
    }

    private class SimpleWatcher implements Watcher {

        CountDownLatch created;
        public SimpleWatcher(CountDownLatch latch) {
            this.created = latch;
        }
        @Override
        public void process(WatchedEvent e) {
            created.countDown();
        }

    }

    @Test
    public void testOutstandingChangesRemoved() throws Exception {
        // this metric is currently recorded in FinalRequestProcessor but it is tightly related to the Prep metrics
        QuorumUtil util = new QuorumUtil(1);
        util.startAll();

        ServerMetrics.getMetrics().resetAll();

        ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
        zk.create("/test", new byte[50], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        CountDownLatch created = new CountDownLatch(1);
        zk.exists("/test", new SimpleWatcher(created));
        created.await(200, TimeUnit.MILLISECONDS);

        Map<String, Object> values = MetricsUtils.currentServerMetrics();
        assertThat((long) values.get("outstanding_changes_removed"), greaterThan(0L));

        util.shutdownAll();
    }

}