TestNessieMultiBranching.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.nessie;
import com.facebook.presto.Session;
import com.facebook.presto.iceberg.IcebergQueryRunner;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.testing.containers.NessieContainer;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.http.HttpClientBuilder;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.LogResponse;
import org.projectnessie.model.Reference;
import org.projectnessie.model.Tag;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.List;
import static com.facebook.presto.iceberg.CatalogType.NESSIE;
import static com.facebook.presto.iceberg.nessie.NessieTestUtil.nessieConnectorProperties;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder;
import static org.assertj.core.api.Assertions.assertThat;
@Test(singleThreaded = true)
public class TestNessieMultiBranching
extends AbstractTestQueryFramework
{
private NessieContainer nessieContainer;
private NessieApiV1 nessieApiV1;
@BeforeClass
public void init()
throws Exception
{
nessieContainer = NessieContainer.builder().build();
nessieContainer.start();
nessieApiV1 = HttpClientBuilder.builder().withUri(nessieContainer.getRestApiUri()).build(NessieApiV1.class);
super.init();
}
@AfterClass
public void tearDown()
{
if (nessieContainer != null) {
nessieContainer.stop();
}
if (nessieApiV1 != null) {
nessieApiV1.close();
}
}
@AfterMethod
public void resetData()
throws NessieNotFoundException, NessieConflictException
{
Branch defaultBranch = nessieApiV1.getDefaultBranch();
for (Reference r : nessieApiV1.getAllReferences().get().getReferences()) {
if (r instanceof Branch && !r.getName().equals(defaultBranch.getName())) {
nessieApiV1.deleteBranch().branch((Branch) r).delete();
}
if (r instanceof Tag) {
nessieApiV1.deleteTag().tag((Tag) r).delete();
}
}
}
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return IcebergQueryRunner.builder()
.setCatalogType(NESSIE)
.setExtraConnectorProperties(nessieConnectorProperties(nessieContainer.getRestApiUri()))
.build().getQueryRunner();
}
@Test
public void testWithUnknownBranch()
{
assertQueryFails(sessionOnRef("unknownRef"), "CREATE SCHEMA nessie_namespace", ".*Nessie ref 'unknownRef' does not exist.*");
}
@Test
public void testNamespaceVisibility()
throws NessieConflictException, NessieNotFoundException
{
Reference one = createBranch("branchOne");
Reference two = createBranch("branchTwo");
Session sessionOne = sessionOnRef(one.getName());
Session sessionTwo = sessionOnRef(two.getName());
assertQuerySucceeds(sessionOne, "CREATE SCHEMA ns_one");
assertThat(computeActual(sessionOne, "SHOW SCHEMAS FROM iceberg LIKE 'ns_one'").getMaterializedRows()).hasSize(1);
assertQuerySucceeds(sessionTwo, "CREATE SCHEMA ns_two");
assertThat(computeActual(sessionTwo, "SHOW SCHEMAS FROM iceberg LIKE 'ns_two'").getMaterializedRows()).hasSize(1);
// ns_two shouldn't be visible on branchOne
assertThat(computeActual(sessionOne, "SHOW SCHEMAS FROM iceberg LIKE 'ns_two'").getMaterializedRows()).isEmpty();
// ns_one shouldn't be visible on branchTwo
assertThat(computeActual(sessionTwo, "SHOW SCHEMAS FROM iceberg LIKE 'ns_one'").getMaterializedRows()).isEmpty();
}
@Test
public void testTableDataVisibility()
throws NessieConflictException, NessieNotFoundException
{
Reference ref = createBranch("tableDataVisibility");
Session session = sessionOnRef(ref.getName());
assertQuerySucceeds(session, "CREATE SCHEMA namespace_one");
assertQuerySucceeds(session, "CREATE TABLE namespace_one.tbl (a int)");
assertQuerySucceeds(session, "INSERT INTO namespace_one.tbl (a) VALUES (1)");
assertQuerySucceeds(session, "INSERT INTO namespace_one.tbl (a) VALUES (2)");
Reference one = createBranch("branchOneWithTable", ref.getName());
Reference two = createBranch("branchTwoWithTable", ref.getName());
Session sessionOne = sessionOnRef(one.getName());
Session sessionTwo = sessionOnRef(two.getName());
assertQuerySucceeds(sessionOne, "INSERT INTO namespace_one.tbl (a) VALUES (3)");
assertQuerySucceeds(sessionTwo, "INSERT INTO namespace_one.tbl (a) VALUES (5)");
assertQuerySucceeds(sessionTwo, "INSERT INTO namespace_one.tbl (a) VALUES (6)");
// tableDataVisibility branch should still have 2 entries
assertThat(computeScalar(session, "SELECT count(*) FROM namespace_one.tbl")).isEqualTo(2L);
MaterializedResult rows = computeActual(session, "SELECT * FROM namespace_one.tbl");
assertThat(rows.getMaterializedRows()).hasSize(2);
assertEqualsIgnoreOrder(rows.getMaterializedRows(), resultBuilder(getSession(), rows.getTypes()).row(1).row(2).build().getMaterializedRows());
// there should be 3 entries on this branch
assertThat(computeScalar(sessionOne, "SELECT count(*) FROM namespace_one.tbl")).isEqualTo(3L);
rows = computeActual(sessionOne, "SELECT * FROM namespace_one.tbl");
assertThat(rows.getMaterializedRows()).hasSize(3);
assertEqualsIgnoreOrder(rows.getMaterializedRows(), resultBuilder(sessionOne, rows.getTypes()).row(1).row(2).row(3).build().getMaterializedRows());
// and 4 entries on this branch
assertThat(computeScalar(sessionTwo, "SELECT count(*) FROM namespace_one.tbl")).isEqualTo(4L);
rows = computeActual(sessionTwo, "SELECT * FROM namespace_one.tbl");
assertThat(rows.getMaterializedRows()).hasSize(4);
assertEqualsIgnoreOrder(rows.getMaterializedRows(), resultBuilder(sessionTwo, rows.getTypes()).row(1).row(2).row(5).row(6).build().getMaterializedRows());
// retrieve the second to the last commit hash and query the table with that hash
List<LogResponse.LogEntry> logEntries = nessieApiV1.getCommitLog().refName(two.getName()).get().getLogEntries();
assertThat(logEntries).isNotEmpty();
String hash = logEntries.get(1).getCommitMeta().getHash();
Session sessionTwoAtHash = sessionOnRef(two.getName(), hash);
// at this hash there were only 3 rows
assertThat(computeScalar(sessionTwoAtHash, "SELECT count(*) FROM namespace_one.tbl")).isEqualTo(3L);
rows = computeActual(sessionTwoAtHash, "SELECT * FROM namespace_one.tbl");
assertThat(rows.getMaterializedRows()).hasSize(3);
assertEqualsIgnoreOrder(rows.getMaterializedRows(), resultBuilder(sessionTwoAtHash, rows.getTypes()).row(1).row(2).row(5).build().getMaterializedRows());
}
private Session sessionOnRef(String reference)
{
return Session.builder(getSession()).setCatalogSessionProperty("iceberg", "nessie_reference_name", reference).build();
}
private Session sessionOnRef(String reference, String hash)
{
return Session.builder(getSession()).setCatalogSessionProperty("iceberg", "nessie_reference_name", reference).setCatalogSessionProperty("iceberg", "nessie_reference_hash", hash).build();
}
private Reference createBranch(String branchName)
throws NessieConflictException, NessieNotFoundException
{
return createBranch(branchName, "main");
}
private Reference createBranch(String branchName, String sourceBranch)
throws NessieConflictException, NessieNotFoundException
{
Reference source = nessieApiV1.getReference().refName(sourceBranch).get();
return nessieApiV1.createReference().sourceRefName(source.getName()).reference(Branch.of(branchName, source.getHash())).create();
}
}