HiveZeroRowFileCreator.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.io.DataOutput;
import com.facebook.presto.common.io.DataSink;
import com.facebook.presto.hive.datasink.DataSinkFactory;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.Slices;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.mapred.JobConf;
import javax.inject.Inject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.common.io.DataOutput.createDataOutput;
import static com.facebook.presto.hive.HiveCompressionCodec.NONE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static com.facebook.presto.hive.HiveWriteUtils.initializeSerializer;
import static com.facebook.presto.hive.pagefile.PageFileWriterFactory.createEmptyPageFile;
import static com.facebook.presto.hive.util.ConfigurationUtils.configureCompression;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.String.format;
import static java.nio.file.Files.deleteIfExists;
import static java.nio.file.Files.readAllBytes;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
public class HiveZeroRowFileCreator
implements ZeroRowFileCreator
{
private static final Logger log = Logger.get(HiveZeroRowFileCreator.class);
private final HdfsEnvironment hdfsEnvironment;
private final DataSinkFactory dataSinkFactory;
private final ListeningExecutorService executor;
@Inject
public HiveZeroRowFileCreator(
HdfsEnvironment hdfsEnvironment,
DataSinkFactory dataSinkFactory,
@ForZeroRowFileCreator ListeningExecutorService zeroRowFileCreatorExecutor)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.dataSinkFactory = requireNonNull(dataSinkFactory, "dataSinkFactory is null");
this.executor = requireNonNull(zeroRowFileCreatorExecutor, "zeroRowFileCreatorExecutor is null");
}
@Override
public void createFiles(ConnectorSession session, HdfsContext hdfsContext, Path destinationDirectory, List<String> fileNames, StorageFormat format, HiveCompressionCodec compressionCodec, Properties schema)
{
if (fileNames.isEmpty()) {
return;
}
byte[] fileContent = generateZeroRowFile(session, hdfsContext, schema, format.getSerDe(), format.getOutputFormat(), compressionCodec);
List<ListenableFuture<?>> commitFutures = new ArrayList<>();
for (String fileName : fileNames) {
commitFutures.add(executor.submit(() -> createFile(hdfsContext, new Path(destinationDirectory, fileName), fileContent, session)));
}
ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
try {
getFutureValue(listenableFutureAggregate, PrestoException.class);
}
catch (RuntimeException e) {
listenableFutureAggregate.cancel(true);
throw e;
}
}
private byte[] generateZeroRowFile(
ConnectorSession session,
HdfsContext hdfsContext,
Properties properties,
String serDe,
String outputFormatName,
HiveCompressionCodec compressionCodec)
{
String tmpDirectoryPath = System.getProperty("java.io.tmpdir");
String tmpFileName = format("presto-hive-zero-row-file-creator-%s-%s", session.getQueryId(), randomUUID().toString());
java.nio.file.Path tmpFilePath = Paths.get(tmpDirectoryPath, tmpFileName);
try {
Path target = new Path(format("file://%s/%s", tmpDirectoryPath, tmpFileName));
//https://github.com/prestodb/presto/issues/14401 JSON Format reader does not fetch compression from source system
JobConf conf = configureCompression(
hdfsEnvironment.getConfiguration(hdfsContext, target),
outputFormatName.equals(HiveStorageFormat.JSON.getOutputFormat()) ? compressionCodec : NONE);
if (outputFormatName.equals(HiveStorageFormat.PAGEFILE.getOutputFormat())) {
createEmptyPageFile(dataSinkFactory, session, target.getFileSystem(conf), target);
return readAllBytes(tmpFilePath);
}
// Some serializers such as Avro set a property in the schema.
initializeSerializer(conf, properties, serDe);
// The code below is not a try with resources because RecordWriter is not Closeable.
RecordWriter recordWriter = HiveWriteUtils.createRecordWriter(
target,
conf,
properties,
outputFormatName,
session);
recordWriter.close(false);
return readAllBytes(tmpFilePath);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
finally {
try {
deleteIfExists(tmpFilePath);
}
catch (IOException e) {
log.error(e, "Error deleting temporary file: %s", tmpFilePath);
}
}
}
private void createFile(HdfsContext hdfsContext, Path path, byte[] content, ConnectorSession session)
{
try {
FileSystem fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
try (DataSink dataSink = dataSinkFactory.createDataSink(session, fs, path)) {
DataOutput dataOutput = createDataOutput(Slices.wrappedBuffer(content));
dataSink.write(ImmutableList.of(dataOutput));
}
}
catch (IOException e) {
throw new PrestoException(HIVE_WRITER_CLOSE_ERROR, "Error write zero-row file to Hive", e);
}
}
}