RemoteRepositoryTest.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.performance;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.eclipse.rdf4j.federated.util.FedXUtil;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryResult;
import org.eclipse.rdf4j.repository.http.HTTPRepository;
public class RemoteRepositoryTest {
private static final int MAX_INSTANCES = 4000;
private static final int N_QUERIES = 4000;
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(30);
Repository repo = new HTTPRepository("http://10.212.10.29:8081/openrdf-sesame", "drugbank");
repo.init();
RepositoryConnection conn = repo.getConnection();
System.out.println("Retrieving instances...");
List<IRI> instances = retrieveInstances(conn,
FedXUtil.iri("http://www4.wiwiss.fu-berlin.de/drugbank/resource/drugbank/drugs"));
System.out.println("Retrieved " + instances.size() + " instances");
System.out.println("Performing queries to retrieve outgoing statements for " + N_QUERIES + " instances.");
List<Future<?>> tasks = new ArrayList<>();
long start = System.currentTimeMillis();
int count = 0;
for (final IRI instance : instances) {
if (++count > N_QUERIES) {
break;
}
// b) multithreaded
final RepositoryConnection _conn = conn;
Future<?> task = executor.submit(() -> {
try {
runQuery(_conn, instance);
} catch (Exception e) {
System.err.println("Error while performing query evaluation for instance "
+ instance.stringValue() + ": " + e.getMessage());
}
});
tasks.add(task);
}
// wait for all tasks being finished
for (Future<?> task : tasks) {
task.get();
}
System.out.println("Done evaluating queries. Duration " + (System.currentTimeMillis() - start) + "ms");
repo.shutDown();
executor.shutdown();
System.out.println("Done.");
}
private static List<IRI> retrieveInstances(RepositoryConnection conn, IRI type) {
try (RepositoryResult<Statement> qres = conn.getStatements(null, RDF.TYPE, type, false)) {
return qres
.stream()
.limit(MAX_INSTANCES)
.map(Statement::getObject)
.map(s -> (IRI) s)
.collect(Collectors.toList());
}
}
private static long runQuery(RepositoryConnection conn, IRI instance) {
TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL,
"SELECT * WHERE { <" + instance.stringValue() + "> ?p ?o }");
try (TupleQueryResult res = query.evaluate()) {
return res.stream().count();
}
}
}