KafkaDataSetProvider.java
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dashbuilder.dataprovider.kafka;
import java.util.List;
import java.util.function.Consumer;
import org.dashbuilder.DataSetCore;
import org.dashbuilder.dataprovider.DataSetProvider;
import org.dashbuilder.dataprovider.DataSetProviderType;
import org.dashbuilder.dataprovider.StaticDataSetProvider;
import org.dashbuilder.dataprovider.kafka.model.KafkaMetric;
import org.dashbuilder.dataprovider.kafka.model.KafkaMetricsRequest;
import org.dashbuilder.dataset.ColumnType;
import org.dashbuilder.dataset.DataSet;
import org.dashbuilder.dataset.DataSetFactory;
import org.dashbuilder.dataset.DataSetLookup;
import org.dashbuilder.dataset.DataSetMetadata;
import org.dashbuilder.dataset.def.DataSetDef;
import org.dashbuilder.dataset.def.DataSetDefRegistry;
import org.dashbuilder.dataset.def.DataSetDefRegistryListener;
import org.dashbuilder.dataset.def.KafkaDataSetDef;
import org.dashbuilder.dataset.def.KafkaDataSetDef.MetricsTarget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaDataSetProvider implements DataSetProvider, DataSetDefRegistryListener {
public static final String DOMAIN_COLUMN = "DOMAIN";
public static final String TYPE_COLUMN = "TYPE";
public static final String NAME_COLUMN = "NAME";
public static final String ATTRIBUTE_COLUMN = "ATTRIBUTE";
public static final String VALUE_COLUMN = "VALUE";
protected StaticDataSetProvider staticDataSetProvider;
protected Logger log = LoggerFactory.getLogger(KafkaDataSetProvider.class);
private static KafkaDataSetProvider instance = null;
public static KafkaDataSetProvider get() {
if (instance == null) {
StaticDataSetProvider staticDataSetProvider = DataSetCore.get().getStaticDataSetProvider();
DataSetDefRegistry dataSetDefRegistry = DataSetCore.get().getDataSetDefRegistry();
instance = new KafkaDataSetProvider(staticDataSetProvider);
dataSetDefRegistry.addListener(instance);
}
return instance;
}
public KafkaDataSetProvider() {
//empty
}
public KafkaDataSetProvider(StaticDataSetProvider staticDataSetProvider) {
this.staticDataSetProvider = staticDataSetProvider;
}
@SuppressWarnings("rawtypes")
public DataSetProviderType getType() {
return DataSetProviderType.KAFKA;
}
public DataSetMetadata getDataSetMetadata(DataSetDef def) throws Exception {
DataSet dataSet = lookupDataSet(def, null);
if (dataSet == null) {
return null;
}
return dataSet.getMetadata();
}
public DataSet lookupDataSet(DataSetDef def, DataSetLookup lookup) throws Exception {
KafkaMetricsRequest request = buildRequestFromDef(def);
List<KafkaMetric> metrics = loadMetrics(request);
DataSet dataSet = toDataSet(metrics);
dataSet.setUUID(def.getUUID());
dataSet.setDefinition(def);
staticDataSetProvider.registerDataSet(dataSet);
return staticDataSetProvider.lookupDataSet(def, lookup);
}
List<KafkaMetric> loadMetrics(KafkaMetricsRequest request) {
List<KafkaMetric> metrics;
try {
metrics = KafkaMetricsProvider.get().getMetrics(request);
} catch (Exception e) {
log.error("Error retrieving metrics from Kafka: {}", e.getMessage());
log.debug("Error retrieving metrics from Kafka", e);
throw new RuntimeException("Error connecting to Kafka, check if the host/port is correct and the server is running. See logs for more details.");
}
if (metrics.isEmpty()) {
throw new RuntimeException(noMetricsErrorMessage(request));
}
return metrics;
}
String noMetricsErrorMessage(KafkaMetricsRequest request) {
StringBuilder sb = new StringBuilder("No metrics were found. Check if ");
Consumer<String> appendParamCheck = s -> sb.append(String.format(", %s is correct", s));
sb.append("the " + request.getMetricsTarget().name() + " has available metrics");
request.clientId().ifPresent(c -> appendParamCheck.accept("client id " + c));
request.nodeId().ifPresent(c -> appendParamCheck.accept("node id " + c));
request.topic().ifPresent(c -> appendParamCheck.accept("topic " + c));
sb.append(" and the filter matches any metrics");
return sb.toString();
}
private KafkaMetricsRequest buildRequestFromDef(DataSetDef def) {
if (!(def instanceof KafkaDataSetDef)) {
throw new IllegalArgumentException("Not a Kafka data set definition");
}
KafkaDataSetDef kafkaDef = (KafkaDataSetDef) def;
if (kafkaDef.getTarget() != MetricsTarget.BROKER && kafkaDef.getClientId() == null) {
throw new IllegalArgumentException("Client Id is required for producer or consumer metrics");
}
return KafkaMetricsRequest.Builder.newBuilder(kafkaDef.getHost(),
kafkaDef.getPort())
.target(kafkaDef.getTarget())
.filter(kafkaDef.getFilter())
.clientId(kafkaDef.getClientId())
.nodeId(kafkaDef.getNodeId())
.topic(kafkaDef.getTopic())
.partition(kafkaDef.getPartition())
.build();
}
DataSet toDataSet(List<KafkaMetric> metrics) {
DataSet dataSet = DataSetFactory.newEmptyDataSet();
dataSet.addColumn(DOMAIN_COLUMN, ColumnType.LABEL);
dataSet.addColumn(TYPE_COLUMN, ColumnType.LABEL);
dataSet.addColumn(NAME_COLUMN, ColumnType.LABEL);
dataSet.addColumn(ATTRIBUTE_COLUMN, ColumnType.LABEL);
dataSet.addColumn(VALUE_COLUMN, findValueColumnType(metrics));
metrics.stream()
.map(metric -> new Object[]{
metric.getDomain(),
metric.getType(),
metric.getName(),
metric.getAttribute(),
metric.getValue()})
.forEach(row -> dataSet.addValuesAt(dataSet.getRowCount(), row));
return dataSet;
}
private ColumnType findValueColumnType(List<KafkaMetric> metrics) {
if (metrics.stream().allMatch(m -> m.getValue() instanceof Number)) {
return ColumnType.NUMBER;
}
return ColumnType.LABEL;
}
@Override
public void onDataSetDefStale(DataSetDef def) {
staticDataSetProvider.removeDataSet(def.getUUID());
}
@Override
public void onDataSetDefModified(DataSetDef olDef, DataSetDef newDef) {
staticDataSetProvider.removeDataSet(olDef.getUUID());
}
@Override
public void onDataSetDefRemoved(DataSetDef oldDef) {
staticDataSetProvider.removeDataSet(oldDef.getUUID());
}
@Override
public void onDataSetDefRegistered(DataSetDef newDef) {
// empty
}
@Override
public boolean isDataSetOutdated(DataSetDef def) {
// consider that the dataset is always outdated to collect latest metrics
return true;
}
}