TestMemoryWorkerCrash.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.memory;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import org.testng.annotations.Test;
import java.util.Map;
import static com.facebook.airlift.testing.Assertions.assertLessThan;
import static io.airlift.units.Duration.nanosSince;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@Test(singleThreaded = true)
public class TestMemoryWorkerCrash
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Map<String, String> coordinatorProperties = ImmutableMap.of("scale-writers", "false", "redistribute-writes", "true");
return MemoryQueryRunner.createQueryRunner(coordinatorProperties, ImmutableMap.of());
}
@Test
public void tableAccessAfterWorkerCrash()
throws Exception
{
getQueryRunner().execute("CREATE TABLE test_nation as SELECT * FROM nation");
assertQuery("SELECT * FROM test_nation ORDER BY nationkey", "SELECT * FROM nation ORDER BY nationkey");
closeWorker();
assertQueryFails("SELECT * FROM test_nation ORDER BY nationkey", "No nodes available to run query");
getQueryRunner().execute("INSERT INTO test_nation SELECT * FROM tpch.tiny.nation");
assertQueryFails("SELECT * FROM test_nation ORDER BY nationkey", "No nodes available to run query");
getQueryRunner().execute("CREATE TABLE test_region as SELECT * FROM tpch.tiny.region");
assertQuery("SELECT * FROM test_region ORDER BY regionkey", "SELECT * FROM region ORDER BY regionkey");
}
private void closeWorker()
throws Exception
{
int nodeCount = getNodeCount();
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
TestingPrestoServer worker = queryRunner.getServers().stream()
.filter(server -> !server.isCoordinator())
.findAny()
.orElseThrow(() -> new IllegalStateException("No worker nodes"));
worker.close();
waitForNodes(nodeCount - 1);
}
private void waitForNodes(int numberOfNodes)
throws InterruptedException
{
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
long start = System.nanoTime();
while (queryRunner.getCoordinator().refreshNodes().getActiveNodes().size() < numberOfNodes) {
assertLessThan(nanosSince(start), new Duration(10, SECONDS));
MILLISECONDS.sleep(10);
}
}
}