TestCustomSplitConversionUtils.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.util;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeBootstrapBaseFileSplit;
import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.testng.Assert.assertEquals;
public class TestCustomSplitConversionUtils
{
private static final String BASE_PATH = "/test/table/";
private static final Path FILE_PATH = new Path(BASE_PATH, "test.parquet");
private static final long SPLIT_START_POS = 0L;
private static final long SPLIT_LENGTH = 100L;
private static final String[] SPLIT_HOSTS = new String[] {"host1", "host2"};
@Test
public void testHudiRealtimeSplitConverterRoundTrip()
throws IOException
{
List<String> deltaLogPaths = Arrays.asList("test1", "test2", "test3");
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
String expectedMaxCommitTime = "max_commit_time";
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, false, Option.empty());
// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
// Test conversion of (customSplitInfo + baseSplit) -> HudiSplit
HoodieRealtimeFileSplit recreatedSplit = (HoodieRealtimeFileSplit) CustomSplitConversionUtils.recreateSplitWithCustomInfo(baseSplit, customSplitInfo);
assertEquals(FILE_PATH, recreatedSplit.getPath());
assertEquals(SPLIT_START_POS, recreatedSplit.getStart());
assertEquals(SPLIT_LENGTH, recreatedSplit.getLength());
assertEquals(SPLIT_HOSTS, recreatedSplit.getLocations());
assertEquals(BASE_PATH, recreatedSplit.getBasePath());
assertEquals(deltaLogPaths, recreatedSplit.getDeltaLogPaths());
assertEquals(expectedMaxCommitTime, recreatedSplit.getMaxCommitTime());
}
@Test
public void testHudiRealtimeSplitConverterNoLogRoundTrip()
throws IOException
{
List<String> deltaLogPaths = ImmutableList.of();
List<HoodieLogFile> deltaLogFiles = ImmutableList.of();
String expectedMaxCommitTime = "max_commit_time";
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit hudiSplit = new HoodieRealtimeFileSplit(baseSplit, BASE_PATH, deltaLogFiles, expectedMaxCommitTime, false, Option.empty());
// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
// Test conversion of (customSplitInfo + baseSplit) -> HudiSplit
HoodieRealtimeFileSplit recreatedSplit = (HoodieRealtimeFileSplit) CustomSplitConversionUtils.recreateSplitWithCustomInfo(baseSplit, customSplitInfo);
assertEquals(FILE_PATH, recreatedSplit.getPath());
assertEquals(SPLIT_START_POS, recreatedSplit.getStart());
assertEquals(SPLIT_LENGTH, recreatedSplit.getLength());
assertEquals(SPLIT_HOSTS, recreatedSplit.getLocations());
assertEquals(BASE_PATH, recreatedSplit.getBasePath());
assertEquals(deltaLogPaths, recreatedSplit.getDeltaLogPaths());
assertEquals(expectedMaxCommitTime, recreatedSplit.getMaxCommitTime());
}
@Test
public void testHudiBootstrapBaseFileSplitConverter()
throws IOException
{
Path bootstrapSourceFilePath = new Path("/test/source/test.parquet");
long bootstrapSourceSplitStartPos = 0L;
long bootstrapSourceSplitLength = 200L;
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit bootstrapSourceSplit = new FileSplit(bootstrapSourceFilePath, bootstrapSourceSplitStartPos, bootstrapSourceSplitLength,
new String[0]);
FileSplit hudiSplit = new BootstrapBaseFileSplit(baseSplit, bootstrapSourceSplit);
// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
// Test conversion of (customSplitInfo + baseSplit) -> HudiSplit
BootstrapBaseFileSplit recreatedSplit = (BootstrapBaseFileSplit) CustomSplitConversionUtils
.recreateSplitWithCustomInfo(baseSplit, customSplitInfo);
assertEquals(FILE_PATH, recreatedSplit.getPath());
assertEquals(SPLIT_START_POS, recreatedSplit.getStart());
assertEquals(SPLIT_LENGTH, recreatedSplit.getLength());
assertEquals(SPLIT_HOSTS, recreatedSplit.getLocations());
assertEquals(bootstrapSourceFilePath, recreatedSplit.getBootstrapFileSplit().getPath());
assertEquals(bootstrapSourceSplitStartPos, recreatedSplit.getBootstrapFileSplit().getStart());
assertEquals(bootstrapSourceSplitLength, recreatedSplit.getBootstrapFileSplit().getLength());
}
@Test
public void testHudiRealtimeBootstrapBaseFileSplitConverter()
throws IOException
{
List<String> deltaLogPaths = Arrays.asList("test1", "test2", "test3");
List<HoodieLogFile> deltaLogFiles = deltaLogPaths.stream().map(p -> new HoodieLogFile(new Path(p))).collect(Collectors.toList());
String maxCommitTime = "max_commit_time";
Path bootstrapSourceFilePath = new Path("/test/source/test.parquet");
long bootstrapSourceSplitStartPos = 0L;
long bootstrapSourceSplitLength = 200L;
FileSplit baseSplit = new FileSplit(FILE_PATH, SPLIT_START_POS, SPLIT_LENGTH, SPLIT_HOSTS);
FileSplit bootstrapSourceSplit = new FileSplit(bootstrapSourceFilePath, bootstrapSourceSplitStartPos, bootstrapSourceSplitLength,
new String[0]);
FileSplit hudiSplit = new HoodieRealtimeBootstrapBaseFileSplit(baseSplit, BASE_PATH, deltaLogFiles, maxCommitTime,
bootstrapSourceSplit, false, Option.empty());
// Test conversion of HudiSplit -> customSplitInfo
Map<String, String> customSplitInfo = CustomSplitConversionUtils.extractCustomSplitInfo(hudiSplit);
// Test conversion of (customSplitInfo + baseSplit) -> HudiSplit
HoodieRealtimeBootstrapBaseFileSplit recreatedSplit = (HoodieRealtimeBootstrapBaseFileSplit) CustomSplitConversionUtils
.recreateSplitWithCustomInfo(baseSplit, customSplitInfo);
assertEquals(FILE_PATH, recreatedSplit.getPath());
assertEquals(SPLIT_START_POS, recreatedSplit.getStart());
assertEquals(SPLIT_LENGTH, recreatedSplit.getLength());
assertEquals(SPLIT_HOSTS, recreatedSplit.getLocations());
assertEquals(BASE_PATH, recreatedSplit.getBasePath());
assertEquals(deltaLogPaths, recreatedSplit.getDeltaLogPaths());
assertEquals(maxCommitTime, recreatedSplit.getMaxCommitTime());
assertEquals(bootstrapSourceFilePath, recreatedSplit.getBootstrapFileSplit().getPath());
assertEquals(bootstrapSourceSplitStartPos, recreatedSplit.getBootstrapFileSplit().getStart());
assertEquals(bootstrapSourceSplitLength, recreatedSplit.getBootstrapFileSplit().getLength());
}
}