ITestBlobDataValidation.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_CHECK_BLOCK_MD5;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_STORE_BLOB_MD5;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.util.Arrays;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.ResponseReceivedEvent;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageEvent;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockSearchMode;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.core.Base64;

/**
 * Test that we do proper data integrity validation with MD5 checks as
 * configured.
 */
public class ITestBlobDataValidation extends AbstractWasbTestWithTimeout {
  private AzureBlobStorageTestAccount testAccount;

  @AfterEach
  public void tearDown() throws Exception {
    testAccount = AzureTestUtils.cleanupTestAccount(testAccount);
  }

  /**
   * Test that by default we don't store the blob-level MD5.
   */
  @Test
  public void testBlobMd5StoreOffByDefault() throws Exception {
    testAccount = AzureBlobStorageTestAccount.create();
    testStoreBlobMd5(false);
  }

  /**
   * Test that we get blob-level MD5 storage and validation if we specify that
   * in the configuration.
   */
  @Test
  public void testStoreBlobMd5() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(KEY_STORE_BLOB_MD5, true);
    testAccount = AzureBlobStorageTestAccount.create(conf);
    testStoreBlobMd5(true);
  }

  /**
   * Trims a suffix/prefix from the given string. For example if
   * s is given as "/xy" and toTrim is "/", this method returns "xy"
   */
  private static String trim(String s, String toTrim) {
    return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim),
        toTrim);
  }

  private void testStoreBlobMd5(boolean expectMd5Stored) throws Exception {
    assumeNotNull(testAccount);
    // Write a test file.
    NativeAzureFileSystem fs = testAccount.getFileSystem();
    Path testFilePath = AzureTestUtils.pathForTests(fs, methodName.getMethodName());
    String testFileKey = trim(testFilePath.toUri().getPath(), "/");
    OutputStream outStream = fs.create(testFilePath);
    outStream.write(new byte[] { 5, 15 });
    outStream.close();

    // Check that we stored/didn't store the MD5 field as configured.
    CloudBlockBlob blob = testAccount.getBlobReference(testFileKey);
    blob.downloadAttributes();
    String obtainedMd5 = blob.getProperties().getContentMD5();
    if (expectMd5Stored) {
      assertNotNull(obtainedMd5);
    } else {
      assertNull(obtainedMd5, "Expected no MD5, found: " + obtainedMd5);
    }

    // Mess with the content so it doesn't match the MD5.
    String newBlockId = Base64.encode(new byte[] { 55, 44, 33, 22 });
    blob.uploadBlock(newBlockId,
        new ByteArrayInputStream(new byte[] { 6, 45 }), 2);
    blob.commitBlockList(Arrays.asList(new BlockEntry[] { new BlockEntry(
        newBlockId, BlockSearchMode.UNCOMMITTED) }));

    // Now read back the content. If we stored the MD5 for the blob content
    // we should get a data corruption error.
    InputStream inStream = fs.open(testFilePath);
    try {
      byte[] inBuf = new byte[100];
      while (inStream.read(inBuf) > 0){
        //nothing;
      }
      inStream.close();
      if (expectMd5Stored) {
        fail("Should've thrown because of data corruption.");
      }
    } catch (IOException ex) {
      if (!expectMd5Stored) {
        throw ex;
      }
      StorageException cause = (StorageException)ex.getCause();
      assertNotNull(cause);
      assertEquals(StorageErrorCodeStrings.INVALID_MD5, cause.getErrorCode(),
          "Unexpected cause: " + cause);
    }
  }

  /**
   * Test that by default we check block-level MD5.
   */
  @Test
  public void testCheckBlockMd5() throws Exception {
    testAccount = AzureBlobStorageTestAccount.create();
    testCheckBlockMd5(true);
  }

  /**
   * Test that we don't check block-level MD5 if we specify that in the
   * configuration.
   */
  @Test
  public void testDontCheckBlockMd5() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(KEY_CHECK_BLOCK_MD5, false);
    testAccount = AzureBlobStorageTestAccount.create(conf);
    testCheckBlockMd5(false);
  }

  /**
   * Connection inspector to check that MD5 fields for content is set/not set as
   * expected.
   */
  private static class ContentMD5Checker extends
      StorageEvent<ResponseReceivedEvent> {
    private final boolean expectMd5;

    public ContentMD5Checker(boolean expectMd5) {
      this.expectMd5 = expectMd5;
    }

    @Override
    public void eventOccurred(ResponseReceivedEvent eventArg) {
      HttpURLConnection connection = (HttpURLConnection) eventArg
          .getConnectionObject();
      if (isGetRange(connection)) {
        checkObtainedMd5(connection
            .getHeaderField(Constants.HeaderConstants.CONTENT_MD5));
      } else if (isPutBlock(connection)) {
        checkObtainedMd5(connection
            .getRequestProperty(Constants.HeaderConstants.CONTENT_MD5));
      }
    }

    private void checkObtainedMd5(String obtainedMd5) {
      if (expectMd5) {
        assertNotNull(obtainedMd5);
      } else {
        assertNull(obtainedMd5, "Expected no MD5, found: " + obtainedMd5);
      }
    }

    private static boolean isPutBlock(HttpURLConnection connection) {
      return connection.getRequestMethod().equals("PUT")
          && connection.getURL().getQuery() != null
          && connection.getURL().getQuery().contains("blockid");
    }

    private static boolean isGetRange(HttpURLConnection connection) {
      return connection.getRequestMethod().equals("GET")
          && connection
              .getHeaderField(Constants.HeaderConstants.STORAGE_RANGE_HEADER) != null;
    }
  }

  private void testCheckBlockMd5(final boolean expectMd5Checked)
      throws Exception {
    assumeNotNull(testAccount);
    Path testFilePath = new Path("/testFile");

    // Add a hook to check that for GET/PUT requests we set/don't set
    // the block-level MD5 field as configured. I tried to do clever
    // testing by also messing with the raw data to see if we actually
    // validate the data as expected, but the HttpURLConnection wasn't
    // pluggable enough for me to do that.
    testAccount.getFileSystem().getStore()
    .addTestHookToOperationContext(new TestHookOperationContext() {
    @Override
          public OperationContext modifyOperationContext(
              OperationContext original) {
      original.getResponseReceivedEventHandler().addListener(
          new ContentMD5Checker(expectMd5Checked));
      return original;
          }
        });

    OutputStream outStream = testAccount.getFileSystem().create(testFilePath);
    outStream.write(new byte[] { 5, 15 });
    outStream.close();

    InputStream inStream = testAccount.getFileSystem().open(testFilePath);
    byte[] inBuf = new byte[100];
    while (inStream.read(inBuf) > 0){
      //nothing;
    }
    inStream.close();
  }
}