TestAbfsPerfTracker.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test the latency tracker for ABFS.
*
*/
public final class TestAbfsPerfTracker {
private static final Logger LOG = LoggerFactory.getLogger(TestAbfsPerfTracker.class);
private static ExecutorService executorService = null;
private static final int TEST_AGGREGATE_COUNT = 42;
private final String filesystemName = "bogusFilesystemName";
private final String accountName = "bogusAccountName";
private final URL url;
public TestAbfsPerfTracker() throws Exception {
this.url = new URL("http", "www.microsoft.com", "/bogusFile");
}
@BeforeEach
public void setUp() throws Exception {
executorService = Executors.newCachedThreadPool();
}
@AfterEach
public void tearDown() throws Exception {
executorService.shutdown();
}
@Test
public void verifyDisablingOfTracker() throws Exception {
// verify that disabling of the tracker works
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller",
"disablingCallee")) {
AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
tracker.registerResult(op).registerSuccess(true);
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyTrackingForSingletonLatencyRecords() throws Exception {
// verify that tracking for singleton latency records works as expected
final int numTasks = 100;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
List<Callable<Integer>> tasks = new ArrayList<>();
AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
return 0;
}
});
}
for (Future<Integer> fr: executorService.invokeAll(tasks)) {
fr.get();
}
for (int i = 0; i < numTasks; i++) {
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ s=0 e= ci=[^ ]* ri=[^ ]* ct=[^ ]* st=[^ ]* rt=[^ ]* bs=0 br=0 m=GET"
+ " u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyTrackingForAggregateLatencyRecords() throws Exception {
// verify that tracking of aggregate latency records works as expected
final int numTasks = 100;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
String latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
List<Callable<Integer>> tasks = new ArrayList<>();
AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true)
.registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
return 0;
}
});
}
for (Future<Integer> fr: executorService.invokeAll(tasks)) {
fr.get();
}
for (int i = 0; i < numTasks; i++) {
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return non-null record").isNotNull();
assertThat(latencyDetails).describedAs("Latency record should be in the correct format")
.containsPattern("h=[^ ]* t=[^ ]* a=bogusFilesystemName c=bogusAccountName cr=oneOperationCaller"
+ " ce=oneOperationCallee r=Succeeded l=[0-9]+ ls=[0-9]+ lc=" + TEST_AGGREGATE_COUNT
+ " s=0 e= ci=[^ ]* ri=[^ ]* ct=[^ ]* st=[^ ]* rt=[^ ]* bs=0 br=0 m=GET u=http%3A%2F%2Fwww.microsoft.com%2FbogusFile");
}
latencyDetails = abfsPerfTracker.getClientLatency();
assertThat(latencyDetails).describedAs("AbfsPerfTracker should return no record").isNull();
}
@Test
public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true)
.registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyGettingLatencyRecordsIsCheapWhenDisabled() throws Exception {
// when latency tracker is disabled, we expect it to take time equivalent to checking a boolean value
final double maxLatencyWhenDisabledMs = 1000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startGet = Instant.now();
abfsPerfTracker.getClientLatency();
long latencyGet = Duration.between(startGet, Instant.now()).toMillis();
LOG.debug("Spent {} ms in retrieving latency record.", latencyGet);
return latencyGet;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording singleton latencies should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "oneOperationCaller",
"oneOperationCallee")) {
tracker.registerResult(httpOperation).registerSuccess(true).
registerAggregates(startRecord, TEST_AGGREGATE_COUNT);
}
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for recording aggregate latencies is bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyGettingLatencyRecordsIsCheapWhenEnabled() throws Exception {
final double maxLatencyWhenDisabledMs = 5000;
final double minLatencyWhenDisabledMs = 0;
final long numTasks = 1000;
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Instant startRecord = Instant.now();
abfsPerfTracker.getClientLatency();
long latencyRecord = Duration.between(startRecord, Instant.now()).toMillis();
LOG.debug("Spent {} ms in recording latency.", latencyRecord);
return latencyRecord;
});
}
for (Future<Long> fr: executorService.invokeAll(tasks)) {
aggregateLatency += fr.get();
}
double averageRecordLatency = aggregateLatency / numTasks;
assertThat(averageRecordLatency).describedAs("Average time for getting latency records should be bounded")
.isBetween(minLatencyWhenDisabledMs, maxLatencyWhenDisabledMs);
}
@Test
public void verifyNoExceptionOnInvalidInput() throws Exception {
Instant testInstant = Instant.now();
AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false);
AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true);
final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
verifyNoException(abfsPerfTrackerDisabled);
verifyNoException(abfsPerfTrackerEnabled);
}
private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception {
Instant testInstant = Instant.now();
final AbfsJdkHttpOperation httpOperation = new AbfsJdkHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>(),
Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT), Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);
try (
AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);
AbfsPerfInfo tracker02 = new AbfsPerfInfo(abfsPerfTracker, "test", null);
AbfsPerfInfo tracker03 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker04 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker05 = new AbfsPerfInfo(abfsPerfTracker, null, null);
AbfsPerfInfo tracker06 = new AbfsPerfInfo(abfsPerfTracker, "test", null);
AbfsPerfInfo tracker07 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker08 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker09 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker10 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker11 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker12 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
AbfsPerfInfo tracker13 = new AbfsPerfInfo(abfsPerfTracker, "test", "test");
) {
tracker01.registerResult(null).registerSuccess(false);
tracker02.registerResult(null).registerSuccess(false);
tracker03.registerResult(null).registerSuccess(false);
tracker04.registerResult(httpOperation).registerSuccess(false);
tracker05.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker06.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker07.registerResult(null).registerSuccess(false).registerAggregates(null, 0);
tracker08.registerResult(httpOperation).registerSuccess(false).registerAggregates(null, 0);
tracker09.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), 0);
tracker10.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.now(), TEST_AGGREGATE_COUNT);
tracker11.registerResult(httpOperation).registerSuccess(false).registerAggregates(testInstant, TEST_AGGREGATE_COUNT);
tracker12.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MAX, TEST_AGGREGATE_COUNT);
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
}
}
/**
* Test helper method to create an AbfsPerfTracker instance.
* @param abfsConfig active test abfs config
* @return instance of AbfsPerfTracker
*/
public static AbfsPerfTracker getAPerfTrackerInstance(AbfsConfiguration abfsConfig) {
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
abfsConfig.getAccountName(), abfsConfig);
return tracker;
}
}