ElasticsearchHelper.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearchstore;
import java.util.Arrays;
import java.util.Iterator;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
class ElasticsearchHelper {
static CloseableIteration<SearchHit> getScrollingIterator(QueryBuilder queryBuilder,
Client client, String index, int scrollTimeout) {
return new CloseableIteration<>() {
Iterator<SearchHit> items;
String scrollId;
long itemsRetrieved = 0;
final int size = 1000;
{
SearchResponse scrollResp = client.prepareSearch(index)
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
.setScroll(scrollTimeout + "ms")
.setQuery(queryBuilder)
.setSize(size)
.get();
items = Arrays.asList(scrollResp.getHits().getHits()).iterator();
scrollId = scrollResp.getScrollId();
}
SearchHit next;
boolean empty = false;
private void calculateNext() {
if (next != null) {
return;
}
if (empty) {
return;
}
if (items.hasNext()) {
next = items.next();
} else {
if (itemsRetrieved < size - 2) {
// the count of our prevous scroll was lower than requested size, so nothing more to get now.
scrollIsEmpty();
} else {
SearchResponse scrollResp = client.prepareSearchScroll(scrollId)
.setScroll(scrollTimeout + "ms")
.execute()
.actionGet();
items = Arrays.asList(scrollResp.getHits().getHits()).iterator();
scrollId = scrollResp.getScrollId();
if (items.hasNext()) {
next = items.next();
} else {
scrollIsEmpty();
}
itemsRetrieved = 0;
}
}
}
private void scrollIsEmpty() {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest);
scrollId = null;
empty = true;
}
@Override
public boolean hasNext() {
calculateNext();
return next != null;
}
@Override
public SearchHit next() {
calculateNext();
SearchHit temp = next;
next = null;
itemsRetrieved++;
return temp;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
if (scrollId != null) {
scrollIsEmpty();
}
}
};
}
}