TestSlowPeerTracker.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.util.FakeTimer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link SlowPeerTracker}.
* Set a timeout for every test case.
*/
@Timeout(300)
public class TestSlowPeerTracker {
private static final Logger LOG = LoggerFactory.getLogger(TestSlowPeerTracker.class);
private Configuration conf;
private SlowPeerTracker tracker;
private FakeTimer timer;
private long reportValidityMs;
private static final ObjectReader READER =
new ObjectMapper().readerFor(new TypeReference<Set<SlowPeerJsonReport>>() {});
@BeforeEach
public void setup() {
conf = new HdfsConfiguration();
timer = new FakeTimer();
tracker = new SlowPeerTracker(conf, timer);
reportValidityMs = tracker.getReportValidityMs();
}
/**
* Edge case, there are no reports to retrieve.
*/
@Test
public void testEmptyReports() {
assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
}
@Test
public void testReportsAreRetrieved() {
tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.2));
tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 2.1));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.22));
assertThat(tracker.getReportsForAllDataNodes().size()).isEqualTo(2);
assertThat(tracker.getReportsForNode("node2").size()).isEqualTo(1);
assertThat(tracker.getReportsForNode("node3").size()).isEqualTo(2);
assertThat(tracker.getReportsForNode("node1").size()).isEqualTo(0);
}
/**
* Test that when all reports are expired, we get back nothing.
*/
@Test
public void testAllReportsAreExpired() {
tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 0.123));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 0.2334));
tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 1.234));
// No reports should expire after 1ms.
timer.advance(1);
assertThat(tracker.getReportsForAllDataNodes().size()).isEqualTo(3);
// All reports should expire after REPORT_VALIDITY_MS.
timer.advance(reportValidityMs);
assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
assertTrue(tracker.getReportsForNode("node1").isEmpty());
assertTrue(tracker.getReportsForNode("node2").isEmpty());
assertTrue(tracker.getReportsForNode("node3").isEmpty());
}
/**
* Test the case when a subset of reports has expired.
* Ensure that we only get back non-expired reports.
*/
@Test
public void testSomeReportsAreExpired() {
tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.234));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.222));
timer.advance(reportValidityMs);
tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.20));
assertThat(tracker.getReportsForAllDataNodes().size()).isEqualTo(1);
assertThat(tracker.getReportsForNode("node3").size()).isEqualTo(1);
assertEquals(1, tracker.getReportsForNode("node3").stream()
.filter(e -> e.getReportingNode().equals("node4")).count());
}
/**
* Test the case when an expired report is replaced by a valid one.
*/
@Test
public void testReplacement() {
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 2.1);
tracker.addReport("node2", "node1", outlierMetrics1);
timer.advance(reportValidityMs); // Expire the report.
assertThat(tracker.getReportsForAllDataNodes().size()).isEqualTo(0);
// This should replace the expired report with a newer valid one.
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 0.001);
tracker.addReport("node2", "node1", outlierMetrics2);
assertThat(tracker.getReportsForAllDataNodes().size()).isEqualTo(1);
assertThat(tracker.getReportsForNode("node2").size()).isEqualTo(1);
}
@Test
public void testGetJson() throws IOException {
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1);
tracker.addReport("node1", "node2", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23);
tracker.addReport("node2", "node3", outlierMetrics2);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13);
tracker.addReport("node2", "node1", outlierMetrics3);
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node4", "node1", outlierMetrics4);
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// And ensure its contents are what we expect.
assertThat(reports.size()).isEqualTo(3);
assertTrue(isNodeInReports(reports, "node1"));
assertTrue(isNodeInReports(reports, "node2"));
assertTrue(isNodeInReports(reports, "node4"));
assertFalse(isNodeInReports(reports, "node3"));
}
@Test
public void testGetJsonSizeIsLimited() throws IOException {
tracker.addReport("node1", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.634));
tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 2.3566));
tracker.addReport("node2", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 3.869));
tracker.addReport("node2", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 4.1356));
tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.73057));
tracker.addReport("node3", "node5", new OutlierMetrics(0.0, 0.0, 0.0, 2.4956730));
tracker.addReport("node4", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 3.29847));
tracker.addReport("node5", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 4.13444));
tracker.addReport("node5", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 5.10845));
tracker.addReport("node6", "node8", new OutlierMetrics(0.0, 0.0, 0.0, 2.37464));
tracker.addReport("node6", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 1.29475656));
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that node4 is not in the list since it was
// tagged by just one peer and we already have 5 other nodes.
assertFalse(isNodeInReports(reports, "node4"));
// Remaining nodes should be in the list.
assertTrue(isNodeInReports(reports, "node1"));
assertTrue(isNodeInReports(reports, "node2"));
assertTrue(isNodeInReports(reports, "node3"));
assertTrue(isNodeInReports(reports, "node5"));
assertTrue(isNodeInReports(reports, "node6"));
assertEquals(1,
reports.stream().filter(e -> e.getSlowNode().equals("node1")
&& e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first()
.getReportedLatency().equals(1.634)
&& e.getSlowPeerLatencyWithReportingNodes().last()
.getReportedLatency().equals(2.3566))
.count());
assertEquals(1,
reports.stream().filter(e -> e.getSlowNode().equals("node2")
&& e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first()
.getReportedLatency().equals(3.869)
&& e.getSlowPeerLatencyWithReportingNodes().last()
.getReportedLatency().equals(4.1356))
.count());
assertEquals(1, reports.stream().filter(e -> e.getSlowNode().equals("node3")
&& e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first()
.getReportedLatency().equals(1.73057)
&& e.getSlowPeerLatencyWithReportingNodes().last()
.getReportedLatency().equals(2.4956730))
.count());
assertEquals(1, reports.stream().filter(e -> e.getSlowNode().equals("node6")
&& e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first()
.getReportedLatency().equals(1.29475656)
&& e.getSlowPeerLatencyWithReportingNodes().last()
.getReportedLatency().equals(2.37464))
.count());
}
@Test
public void testLowRankedElementsIgnored() throws IOException {
// Insert 5 nodes with 2 peer reports each.
for (int i = 0; i < 5; ++i) {
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.295673);
tracker.addReport("node" + i, "reporter1", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 2.38560);
tracker.addReport("node" + i, "reporter2", outlierMetrics2);
}
// Insert 10 nodes with 1 peer report each.
for (int i = 10; i < 20; ++i) {
OutlierMetrics outlierMetrics = new OutlierMetrics(0.0, 0.0, 0.0, 3.4957);
tracker.addReport("node" + i, "reporter1", outlierMetrics);
}
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that only the first 5 nodes with two reports each were
// included in the JSON.
for (int i = 0; i < 5; ++i) {
assertTrue(isNodeInReports(reports, "node" + i));
}
}
private boolean isNodeInReports(
Set<SlowPeerJsonReport> reports, String node) {
for (SlowPeerJsonReport report : reports) {
if (report.getSlowNode().equalsIgnoreCase(node)) {
return true;
}
}
return false;
}
private Set<SlowPeerJsonReport> getAndDeserializeJson()
throws IOException {
final String json = tracker.getJson();
LOG.info("Got JSON: {}", json);
return READER.readValue(json);
}
}