KafkaMetricsProvider.java
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dashbuilder.dataprovider.kafka;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.management.MBeanServerConnection;
import javax.management.remote.JMXConnector;
import org.dashbuilder.dataprovider.kafka.mbean.MBeanServerConnectionProvider;
import org.dashbuilder.dataprovider.kafka.metrics.KafkaMetricCollector;
import org.dashbuilder.dataprovider.kafka.metrics.group.MetricsCollectorGroupFactory;
import org.dashbuilder.dataprovider.kafka.model.KafkaMetric;
import org.dashbuilder.dataprovider.kafka.model.KafkaMetricsRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provides the metrics for a given Kafka metrics request.
*
*/
public class KafkaMetricsProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMetricsProvider.class);
private static KafkaMetricsProvider instance;
MetricsCollectorGroupFactory metricsCollectorGroupFactory;
static {
MetricsCollectorGroupFactory metricsCollectorGroupFactory = MetricsCollectorGroupFactory.get();
instance = new KafkaMetricsProvider(metricsCollectorGroupFactory);
}
KafkaMetricsProvider(MetricsCollectorGroupFactory metricsCollectorGroupFactory) {
this.metricsCollectorGroupFactory = metricsCollectorGroupFactory;
}
public static KafkaMetricsProvider get() {
return instance;
}
public List<KafkaMetric> getMetrics(KafkaMetricsRequest request) {
List<KafkaMetricCollector> extractors = collectorsFor(request);
JMXConnector connector = MBeanServerConnectionProvider.newConnection(request);
try {
MBeanServerConnection mbsc = connector.getMBeanServerConnection();
return extractMetrics(mbsc, extractors);
} catch (Exception e) {
LOGGER.warn("Error reading metrics for request {}", request);
LOGGER.debug("Error reading metrics for request", e);
return Collections.emptyList();
} finally {
try {
connector.close();
} catch (IOException e) {
LOGGER.warn("Error closing JMX connector");
LOGGER.debug("Error closing JMX Connector", e);
}
}
}
List<KafkaMetricCollector> collectorsFor(KafkaMetricsRequest request) {
List<KafkaMetricCollector> collectors = metricsCollectorGroupFactory.forTarget(request.getMetricsTarget())
.getMetricsCollectors(request);
return request.filter()
.map(f -> filtering(collectors, f))
.orElse(collectors);
}
List<KafkaMetricCollector> filtering(List<KafkaMetricCollector> collectors, String filter) {
return filter.trim().isEmpty() ? collectors : collectors.stream()
.filter(c -> c.getName().toLowerCase().contains(filter.toLowerCase()))
.collect(Collectors.toList());
}
private List<KafkaMetric> extractMetrics(MBeanServerConnection mbsc, List<KafkaMetricCollector> extractors) {
return extractors.stream()
.flatMap(e -> e.collect(mbsc).stream())
.collect(Collectors.toList());
}
}