ElasticsearchIndexTest.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearch;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.lucene.LuceneSail;
import org.eclipse.rdf4j.sail.lucene.SearchDocument;
import org.eclipse.rdf4j.sail.lucene.SearchFields;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ClusterScope(numDataNodes = 1)
public class ElasticsearchIndexTest extends ESIntegTestCase {
private static final ValueFactory vf = SimpleValueFactory.getInstance();
public static final IRI CONTEXT_1 = vf.createIRI("urn:context1");
public static final IRI CONTEXT_2 = vf.createIRI("urn:context2");
public static final IRI CONTEXT_3 = vf.createIRI("urn:context3");
// create some objects that we will use throughout this test
IRI subject = vf.createIRI("urn:subj");
IRI subject2 = vf.createIRI("urn:subj2");
IRI predicate1 = vf.createIRI("urn:pred1");
IRI predicate2 = vf.createIRI("urn:pred2");
Literal object1 = vf.createLiteral("object1");
Literal object2 = vf.createLiteral("object2");
Literal object3 = vf.createLiteral("cats");
Literal object4 = vf.createLiteral("dogs");
Literal object5 = vf.createLiteral("chicken");
Statement statement11 = vf.createStatement(subject, predicate1, object1);
Statement statement12 = vf.createStatement(subject, predicate2, object2);
Statement statement21 = vf.createStatement(subject2, predicate1, object3);
Statement statement22 = vf.createStatement(subject2, predicate2, object4);
Statement statement23 = vf.createStatement(subject2, predicate2, object5);
Statement statementContext111 = vf.createStatement(subject, predicate1, object1, CONTEXT_1);
Statement statementContext121 = vf.createStatement(subject, predicate2, object2, CONTEXT_1);
Statement statementContext211 = vf.createStatement(subject2, predicate1, object3, CONTEXT_1);
Statement statementContext222 = vf.createStatement(subject2, predicate2, object4, CONTEXT_2);
Statement statementContext232 = vf.createStatement(subject2, predicate2, object5, CONTEXT_2);
TransportClient client;
ElasticsearchIndex index;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
client = (TransportClient) internalCluster().transportClient();
Properties sailProperties = new Properties();
sailProperties.put(ElasticsearchIndex.TRANSPORT_KEY, client.transportAddresses().get(0).toString());
sailProperties.put(ElasticsearchIndex.ELASTICSEARCH_KEY_PREFIX + "cluster.name",
client.settings().get("cluster.name"));
sailProperties.put(ElasticsearchIndex.INDEX_NAME_KEY, ElasticsearchTestUtils.getNextTestIndexName());
sailProperties.put(ElasticsearchIndex.WAIT_FOR_STATUS_KEY, "green");
sailProperties.put(ElasticsearchIndex.WAIT_FOR_NODES_KEY, ">=1");
index = new ElasticsearchIndex();
index.initialize(sailProperties);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return List.of(ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(ReindexPlugin.class);
}
@After
@Override
public void tearDown() throws Exception {
try {
index.shutDown();
} finally {
super.tearDown();
}
org.eclipse.rdf4j.common.concurrent.locks.Properties.setLockTrackingEnabled(false);
}
@Test
public void testAddStatement() throws IOException {
String predicate1Field = ElasticsearchIndex.toPropertyFieldName(SearchFields.getPropertyField(predicate1));
String predicate2Field = ElasticsearchIndex.toPropertyFieldName(SearchFields.getPropertyField(predicate2));
// add a statement to an index
index.begin();
index.addStatement(statement11);
index.commit();
// check that it arrived properly
long count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.get()
.getHits()
.getTotalHits().value;
assertEquals(1, count);
SearchHits hits = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.setQuery(QueryBuilders.termQuery(SearchFields.URI_FIELD_NAME, subject.toString()))
.execute()
.actionGet()
.getHits();
Iterator<SearchHit> docs = hits.iterator();
assertTrue(docs.hasNext());
SearchHit doc = docs.next();
Map<String, Object> fields = client.prepareGet(doc.getIndex(), doc.getType(), doc.getId())
.execute()
.actionGet()
.getSource();
assertEquals(subject.toString(), fields.get(SearchFields.URI_FIELD_NAME));
assertEquals(object1.getLabel(), fields.get(predicate1Field));
assertFalse(docs.hasNext());
// add another statement
index.begin();
index.addStatement(statement12);
index.commit();
// See if everything remains consistent. We must create a new
// IndexReader
// in order to be able to see the updates
count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.get()
.getHits()
.getTotalHits().value;
assertEquals(1, count); // #docs should *not* have increased
hits = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.setQuery(QueryBuilders.termQuery(SearchFields.URI_FIELD_NAME, subject.toString()))
.execute()
.actionGet()
.getHits();
docs = hits.iterator();
assertTrue(docs.hasNext());
doc = docs.next();
fields = client.prepareGet(doc.getIndex(), doc.getType(), doc.getId()).execute().actionGet().getSource();
assertEquals(subject.toString(), fields.get(SearchFields.URI_FIELD_NAME));
assertEquals(object1.getLabel(), fields.get(predicate1Field));
assertEquals(object2.getLabel(), fields.get(predicate2Field));
assertFalse(docs.hasNext());
// see if we can query for these literals
count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.setSource(new SearchSourceBuilder().size(0).query(QueryBuilders.queryStringQuery(object1.getLabel())))
.get()
.getHits()
.getTotalHits().value;
assertEquals(1, count);
count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.setSource(new SearchSourceBuilder().size(0).query(QueryBuilders.queryStringQuery(object2.getLabel())))
.get()
.getHits()
.getTotalHits().value;
assertEquals(1, count);
// remove the first statement
index.begin();
index.removeStatement(statement11);
index.commit();
// check that that statement is actually removed and that the other
// still
// exists
count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.get()
.getHits()
.getTotalHits().value;
assertEquals(1, count);
hits = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.setQuery(QueryBuilders.termQuery(SearchFields.URI_FIELD_NAME, subject.toString()))
.execute()
.actionGet()
.getHits();
docs = hits.iterator();
assertTrue(docs.hasNext());
doc = docs.next();
fields = client.prepareGet(doc.getIndex(), doc.getType(), doc.getId()).execute().actionGet().getSource();
assertEquals(subject.toString(), fields.get(SearchFields.URI_FIELD_NAME));
assertNull(fields.get(predicate1.toString()));
assertEquals(object2.getLabel(), fields.get(predicate2Field));
assertFalse(docs.hasNext());
// remove the other statement
index.begin();
index.removeStatement(statement12);
index.commit();
// check that there are no documents left (i.e. the last Document was
// removed completely, rather than its remaining triple removed)
count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.get()
.getHits()
.getTotalHits().value;
assertEquals(0, count);
}
@Test
public void testAddMultiple() throws Exception {
// add a statement to an index
HashSet<Statement> added = new HashSet<>();
HashSet<Statement> removed = new HashSet<>();
added.add(statement11);
added.add(statement12);
added.add(statement21);
added.add(statement22);
index.begin();
index.addRemoveStatements(added, removed);
index.commit();
// check that it arrived properly
long count = client.prepareSearch(index.getIndexName())
.setTypes(index.getTypes())
.get()
.getHits()
.getTotalHits().value;
assertEquals(2, count);
// check the documents
SearchDocument document = index.getDocuments(subject).iterator().next();
assertEquals(subject.toString(), document.getResource());
assertStatement(statement11, document);
assertStatement(statement12, document);
document = index.getDocuments(subject2).iterator().next();
assertEquals(subject2.toString(), document.getResource());
assertStatement(statement21, document);
assertStatement(statement22, document);
// check if the text field stores all added string values
HashSet<String> texts = new HashSet<>(List.of("cats", "dogs"));
// FIXME
// assertTexts(texts, document);
// add/remove one
added.clear();
removed.clear();
added.add(statement23);
removed.add(statement22);
index.begin();
index.addRemoveStatements(added, removed);
index.commit();
// check doc 2
document = index.getDocuments(subject2).iterator().next();
assertEquals(subject2.toString(), document.getResource());
assertStatement(statement21, document);
assertStatement(statement23, document);
assertNoStatement(statement22, document);
// check if the text field stores all added and no deleted string values
texts.remove("dogs");
texts.add("chicken");
// FIXME
// assertTexts(texts, document);
// TODO: check deletion of the rest
}
/**
* Contexts can only be tested in combination with a sail, as the triples have to be retrieved from the sail
*
* @throws Exception
*/
@Test
public void testContexts() throws Exception {
// add a sail
MemoryStore memoryStore = new MemoryStore();
// enable lock tracking
org.eclipse.rdf4j.common.concurrent.locks.Properties.setLockTrackingEnabled(true);
LuceneSail sail = new LuceneSail();
sail.setBaseSail(memoryStore);
sail.setLuceneIndex(index);
// create a Repository wrapping the LuceneSail
SailRepository repository = new SailRepository(sail);
// now add the statements through the repo
// add statements with context
try (SailRepositoryConnection connection = repository.getConnection()) {
connection.begin();
connection.add(statementContext111, statementContext111.getContext());
connection.add(statementContext121, statementContext121.getContext());
connection.add(statementContext211, statementContext211.getContext());
connection.add(statementContext222, statementContext222.getContext());
connection.add(statementContext232, statementContext232.getContext());
connection.commit();
// check if they are there
assertStatement(statementContext111);
assertStatement(statementContext121);
assertStatement(statementContext211);
assertStatement(statementContext222);
assertStatement(statementContext232);
// delete context 1
connection.begin();
connection.clear(CONTEXT_1);
connection.commit();
assertNoStatement(statementContext111);
assertNoStatement(statementContext121);
assertNoStatement(statementContext211);
assertStatement(statementContext222);
assertStatement(statementContext232);
} finally {
// close repo
repository.shutDown();
}
}
/**
* Contexts can only be tested in combination with a sail, as the triples have to be retrieved from the sail
*
* @throws Exception
*/
@Test
public void testContextsRemoveContext2() throws Exception {
// add a sail
MemoryStore memoryStore = new MemoryStore();
// enable lock tracking
org.eclipse.rdf4j.common.concurrent.locks.Properties.setLockTrackingEnabled(true);
LuceneSail sail = new LuceneSail();
sail.setBaseSail(memoryStore);
sail.setLuceneIndex(index);
// create a Repository wrapping the LuceneSail
SailRepository repository = new SailRepository(sail);
// now add the statements through the repo
// add statements with context
try (SailRepositoryConnection connection = repository.getConnection()) {
connection.begin();
connection.add(statementContext111, statementContext111.getContext());
connection.add(statementContext121, statementContext121.getContext());
connection.add(statementContext211, statementContext211.getContext());
connection.add(statementContext222, statementContext222.getContext());
connection.add(statementContext232, statementContext232.getContext());
connection.commit();
// check if they are there
assertStatement(statementContext111);
assertStatement(statementContext121);
assertStatement(statementContext211);
assertStatement(statementContext222);
assertStatement(statementContext232);
// delete context 2
connection.begin();
connection.clear(CONTEXT_2);
connection.commit();
assertStatement(statementContext111);
assertStatement(statementContext121);
assertStatement(statementContext211);
assertNoStatement(statementContext222);
assertNoStatement(statementContext232);
} finally {
// close repo
repository.shutDown();
}
}
@Test
public void testRejectedDatatypes() {
IRI STRING = vf.createIRI("http://www.w3.org/2001/XMLSchema#string");
IRI FLOAT = vf.createIRI("http://www.w3.org/2001/XMLSchema#float");
Literal literal1 = vf.createLiteral("hi there");
Literal literal2 = vf.createLiteral("hi there, too", STRING);
Literal literal3 = vf.createLiteral("1.0");
Literal literal4 = vf.createLiteral("1.0", FLOAT);
assertTrue("Is the first literal accepted?", index.accept(literal1));
assertTrue("Is the second literal accepted?", index.accept(literal2));
assertTrue("Is the third literal accepted?", index.accept(literal3));
assertFalse("Is the fourth literal accepted?", index.accept(literal4));
}
private void assertStatement(Statement statement) throws Exception {
SearchDocument document = index.getDocument(statement.getSubject(), statement.getContext());
if (document == null) {
fail("Missing document " + statement.getSubject());
}
assertStatement(statement, document);
}
private void assertNoStatement(Statement statement) throws Exception {
SearchDocument document = index.getDocument(statement.getSubject(), statement.getContext());
if (document == null) {
return;
}
assertNoStatement(statement, document);
}
private void assertStatement(Statement statement, SearchDocument document) {
List<String> fields = document.getProperty(SearchFields.getPropertyField(statement.getPredicate()));
assertNotNull("field " + statement.getPredicate() + " not found in document " + document, fields);
for (String f : fields) {
if (((Literal) statement.getObject()).getLabel().equals(f)) {
return;
}
}
fail("Statement not found in document " + statement);
}
private void assertNoStatement(Statement statement, SearchDocument document) {
List<String> fields = document.getProperty(SearchFields.getPropertyField(statement.getPredicate()));
if (fields == null) {
return;
}
for (String f : fields) {
if (((Literal) statement.getObject()).getLabel().equals(f)) {
fail("Statement should not be found in document " + statement);
}
}
}
/*
* private void assertTexts(Set<String> texts, Document document) { Set<String> toFind = new HashSet<String>(texts);
* Set<String> found = new HashSet<String>(); for(Field field : document.getFields(LuceneIndex.TEXT_FIELD_NAME)) {
* // is the field value expected and not yet been found? if(toFind.remove(field.stringValue())) { // add it to the
* found set // (it was already remove from the toFind list in the if clause) found.add(field.stringValue()); } else
* { assertEquals( "Was the text value '" + field.stringValue() + "' expected to exist?", false, true); } }
* for(String notFound : toFind) { assertEquals("Was the expected text value '" + notFound + "' found?", true,
* false); } }
*/
}