FileIO.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.memory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.eclipse.rdf4j.common.io.IOUtil;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.BNode;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.util.Literals;
import org.eclipse.rdf4j.rio.helpers.RDFStarUtil;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.base.SailDataset;
import org.eclipse.rdf4j.sail.base.SailSink;
import org.eclipse.rdf4j.sail.memory.model.MemIRI;
import org.eclipse.rdf4j.sail.memory.model.MemResource;
import org.eclipse.rdf4j.sail.memory.model.MemValue;
import org.eclipse.rdf4j.sail.memory.model.MemValueFactory;
/**
* Functionality to read and write MemoryStore to/from a file.
*
* @author Arjohn Kampman
*/
class FileIO {
/*-----------*
* Constants *
*-----------*/
/**
* Magic number for Binary Memory Store Files
*/
private static final byte[] MAGIC_NUMBER = new byte[] { 'B', 'M', 'S', 'F' };
/**
* The version number of the current format.
*/
// Version 1: initial version
// Version 2: don't use read/writeUTF() to remove 64k limit on strings,
// removed dummy "up-to-date status" boolean for namespace records
// Version 3: introduced RDF-star triple record type
private static final int BMSF_VERSION = 3;
/* RECORD TYPES */
public static final int NAMESPACE_MARKER = 1;
public static final int EXPL_TRIPLE_MARKER = 2;
public static final int EXPL_QUAD_MARKER = 3;
public static final int INF_TRIPLE_MARKER = 4;
public static final int INF_QUAD_MARKER = 5;
public static final int URI_MARKER = 6;
public static final int BNODE_MARKER = 7;
public static final int PLAIN_LITERAL_MARKER = 8;
public static final int LANG_LITERAL_MARKER = 9;
public static final int DATATYPE_LITERAL_MARKER = 10;
public static final int RDFSTAR_TRIPLE_MARKER = 11;
public static final int EOF_MARKER = 127;
/*-----------*
* Variables *
*-----------*/
private final MemValueFactory vf;
private final CharsetEncoder charsetEncoder = StandardCharsets.UTF_8.newEncoder();
private final CharsetDecoder charsetDecoder = StandardCharsets.UTF_8.newDecoder();
private int formatVersion;
/*--------------*
* Constructors *
*--------------*/
public FileIO(MemValueFactory vf) {
this.vf = vf;
}
/*---------*
* Methods *
*---------*/
public synchronized void write(SailDataset explicit, SailDataset inferred, File syncFile, File dataFile)
throws IOException, SailException {
write(explicit, inferred, syncFile);
// prefer atomic renameTo operations
boolean renamed = syncFile.renameTo(dataFile);
if (!renamed) {
// tolerate renameTo that does not work if destination exists
if (syncFile.exists() && dataFile.exists()) {
dataFile.delete();
renamed = syncFile.renameTo(dataFile);
}
}
if (!renamed) {
String path = syncFile.getAbsolutePath();
String name = dataFile.getName();
throw new IOException("Could not rename " + path + " to " + name);
}
}
private void write(SailDataset explicit, SailDataset inferred, File dataFile) throws IOException, SailException {
try (OutputStream out = Files.newOutputStream(dataFile.toPath())) {
// Write header
out.write(MAGIC_NUMBER);
out.write(BMSF_VERSION);
out.flush();
// The rest of the data is GZIP-compressed
try (DataOutputStream dataOut = new DataOutputStream(new GZIPOutputStream(out))) {
writeNamespaces(explicit, dataOut);
writeStatements(explicit, inferred, dataOut);
dataOut.writeByte(EOF_MARKER);
}
}
}
public synchronized void read(File dataFile, SailSink explicit, SailSink inferred)
throws IOException, SailException {
try (InputStream in = Files.newInputStream(dataFile.toPath())) {
byte[] magicNumber = IOUtil.readBytes(in, MAGIC_NUMBER.length);
if (!Arrays.equals(magicNumber, MAGIC_NUMBER)) {
throw new IOException("File is not a binary MemoryStore file");
}
formatVersion = in.read();
if (formatVersion > BMSF_VERSION || formatVersion < 1) {
throw new IOException("Incompatible format version: " + formatVersion);
}
// The rest of the data is GZIP-compressed
try (DataInputStream dataIn = new DataInputStream(new GZIPInputStream(in))) {
int recordTypeMarker;
while ((recordTypeMarker = dataIn.readByte()) != EOF_MARKER) {
switch (recordTypeMarker) {
case NAMESPACE_MARKER:
readNamespace(dataIn, explicit);
break;
case EXPL_TRIPLE_MARKER:
readStatement(false, true, dataIn, explicit, inferred);
break;
case EXPL_QUAD_MARKER:
readStatement(true, true, dataIn, explicit, inferred);
break;
case INF_TRIPLE_MARKER:
readStatement(false, false, dataIn, explicit, inferred);
break;
case INF_QUAD_MARKER:
readStatement(true, false, dataIn, explicit, inferred);
break;
default:
throw new IOException("Invalid record type marker: " + recordTypeMarker);
}
}
}
}
}
private void writeNamespaces(SailDataset store, DataOutputStream dataOut) throws IOException, SailException {
try (CloseableIteration<? extends Namespace> iter = store.getNamespaces()) {
while (iter.hasNext()) {
Namespace ns = iter.next();
dataOut.writeByte(NAMESPACE_MARKER);
writeString(ns.getPrefix(), dataOut);
writeString(ns.getName(), dataOut);
}
}
}
private void readNamespace(DataInputStream dataIn, SailSink store) throws IOException, SailException {
String prefix = readString(dataIn);
String name = readString(dataIn);
if (formatVersion <= 1) {
// the up-to-date status is no longer relevant
dataIn.readBoolean();
}
store.setNamespace(prefix, name);
}
private void writeStatements(final SailDataset explicit, SailDataset inferred, DataOutputStream dataOut)
throws IOException, SailException {
// write explicit only statements
writeStatement(explicit.getStatements(null, null, null), EXPL_TRIPLE_MARKER, EXPL_QUAD_MARKER, dataOut);
// write inferred only statements
writeStatement(inferred.getStatements(null, null, null), INF_TRIPLE_MARKER, INF_QUAD_MARKER, dataOut);
}
public void writeStatement(CloseableIteration<? extends Statement> stIter, int tripleMarker,
int quadMarker, DataOutputStream dataOut) throws IOException, SailException {
try (stIter) {
while (stIter.hasNext()) {
Statement st = stIter.next();
Resource context = st.getContext();
if (context == null) {
dataOut.writeByte(tripleMarker);
} else {
dataOut.writeByte(quadMarker);
}
writeValue(st.getSubject(), dataOut);
writeValue(st.getPredicate(), dataOut);
writeValue(st.getObject(), dataOut);
if (context != null) {
writeValue(context, dataOut);
}
}
}
}
private void readStatement(boolean hasContext, boolean isExplicit, DataInputStream dataIn, SailSink explicit,
SailSink inferred) throws IOException, ClassCastException, SailException {
MemResource memSubj = (MemResource) readValue(dataIn);
MemIRI memPred = (MemIRI) readValue(dataIn);
MemValue memObj = (MemValue) readValue(dataIn);
MemResource memContext = null;
if (hasContext) {
memContext = (MemResource) readValue(dataIn);
}
if (isExplicit) {
explicit.approve(memSubj, memPred, memObj, memContext);
} else {
inferred.approve(memSubj, memPred, memObj, memContext);
}
}
private void writeValue(Value value, DataOutputStream dataOut) throws IOException {
if (value.isIRI()) {
dataOut.writeByte(URI_MARKER);
writeString(((IRI) value).stringValue(), dataOut);
} else if (value.isBNode()) {
dataOut.writeByte(BNODE_MARKER);
writeString(((BNode) value).getID(), dataOut);
} else if (value.isLiteral()) {
Literal lit = (Literal) value;
String label = lit.getLabel();
IRI datatype = lit.getDatatype();
if (Literals.isLanguageLiteral(lit)) {
dataOut.writeByte(LANG_LITERAL_MARKER);
writeString(label, dataOut);
writeString(lit.getLanguage().get(), dataOut);
} else {
dataOut.writeByte(DATATYPE_LITERAL_MARKER);
writeString(label, dataOut);
writeValue(datatype, dataOut);
}
} else if (value.isTriple()) {
dataOut.writeByte(RDFSTAR_TRIPLE_MARKER);
writeValue(RDFStarUtil.toRDFEncodedValue(value), dataOut);
} else {
throw new IllegalArgumentException("unexpected value type: " + value.getClass());
}
}
private Value readValue(DataInputStream dataIn) throws IOException, ClassCastException {
int valueTypeMarker = dataIn.readByte();
if (valueTypeMarker == URI_MARKER) {
String uriString = readString(dataIn);
return vf.createIRI(uriString);
} else if (valueTypeMarker == BNODE_MARKER) {
String bnodeID = readString(dataIn);
return vf.createBNode(bnodeID);
} else if (valueTypeMarker == PLAIN_LITERAL_MARKER) {
String label = readString(dataIn);
return vf.createLiteral(label);
} else if (valueTypeMarker == LANG_LITERAL_MARKER) {
String label = readString(dataIn);
String language = readString(dataIn);
return vf.createLiteral(label, language);
} else if (valueTypeMarker == DATATYPE_LITERAL_MARKER) {
String label = readString(dataIn);
IRI datatype = (IRI) readValue(dataIn);
return vf.createLiteral(label, datatype);
} else if (valueTypeMarker == RDFSTAR_TRIPLE_MARKER) {
IRI rdfStarEncodedTriple = (IRI) readValue(dataIn);
Triple triple = (Triple) RDFStarUtil.fromRDFEncodedValue(rdfStarEncodedTriple, vf);
return vf.getOrCreateMemTriple(triple);
} else {
throw new IOException("Invalid value type marker: " + valueTypeMarker);
}
}
private void writeString(String s, DataOutputStream dataOut) throws IOException {
ByteBuffer byteBuf = charsetEncoder.encode(CharBuffer.wrap(s));
dataOut.writeInt(byteBuf.remaining());
dataOut.write(byteBuf.array(), 0, byteBuf.remaining());
}
private String readString(DataInputStream dataIn) throws IOException {
if (formatVersion == 1) {
return readStringV1(dataIn);
} else {
return readStringV2(dataIn);
}
}
/**
* Reads a string from the version 1 format, i.e. in Java's {@link DataInput#modified-utf-8 Modified UTF-8}.
*/
private String readStringV1(DataInputStream dataIn) throws IOException {
return dataIn.readUTF();
}
/**
* Reads a string from the version 2 format. Strings are encoded as UTF-8 and are preceeded by a 32-bit integer
* (high byte first) specifying the length of the encoded string.
*/
private String readStringV2(DataInputStream dataIn) throws IOException {
int stringLength = dataIn.readInt();
byte[] encodedString = IOUtil.readBytes(dataIn, stringLength);
if (encodedString.length != stringLength) {
throw new EOFException("Attempted to read " + stringLength + " bytes but no more than "
+ encodedString.length + " were available");
}
ByteBuffer byteBuf = ByteBuffer.wrap(encodedString);
CharBuffer charBuf = charsetDecoder.decode(byteBuf);
return charBuf.toString();
}
}