IndexedCsvReader.java
package de.siegmar.fastcsv.reader;
import static de.siegmar.fastcsv.util.Util.containsDupe;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicLong;
import de.siegmar.fastcsv.util.Preconditions;
import de.siegmar.fastcsv.util.Util;
/**
* CSV reader implementation for indexed based access.
* <p>
* If no prebuilt index passed in (via {@link IndexedCsvReaderBuilder#index(CsvIndex)}) the constructor will initiate
* indexing the file.
* This process is optimized on performance and low memory usage ��� no CSV data is stored in memory.
* The current status can be monitored via {@link IndexedCsvReaderBuilder#statusListener(StatusListener)}.
* <p>
* This class is thread-safe.
* <p>
* Example use:
* {@snippet :
* try (IndexedCsvReader<CsvRecord> csv = IndexedCsvReader.builder().ofCsvRecord(file)) {
* CsvIndex index = csv.getIndex();
* int lastPage = index.getPageCount() - 1;
* List<CsvRecord> csvRecords = csv.readPage(lastPage);
* }
*}
*
* @param <T> the type of the CSV record.
*/
@SuppressWarnings({"checkstyle:ClassFanOutComplexity", "checkstyle:ClassDataAbstractionCoupling"})
public final class IndexedCsvReader<T> implements Closeable {
private final Path file;
private final Charset charset;
private final char fieldSeparator;
private final char quoteCharacter;
private final CommentStrategy commentStrategy;
private final char commentCharacter;
private final boolean acceptCharsAfterQuotes;
private final int pageSize;
private final RandomAccessFile raf;
private final CsvCallbackHandler<T> csvRecordHandler;
private final CsvParser csvParser;
private final CsvIndex csvIndex;
@SuppressWarnings("checkstyle:ParameterNumber")
IndexedCsvReader(final Path file, final Charset defaultCharset,
final char fieldSeparator, final char quoteCharacter,
final CommentStrategy commentStrategy, final char commentCharacter,
final boolean acceptCharsAfterQuotes, final int pageSize,
final CsvCallbackHandler<T> csvRecordHandler, final CsvIndex csvIndex,
final StatusListener statusListener)
throws IOException {
Preconditions.checkArgument(!containsDupe(fieldSeparator, quoteCharacter, commentCharacter),
"Control characters must differ"
+ " (fieldSeparator=%s, quoteCharacter=%s, commentCharacter=%s)",
fieldSeparator, quoteCharacter, commentCharacter);
this.file = file;
this.fieldSeparator = fieldSeparator;
this.quoteCharacter = quoteCharacter;
this.commentStrategy = commentStrategy;
this.commentCharacter = commentCharacter;
this.acceptCharsAfterQuotes = acceptCharsAfterQuotes;
this.pageSize = pageSize;
this.csvRecordHandler = csvRecordHandler;
// Detect potential BOM and use the detected charset
final Optional<BomHeader> optionalBomHeader = detectBom(file, statusListener);
final int bomHeaderLength;
if (optionalBomHeader.isEmpty()) {
charset = defaultCharset;
bomHeaderLength = 0;
} else {
final var bomHeader = optionalBomHeader.get();
charset = bomHeader.getCharset();
bomHeaderLength = bomHeader.getLength();
}
if (csvIndex != null) {
this.csvIndex = validatePrebuiltIndex(file, bomHeaderLength,
(byte) fieldSeparator, (byte) quoteCharacter, commentStrategy, (byte) commentCharacter,
csvIndex);
} else {
this.csvIndex = buildIndex(bomHeaderLength, statusListener);
}
raf = new RandomAccessFile(file.toFile(), "r");
csvParser = new CsvParser(fieldSeparator, quoteCharacter, commentStrategy, commentCharacter,
acceptCharsAfterQuotes, csvRecordHandler,
new InputStreamReader(new RandomAccessFileInputStream(raf), charset));
}
private static Optional<BomHeader> detectBom(final Path file, final StatusListener statusListener)
throws IOException {
try {
return BomUtil.detectCharset(file);
} catch (final IOException e) {
statusListener.onError(e);
throw e;
}
}
private static CsvIndex validatePrebuiltIndex(final Path file, final int bomHeaderLength, final byte fieldSeparator,
final byte quoteCharacter, final CommentStrategy commentStrategy,
final byte commentCharacter, final CsvIndex csvIndex)
throws IOException {
final var expectedSignature = new StringJoiner(", ")
.add("bomHeaderLength=" + bomHeaderLength)
.add("fileSize=" + Files.size(file))
.add("fieldSeparator=" + fieldSeparator)
.add("quoteCharacter=" + quoteCharacter)
.add("commentStrategy=" + commentStrategy)
.add("commentCharacter=" + commentCharacter)
.toString();
final var actualSignature = new StringJoiner(", ")
.add("bomHeaderLength=" + csvIndex.getBomHeaderLength())
.add("fileSize=" + csvIndex.getFileSize())
.add("fieldSeparator=" + csvIndex.getFieldSeparator())
.add("quoteCharacter=" + csvIndex.getQuoteCharacter())
.add("commentStrategy=" + csvIndex.getCommentStrategy())
.add("commentCharacter=" + csvIndex.getCommentCharacter())
.toString();
Preconditions.checkArgument(expectedSignature.equals(actualSignature),
"Index does not match! Expected: %s; Actual: %s",
expectedSignature, actualSignature);
return csvIndex;
}
@SuppressWarnings({"checkstyle:IllegalCatch", "PMD.AvoidCatchingThrowable"})
private CsvIndex buildIndex(final int bomHeaderLength, final StatusListener statusListener) throws IOException {
final var listener = new ScannerListener(statusListener);
try (var channel = Files.newByteChannel(file, StandardOpenOption.READ)) {
statusListener.onInit(channel.size());
new CsvScanner(channel,
bomHeaderLength,
(byte) fieldSeparator,
(byte) quoteCharacter,
commentStrategy,
(byte) commentCharacter,
listener
).scan();
final var idx = new CsvIndex(bomHeaderLength, channel.size(), (byte) fieldSeparator, (byte) quoteCharacter,
commentStrategy, (byte) commentCharacter,
listener.recordCounter.get(), listener.pageOffsets);
statusListener.onComplete();
return idx;
} catch (final Throwable t) {
statusListener.onError(t);
throw t;
}
}
/**
* Constructs a {@link IndexedCsvReaderBuilder} to configure and build instances of
* this class.
*
* @return a new {@link IndexedCsvReaderBuilder} instance.
*/
public static IndexedCsvReaderBuilder builder() {
return new IndexedCsvReaderBuilder();
}
/**
* Get the index used for accessing the CSV file.
* That index is either a freshly built index or the index that has been
* passed via {@link IndexedCsvReaderBuilder#index(CsvIndex)}.
*
* @return the index that is used for accessing the CSV file.
*/
public CsvIndex getIndex() {
return csvIndex;
}
/**
* Reads a page of records.
*
* @param page the page to read (0-based).
* @return a page of records, never {@code null}.
* @throws IOException if an I/O error occurs.
* @throws IllegalArgumentException if {@code page} is < 0
* @throws IndexOutOfBoundsException if the file does not contain the specified page
*/
public List<T> readPage(final int page) throws IOException {
Preconditions.checkArgument(page >= 0, "page must be >= 0");
return readPage(csvIndex.getPage(page));
}
@SuppressWarnings("PMD.AssignmentInOperand")
private List<T> readPage(final CsvIndex.CsvPage page) throws IOException {
final List<T> ret = new ArrayList<>(pageSize);
synchronized (raf) {
raf.seek(page.getOffset());
csvParser.reset(page.getStartingLineNumber() - 1);
for (int i = 0; i < pageSize && csvParser.parse(); i++) {
final RecordWrapper<T> rec = csvRecordHandler.buildRecord();
if (rec != null) {
ret.add(rec.getWrappedRecord());
}
}
return ret;
}
}
@Override
public void close() throws IOException {
csvParser.close();
}
@Override
public String toString() {
return new StringJoiner(", ", IndexedCsvReader.class.getSimpleName() + "[", "]")
.add("file=" + file)
.add("charset=" + charset)
.add("fieldSeparator=" + fieldSeparator)
.add("quoteCharacter=" + quoteCharacter)
.add("commentStrategy=" + commentStrategy)
.add("commentCharacter=" + commentCharacter)
.add("acceptCharsAfterQuotes=" + acceptCharsAfterQuotes)
.add("pageSize=" + pageSize)
.add("index=" + csvIndex)
.toString();
}
/**
* This builder is used to create configured instances of {@link IndexedCsvReader}. The default
* configuration of this class adheres with RFC 4180.
* <p>
* The line delimiter (line-feed, carriage-return or the combination of both) is detected
* automatically and thus not configurable.
*/
@SuppressWarnings({"checkstyle:HiddenField", "PMD.AvoidFieldNameMatchingMethodName"})
public static final class IndexedCsvReaderBuilder {
private static final int MAX_BASE_ASCII = 127;
private static final int DEFAULT_PAGE_SIZE = 100;
private static final int MIN_PAGE_SIZE = 1;
private char fieldSeparator = ',';
private char quoteCharacter = '"';
private CommentStrategy commentStrategy = CommentStrategy.NONE;
private char commentCharacter = '#';
private boolean acceptCharsAfterQuotes = true;
private StatusListener statusListener;
private int pageSize = DEFAULT_PAGE_SIZE;
private CsvIndex csvIndex;
private IndexedCsvReaderBuilder() {
}
/**
* Sets the {@code fieldSeparator} used when reading CSV data.
*
* @param fieldSeparator the field separator character (default: {@code ,} - comma).
* @return This updated object, allowing additional method calls to be chained together.
*/
public IndexedCsvReaderBuilder fieldSeparator(final char fieldSeparator) {
checkControlCharacter(fieldSeparator);
this.fieldSeparator = fieldSeparator;
return this;
}
/**
* Sets the {@code quoteCharacter} used when reading CSV data.
*
* @param quoteCharacter the character used to enclose fields
* (default: {@code "} - double quotes).
* @return This updated object, allowing additional method calls to be chained together.
*/
public IndexedCsvReaderBuilder quoteCharacter(final char quoteCharacter) {
checkControlCharacter(quoteCharacter);
this.quoteCharacter = quoteCharacter;
return this;
}
/**
* Sets the strategy that defines how (and if) commented lines should be handled
* (default: {@link CommentStrategy#NONE} as comments are not defined in RFC 4180).
*
* @param commentStrategy the strategy for handling comments.
* @return This updated object, allowing additional method calls to be chained together.
* @throws IllegalArgumentException if {@link CommentStrategy#SKIP} is passed, as this is not supported
* @see #commentCharacter(char)
*/
public IndexedCsvReaderBuilder commentStrategy(final CommentStrategy commentStrategy) {
Preconditions.checkArgument(commentStrategy != CommentStrategy.SKIP,
"CommentStrategy SKIP is not supported in IndexedCsvReader");
this.commentStrategy = commentStrategy;
return this;
}
/**
* Sets the {@code commentCharacter} used to comment lines.
*
* @param commentCharacter the character used to comment lines (default: {@code #} - hash)
* @return This updated object, allowing additional method calls to be chained together.
* @see #commentStrategy(CommentStrategy)
*/
public IndexedCsvReaderBuilder commentCharacter(final char commentCharacter) {
checkControlCharacter(commentCharacter);
this.commentCharacter = commentCharacter;
return this;
}
/**
* Specifies whether the presence of characters between a closing quote and a field separator or
* the end of a line should be treated as an error or not.
* <p>
* Example:
* <pre>
* {@code "a"b,"c"}
* </pre>
* <p>
* If this is set to {@code true}, the value {@code ab} will be returned for the first field.
* <p>
* If this is set to {@code false}, a {@link CsvParseException} will be thrown.
*
* @param acceptCharsAfterQuotes allow characters after quotes (default: {@code true}).
* @return This updated object, allowing additional method calls to be chained together.
*/
public IndexedCsvReaderBuilder acceptCharsAfterQuotes(final boolean acceptCharsAfterQuotes) {
this.acceptCharsAfterQuotes = acceptCharsAfterQuotes;
return this;
}
/**
* Sets the {@code statusListener} to listen for indexer status updates.
*
* @param statusListener the status listener.
* @return This updated object, allowing additional method calls to be chained together.
*/
public IndexedCsvReaderBuilder statusListener(final StatusListener statusListener) {
this.statusListener = statusListener;
return this;
}
/**
* Sets a prebuilt index that should be used for accessing the file.
*
* @param csvIndex a prebuilt index
* @return This updated object, allowing additional method calls to be chained together.
*/
public IndexedCsvReaderBuilder index(final CsvIndex csvIndex) {
this.csvIndex = csvIndex;
return this;
}
/**
* Sets the {@code pageSize} for pages returned by {@link IndexedCsvReader#readPage(int)}
* (default: {@value DEFAULT_PAGE_SIZE}).
*
* @param pageSize the maximum size of pages.
* @return This updated object, allowing additional method calls to be chained together.
*/
public IndexedCsvReaderBuilder pageSize(final int pageSize) {
Preconditions.checkArgument(pageSize >= MIN_PAGE_SIZE,
"pageSize must be >= %d", MIN_PAGE_SIZE);
this.pageSize = pageSize;
return this;
}
/*
* Characters from 0 to 127 are base ASCII and collision-free with UTF-8.
* Characters from 128 to 255 need to be represented as a multibyte string in UTF-8.
* Multibyte handling of control characters is currently not supported by the byte-oriented CSV indexer
* of IndexedCsvReader.
*/
private static void checkControlCharacter(final char controlChar) {
Preconditions.checkArgument(!Util.isNewline(controlChar),
"A newline character must not be used as control character");
Preconditions.checkArgument(controlChar <= MAX_BASE_ASCII,
"Multibyte control characters are not supported in IndexedCsvReader: '%s' (value: %d)",
controlChar, (int) controlChar);
}
/**
* Constructs a new {@link IndexedCsvReader} of {@link CsvRecord} for the specified path using UTF-8
* as the character set.
* <p>
* Convenience method for {@link #build(CsvCallbackHandler, Path, Charset)} with
* {@link CsvRecordHandler} as the callback handler and
* {@link StandardCharsets#UTF_8} as the charset.
*
* @param file the file to read data from.
* @return a new IndexedCsvReader - never {@code null}. Remember to close it!
* @throws IOException if an I/O error occurs.
* @throws NullPointerException if file or charset is {@code null}
*/
public IndexedCsvReader<CsvRecord> ofCsvRecord(final Path file) throws IOException {
return build(new CsvRecordHandler(), file, StandardCharsets.UTF_8);
}
/**
* Constructs a new {@link IndexedCsvReader} of {@link CsvRecord} for the specified arguments.
* <p>
* Convenience method for {@link #build(CsvCallbackHandler, Path, Charset)} with
* {@link CsvRecordHandler} as the callback handler.
*
* @param file the file to read data from.
* @param charset the character set to use.
* @return a new IndexedCsvReader - never {@code null}. Remember to close it!
* @throws IOException if an I/O error occurs.
* @throws NullPointerException if file or charset is {@code null}
*/
public IndexedCsvReader<CsvRecord> ofCsvRecord(final Path file, final Charset charset) throws IOException {
return build(new CsvRecordHandler(), file, charset);
}
/**
* Constructs a new {@link IndexedCsvReader} for the specified callback handler and path using UTF-8
* as the character set.
* <p>
* Convenience method for {@link #build(CsvCallbackHandler, Path, Charset)} with {@link StandardCharsets#UTF_8}
* as charset.
*
* @param <T> the type of the CSV record.
* @param callbackHandler the callback handler to use.
* @param file the file to read data from.
* @return a new IndexedCsvReader - never {@code null}. Remember to close it!
* @throws IOException if an I/O error occurs.
* @throws NullPointerException if callbackHandler, file or charset is {@code null}
*/
public <T> IndexedCsvReader<T> build(final CsvCallbackHandler<T> callbackHandler, final Path file)
throws IOException {
return build(callbackHandler, file, StandardCharsets.UTF_8);
}
/**
* Constructs a new {@link IndexedCsvReader} for the specified arguments.
*
* @param <T> the type of the CSV record.
* @param callbackHandler the callback handler to use.
* @param file the file to read data from.
* @param charset the character set to use.
* @return a new IndexedCsvReader - never {@code null}. Remember to close it!
* @throws IOException if an I/O error occurs.
* @throws NullPointerException if callbackHandler, file or charset is {@code null}
*/
public <T> IndexedCsvReader<T> build(final CsvCallbackHandler<T> callbackHandler,
final Path file, final Charset charset) throws IOException {
Objects.requireNonNull(callbackHandler, "callbackHandler must not be null");
Objects.requireNonNull(file, "file must not be null");
Objects.requireNonNull(charset, "charset must not be null");
final var sl = statusListener != null ? statusListener
: new StatusListener() { };
return new IndexedCsvReader<>(file, charset, fieldSeparator, quoteCharacter, commentStrategy,
commentCharacter, acceptCharsAfterQuotes, pageSize, callbackHandler, csvIndex, sl);
}
}
private final class ScannerListener implements CsvScanner.CsvListener {
private final StatusListener statusListener;
private final List<CsvIndex.CsvPage> pageOffsets = new ArrayList<>();
private final AtomicLong recordCounter = new AtomicLong();
private long startingLineNumber = 1;
private ScannerListener(final StatusListener statusListener) {
this.statusListener = statusListener;
}
@Override
public void onReadBytes(final int readCnt) {
statusListener.onReadBytes(readCnt);
}
@Override
public void startOffset(final long offset) {
if (recordCounter.getAndIncrement() % pageSize == 0) {
pageOffsets.add(new CsvIndex.CsvPage(offset, startingLineNumber));
}
}
@Override
public void onReadRecord() {
startingLineNumber++;
statusListener.onReadRecord();
}
@Override
public void additionalLine() {
startingLineNumber++;
}
}
}