BinaryQueryResultParser.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.query.resultio.binary;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.BNODE_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.DATATYPE_LITERAL_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.EMPTY_ROW_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.ERROR_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.FORMAT_VERSION;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.LANG_LITERAL_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.MAGIC_NUMBER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.MALFORMED_QUERY_ERROR;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.NAMESPACE_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.NULL_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.PLAIN_LITERAL_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.QNAME_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.QUERY_EVALUATION_ERROR;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.REPEAT_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.TABLE_END_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.TRIPLE_RECORD_MARKER;
import static org.eclipse.rdf4j.query.resultio.binary.BinaryQueryResultConstants.URI_RECORD_MARKER;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.eclipse.rdf4j.common.io.IOUtil;
import org.eclipse.rdf4j.model.BNode;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.TupleQueryResultHandlerException;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.query.impl.ListBindingSet;
import org.eclipse.rdf4j.query.resultio.AbstractTupleQueryResultParser;
import org.eclipse.rdf4j.query.resultio.QueryResultParseException;
import org.eclipse.rdf4j.query.resultio.TupleQueryResultFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reader for the binary tuple result format. The format is explained in {@link BinaryQueryResultConstants}.
*/
public class BinaryQueryResultParser extends AbstractTupleQueryResultParser {
/*-----------*
* Variables *
*-----------*/
private DataInputStream in;
private int formatVersion;
private final CharsetDecoder charsetDecoder = StandardCharsets.UTF_8.newDecoder();
private static final Logger logger = LoggerFactory.getLogger(BinaryQueryResultParser.class);
private String[] namespaceArray = new String[32];
private static final int INVALID_CONTENT_LIMIT = 8 * 1024;
/*--------------*
* Constructors *
*--------------*/
/**
* Creates a new parser for the binary query result format that will use an instance of {@link SimpleValueFactory}
* to create Value objects.
*/
public BinaryQueryResultParser() {
super();
}
/**
* Creates a new parser for the binary query result format that will use the supplied ValueFactory to create Value
* objects.
*/
public BinaryQueryResultParser(ValueFactory valueFactory) {
super(valueFactory);
}
/*---------*
* Methods *
*---------*/
@Override
public final TupleQueryResultFormat getTupleQueryResultFormat() {
return TupleQueryResultFormat.BINARY;
}
@Override
public synchronized void parse(InputStream in)
throws IOException, QueryResultParseException, TupleQueryResultHandlerException {
if (in == null) {
throw new IllegalArgumentException("Input stream can not be 'null'");
}
this.in = new DataInputStream(in);
// Check magic number
byte[] magicNumber = IOUtil.readBytes(in, MAGIC_NUMBER.length);
if (!Arrays.equals(magicNumber, MAGIC_NUMBER)) {
throw new QueryResultParseException("File does not contain a binary RDF table result");
}
// Check format version (parser is backward-compatible with version 1 and
// version 2)
formatVersion = this.in.readInt();
if (formatVersion > FORMAT_VERSION && formatVersion < 1) {
throw new QueryResultParseException("Incompatible format version: " + formatVersion);
}
if (formatVersion == 2) {
// read format version 2 FLAG byte (ordered and distinct flags) and
// ignore them
this.in.readByte();
}
// Read column headers
int columnCount = this.in.readInt();
if (columnCount < 0) {
throw new QueryResultParseException("Illegal column count specified: " + columnCount);
}
List<String> columnHeaders = new ArrayList<>(columnCount);
for (int i = 0; i < columnCount; i++) {
columnHeaders.add(readString());
}
columnHeaders = Collections.unmodifiableList(columnHeaders);
if (handler != null) {
handler.startQueryResult(columnHeaders);
}
// Read value tuples
List<Value> currentTuple = new ArrayList<>(columnCount);
List<Value> previousTuple = Collections.nCopies(columnCount, (Value) null);
int recordTypeMarker = this.in.readByte();
while (recordTypeMarker != TABLE_END_RECORD_MARKER) {
if (recordTypeMarker == ERROR_RECORD_MARKER) {
processError();
} else if (recordTypeMarker == NAMESPACE_RECORD_MARKER) {
processNamespace();
} else if (recordTypeMarker == EMPTY_ROW_RECORD_MARKER) {
if (handler != null) {
handler.handleSolution(EmptyBindingSet.getInstance());
}
} else {
Value value = null;
switch (recordTypeMarker) {
case NULL_RECORD_MARKER:
break; // do nothing
case REPEAT_RECORD_MARKER:
value = previousTuple.get(currentTuple.size());
break;
case QNAME_RECORD_MARKER:
value = readQName();
break;
case URI_RECORD_MARKER:
value = readURI();
break;
case BNODE_RECORD_MARKER:
value = readBnode();
break;
case PLAIN_LITERAL_RECORD_MARKER:
case LANG_LITERAL_RECORD_MARKER:
case DATATYPE_LITERAL_RECORD_MARKER:
value = readLiteral(recordTypeMarker);
break;
case TRIPLE_RECORD_MARKER:
value = readTriple();
break;
default:
logger.error(extractInvalidContentAsString(recordTypeMarker));
throw new QueryResultParseException("Could not parse the query result.");
}
currentTuple.add(value);
if (currentTuple.size() == columnCount) {
previousTuple = Collections.unmodifiableList(currentTuple);
currentTuple = new ArrayList<>(columnCount);
if (handler != null) {
handler.handleSolution(new ListBindingSet(columnHeaders, previousTuple));
}
}
}
recordTypeMarker = this.in.readByte();
}
if (handler != null) {
handler.endQueryResult();
}
}
private void processError() throws IOException, QueryResultParseException {
byte errTypeFlag = in.readByte();
QueryErrorType errType;
if (errTypeFlag == MALFORMED_QUERY_ERROR) {
errType = QueryErrorType.MALFORMED_QUERY_ERROR;
} else if (errTypeFlag == QUERY_EVALUATION_ERROR) {
errType = QueryErrorType.QUERY_EVALUATION_ERROR;
} else {
throw new QueryResultParseException("Unknown error type: " + errTypeFlag);
}
String msg = readString();
// FIXME: is this the right thing to do upon encountering an error?
throw new QueryResultParseException(errType + ": " + msg);
}
private void processNamespace() throws IOException {
int namespaceID = in.readInt();
String namespace = readString();
if (namespaceID >= namespaceArray.length) {
int newSize = Math.max(namespaceID, namespaceArray.length * 2);
String[] newArray = new String[newSize];
System.arraycopy(namespaceArray, 0, newArray, 0, namespaceArray.length);
namespaceArray = newArray;
}
namespaceArray[namespaceID] = namespace;
}
private IRI readQName() throws IOException {
int nsID = in.readInt();
String localName = readString();
return valueFactory.createIRI(namespaceArray[nsID], localName);
}
private IRI readURI() throws IOException {
String uri = readString();
return valueFactory.createIRI(uri);
}
private BNode readBnode() throws IOException {
String bnodeID = readString();
return valueFactory.createBNode(bnodeID);
}
private Literal readLiteral(int recordTypeMarker) throws IOException, QueryResultParseException {
String label = readString();
if (recordTypeMarker == DATATYPE_LITERAL_RECORD_MARKER) {
IRI datatype;
int dtTypeMarker = in.readByte();
switch (dtTypeMarker) {
case QNAME_RECORD_MARKER:
datatype = readQName();
break;
case URI_RECORD_MARKER:
datatype = readURI();
break;
default:
throw new QueryResultParseException("Illegal record type marker for literal's datatype");
}
return valueFactory.createLiteral(label, datatype);
} else if (recordTypeMarker == LANG_LITERAL_RECORD_MARKER) {
String language = readString();
return valueFactory.createLiteral(label, language);
} else {
return valueFactory.createLiteral(label);
}
}
private String readString() throws IOException {
if (formatVersion == 1) {
return readStringV1();
} else {
return readStringV2();
}
}
/**
* Used when trying to parse some invalid content. Reads the remaining bytes as string in order to provide more
* user-friendly error message. Sets the max limit of the returned string, if its length > INVALID_CONTENT_LIMIT,
* the returned string is trimmed and "..." is appended in order to prevent displaying too long result.
*/
private String extractInvalidContentAsString(int recordTypeMarker) throws IOException {
byte[] remainingBytes = new byte[INVALID_CONTENT_LIMIT];
IOUtil.readBytes(in, remainingBytes);
ByteBuffer byteBuf = ByteBuffer.wrap(remainingBytes);
CharBuffer charBuf = charsetDecoder.decode(byteBuf);
String remainingSymbols = charBuf.toString();
String result = remainingSymbols;
if (remainingSymbols.contains("Exception")) {
result = remainingSymbols.substring(0, remainingSymbols.lastIndexOf(')') + 1);
}
return Character.toString(recordTypeMarker) + result + "...";
}
/**
* Reads a string from the version 1 format, i.e. in Java's {@link DataInput#modified-utf-8 Modified UTF-8}.
*/
private String readStringV1() throws IOException {
return in.readUTF();
}
/**
* Reads a string from the version 2 format. Strings are encoded as UTF-8 and are preceeded by a 32-bit integer
* (high byte first) specifying the length of the encoded string.
*/
private String readStringV2() throws IOException {
int stringLength = in.readInt();
byte[] encodedString = IOUtil.readBytes(in, stringLength);
if (encodedString.length != stringLength) {
throw new EOFException("Attempted to read " + stringLength + " bytes but no more than "
+ encodedString.length + " were available");
}
ByteBuffer byteBuf = ByteBuffer.wrap(encodedString);
CharBuffer charBuf = charsetDecoder.decode(byteBuf);
return charBuf.toString();
}
private Triple readTriple() throws IOException {
Value subject = readDirectValue();
if (!(subject instanceof Resource)) {
throw new IOException("Unexpected value type: " + subject);
}
Value predicate = readDirectValue();
if (!(predicate instanceof IRI)) {
throw new IOException("Unexpected value type: " + predicate);
}
Value object = readDirectValue();
return valueFactory.createTriple((Resource) subject, (IRI) predicate, object);
}
private Value readDirectValue() throws IOException {
int recordTypeMarker = this.in.readByte();
switch (recordTypeMarker) {
case NAMESPACE_RECORD_MARKER:
processNamespace();
return readDirectValue();
case QNAME_RECORD_MARKER:
return readQName();
case URI_RECORD_MARKER:
return readURI();
case BNODE_RECORD_MARKER:
return readBnode();
case PLAIN_LITERAL_RECORD_MARKER:
case LANG_LITERAL_RECORD_MARKER:
case DATATYPE_LITERAL_RECORD_MARKER:
return readLiteral(recordTypeMarker);
case TRIPLE_RECORD_MARKER:
return readTriple();
default:
throw new IOException("Unexpected record type: " + recordTypeMarker);
}
}
}