MagicOutputStream.java
/*
* ByteDance Volcengine EMR, Copyright 2022.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.tosfs.commit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.object.Part;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class MagicOutputStream extends ObjectOutputStream {
private final FileSystem fs;
private final Path pendingPath;
private boolean closeStorage = false;
public MagicOutputStream(FileSystem fs, ExecutorService threadPool, Configuration conf,
Path magic) {
this(fs,
ObjectStorageFactory.create(magic.toUri().getScheme(), magic.toUri().getHost(), conf),
threadPool,
conf,
magic);
closeStorage = true;
}
public MagicOutputStream(FileSystem fs, ObjectStorage storage, ExecutorService threadPool,
Configuration conf, Path magic) {
super(storage, threadPool, conf, magic, false);
this.fs = fs;
this.pendingPath = createPendingPath(magic);
}
static String toDestKey(Path magicPath) {
Preconditions.checkArgument(isMagic(magicPath), "Destination path is not magic %s", magicPath);
String magicKey = ObjectUtils.pathToKey(magicPath);
List<String> splits = Lists.newArrayList(magicKey.split("/"));
// Break the full splits list into three collections: <parentSplits>, __magic, <childrenSplits>
int magicIndex = splits.indexOf(CommitUtils.MAGIC);
Preconditions.checkArgument(magicIndex >= 0, "Cannot locate %s in path %s", CommitUtils.MAGIC,
magicPath);
List<String> parentSplits = splits.subList(0, magicIndex);
List<String> childrenSplits = splits.subList(magicIndex + 1, splits.size());
Preconditions.checkArgument(!childrenSplits.isEmpty(),
"No path found under %s for path %s", CommitUtils.MAGIC, magicPath);
// Generate the destination splits which will be joined into the destination object key.
List<String> destSplits = Lists.newArrayList(parentSplits);
if (childrenSplits.contains(CommitUtils.BASE)) {
// Break the <childrenDir> into three collections: <baseParentSplits>, __base,
// <baseChildrenSplits>, and add all <baseChildrenSplits> into the destination splits.
int baseIndex = childrenSplits.indexOf(CommitUtils.BASE);
Preconditions.checkArgument(baseIndex >= 0, "Cannot locate %s in path %s", CommitUtils.BASE,
magicPath);
List<String> baseChildrenSplits =
childrenSplits.subList(baseIndex + 1, childrenSplits.size());
Preconditions.checkArgument(!baseChildrenSplits.isEmpty(),
"No path found under %s for magic path %s", CommitUtils.BASE, magicPath);
destSplits.addAll(baseChildrenSplits);
} else {
// Just add the last elements of the <childrenSplits> into the destination splits.
String filename = childrenSplits.get(childrenSplits.size() - 1);
destSplits.add(filename);
}
return StringUtils.join(destSplits, "/");
}
@Override
protected String createDestKey(Path magicPath) {
return toDestKey(magicPath);
}
@Override
protected void finishUpload(String destKey, String uploadId, List<Part> parts)
throws IOException {
Pending pending = Pending.builder()
.setBucket(storage().bucket().name())
.setUploadId(uploadId)
.setLength(parts.stream().mapToLong(Part::size).sum())
.setDestKey(destKey)
.setCreatedTimestamp(System.currentTimeMillis())
.addParts(parts)
.build();
persist(pendingPath, pending.serialize());
}
@Override
public synchronized void close() throws IOException {
super.close();
if (closeStorage) {
storage().close();
}
}
protected void persist(Path p, byte[] data) throws IOException {
CommitUtils.save(fs, p, data);
}
public String pendingKey() {
return ObjectUtils.pathToKey(pendingPath);
}
private static Path createPendingPath(Path magic) {
return new Path(magic.getParent(),
String.format("%s%s", magic.getName(), CommitUtils.PENDING_SUFFIX));
}
// .pending and .pendingset files are not typical magic files.
private static boolean isInternalFile(Path p) {
return p.toString().endsWith(CommitUtils.PENDINGSET_SUFFIX) || p.toString()
.endsWith(CommitUtils.PENDING_SUFFIX);
}
public static boolean isMagic(Path p) {
Preconditions.checkNotNull(p, "path cannot be null.");
String path = p.toUri().getPath();
List<String> splits = Arrays.stream(path.split("/"))
.filter(StringUtils::isNoneEmpty)
.collect(Collectors.toList());
return splits.contains(CommitUtils.MAGIC) && !isInternalFile(p);
}
}