TracingHeaderValidator.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.utils;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
/**
* Used to validate correlation identifiers provided during testing against
* values that get associated with a request through its TracingContext instance
*/
public class TracingHeaderValidator implements Listener {
private String clientCorrelationId;
private String fileSystemId;
private String primaryRequestId = EMPTY_STRING;
private boolean needsPrimaryRequestId;
private String streamID = "";
private FSOperationType operation;
private int retryNum;
private TracingHeaderFormat format;
private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$";
private String ingressHandler = null;
private String position = null;
private Integer operatedBlobCount = null;
@Override
public void callTracingHeaderValidator(String tracingContextHeader,
TracingHeaderFormat format) {
this.format = format;
validateTracingHeader(tracingContextHeader);
}
@Override
public TracingHeaderValidator getClone() {
TracingHeaderValidator tracingHeaderValidator = new TracingHeaderValidator(
clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId,
retryNum, streamID);
tracingHeaderValidator.primaryRequestId = primaryRequestId;
tracingHeaderValidator.ingressHandler = ingressHandler;
tracingHeaderValidator.position = position;
tracingHeaderValidator.operatedBlobCount = operatedBlobCount;
return tracingHeaderValidator;
}
public TracingHeaderValidator(String clientCorrelationId, String fileSystemId,
FSOperationType operation, boolean needsPrimaryRequestId, int retryNum) {
this.clientCorrelationId = clientCorrelationId;
this.fileSystemId = fileSystemId;
this.operation = operation;
this.retryNum = retryNum;
this.needsPrimaryRequestId = needsPrimaryRequestId;
}
public TracingHeaderValidator(String clientCorrelationId, String fileSystemId,
FSOperationType operation, boolean needsPrimaryRequestId, int retryNum,
String streamID) {
this(clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId,
retryNum);
this.streamID = streamID;
}
private void validateTracingHeader(String tracingContextHeader) {
String[] idList = tracingContextHeader.split(":");
validateBasicFormat(idList);
if (format != TracingHeaderFormat.ALL_ID_FORMAT) {
return;
}
if (idList.length >= 8) {
if (operatedBlobCount != null) {
Assertions.assertThat(Integer.parseInt(idList[7]))
.describedAs("OperatedBlobCount is incorrect")
.isEqualTo(operatedBlobCount);
}
}
if (!primaryRequestId.isEmpty() && !idList[3].isEmpty()) {
Assertions.assertThat(idList[3])
.describedAs("PrimaryReqID should be common for these requests")
.isEqualTo(primaryRequestId);
}
if (!streamID.isEmpty()) {
Assertions.assertThat(idList[4])
.describedAs("Stream id should be common for these requests")
.isEqualTo(streamID);
}
}
private void validateBasicFormat(String[] idList) {
if (format == TracingHeaderFormat.ALL_ID_FORMAT) {
int expectedSize = 8;
if (operatedBlobCount != null) {
expectedSize += 1;
}
if (ingressHandler != null) {
expectedSize += 2;
}
Assertions.assertThat(idList)
.describedAs("header should have " + expectedSize + " elements")
.hasSize(expectedSize);
} else if (format == TracingHeaderFormat.TWO_ID_FORMAT) {
Assertions.assertThat(idList)
.describedAs("header should have 2 elements").hasSize(2);
} else {
Assertions.assertThat(idList).describedAs("header should have 1 element")
.hasSize(1);
Assertions.assertThat(idList[0])
.describedAs("Client request ID is a guid").matches(GUID_PATTERN);
return;
}
if (clientCorrelationId.matches("[a-zA-Z0-9-]*")) {
Assertions.assertThat(idList[0])
.describedAs("Correlation ID should match config")
.isEqualTo(clientCorrelationId);
} else {
Assertions.assertThat(idList[0])
.describedAs("Invalid config should be replaced with empty string")
.isEmpty();
}
Assertions.assertThat(idList[1]).describedAs("Client request ID is a guid")
.matches(GUID_PATTERN);
if (format != TracingHeaderFormat.ALL_ID_FORMAT) {
return;
}
Assertions.assertThat(idList[2]).describedAs("Filesystem ID incorrect")
.isEqualTo(fileSystemId);
if (needsPrimaryRequestId && !operation
.equals(FSOperationType.READ)) {
Assertions.assertThat(idList[3]).describedAs("should have primaryReqId")
.isNotEmpty();
}
Assertions.assertThat(idList[5]).describedAs("Operation name incorrect")
.isEqualTo(operation.toString());
if (idList[6].contains("_")) {
idList[6] = idList[6].split("_")[0];
}
int retryCount = Integer.parseInt(idList[6]);
Assertions.assertThat(retryCount)
.describedAs("Retry was required due to issue on server side")
.isEqualTo(retryNum);
}
/**
* Sets the value of expected Hadoop operation
* @param operation Hadoop operation code (String of two characters)
*/
@Override
public void setOperation(FSOperationType operation) {
this.operation = operation;
}
@Override
public void updatePrimaryRequestID(String primaryRequestId) {
this.primaryRequestId = primaryRequestId;
}
@Override
public void updateIngressHandler(String ingressHandler) {
this.ingressHandler = ingressHandler;
}
@Override
public void updatePosition(String position) {
this.position = position;
}
/**
* Sets the value of the number of blobs operated on
* @param operatedBlobCount number of blobs operated on
*/
public void setOperatedBlobCount(Integer operatedBlobCount) {
this.operatedBlobCount = operatedBlobCount;
}
}