TracingHeaderValidator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.utils;

import org.assertj.core.api.Assertions;

import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.ReadType;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT;

/**
 * Used to validate correlation identifiers provided during testing against
 * values that get associated with a request through its TracingContext instance
 */
public class TracingHeaderValidator implements Listener {
  private String clientCorrelationId;
  private String fileSystemId;
  private String primaryRequestId = EMPTY_STRING;
  private boolean needsPrimaryRequestId;
  private String streamID = "";
  private FSOperationType operation;
  private int retryNum;
  private TracingHeaderFormat format;

  private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$";
  private String ingressHandler = null;
  private String position = String.valueOf(0);
  private ReadType readType = ReadType.UNKNOWN_READ;

  private Integer operatedBlobCount = null;

  @Override
  public void callTracingHeaderValidator(String tracingContextHeader,
      TracingHeaderFormat format) {
    this.format = format;
    validateTracingHeader(tracingContextHeader);
  }

  @Override
  public TracingHeaderValidator getClone() {
    TracingHeaderValidator tracingHeaderValidator = new TracingHeaderValidator(
        clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId,
        retryNum, streamID);
    tracingHeaderValidator.primaryRequestId = primaryRequestId;
    tracingHeaderValidator.ingressHandler = ingressHandler;
    tracingHeaderValidator.position = position;
    tracingHeaderValidator.readType = readType;
    tracingHeaderValidator.operatedBlobCount = operatedBlobCount;
    return tracingHeaderValidator;
  }

  public TracingHeaderValidator(String clientCorrelationId, String fileSystemId,
      FSOperationType operation, boolean needsPrimaryRequestId, int retryNum) {
    this.clientCorrelationId = clientCorrelationId;
    this.fileSystemId = fileSystemId;
    this.operation = operation;
    this.retryNum = retryNum;
    this.needsPrimaryRequestId = needsPrimaryRequestId;
  }

  public TracingHeaderValidator(String clientCorrelationId, String fileSystemId,
      FSOperationType operation, boolean needsPrimaryRequestId, int retryNum,
      String streamID) {
    this(clientCorrelationId, fileSystemId, operation, needsPrimaryRequestId,
        retryNum);
    this.streamID = streamID;
  }

  private void validateTracingHeader(String tracingContextHeader) {
    String[] idList = tracingContextHeader.split(":", SPLIT_NO_LIMIT);
    validateBasicFormat(idList);
    if (format != TracingHeaderFormat.ALL_ID_FORMAT) {
      return;
    }

    // Validate Operated Blob Count
    if (operatedBlobCount != null) {
      Assertions.assertThat(Integer.parseInt(idList[10]))
          .describedAs("OperatedBlobCount is incorrect")
          .isEqualTo(operatedBlobCount);
    }

    // Validate Primary Request ID
    if (!primaryRequestId.isEmpty() && !idList[4].isEmpty()) {
      Assertions.assertThat(idList[4])
          .describedAs("PrimaryReqID should be common for these requests")
          .isEqualTo(primaryRequestId);
    }

    // Validate Stream ID
    if (!streamID.isEmpty()) {
      Assertions.assertThat(idList[5])
          .describedAs("Stream id should be common for these requests")
          .isEqualTo(streamID);
    }
  }

  private void validateBasicFormat(String[] idList) {
    // Validate Version and Number of fields in the header
    Assertions.assertThat(idList[0]).describedAs("Version should be present")
        .isEqualTo(TracingHeaderVersion.getCurrentVersion().toString());
    int expectedSize = 0;
    if (format == TracingHeaderFormat.ALL_ID_FORMAT) {
      expectedSize = TracingHeaderVersion.getCurrentVersion().getFieldCount();
    } else if (format == TracingHeaderFormat.TWO_ID_FORMAT) {
      expectedSize = 3;
    } else {
      Assertions.assertThat(idList).describedAs("header should have 1 element")
          .hasSize(1);
      Assertions.assertThat(idList[0])
          .describedAs("Client request ID is a guid").matches(GUID_PATTERN);
      return;
    }
    Assertions.assertThat(idList)
        .describedAs("header should have " + expectedSize + " elements")
        .hasSize(expectedSize);

    // Validate Client Correlation ID
    if (clientCorrelationId.matches("[a-zA-Z0-9-]*")) {
      Assertions.assertThat(idList[1])
          .describedAs("Correlation ID should match config")
          .isEqualTo(clientCorrelationId);
    } else {
      Assertions.assertThat(idList[1])
          .describedAs("Invalid config should be replaced with empty string")
          .isEmpty();
    }

    // Validate Client Request ID
    Assertions.assertThat(idList[2]).describedAs("Client request ID is a guid")
        .matches(GUID_PATTERN);

    if (format != TracingHeaderFormat.ALL_ID_FORMAT) {
      return;
    }

    // Validate FileSystem ID
    Assertions.assertThat(idList[3]).describedAs("Filesystem ID incorrect")
        .isEqualTo(fileSystemId);

    // Validate Primary Request ID
    if (needsPrimaryRequestId && !operation
        .equals(FSOperationType.READ)) {
      Assertions.assertThat(idList[4]).describedAs("should have primaryReqId")
          .isNotEmpty();
    }

    // Validate Operation Type
    Assertions.assertThat(idList[6]).describedAs("Operation name incorrect")
        .isEqualTo(operation.toString());

    // Validate Retry Header
    if (idList[7].contains("_")) {
      idList[7] = idList[7].split("_")[0];
    }
    int retryCount = Integer.parseInt(idList[7]);
    Assertions.assertThat(retryCount)
        .describedAs("Retry was required due to issue on server side")
        .isEqualTo(retryNum);
  }

  /**
   * Sets the value of expected Hadoop operation
   * @param operation Hadoop operation code (String of two characters)
   */
  @Override
  public void setOperation(FSOperationType operation) {
    this.operation = operation;
  }

  @Override
  public void updatePrimaryRequestID(String primaryRequestId) {
    this.primaryRequestId = primaryRequestId;
  }

  @Override
  public void updateIngressHandler(String ingressHandler) {
    this.ingressHandler = ingressHandler;
  }

  @Override
  public void updatePosition(String position) {
    this.position = position;
  }

  @Override
  public void updateReadType(ReadType readType) {
    this.readType = readType;
  }

  /**
   * Sets the value of the number of blobs operated on.
   * @param operatedBlobCount number of blobs operated on
   */
  public void setOperatedBlobCount(Integer operatedBlobCount) {
    this.operatedBlobCount = operatedBlobCount;
  }
}