TestSlowPeerTracker.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Set;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link SlowPeerTracker}.
*/
public class TestSlowPeerTracker {
private static final Logger LOG = LoggerFactory.getLogger(TestSlowPeerTracker.class);
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
private Configuration conf;
private SlowPeerTracker tracker;
private FakeTimer timer;
private long reportValidityMs;
private static final ObjectReader READER =
new ObjectMapper().readerFor(new TypeReference<Set<SlowPeerJsonReport>>() {});
@Before
public void setup() {
conf = new HdfsConfiguration();
timer = new FakeTimer();
tracker = new SlowPeerTracker(conf, timer);
reportValidityMs = tracker.getReportValidityMs();
}
/**
* Edge case, there are no reports to retrieve.
*/
@Test
public void testEmptyReports() {
assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty());
}
@Test
public void testReportsAreRetrieved() {
tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.2));
tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 2.1));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.22));
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(2));
assertThat(tracker.getReportsForNode("node1").size(), is(0));
}
/**
* Test that when all reports are expired, we get back nothing.
*/
@Test
public void testAllReportsAreExpired() {
tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 0.123));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 0.2334));
tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 1.234));
// No reports should expire after 1ms.
timer.advance(1);
assertThat(tracker.getReportsForAllDataNodes().size(), is(3));
// All reports should expire after REPORT_VALIDITY_MS.
timer.advance(reportValidityMs);
assertTrue(tracker.getReportsForAllDataNodes().isEmpty());
assertTrue(tracker.getReportsForNode("node1").isEmpty());
assertTrue(tracker.getReportsForNode("node2").isEmpty());
assertTrue(tracker.getReportsForNode("node3").isEmpty());
}
/**
* Test the case when a subset of reports has expired.
* Ensure that we only get back non-expired reports.
*/
@Test
public void testSomeReportsAreExpired() {
tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.234));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.222));
timer.advance(reportValidityMs);
tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.20));
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(1));
assertEquals(1, tracker.getReportsForNode("node3").stream()
.filter(e -> e.getReportingNode().equals("node4")).count());
}
/**
* Test the case when an expired report is replaced by a valid one.
*/
@Test
public void testReplacement() {
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 2.1);
tracker.addReport("node2", "node1", outlierMetrics1);
timer.advance(reportValidityMs); // Expire the report.
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
// This should replace the expired report with a newer valid one.
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 0.001);
tracker.addReport("node2", "node1", outlierMetrics2);
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
}
@Test
public void testGetJson() throws IOException {
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1);
tracker.addReport("node1", "node2", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23);
tracker.addReport("node2", "node3", outlierMetrics2);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13);
tracker.addReport("node2", "node1", outlierMetrics3);
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node4", "node1", outlierMetrics4);
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// And ensure its contents are what we expect.
assertThat(reports.size(), is(3));
assertTrue(isNodeInReports(reports, "node1"));
assertTrue(isNodeInReports(reports, "node2"));
assertTrue(isNodeInReports(reports, "node4"));
assertFalse(isNodeInReports(reports, "node3"));
}
@Test
public void testGetJsonSizeIsLimited() throws IOException {
tracker.addReport("node1", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.634));
tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 2.3566));
tracker.addReport("node2", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 3.869));
tracker.addReport("node2", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 4.1356));
tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.73057));
tracker.addReport("node3", "node5", new OutlierMetrics(0.0, 0.0, 0.0, 2.4956730));
tracker.addReport("node4", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 3.29847));
tracker.addReport("node5", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 4.13444));
tracker.addReport("node5", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 5.10845));
tracker.addReport("node6", "node8", new OutlierMetrics(0.0, 0.0, 0.0, 2.37464));
tracker.addReport("node6", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 1.29475656));
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that node4 is not in the list since it was
// tagged by just one peer and we already have 5 other nodes.
assertFalse(isNodeInReports(reports, "node4"));
// Remaining nodes should be in the list.
assertTrue(isNodeInReports(reports, "node1"));
assertTrue(isNodeInReports(reports, "node2"));
assertTrue(isNodeInReports(reports, "node3"));
assertTrue(isNodeInReports(reports, "node5"));
assertTrue(isNodeInReports(reports, "node6"));
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node1") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.634)
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(2.3566))
.count());
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node2") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(3.869)
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency().equals(4.1356))
.count());
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node3") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency().equals(1.73057)
&& e.getSlowPeerLatencyWithReportingNodes().last().getReportedLatency()
.equals(2.4956730)).count());
assertEquals(1, reports.stream().filter(
e -> e.getSlowNode().equals("node6") && e.getSlowPeerLatencyWithReportingNodes().size() == 2
&& e.getSlowPeerLatencyWithReportingNodes().first().getReportedLatency()
.equals(1.29475656) && e.getSlowPeerLatencyWithReportingNodes().last()
.getReportedLatency().equals(2.37464)).count());
}
@Test
public void testLowRankedElementsIgnored() throws IOException {
// Insert 5 nodes with 2 peer reports each.
for (int i = 0; i < 5; ++i) {
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.295673);
tracker.addReport("node" + i, "reporter1", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 2.38560);
tracker.addReport("node" + i, "reporter2", outlierMetrics2);
}
// Insert 10 nodes with 1 peer report each.
for (int i = 10; i < 20; ++i) {
OutlierMetrics outlierMetrics = new OutlierMetrics(0.0, 0.0, 0.0, 3.4957);
tracker.addReport("node" + i, "reporter1", outlierMetrics);
}
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
// Ensure that only the first 5 nodes with two reports each were
// included in the JSON.
for (int i = 0; i < 5; ++i) {
assertTrue(isNodeInReports(reports, "node" + i));
}
}
private boolean isNodeInReports(
Set<SlowPeerJsonReport> reports, String node) {
for (SlowPeerJsonReport report : reports) {
if (report.getSlowNode().equalsIgnoreCase(node)) {
return true;
}
}
return false;
}
private Set<SlowPeerJsonReport> getAndDeserializeJson()
throws IOException {
final String json = tracker.getJson();
LOG.info("Got JSON: {}", json);
return READER.readValue(json);
}
}