TestNativeExecutionSystemConfig.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark.execution.property;
import com.facebook.airlift.configuration.testing.ConfigAssertions;
import io.airlift.units.DataSize;
import org.testng.annotations.Test;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
public class TestNativeExecutionSystemConfig
{
@Test
public void testNativeExecutionVeloxConfig()
{
// Test defaults
assertRecordedDefaults(ConfigAssertions.recordDefaults(NativeExecutionVeloxConfig.class)
.setCodegenEnabled(false)
.setSpillEnabled(true)
.setAggregationSpillEnabled(true)
.setJoinSpillEnabled(true)
.setOrderBySpillEnabled(true)
.setMaxSpillBytes(500L << 30));
// Test explicit property mapping. Also makes sure properties returned by getAllProperties() covers full property list.
NativeExecutionVeloxConfig expected = new NativeExecutionVeloxConfig()
.setCodegenEnabled(true)
.setSpillEnabled(false)
.setAggregationSpillEnabled(false)
.setJoinSpillEnabled(false)
.setOrderBySpillEnabled(false)
.setMaxSpillBytes(1L);
Map<String, String> properties = expected.getAllProperties();
assertFullMapping(properties, expected);
}
@Test
public void testNativeExecutionSystemConfig()
{
// Test defaults
assertRecordedDefaults(ConfigAssertions.recordDefaults(NativeExecutionSystemConfig.class)
.setEnableSerializedPageChecksum(true)
.setEnableVeloxExpressionLogging(false)
.setEnableVeloxTaskLogging(true)
.setHttpServerReusePort(true)
.setHttpServerBindToNodeInternalAddressOnlyEnabled(true)
.setHttpServerPort(7777)
.setHttpServerNumIoThreadsHwMultiplier(1.0)
.setHttpsServerPort(7778)
.setEnableHttpsCommunication(false)
.setHttpsCiphers("AES128-SHA,AES128-SHA256,AES256-GCM-SHA384")
.setHttpsCertPath("")
.setHttpsKeyPath("")
.setExchangeHttpClientNumIoThreadsHwMultiplier(1.0)
.setAsyncDataCacheEnabled(false)
.setAsyncCacheSsdGb(0)
.setConnectorNumIoThreadsHwMultiplier(0.0)
.setShutdownOnsetSec(10)
.setSystemMemoryGb(10)
.setQueryMemoryGb(new DataSize(8, DataSize.Unit.GIGABYTE))
.setUseMmapAllocator(true)
.setMemoryArbitratorKind("SHARED")
.setMemoryArbitratorCapacityGb(8)
.setMemoryArbitratorReservedCapacityGb(0)
.setMemoryPoolInitCapacity(8L << 30)
.setMemoryPoolReservedCapacity(0)
.setMemoryPoolTransferCapacity(2L << 30)
.setMemoryReclaimWaitMs(300_000)
.setSpillerSpillPath("")
.setConcurrentLifespansPerTask(5)
.setMaxDriversPerTask(15)
.setOldTaskCleanupMs(false)
.setPrestoVersion("dummy.presto.version")
.setShuffleName("local")
.setRegisterTestFunctions(false)
.setEnableHttpServerAccessLog(true)
.setCoreOnAllocationFailureEnabled(false));
// Test explicit property mapping. Also makes sure properties returned by getAllProperties() covers full property list.
NativeExecutionSystemConfig expected = new NativeExecutionSystemConfig()
.setConcurrentLifespansPerTask(15)
.setEnableSerializedPageChecksum(false)
.setEnableVeloxExpressionLogging(true)
.setEnableVeloxTaskLogging(false)
.setHttpServerReusePort(false)
.setHttpServerBindToNodeInternalAddressOnlyEnabled(false)
.setHttpServerPort(8080)
.setHttpServerNumIoThreadsHwMultiplier(3.0)
.setHttpsServerPort(8081)
.setEnableHttpsCommunication(true)
.setHttpsCiphers("AES128-SHA")
.setHttpsCertPath("/tmp/non_existent.cert")
.setHttpsKeyPath("/tmp/non_existent.key")
.setExchangeHttpClientNumIoThreadsHwMultiplier(0.5)
.setAsyncDataCacheEnabled(true)
.setAsyncCacheSsdGb(1000)
.setConnectorNumIoThreadsHwMultiplier(1.0)
.setPrestoVersion("presto-version")
.setShutdownOnsetSec(30)
.setSystemMemoryGb(40)
.setQueryMemoryGb(new DataSize(20, DataSize.Unit.GIGABYTE))
.setUseMmapAllocator(false)
.setMemoryArbitratorKind("")
.setMemoryArbitratorCapacityGb(10)
.setMemoryArbitratorReservedCapacityGb(8)
.setMemoryPoolInitCapacity(7L << 30)
.setMemoryPoolReservedCapacity(6L << 30)
.setMemoryPoolTransferCapacity(1L << 30)
.setMemoryReclaimWaitMs(123123123)
.setSpillerSpillPath("dummy.spill.path")
.setMaxDriversPerTask(30)
.setOldTaskCleanupMs(true)
.setShuffleName("custom")
.setRegisterTestFunctions(true)
.setEnableHttpServerAccessLog(false)
.setCoreOnAllocationFailureEnabled(true);
Map<String, String> properties = expected.getAllProperties();
assertFullMapping(properties, expected);
}
@Test
public void testNativeExecutionNodeConfig()
{
// Test defaults
assertRecordedDefaults(ConfigAssertions.recordDefaults(NativeExecutionNodeConfig.class)
.setNodeEnvironment("spark-velox")
.setNodeLocation("/dummy/location")
.setNodeInternalAddress("127.0.0.1")
.setNodeId(0)
.setNodeMemoryGb(10));
// Test explicit property mapping. Also makes sure properties returned by getAllProperties() covers full property list.
NativeExecutionNodeConfig expected = new NativeExecutionNodeConfig()
.setNodeEnvironment("next-gen-spark")
.setNodeId(1)
.setNodeLocation("/extra/dummy/location")
.setNodeInternalAddress("1.1.1.1")
.setNodeMemoryGb(40);
Map<String, String> properties = expected.getAllProperties();
assertFullMapping(properties, expected);
}
@Test
public void testNativeExecutionConnectorConfig()
{
// Test defaults
assertRecordedDefaults(ConfigAssertions.recordDefaults(NativeExecutionConnectorConfig.class)
.setCacheEnabled(false)
.setMaxCacheSize(new DataSize(0, DataSize.Unit.MEGABYTE))
.setConnectorName("hive"));
// Test explicit property mapping. Also makes sure properties returned by getAllProperties() covers full property list.
NativeExecutionConnectorConfig expected = new NativeExecutionConnectorConfig()
.setConnectorName("custom")
.setMaxCacheSize(new DataSize(32, DataSize.Unit.MEGABYTE))
.setCacheEnabled(true);
Map<String, String> properties = new java.util.HashMap<>(expected.getAllProperties());
// Since the cache.max-cache-size requires to be size without the unit which to be compatible with the C++,
// here we convert the size from Long type (in string format) back to DataSize for comparison
properties.put("cache.max-cache-size", String.valueOf(new DataSize(Double.parseDouble(properties.get("cache.max-cache-size")), DataSize.Unit.MEGABYTE)));
assertFullMapping(properties, expected);
}
@Test
public void testFilePropertiesPopulator()
{
PrestoSparkWorkerProperty workerProperty = new PrestoSparkWorkerProperty(new NativeExecutionConnectorConfig(), new NativeExecutionNodeConfig(), new NativeExecutionSystemConfig(), new NativeExecutionVeloxConfig());
testPropertiesPopulate(workerProperty);
}
private void testPropertiesPopulate(PrestoSparkWorkerProperty workerProperty)
{
Path directory = null;
try {
directory = Files.createTempDirectory("presto");
Path configPropertiesPath = Paths.get(directory.toString(), "config.properties");
Path nodePropertiesPath = Paths.get(directory.toString(), "node.properties");
Path connectorPropertiesPath = Paths.get(directory.toString(), "catalog/hive.properties");
workerProperty.populateAllProperties(configPropertiesPath, nodePropertiesPath, connectorPropertiesPath);
verifyProperties(workerProperty.getSystemConfig().getAllProperties(), readPropertiesFromDisk(configPropertiesPath));
verifyProperties(workerProperty.getNodeConfig().getAllProperties(), readPropertiesFromDisk(nodePropertiesPath));
verifyProperties(workerProperty.getConnectorConfig().getAllProperties(), readPropertiesFromDisk(connectorPropertiesPath));
}
catch (Exception exception) {
exception.printStackTrace();
fail();
}
}
private Properties readPropertiesFromDisk(Path path)
{
Properties properties = new Properties();
File file = new File(path.toString());
try {
FileReader fileReader = new FileReader(file);
properties.load(fileReader);
fileReader.close();
return properties;
}
catch (IOException e) {
e.printStackTrace();
fail();
}
return null;
}
private void verifyProperties(Map<String, String> expected, Properties actual)
{
for (Map.Entry<String, String> entry : expected.entrySet()) {
assertEquals(entry.getValue(), actual.get(entry.getKey()));
}
for (Map.Entry<Object, Object> entry : actual.entrySet()) {
assertEquals((String) entry.getValue(), expected.get((String) entry.getKey()));
}
}
}