TracingContext.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.utils;

import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;

/**
 * The TracingContext class to correlate Store requests using unique
 * identifiers and resources common to requests (e.g. filesystem, stream)
 *
 * Implementing new HDFS method:
 * Create TracingContext instance in method of outer layer of
 * ABFS driver (AzureBlobFileSystem/AbfsInputStream/AbfsOutputStream), to be
 * passed through ABFS layers up to AbfsRestOperation.
 *
 * Add new operations to HdfsOperationConstants file.
 *
 * PrimaryRequestId can be enabled for individual Hadoop API that invoke
 * multiple Store calls.
 *
 * Testing:
 * Pass an instance of TracingHeaderValidator to registerListener() of ABFS
 * filesystem/stream class before calling the API in tests.
 */

public class TracingContext {
  private final String clientCorrelationID;  // passed over config by client
  private final String fileSystemID;  // GUID for fileSystem instance
  private String clientRequestId = EMPTY_STRING;  // GUID per http request
  //Optional, non-empty for methods that trigger two or more Store calls
  private String primaryRequestId;
  private String streamID;  // appears per stream instance (read/write ops)
  private int retryCount;  // retry number as recorded by AbfsRestOperation
  private FSOperationType opType;  // two-lettered code representing Hadoop op
  private final TracingHeaderFormat format;  // header ID display options
  private Listener listener = null;  // null except when testing
  //final concatenated ID list set into x-ms-client-request-id header
  private String header = EMPTY_STRING;
  private String ingressHandler = EMPTY_STRING;
  private String position = EMPTY_STRING;
  private String metricResults = EMPTY_STRING;
  private String metricHeader = EMPTY_STRING;

  /**
   * If {@link #primaryRequestId} is null, this field shall be set equal
   * to the last part of the {@link #clientRequestId}'s UUID
   * in {@link #constructHeader(AbfsHttpOperation, String, String)} only on the
   * first API call for an operation. Subsequent retries for that operation
   * will not change this field. In case {@link  #primaryRequestId} is non-null,
   * this field shall not be set.
   */
  private String primaryRequestIdForRetry;

  private Integer operatedBlobCount = null;

  private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
  public static final int MAX_CLIENT_CORRELATION_ID_LENGTH = 72;
  public static final String CLIENT_CORRELATION_ID_PATTERN = "[a-zA-Z0-9-]*";

  /**
   * Initialize TracingContext
   * @param clientCorrelationID Provided over config by client
   * @param fileSystemID Unique guid for AzureBlobFileSystem instance
   * @param opType Code indicating the high-level Hadoop operation that
   *                    triggered the current Store request
   * @param tracingHeaderFormat Format of IDs to be printed in header and logs
   * @param listener Holds instance of TracingHeaderValidator during testing,
   *                null otherwise
   */
  public TracingContext(String clientCorrelationID, String fileSystemID,
      FSOperationType opType, TracingHeaderFormat tracingHeaderFormat,
      Listener listener) {
    this.fileSystemID = fileSystemID;
    this.opType = opType;
    this.clientCorrelationID = clientCorrelationID;
    streamID = EMPTY_STRING;
    retryCount = 0;
    primaryRequestId = EMPTY_STRING;
    format = tracingHeaderFormat;
    this.listener = listener;
  }

  public TracingContext(String clientCorrelationID, String fileSystemID,
      FSOperationType opType, boolean needsPrimaryReqId,
      TracingHeaderFormat tracingHeaderFormat, Listener listener) {
    this(clientCorrelationID, fileSystemID, opType, tracingHeaderFormat,
        listener);
    primaryRequestId = needsPrimaryReqId ? UUID.randomUUID().toString() : "";
    if (listener != null) {
      listener.updatePrimaryRequestID(primaryRequestId);
    }
  }

  public TracingContext(String clientCorrelationID, String fileSystemID,
      FSOperationType opType, boolean needsPrimaryReqId,
      TracingHeaderFormat tracingHeaderFormat, Listener listener, String metricResults) {
    this(clientCorrelationID, fileSystemID, opType, needsPrimaryReqId, tracingHeaderFormat,
        listener);
    this.metricResults = metricResults;
  }


  public TracingContext(TracingContext originalTracingContext) {
    this.fileSystemID = originalTracingContext.fileSystemID;
    this.streamID = originalTracingContext.streamID;
    this.clientCorrelationID = originalTracingContext.clientCorrelationID;
    this.opType = originalTracingContext.opType;
    this.retryCount = 0;
    this.primaryRequestId = originalTracingContext.primaryRequestId;
    this.format = originalTracingContext.format;
    this.position = originalTracingContext.getPosition();
    this.ingressHandler = originalTracingContext.getIngressHandler();
    this.operatedBlobCount = originalTracingContext.operatedBlobCount;
    if (originalTracingContext.listener != null) {
      this.listener = originalTracingContext.listener.getClone();
    }
    this.metricResults = originalTracingContext.metricResults;
  }
  public static String validateClientCorrelationID(String clientCorrelationID) {
    if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH)
        || (!clientCorrelationID.matches(CLIENT_CORRELATION_ID_PATTERN))) {
      LOG.debug(
          "Invalid config provided; correlation id not included in header.");
      return EMPTY_STRING;
    }
    return clientCorrelationID;
  }

  public void setPrimaryRequestID() {
    primaryRequestId = UUID.randomUUID().toString();
    if (listener != null) {
      listener.updatePrimaryRequestID(primaryRequestId);
    }
  }

  public void setStreamID(String stream) {
    streamID = stream;
  }

  public void setOperation(FSOperationType operation) {
    this.opType = operation;
  }

  public int getRetryCount() {
    return retryCount;
  }

  public void setRetryCount(int retryCount) {
    this.retryCount = retryCount;
  }

  public void setListener(Listener listener) {
    this.listener = listener;
  }

  /**
   * Concatenate all identifiers separated by (:) into a string and set into
   * X_MS_CLIENT_REQUEST_ID header of the http operation
   * @param httpOperation AbfsHttpOperation instance to set header into
   *                      connection
   * @param previousFailure Failure seen before this API trigger on same operation
   * from AbfsClient.
   * @param retryPolicyAbbreviation Retry policy used to get retry interval before this
   * API trigger on same operation from AbfsClient
   */
  public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure, String retryPolicyAbbreviation) {
    clientRequestId = UUID.randomUUID().toString();
    switch (format) {
    case ALL_ID_FORMAT: // Optional IDs (e.g. streamId) may be empty
      header =
          clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + ":"
              + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID
              + ":" + opType + ":" + retryCount;
      header = addFailureReasons(header, previousFailure, retryPolicyAbbreviation);
      if (!(ingressHandler.equals(EMPTY_STRING))) {
        header += ":" + ingressHandler;
      }
      if (!(position.equals(EMPTY_STRING))) {
        header += ":" + position;
      }
      if (operatedBlobCount != null) {
        header += (":" + operatedBlobCount);
      }
      header += (":" + httpOperation.getTracingContextSuffix());
      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : "";
      break;
    case TWO_ID_FORMAT:
      header = clientCorrelationID + ":" + clientRequestId;
      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : "";
      break;
    default:
      //case SINGLE_ID_FORMAT
      header = clientRequestId;
      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : "";
    }
    if (listener != null) { //for testing
      listener.callTracingHeaderValidator(header, format);
    }
    httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header);
    if (!metricHeader.equals(EMPTY_STRING)) {
      httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS, metricHeader);
    }
    /*
    * In case the primaryRequestId is an empty-string and if it is the first try to
    * API call (previousFailure shall be null), maintain the last part of clientRequestId's
    * UUID in primaryRequestIdForRetry. This field shall be used as primaryRequestId part
    * of the x-ms-client-request-id header in case of retry of the same API-request.
    */
    if (primaryRequestId.isEmpty() && previousFailure == null) {
      String[] clientRequestIdParts = clientRequestId.split("-");
      primaryRequestIdForRetry = clientRequestIdParts[
          clientRequestIdParts.length - 1];
    }
  }

  /**
   * Provide value to be used as primaryRequestId part of x-ms-client-request-id header.
   * @param isRetry define if it's for a retry case.
   * @return {@link #primaryRequestIdForRetry}:If the {@link #primaryRequestId}
   * is an empty-string, and it's a retry iteration.
   * {@link #primaryRequestId} for other cases.
   */
  private String getPrimaryRequestIdForHeader(final Boolean isRetry) {
    if (!primaryRequestId.isEmpty() || !isRetry) {
      return primaryRequestId;
    }
    return primaryRequestIdForRetry;
  }

  private String addFailureReasons(final String header,
      final String previousFailure, String retryPolicyAbbreviation) {
    if (previousFailure == null) {
      return header;
    }
    if (CONNECTION_TIMEOUT_ABBREVIATION.equals(previousFailure) && retryPolicyAbbreviation != null) {
      return String.format("%s_%s_%s", header, previousFailure, retryPolicyAbbreviation);
    }
    return String.format("%s_%s", header, previousFailure);
  }

  public void setOperatedBlobCount(Integer count) {
    operatedBlobCount = count;
  }

  public FSOperationType getOpType() {
    return opType;
  }

  /**
   * Return header representing the request associated with the tracingContext
   * @return Header string set into X_MS_CLIENT_REQUEST_ID
   */
  public String getHeader() {
    return header;
  }

  /**
   * Gets the ingress handler.
   *
   * @return the ingress handler as a String.
   */
  public String getIngressHandler() {
    return ingressHandler;
  }

  /**
   * Gets the position.
   *
   * @return the position as a String.
   */
  public String getPosition() {
    return position;
  }

  /**
   * Sets the ingress handler.
   *
   * @param ingressHandler the ingress handler to set, must not be null.
   */
  public void setIngressHandler(final String ingressHandler) {
    this.ingressHandler = ingressHandler;
    if (listener != null) {
      listener.updateIngressHandler(ingressHandler);
    }
  }

  /**
   * Sets the position.
   *
   * @param position the position to set, must not be null.
   */
  public void setPosition(final String position) {
    this.position = position;
    if (listener != null) {
      listener.updatePosition(position);
    }
  }
}