ITestS3APutIfMatchAndIfNoneMatch.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.s3.model.S3Exception;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;

import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeConditionalCreateEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assumptions.assumeThat;

/**
 * Integration tests with conditional overwrites.
 * This test class verifies the behavior of "If-Match" and "If-None-Match"
 * conditions while writing files.
 */
public class ITestS3APutIfMatchAndIfNoneMatch extends AbstractS3ATestBase {

  private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;

  private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255);
  private static final byte[] MULTIPART_FILE_BYTES = dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');

  private BlockOutputStreamStatistics statistics;

  @Override
  public Configuration createConfiguration() {
    Configuration conf = super.createConfiguration();

    S3ATestUtils.disableFilesystemCaching(conf);
    removeBaseAndBucketOverrides(
            conf,
            FS_S3A_CREATE_PERFORMANCE,
            FS_S3A_PERFORMANCE_FLAGS,
            MULTIPART_SIZE,
            MIN_MULTIPART_THRESHOLD,
            UPLOAD_PART_COUNT_LIMIT
    );
    conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
    conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD);
    conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD);
    return conf;
  }

  @Override
  public void setup() throws Exception {
    super.setup();
    Configuration conf = getConfiguration();
    assumeConditionalCreateEnabled(conf);
  }

  /**
   * Asserts that an S3Exception has the expected HTTP status code.
   *
   * @param code Expected HTTP status code.
   * @param ex   Exception to validate.
   */
  private static void assertS3ExceptionStatusCode(int code, Exception ex) {
    S3Exception s3Exception = (S3Exception) ex.getCause();

    if (s3Exception.statusCode() != code) {
      throw new AssertionError("Expected status code " + code + " from " + ex, ex);
    }
  }

  /**
   * Asserts that the FSDataOutputStream has the conditional create capability enabled.
   *
   * @param stream The output stream to check.
   */
  private static void assertHasCapabilityConditionalCreate(FSDataOutputStream stream) {
    Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE))
            .as("Conditional create capability should be enabled")
            .isTrue();
  }

  /**
   * Asserts that the FSDataOutputStream has the ETag-based conditional create capability enabled.
   *
   * @param stream The output stream to check.
   */
  private static void assertHasCapabilityEtagWrite(FSDataOutputStream stream) {
    Assertions.assertThat(stream.hasCapability(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG))
            .as("ETag-based conditional create capability should be enabled")
            .isTrue();
  }

  protected String getBlockOutputBufferName() {
    return FAST_UPLOAD_BUFFER_ARRAY;
  }

  /**
   * Creates a file with specified flags and writes data to it.
   *
   * @param fs              The FileSystem instance.
   * @param path            Path of the file to create.
   * @param data            Byte data to write into the file.
   * @param ifNoneMatchFlag If true, enforces conditional creation.
   * @param etag            The ETag for conditional writes.
   * @param forceMultipart  If true, forces multipart upload.
   * @return The FileStatus of the created file.
   * @throws Exception If an error occurs during file creation.
   */
  private static FileStatus createFileWithFlags(
          FileSystem fs,
          Path path,
          byte[] data,
          boolean ifNoneMatchFlag,
          String etag,
          boolean forceMultipart) throws Exception {
    try (FSDataOutputStream stream = getStreamWithFlags(fs, path, ifNoneMatchFlag, etag,
            forceMultipart)) {
      if (ifNoneMatchFlag) {
        assertHasCapabilityConditionalCreate(stream);
      }
      if (etag != null) {
        assertHasCapabilityEtagWrite(stream);
      }
      if (data != null && data.length > 0) {
        stream.write(data);
      }
    }
    return fs.getFileStatus(path);
  }

  /**
   * Overloaded method to create a file without forcing multipart upload.
   *
   * @param fs              The FileSystem instance.
   * @param path            Path of the file to create.
   * @param data            Byte data to write into the file.
   * @param ifNoneMatchFlag If true, enforces conditional creation.
   * @param etag            The ETag for conditional writes.
   * @return The FileStatus of the created file.
   * @throws Exception If an error occurs during file creation.
   */
  private static FileStatus createFileWithFlags(
          FileSystem fs,
          Path path,
          byte[] data,
          boolean ifNoneMatchFlag,
          String etag) throws Exception {
    return createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false);
  }

  /**
   * Opens a file for writing with specific conditional write flags.
   *
   * @param fs              The FileSystem instance.
   * @param path            Path of the file to open.
   * @param ifNoneMatchFlag If true, enables conditional overwrites.
   * @param etag            The ETag for conditional writes.
   * @param forceMultipart  If true, forces multipart upload.
   * @return The FSDataOutputStream for writing.
   * @throws Exception If an error occurs while opening the file.
   */
  private static FSDataOutputStream getStreamWithFlags(
          FileSystem fs,
          Path path,
          boolean ifNoneMatchFlag,
          String etag,
          boolean forceMultipart) throws Exception {
    FSDataOutputStreamBuilder builder = fs.createFile(path);
    if (ifNoneMatchFlag) {
      builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, true);
    }
    if (etag != null) {
      builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag);
    }
    if (forceMultipart) {
      builder.opt(FS_S3A_CREATE_MULTIPART, true);
    }
    return builder.create().build();
  }

  /**
   * Opens a file for writing with specific conditional write flags and without forcing multipart upload.
   *
   * @param fs              The FileSystem instance.
   * @param path            Path of the file to open.
   * @param ifNoneMatchFlag If true, enables conditional overwrites.
   * @param etag            The ETag for conditional writes.
   * @return The FSDataOutputStream for writing.
   * @throws Exception If an error occurs while opening the file.
   */
  private static FSDataOutputStream getStreamWithFlags(
          FileSystem fs,
          Path path,
          boolean ifNoneMatchFlag,
          String etag) throws Exception {
    return getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, false);
  }

  /**
   * Reads the content of a file as a string.
   *
   * @param fs   The FileSystem instance.
   * @param path The file path to read.
   * @return The content of the file as a string.
   * @throws Throwable If an error occurs while reading the file.
   */
  private static String readFileContent(FileSystem fs, Path path) throws Throwable {
    try (FSDataInputStream inputStream = fs.open(path)) {
      return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
    }
  }

  /**
   * Updates the statistics of the output stream.
   *
   * @param stream The FSDataOutputStream whose statistics should be updated.
   */
  private void updateStatistics(FSDataOutputStream stream) {
    statistics = S3ATestUtils.getOutputStreamStatistics(stream);
  }

  /**
   * Retrieves the ETag of a file.
   *
   * @param fs   The FileSystem instance.
   * @param path The path of the file.
   * @return The ETag associated with the file.
   * @throws IOException If an error occurs while fetching the file status.
   */
  private static String getEtag(FileSystem fs, Path path) throws IOException {
    String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
    return etag;
  }

  @Test
  public void testIfNoneMatchConflictOnOverwrite() throws Throwable {
    describe("generate conflict on overwrites");
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();
    fs.mkdirs(testFile.getParent());

    // create a file over an empty path: all good
    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);

    // attempted overwrite fails
    RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class,
            () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);

    // second attempt also fails
    RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class,
            () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);

    // Delete file and verify an overwrite works again
    fs.delete(testFile, false);
    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
  }

  @Test
  public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();

    // Skip if multipart upload not supported
    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
            .as("Skipping as multipart upload not supported")
            .isTrue();

    createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true);

    RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class,
            () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException);

    RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class,
            () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true));
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException);
  }

  @Test
  public void testIfNoneMatchMultipartUploadWithRaceCondition() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();

    // Skip test if multipart uploads are not supported
    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
            .as("Skipping as multipart upload not supported")
            .isTrue();

    // Create a file with multipart upload but do not close the stream
    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, true);
    assertHasCapabilityConditionalCreate(stream);
    stream.write(MULTIPART_FILE_BYTES);

    // create and close another small file in parallel
    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);

    // Closing the first stream should throw RemoteFileChangedException
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Test
  public void testIfNoneMatchTwoConcurrentMultipartUploads() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();

    // Skip test if multipart uploads are not supported
    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
            .as("Skipping as multipart upload not supported")
            .isTrue();

    // Create a file with multipart upload but do not close the stream
    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, true);
    assertHasCapabilityConditionalCreate(stream);
    stream.write(MULTIPART_FILE_BYTES);

    // create and close another multipart file in parallel
    createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true);

    // Closing the first stream should throw RemoteFileChangedException
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Test
  public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();
    fs.mkdirs(testFile.getParent());

    // create a non-empty file
    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);

    // overwrite with zero-byte file (no write)
    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null);
    assertHasCapabilityConditionalCreate(stream);

    // close the stream, should throw RemoteFileChangedException
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close);
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Test
  public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();
    fs.mkdirs(testFile.getParent());

    // create an empty file (no write)
    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null);
    assertHasCapabilityConditionalCreate(stream);
    stream.close();

    // overwrite with non-empty file, should throw RemoteFileChangedException
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class,
            () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null));
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Test
  public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();
    fs.mkdirs(testFile.getParent());

    // create an empty file (no write)
    FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, true, null);
    assertHasCapabilityConditionalCreate(stream1);
    stream1.close();

    // overwrite with another empty file, should throw RemoteFileChangedException
    FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, null);
    assertHasCapabilityConditionalCreate(stream2);
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close);
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Test
  public void testIfMatchOverwriteWithCorrectEtag() throws Throwable {
    FileSystem fs = getFileSystem();
    Path path = methodPath();
    fs.mkdirs(path.getParent());

    // Create a file
    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);

    // Retrieve the etag from the created file
    String etag = getEtag(fs, path);
    Assertions.assertThat(etag)
            .as("ETag should not be null after file creation")
            .isNotNull();

    String updatedFileContent = "Updated content";
    byte[] updatedData = updatedFileContent.getBytes(StandardCharsets.UTF_8);

    // overwrite file with etag
    createFileWithFlags(fs, path, updatedData, false, etag);

    // read file and verify overwritten content
    String fileContent = readFileContent(fs, path);
    Assertions.assertThat(fileContent)
            .as("File content should be correctly updated after overwriting with the correct ETag")
            .isEqualTo(updatedFileContent);
  }

  @Test
  public void testIfMatchOverwriteWithOutdatedEtag() throws Throwable {
    FileSystem fs = getFileSystem();
    Path path = methodPath();
    fs.mkdirs(path.getParent());

    // Create a file
    createFileWithFlags(fs, path, SMALL_FILE_BYTES, true, null);

    // Retrieve the etag from the created file
    String etag = getEtag(fs, path);
    Assertions.assertThat(etag)
            .as("ETag should not be null after file creation")
            .isNotNull();

    // Overwrite the file. Will update the etag, making the previously fetched etag outdated.
    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);

    // overwrite file with outdated etag. Should throw RemoteFileChangedException
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class,
            () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag));
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Test
  public void testIfMatchOverwriteDeletedFileWithEtag() throws Throwable {
    FileSystem fs = getFileSystem();
    Path path = methodPath();
    fs.mkdirs(path.getParent());

    // Create a file
    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);

    // Retrieve the etag from the created file
    String etag = getEtag(fs, path);
    Assertions.assertThat(etag)
            .as("ETag should not be null after file creation")
            .isNotNull();

    // delete the file
    fs.delete(path);

    // overwrite file with etag. Should throw FileNotFoundException
    FileNotFoundException exception = intercept(FileNotFoundException.class,
            () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag));
    assertS3ExceptionStatusCode(SC_404_NOT_FOUND, exception);
  }

  @Test
  public void testIfMatchOverwriteFileWithEmptyEtag() throws Throwable {
    FileSystem fs = getFileSystem();
    Path path = methodPath();
    fs.mkdirs(path.getParent());

    // Create a file
    createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);

    // overwrite file with empty etag. Should throw IllegalArgumentException
    intercept(IllegalArgumentException.class,
            () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, ""));
  }

  @Test
  public void testIfMatchTwoMultipartUploadsRaceConditionOneClosesFirst() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();

    // Skip test if multipart uploads are not supported
    assumeThat(fs.hasPathCapability(testFile, STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED))
            .as("Skipping as multipart upload not supported")
            .isTrue();

    // Create a file and retrieve its etag
    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);
    String etag = getEtag(fs, testFile);
    Assertions.assertThat(etag)
            .as("ETag should not be null after file creation")
            .isNotNull();

    // Start two multipart uploads with the same etag
    FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, false, etag, true);
    assertHasCapabilityEtagWrite(stream1);

    FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, false, etag, true);
    assertHasCapabilityEtagWrite(stream2);

    // Write data to both streams
    stream1.write(MULTIPART_FILE_BYTES);
    stream2.write(MULTIPART_FILE_BYTES);

    // Close the first stream successfully. Will update the etag
    stream1.close();

    // Close second stream, should fail due to etag mismatch
    RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close);
    assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
  }

  @Disabled("conditional_write statistics not yet fully implemented")
  @Test
  public void testConditionalWriteStatisticsWithoutIfNoneMatch() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();

    // write without an If-None-Match
    // conditional_write, conditional_write_statistics should remain 0
    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, false, null, false);
    updateStatistics(stream);
    stream.write(SMALL_FILE_BYTES);
    stream.close();
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);

    // write with overwrite = true
    // conditional_write, conditional_write_statistics should remain 0
    try (FSDataOutputStream outputStream = fs.create(testFile, true)) {
      outputStream.write(SMALL_FILE_BYTES);
      updateStatistics(outputStream);
    }
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);

    // write in path where file already exists with overwrite = false
    // conditional_write, conditional_write_statistics should remain 0
    try (FSDataOutputStream outputStream = fs.create(testFile, false)) {
      outputStream.write(SMALL_FILE_BYTES);
      updateStatistics(outputStream);
    } catch (FileAlreadyExistsException e) {}
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);

    // delete the file
    fs.delete(testFile, false);

    // write in path where file doesn't exist with overwrite = false
    // conditional_write, conditional_write_statistics should remain 0
    try (FSDataOutputStream outputStream = fs.create(testFile, false)) {
      outputStream.write(SMALL_FILE_BYTES);
      updateStatistics(outputStream);
    }
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);
  }

  @Disabled("conditional_write statistics not yet fully implemented")
  @Test
  public void testConditionalWriteStatisticsWithIfNoneMatch() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();

    FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, null, false);
    updateStatistics(stream);
    stream.write(SMALL_FILE_BYTES);
    stream.close();

    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 1);
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 0);

    intercept(RemoteFileChangedException.class,
            () -> {
              // try again with If-None-Match. should fail
              FSDataOutputStream s = getStreamWithFlags(fs, testFile, true, null, false);
              updateStatistics(s);
              s.write(SMALL_FILE_BYTES);
              s.close();
              return "Second write using If-None-Match should have failed due to existing file." + s;
            }
    );

    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 1);
    verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 1);
  }

  /**
   * Tests that a conditional create operation is triggered when the performance flag is enabled
   * and the overwrite option is set to false.
   */
  @Test
  public void testConditionalCreateWhenPerformanceFlagEnabledAndOverwriteDisabled() throws Throwable {
    FileSystem fs = getFileSystem();
    Path testFile = methodPath();
    fs.mkdirs(testFile.getParent());

    // Create a file
    createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);

    // Attempt to override the file without overwrite and performance flag.
    // Should throw RemoteFileChangedException (due to conditional write operation)
    intercept(RemoteFileChangedException.class, () -> {
      FSDataOutputStreamBuilder cf = fs.createFile(testFile);
      cf.overwrite(false);
      cf.must(FS_S3A_CREATE_PERFORMANCE, true);
      try (FSDataOutputStream stream = cf.build()) {
        assertHasCapabilityConditionalCreate(stream);
        stream.write(SMALL_FILE_BYTES);
        updateStatistics(stream);
      }
    });

    // TODO: uncomment when statistics are getting initialised
    // verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE.getSymbol(), 0);
    // verifyStatisticCounterValue(statistics.getIOStatistics(), Statistic.CONDITIONAL_CREATE_FAILED.getSymbol(), 1);
  }
}