ITestAbfsManifestStoreOperations.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.commit;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_PRESERVED_IN_RENAME;
import static org.apache.hadoop.fs.azurebfs.commit.AbfsCommitTestHelper.prepareTestConfiguration;
import static org.junit.Assume.assumeTrue;
/**
* Test {@link AbfsManifestStoreOperations}.
* As this looks at etag handling through FS operations, it's actually testing how etags work
* in ABFS (preservation across renames) and in the client (are they consistent
* in LIST and HEAD calls).
*
* Skipped when tested against wasb-compatible stores.
*/
public class ITestAbfsManifestStoreOperations extends AbstractManifestCommitterTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsManifestStoreOperations.class);
private final ABFSContractTestBinding binding;
public ITestAbfsManifestStoreOperations() throws Exception {
binding = new ABFSContractTestBinding();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
// skip tests on non-HNS stores
assumeTrue("Resilient rename not available",
getFileSystem().hasPathCapability(getContract().getTestPath(),
ETAGS_PRESERVED_IN_RENAME));
}
@Override
protected Configuration createConfiguration() {
return enableManifestCommitter(prepareTestConfiguration(binding));
}
@Override
protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
/**
* basic consistency across operations, as well as being non-empty.
*/
@Test
public void testEtagConsistencyAcrossListAndHead() throws Throwable {
describe("Etag values must be non-empty and consistent across LIST and HEAD Calls.");
final Path path = methodPath();
final FileSystem fs = getFileSystem();
ContractTestUtils.touch(fs, path);
final ManifestStoreOperations operations = createManifestStoreOperations();
Assertions.assertThat(operations)
.describedAs("Store operations class loaded via Configuration")
.isInstanceOf(AbfsManifestStoreOperations.class);
final FileStatus st = operations.getFileStatus(path);
final String etag = operations.getEtag(st);
Assertions.assertThat(etag)
.describedAs("Etag of %s", st)
.isNotBlank();
LOG.info("etag of empty file is \"{}\"", etag);
final FileStatus[] statuses = fs.listStatus(path);
Assertions.assertThat(statuses)
.describedAs("List(%s)", path)
.hasSize(1);
final FileStatus lsStatus = statuses[0];
Assertions.assertThat(operations.getEtag(lsStatus))
.describedAs("etag of list status (%s) compared to HEAD value of %s", lsStatus, st)
.isEqualTo(etag);
}
@Test
public void testEtagsOfDifferentDataDifferent() throws Throwable {
describe("Verify that two different blocks of data written have different tags");
final Path path = methodPath();
final FileSystem fs = getFileSystem();
Path src = new Path(path, "src");
ContractTestUtils.createFile(fs, src, true,
"data1234".getBytes(StandardCharsets.UTF_8));
final ManifestStoreOperations operations = createManifestStoreOperations();
final FileStatus srcStatus = operations.getFileStatus(src);
final String srcTag = operations.getEtag(srcStatus);
LOG.info("etag of file 1 is \"{}\"", srcTag);
// now overwrite with data of same length
// (ensure that path or length aren't used exclusively as tag)
ContractTestUtils.createFile(fs, src, true,
"1234data".getBytes(StandardCharsets.UTF_8));
// validate
final String tag2 = operations.getEtag(operations.getFileStatus(src));
LOG.info("etag of file 2 is \"{}\"", tag2);
Assertions.assertThat(tag2)
.describedAs("etag of updated file")
.isNotEqualTo(srcTag);
}
@Test
public void testEtagConsistencyAcrossRename() throws Throwable {
describe("Verify that when a file is renamed, the etag remains unchanged");
final Path path = methodPath();
final FileSystem fs = getFileSystem();
Path src = new Path(path, "src");
Path dest = new Path(path, "dest");
ContractTestUtils.createFile(fs, src, true,
"sample data".getBytes(StandardCharsets.UTF_8));
final ManifestStoreOperations operations = createManifestStoreOperations();
final FileStatus srcStatus = operations.getFileStatus(src);
final String srcTag = operations.getEtag(srcStatus);
LOG.info("etag of short file is \"{}\"", srcTag);
Assertions.assertThat(srcTag)
.describedAs("Etag of %s", srcStatus)
.isNotBlank();
// rename
operations.commitFile(new FileEntry(src, dest, 0, srcTag));
// validate
FileStatus destStatus = operations.getFileStatus(dest);
final String destTag = operations.getEtag(destStatus);
Assertions.assertThat(destTag)
.describedAs("etag of list status (%s) compared to HEAD value of %s", destStatus, srcStatus)
.isEqualTo(srcTag);
}
}