BrokerMetricsGroup.java

/*
 * Copyright 2021 Red Hat, Inc. and/or its affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.dashbuilder.dataprovider.kafka.metrics.group;

import java.util.List;

import org.dashbuilder.dataprovider.kafka.mbean.ObjectNamePrototype;
import org.dashbuilder.dataprovider.kafka.metrics.KafkaMetricCollector;
import org.dashbuilder.dataprovider.kafka.model.KafkaMetricsRequest;

import static org.dashbuilder.dataprovider.kafka.mbean.MBeanDefinitions.KAFKA_CONTROLLER_DOMAIN;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanDefinitions.KAFKA_NETWORK_DOMAIN;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanDefinitions.KAFKA_SERVER_DOMAIN;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanDefinitions.PER_TIME_ATTRS;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanDefinitions.REQUEST_METRICS;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanDefinitions.TIME_MS_ATTRS;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanNameFactory.withName;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanNameFactory.withProduceDelayedAndFetchDelayedOperation;
import static org.dashbuilder.dataprovider.kafka.mbean.MBeanNameFactory.withProduceFetchConsumerAndFetchFollowerRequest;
import static org.dashbuilder.dataprovider.kafka.mbean.ObjectNamePrototype.withDomainAndType;
import static org.dashbuilder.dataprovider.kafka.mbean.ObjectNamePrototype.withDomainTypeAndName;
import static org.dashbuilder.dataprovider.kafka.metrics.MBeanMetricCollector.metricCollector;
import static org.dashbuilder.dataprovider.kafka.metrics.group.MetricsCollectorGroup.merge;
import static org.dashbuilder.dataprovider.kafka.metrics.group.MetricsCollectorGroup.mergeAttrs;

/**
 * Group of metrics for requests targeting Kafka broker
 *
 */
class BrokerMetricsGroup implements MetricsCollectorGroup {

    // types prototypes
    private static final ObjectNamePrototype KAFKA_CONTROLLER_TYPE = withDomainAndType(KAFKA_CONTROLLER_DOMAIN, "KafkaController");
    private static final ObjectNamePrototype CONTROLLER_STATS_TYPE = withDomainAndType(KAFKA_CONTROLLER_DOMAIN, "ControllerStats");
    private static final ObjectNamePrototype KAFKA_REQUEST_HANDLER_POOL_TYPE = withDomainAndType(KAFKA_SERVER_DOMAIN, "KafkaRequestHandlerPool");
    private static final ObjectNamePrototype REQUEST_CHANNEL_TYPE = withDomainAndType(KAFKA_NETWORK_DOMAIN, "RequestChannel");
    private static final ObjectNamePrototype SOCKET_SERVER_TYPE = withDomainAndType(KAFKA_NETWORK_DOMAIN, "SocketServer");
    private static final ObjectNamePrototype REPLICA_MANAGER = withDomainAndType(KAFKA_SERVER_DOMAIN, "ReplicaManager");
    private static final ObjectNamePrototype REPLICA_FETCHER_MANAGER = withDomainAndType(KAFKA_SERVER_DOMAIN, "ReplicaFetcherManager");
    private static final ObjectNamePrototype BROKER_TOPIC_METRICS = withDomainAndType(KAFKA_SERVER_DOMAIN, "BrokerTopicMetrics");

    // types with name prototypes
    private static final ObjectNamePrototype REQUEST_METRICS_TOTAL_TIME_MS = withDomainTypeAndName(KAFKA_NETWORK_DOMAIN, REQUEST_METRICS, "TotalTimeMs");
    private static final ObjectNamePrototype REQUEST_METRICS_LOCAL_TIME_MS = withDomainTypeAndName(KAFKA_NETWORK_DOMAIN, REQUEST_METRICS, "LocalTimeMs");
    private static final ObjectNamePrototype REQUEST_METRICS_REMOTE_TIME_MS = withDomainTypeAndName(KAFKA_NETWORK_DOMAIN, REQUEST_METRICS, "RemoteTimeMs");
    private static final ObjectNamePrototype REQUEST_METRICS_RESPONSE_QUEUE_TIME_MS = withDomainTypeAndName(KAFKA_NETWORK_DOMAIN, REQUEST_METRICS, "ResponseQueueTimeMs");
    private static final ObjectNamePrototype REQUEST_METRICS_RESPONSE_SEND_TIME_MS = withDomainTypeAndName(KAFKA_NETWORK_DOMAIN, REQUEST_METRICS, "ResponseSendTimeMs");
    private static final ObjectNamePrototype REQUEST_METRICS_REQUEST_QUEUE_TIME_MS = withDomainTypeAndName(KAFKA_NETWORK_DOMAIN, REQUEST_METRICS, "RequestQueueTimeMs");
    private static final ObjectNamePrototype DELAYED_OPERATION_PURGATORY_PURGATORY_SIZE = withDomainTypeAndName(KAFKA_SERVER_DOMAIN, "DelayedOperationPurgatory", "PurgatorySize");

    private static final List<KafkaMetricCollector> COLLECTORS;

    static {

        KafkaMetricCollector[] collectors = {
                                              metricCollector(withName(KAFKA_CONTROLLER_TYPE, "GlobalPartitionCount")),
                                              metricCollector(withName(KAFKA_CONTROLLER_TYPE, "ActiveControllerCount")),
                                              metricCollector(withName(KAFKA_CONTROLLER_TYPE, "OfflinePartitionsCount")),

                                              metricCollector(withName(CONTROLLER_STATS_TYPE, "LeaderElectionRateAndTimeMs"),
                                                              mergeAttrs(TIME_MS_ATTRS, PER_TIME_ATTRS)),
                                              metricCollector(withName(CONTROLLER_STATS_TYPE, "UncleanLeaderElectionsPerSec"), PER_TIME_ATTRS),

                                              metricCollector(withName(REPLICA_MANAGER, "UnderReplicatedPartitions")),
                                              metricCollector(withName(REPLICA_MANAGER, "UnderMinIsrPartitionCount")),
                                              metricCollector(withName(REPLICA_MANAGER, "ReassigningPartitions")),
                                              metricCollector(withName(REPLICA_MANAGER, "IsrShrinksPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(REPLICA_MANAGER, "IsrExpandsPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(REPLICA_MANAGER, "PartitionCount")),
                                              metricCollector(withName(REPLICA_MANAGER, "LeaderCount")),

                                              metricCollector(withName(SOCKET_SERVER_TYPE, "NetworkProcessorAvgIdlePercent")),

                                              metricCollector(withName(BROKER_TOPIC_METRICS, "BytesInPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "BytesOutPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "TotalProduceRequestsPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "TotalFetchRequestsPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "FailedProduceRequestsPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "ReassignmentBytesInPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "ReassignmentBytesOutPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "MessagesInPerSec"), PER_TIME_ATTRS),
                                              metricCollector(withName(BROKER_TOPIC_METRICS, "FailedFetchRequestsPerSec"), PER_TIME_ATTRS),

                                              metricCollector(withName(REQUEST_CHANNEL_TYPE, "RequestQueueSize")),
                                              metricCollector(withName(REQUEST_CHANNEL_TYPE, "ResponseQueueSize")),

                                              metricCollector(withName(KAFKA_REQUEST_HANDLER_POOL_TYPE, "RequestHandlerAvgIdlePercent"), PER_TIME_ATTRS),
                                              metricCollector(REPLICA_FETCHER_MANAGER.copy().name("MaxLag").clientId("Replica").build()),

        };

        COLLECTORS = merge(collectors,
                           withProduceDelayedAndFetchDelayedOperation(DELAYED_OPERATION_PURGATORY_PURGATORY_SIZE),
                           withProduceFetchConsumerAndFetchFollowerRequest(REQUEST_METRICS_TOTAL_TIME_MS),
                           withProduceFetchConsumerAndFetchFollowerRequest(REQUEST_METRICS_REQUEST_QUEUE_TIME_MS),
                           withProduceFetchConsumerAndFetchFollowerRequest(REQUEST_METRICS_LOCAL_TIME_MS),
                           withProduceFetchConsumerAndFetchFollowerRequest(REQUEST_METRICS_REMOTE_TIME_MS),
                           withProduceFetchConsumerAndFetchFollowerRequest(REQUEST_METRICS_RESPONSE_QUEUE_TIME_MS),
                           withProduceFetchConsumerAndFetchFollowerRequest(REQUEST_METRICS_RESPONSE_SEND_TIME_MS));
    }

    @Override
    public List<KafkaMetricCollector> getMetricsCollectors(KafkaMetricsRequest request) {
        return COLLECTORS;
    }

}