TimeSeriesTable.java
/**
* Copyright (c) 2018, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
* SPDX-License-Identifier: MPL-2.0
*/
package com.powsybl.timeseries;
import com.google.common.base.Stopwatch;
import gnu.trove.list.array.TIntArrayList;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.zip.GZIPOutputStream;
import static com.powsybl.timeseries.TimeSeries.writeInstantToMicroString;
import static com.powsybl.timeseries.TimeSeries.writeInstantToNanoString;
/**
* Utility class to load time series into a table and then:
*
* <ul>
* <li>Get direct access to all values</li>
* <li>Compute statistics like mean, standard deviation, and Pearson product-moment correlation coefficient</li>
* <li>Convert to CSV</li>
* </ul>
*
* Some design considerations and limitations:
* <ul>
* <li>Number of version loadable in the table has to be specified at creation</li>
* <li>Versions have to contiguous</li>
* <li>Once first batch of time series has been loaded, new time series cannot be added but data of existing one can be updated</li>
* <li>Concurrent load (i.e multi-thread) of data is supported (using same time series list)</li>
* <li>Concurrency between data loading and other operations (CSV writing, statistics computation) is NOT supported</li>
* </ul>
*
* @author Geoffroy Jamgotchian {@literal <geoffroy.jamgotchian at rte-france.com>}
*/
public class TimeSeriesTable {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesTable.class);
public static class Correlation {
private final String timeSeriesName1;
private final String timeSeriesName2;
private final double coefficient;
public Correlation(String timeSeriesName1, String timeSeriesName2, double coefficient) {
this.timeSeriesName1 = Objects.requireNonNull(timeSeriesName1);
this.timeSeriesName2 = Objects.requireNonNull(timeSeriesName2);
this.coefficient = coefficient;
}
public String getTimeSeriesName1() {
return timeSeriesName1;
}
public String getTimeSeriesName2() {
return timeSeriesName2;
}
public double getCoefficient() {
return coefficient;
}
@Override
public String toString() {
return "Correlation(timeSeriesName1=" + timeSeriesName1 + ", timeSeriesName2=" + timeSeriesName2 +
", coefficient=" + coefficient + ")";
}
}
private static class TimeSeriesNameMap {
private final BiList<String> names = new BiList<>();
int add(String name) {
return names.add(name);
}
String getName(int index) {
if (index < 0 || index >= names.size()) {
throw new IllegalArgumentException("Time series at index " + index + " not found");
}
return names.get(index);
}
int getIndex(String name) {
Objects.requireNonNull(name);
int index = names.indexOf(name);
if (index == -1) {
throw new IllegalArgumentException("Time series '" + name + "' not found");
}
return index;
}
int size() {
return names.size();
}
void clear() {
names.clear();
}
}
private final int fromVersion;
private final int toVersion;
private List<TimeSeriesMetadata> timeSeriesMetadata;
private final TimeSeriesIndex tableIndex;
private final IntFunction<ByteBuffer> byteBufferAllocator;
private final TIntArrayList timeSeriesIndexDoubleOrString = new TIntArrayList(); // global index to typed index
private final TimeSeriesNameMap doubleTimeSeriesNames = new TimeSeriesNameMap();
private final TimeSeriesNameMap stringTimeSeriesNames = new TimeSeriesNameMap();
private BigDoubleBuffer doubleBuffer;
private BigStringBuffer stringBuffer;
private final Lock initLock = new ReentrantLock();
// statistics
private double[] means;
private double[] stdDevs;
private final Lock statsLock = new ReentrantLock();
public TimeSeriesTable(int fromVersion, int toVersion, TimeSeriesIndex tableIndex) {
this(fromVersion, toVersion, tableIndex, ByteBuffer::allocateDirect);
}
public TimeSeriesTable(int fromVersion, int toVersion, TimeSeriesIndex tableIndex, IntFunction<ByteBuffer> byteBufferAllocator) {
TimeSeriesVersions.check(fromVersion);
TimeSeriesVersions.check(toVersion);
if (toVersion < fromVersion) {
throw new TimeSeriesException("toVersion (" + toVersion + ") is expected to be greater than fromVersion (" + fromVersion + ")");
}
this.fromVersion = fromVersion;
this.toVersion = toVersion;
this.tableIndex = Objects.requireNonNull(tableIndex);
this.byteBufferAllocator = Objects.requireNonNull(byteBufferAllocator);
}
public static TimeSeriesTable createDirectMem(int fromVersion, int toVersion, TimeSeriesIndex tableIndex) {
return new TimeSeriesTable(fromVersion, toVersion, tableIndex);
}
public static TimeSeriesTable createMem(int fromVersion, int toVersion, TimeSeriesIndex tableIndex) {
return new TimeSeriesTable(fromVersion, toVersion, tableIndex, ByteBuffer::allocate);
}
private void initTable(List<DoubleTimeSeries> doubleTimeSeries, List<StringTimeSeries> stringTimeSeries) {
initLock.lock();
try {
if (timeSeriesMetadata != null) {
return; // already initialized
}
timeSeriesMetadata = new ArrayList<>(doubleTimeSeries.size() + stringTimeSeries.size());
for (DoubleTimeSeries timeSeries : doubleTimeSeries.stream()
.sorted(Comparator.comparing(ts -> ts.getMetadata().getName()))
.toList()) {
timeSeriesMetadata.add(timeSeries.getMetadata());
int i = doubleTimeSeriesNames.add(timeSeries.getMetadata().getName());
timeSeriesIndexDoubleOrString.add(i);
}
for (StringTimeSeries timeSeries : stringTimeSeries.stream()
.sorted(Comparator.comparing(ts -> ts.getMetadata().getName()))
.toList()) {
timeSeriesMetadata.add(timeSeries.getMetadata());
int i = stringTimeSeriesNames.add(timeSeries.getMetadata().getName());
timeSeriesIndexDoubleOrString.add(i);
}
if (tableIndex == null) {
throw new TimeSeriesException("None of the time series have a finite index");
}
int versionCount = toVersion - fromVersion + 1;
// allocate double buffer
long doubleBufferSize = (long) versionCount * doubleTimeSeriesNames.size() * tableIndex.getPointCount();
doubleBuffer = createDoubleBuffer(byteBufferAllocator, doubleBufferSize, Double.NaN);
// allocate string buffer
long stringBufferSize = (long) versionCount * stringTimeSeriesNames.size() * tableIndex.getPointCount();
stringBuffer = new BigStringBuffer(byteBufferAllocator, stringBufferSize);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Allocation of {} for time series table",
FileUtils.byteCountToDisplaySize(doubleBuffer.capacity() * Double.BYTES + stringBuffer.capacity() * Integer.BYTES));
}
// allocate statistics buffer
means = new double[doubleTimeSeriesNames.size() * versionCount];
Arrays.fill(means, Double.NaN);
stdDevs = new double[doubleTimeSeriesNames.size() * versionCount];
Arrays.fill(stdDevs, Double.NaN);
} catch (Exception e) {
LOGGER.error(e.toString(), e);
timeSeriesMetadata = null;
doubleTimeSeriesNames.clear();
stringTimeSeriesNames.clear();
timeSeriesIndexDoubleOrString.clear();
doubleBuffer = null;
stringBuffer = null;
means = null;
stdDevs = null;
throw e;
} finally {
initLock.unlock();
}
}
public TimeSeriesIndex getTableIndex() {
return tableIndex;
}
private static BigDoubleBuffer createDoubleBuffer(IntFunction<ByteBuffer> byteBufferAllocator, long size) {
return new BigDoubleBuffer(byteBufferAllocator, size);
}
private static BigDoubleBuffer createDoubleBuffer(IntFunction<ByteBuffer> byteBufferAllocator, long size, double initialValue) {
BigDoubleBuffer doubleBuffer = createDoubleBuffer(byteBufferAllocator, size);
for (long i = 0; i < size; i++) {
doubleBuffer.put(i, initialValue);
}
return doubleBuffer;
}
private long getTimeSeriesOffset(int version, int timeSeriesNum) {
return (long) timeSeriesNum * tableIndex.getPointCount() * (toVersion - fromVersion + 1) + (long) (version - fromVersion) * tableIndex.getPointCount();
}
private int getStatisticsIndex(int version, int timeSeriesNum) {
return (version - fromVersion) * doubleTimeSeriesNames.size() + timeSeriesNum;
}
private void checkVersionIsInRange(int version) {
if (version < fromVersion || version > toVersion) {
throw new IllegalArgumentException("Version is out of range [" + fromVersion + ", " + toVersion + "]");
}
}
private int checkTimeSeriesNum(int timeSeriesNum) {
if (timeSeriesNum < 0 || timeSeriesNum >= timeSeriesIndexDoubleOrString.size()) {
throw new IllegalArgumentException("Time series number is out of range [0, " + (timeSeriesIndexDoubleOrString.size() - 1) + "]");
}
return timeSeriesIndexDoubleOrString.get(timeSeriesNum);
}
private void checkPoint(int point) {
if (point < 0 || point >= tableIndex.getPointCount()) {
throw new IllegalArgumentException("Point is out of range [0, " + (tableIndex.getPointCount() - 1) + "]");
}
}
private void loadDouble(int version, DoubleTimeSeries timeSeries) {
// check time series exists in the table
int timeSeriesNum = doubleTimeSeriesNames.getIndex(timeSeries.getMetadata().getName());
// copy data
long timeSeriesOffset = getTimeSeriesOffset(version, timeSeriesNum);
timeSeries.fillBuffer(doubleBuffer, timeSeriesOffset);
// invalidate statistics
invalidateStatistics(version, timeSeriesNum);
}
private void loadString(int version, StringTimeSeries timeSeries) {
// check time series exists in the table
int timeSeriesNum = stringTimeSeriesNames.getIndex(timeSeries.getMetadata().getName());
// copy data
long timeSeriesOffset = getTimeSeriesOffset(version, timeSeriesNum);
timeSeries.fillBuffer(stringBuffer, timeSeriesOffset);
}
@SafeVarargs
public final void load(int version, List<? extends TimeSeries>... timeSeries) {
load(version, Arrays.stream(timeSeries).flatMap(List::stream).collect(Collectors.toList()));
}
public void load(int version, List<TimeSeries> timeSeriesList) {
checkVersionIsInRange(version);
Objects.requireNonNull(timeSeriesList);
if (timeSeriesList.isEmpty()) {
throw new TimeSeriesException("Empty time series list");
}
Stopwatch stopWatch = Stopwatch.createStarted();
List<DoubleTimeSeries> doubleTimeSeries = new ArrayList<>();
List<StringTimeSeries> stringTimeSeries = new ArrayList<>();
for (TimeSeries timeSeries : timeSeriesList) {
Objects.requireNonNull(timeSeries);
if (timeSeries instanceof DoubleTimeSeries dts) {
doubleTimeSeries.add(dts);
} else if (timeSeries instanceof StringTimeSeries sts) {
stringTimeSeries.add(sts);
} else {
throw new IllegalStateException("Unsupported time series type " + timeSeries.getClass());
}
}
initTable(doubleTimeSeries, stringTimeSeries);
for (DoubleTimeSeries timeSeries : doubleTimeSeries) {
timeSeries.synchronize(tableIndex);
loadDouble(version, timeSeries);
}
for (StringTimeSeries timeSeries : stringTimeSeries) {
timeSeries.synchronize(tableIndex);
loadString(version, timeSeries);
}
LOGGER.info("{} time series (version={}) loaded in {} ms", timeSeriesList.size(), version,
stopWatch.elapsed(TimeUnit.MILLISECONDS));
}
public List<String> getTimeSeriesNames() {
return timeSeriesMetadata.stream().map(TimeSeriesMetadata::getName).collect(Collectors.toList());
}
public double getDoubleValue(int version, int timeSeriesNum, int point) {
checkVersionIsInRange(version);
int doubleTimeSeriesNum = checkTimeSeriesNum(timeSeriesNum);
checkPoint(point);
long timeSeriesOffset = getTimeSeriesOffset(version, doubleTimeSeriesNum);
return doubleBuffer.get(timeSeriesOffset + point);
}
public String getStringValue(int version, int timeSeriesNum, int point) {
checkVersionIsInRange(version);
int stringTimeSeriesNum = checkTimeSeriesNum(timeSeriesNum);
checkPoint(point);
long timeSeriesOffset = getTimeSeriesOffset(version, stringTimeSeriesNum);
return stringBuffer.getString(timeSeriesOffset + point);
}
public int getDoubleTimeSeriesIndex(String timeSeriesName) {
return doubleTimeSeriesNames.getIndex(timeSeriesName);
}
public int getStringTimeSeriesIndex(String timeSeriesName) {
return stringTimeSeriesNames.getIndex(timeSeriesName);
}
private void invalidateStatistics(int version, int timeSeriesNum) {
int statisticsIndex = getStatisticsIndex(version, timeSeriesNum);
statsLock.lock();
try {
means[statisticsIndex] = Double.NaN;
stdDevs[statisticsIndex] = Double.NaN;
} finally {
statsLock.unlock();
}
}
private void updateStatistics(int version, int timeSeriesNum) {
int statisticsIndex = getStatisticsIndex(version, timeSeriesNum);
if (!Double.isNaN(means[statisticsIndex]) && !Double.isNaN(stdDevs[statisticsIndex])) {
return;
}
long timeSeriesOffset = getTimeSeriesOffset(version, timeSeriesNum);
double sum = 0;
int nbPoints = 0;
for (int point = 0; point < tableIndex.getPointCount(); point++) {
double value = doubleBuffer.get(timeSeriesOffset + point);
if (!Double.isNaN(value)) {
sum += value;
nbPoints++;
}
}
double mean = nbPoints > 0 ? sum / nbPoints : 0;
means[statisticsIndex] = mean;
double stdDev = 0;
for (int point = 0; point < tableIndex.getPointCount(); point++) {
double value = doubleBuffer.get(timeSeriesOffset + point);
if (!Double.isNaN(value)) {
stdDev += (value - mean) * (value - mean);
}
}
stdDev = nbPoints > 1 ? Math.sqrt(stdDev / (nbPoints - 1)) : 0;
stdDevs[statisticsIndex] = stdDev;
}
private void updateStatistics(int version) {
for (int timeSeriesNum = 0; timeSeriesNum < doubleTimeSeriesNames.size(); timeSeriesNum++) {
updateStatistics(version, timeSeriesNum);
}
}
private double getStatistics(int version, int timeSeriesNum, double[] stats) {
checkVersionIsInRange(version);
int doubleTimeSeriesNum = checkTimeSeriesNum(timeSeriesNum);
int statisticsIndex = getStatisticsIndex(version, doubleTimeSeriesNum);
statsLock.lock();
try {
updateStatistics(version, timeSeriesNum);
return stats[statisticsIndex];
} finally {
statsLock.unlock();
}
}
public double getMean(int version, int timeSeriesNum) {
return getStatistics(version, timeSeriesNum, means);
}
public double getStdDev(int version, int timeSeriesNum) {
return getStatistics(version, timeSeriesNum, stdDevs);
}
public List<Correlation> findMostCorrelatedTimeSeries(String timeSeriesName, int version) {
return findMostCorrelatedTimeSeries(timeSeriesName, version, 10);
}
public List<Correlation> findMostCorrelatedTimeSeries(String timeSeriesName, int version, int maxSize) {
double[] ppmcc = computePpmcc(timeSeriesName, version);
return IntStream.range(0, ppmcc.length)
.mapToObj(i -> new Correlation(timeSeriesName, doubleTimeSeriesNames.getName(i), Math.abs(ppmcc[i])))
.filter(correlation -> !correlation.getTimeSeriesName2().equals(timeSeriesName))
.sorted(Comparator.comparingDouble(Correlation::getCoefficient).reversed())
.limit(maxSize)
.collect(Collectors.toList());
}
private void computeConstantTimeSeriesPpmcc(double[] r, int version) {
for (int timeSeriesNum2 = 0; timeSeriesNum2 < doubleTimeSeriesNames.size(); timeSeriesNum2++) {
int statisticsIndex2 = getStatisticsIndex(version, timeSeriesNum2);
double stdDev2 = stdDevs[statisticsIndex2];
// time series 1 is correlated to other constant time series
r[timeSeriesNum2] = stdDev2 == 0 ? 1 : 0;
}
}
private void computeVariableTimeSeriesPpmcc(double[] r, int timeSeriesNum1, int statisticsIndex1, double stdDev1,
int version) {
double mean1 = means[statisticsIndex1];
for (int timeSeriesNum2 = 0; timeSeriesNum2 < doubleTimeSeriesNames.size(); timeSeriesNum2++) {
if (timeSeriesNum2 == timeSeriesNum1) {
r[timeSeriesNum2] = 1;
} else {
int statisticsIndex2 = getStatisticsIndex(version, timeSeriesNum2);
double stdDev2 = stdDevs[statisticsIndex2];
r[timeSeriesNum2] = 0;
if (stdDev2 != 0) {
double mean2 = means[statisticsIndex2];
long timeSeriesOffset1 = getTimeSeriesOffset(version, timeSeriesNum1);
long timeSeriesOffset2 = getTimeSeriesOffset(version, timeSeriesNum2);
for (int point = 0; point < tableIndex.getPointCount(); point++) {
double value1 = doubleBuffer.get(timeSeriesOffset1 + point);
double value2 = doubleBuffer.get(timeSeriesOffset2 + point);
r[timeSeriesNum2] += (value1 - mean1) / stdDev1 * (value2 - mean2) / stdDev2;
}
r[timeSeriesNum2] /= tableIndex.getPointCount() - 1;
}
}
}
}
public double[] computePpmcc(String timeSeriesName, int version) {
int timeSeriesNum1 = doubleTimeSeriesNames.getIndex(timeSeriesName);
checkVersionIsInRange(version);
Stopwatch stopWatch = Stopwatch.createStarted();
double[] r = new double[doubleTimeSeriesNames.size()];
statsLock.lock();
try {
updateStatistics(version);
int statisticsIndex1 = getStatisticsIndex(version, timeSeriesNum1);
double stdDev1 = stdDevs[statisticsIndex1];
if (stdDev1 == 0) { // constant time series
computeConstantTimeSeriesPpmcc(r, version);
} else {
computeVariableTimeSeriesPpmcc(r, timeSeriesNum1, statisticsIndex1, stdDev1, version);
}
} finally {
statsLock.unlock();
}
LOGGER.info("PPMCC computed in {} ms", stopWatch.elapsed(TimeUnit.MILLISECONDS));
return r;
}
private static BufferedWriter createWriter(Path file) throws IOException {
if (file.getFileName().toString().endsWith(".gz")) {
return new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(file)), StandardCharsets.UTF_8));
} else {
return Files.newBufferedWriter(file, StandardCharsets.UTF_8);
}
}
public void writeCsv(Path file) {
try (BufferedWriter writer = createWriter(file)) {
writeCsv(writer, new TimeSeriesCsvConfig());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void writeCsv(Path file, TimeSeriesCsvConfig timeSeriesCsvConfig) {
try (BufferedWriter writer = createWriter(file)) {
writeCsv(writer, timeSeriesCsvConfig);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public String toCsvString() {
try (StringWriter writer = new StringWriter()) {
writeCsv(writer, new TimeSeriesCsvConfig());
return writer.toString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public String toCsvString(TimeSeriesCsvConfig timeSeriesCsvConfig) {
try (StringWriter writer = new StringWriter()) {
writeCsv(writer, timeSeriesCsvConfig);
return writer.toString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void writeHeader(Writer writer, TimeSeriesCsvConfig timeSeriesCsvConfig) throws IOException {
// write header
writer.write("Time");
if (timeSeriesCsvConfig.versioned()) {
writer.write(timeSeriesCsvConfig.separator());
writer.write("Version");
}
if (timeSeriesMetadata != null) {
for (TimeSeriesMetadata metadata : timeSeriesMetadata) {
writer.write(timeSeriesCsvConfig.separator());
writer.write(metadata.getName());
}
}
writer.write(System.lineSeparator());
}
private class CsvCache {
static final int CACHE_SIZE = 10;
final double[] doubleCache = new double[CACHE_SIZE * doubleTimeSeriesNames.size()];
final String[] stringCache = new String[CACHE_SIZE * stringTimeSeriesNames.size()];
}
private void fillCache(int point, CsvCache cache, int cachedPoints, int version) {
for (int i = 0; i < timeSeriesMetadata.size(); i++) {
TimeSeriesMetadata metadata = timeSeriesMetadata.get(i);
int timeSeriesNum = timeSeriesIndexDoubleOrString.get(i);
long timeSeriesOffset = getTimeSeriesOffset(version, timeSeriesNum);
if (metadata.getDataType() == TimeSeriesDataType.DOUBLE) {
for (int cachedPoint = 0; cachedPoint < cachedPoints; cachedPoint++) {
cache.doubleCache[cachedPoint * doubleTimeSeriesNames.size() + timeSeriesNum] = doubleBuffer.get(timeSeriesOffset + point + cachedPoint);
}
} else if (metadata.getDataType() == TimeSeriesDataType.STRING) {
for (int cachedPoint = 0; cachedPoint < cachedPoints; cachedPoint++) {
cache.stringCache[cachedPoint * stringTimeSeriesNames.size() + timeSeriesNum] = stringBuffer.getString(timeSeriesOffset + point + cachedPoint);
}
} else {
throw new IllegalStateException("Unexpected data type " + metadata.getDataType());
}
}
}
private static void writeDouble(Writer writer, double value) throws IOException {
if (!Double.isNaN(value)) {
writer.write(Double.toString(value));
}
}
private static void writeString(Writer writer, String value) throws IOException {
if (value != null) {
writer.write(value);
}
}
private void dumpCache(Writer writer, TimeSeriesCsvConfig timeSeriesCsvConfig, int point, CsvCache cache, int cachedPoints, int version) throws IOException {
for (int cachedPoint = 0; cachedPoint < cachedPoints; cachedPoint++) {
writeTime(writer, timeSeriesCsvConfig, point, cachedPoint);
if (timeSeriesCsvConfig.versioned()) {
writer.write(timeSeriesCsvConfig.separator());
writer.write(Integer.toString(version));
}
for (int i = 0; i < timeSeriesMetadata.size(); i++) {
TimeSeriesMetadata metadata = timeSeriesMetadata.get(i);
int timeSeriesNum = timeSeriesIndexDoubleOrString.get(i);
writer.write(timeSeriesCsvConfig.separator());
if (metadata.getDataType() == TimeSeriesDataType.DOUBLE) {
double value = cache.doubleCache[cachedPoint * doubleTimeSeriesNames.size() + timeSeriesNum];
writeDouble(writer, value);
} else if (metadata.getDataType() == TimeSeriesDataType.STRING) {
String value = cache.stringCache[cachedPoint * stringTimeSeriesNames.size() + timeSeriesNum];
writeString(writer, value);
} else {
throw new IllegalStateException("Unexpected data type " + metadata.getDataType());
}
}
writer.write(System.lineSeparator());
}
}
private void writeTime(Writer writer, TimeSeriesCsvConfig timeSeriesCsvConfig, int point, int cachedPoint) throws IOException {
Instant instant = tableIndex.getInstantAt(point + cachedPoint);
switch (timeSeriesCsvConfig.timeFormat()) {
case DATE_TIME -> writer.write(ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()).format(timeSeriesCsvConfig.dateTimeFormatter()));
case FRACTIONS_OF_SECOND -> writer.write(Double.toString(instant.getEpochSecond() + instant.getNano() / 1e9));
case MILLIS -> writer.write(Long.toString(instant.toEpochMilli()));
case MICROS -> writer.write(writeInstantToMicroString(instant));
case NANOS -> writer.write(writeInstantToNanoString(instant));
default -> throw new IllegalStateException("Unknown time format " + timeSeriesCsvConfig.timeFormat());
}
}
public void writeCsv(Writer writer, TimeSeriesCsvConfig timeSeriesCsvConfig) throws IOException {
Objects.requireNonNull(writer);
Objects.requireNonNull(timeSeriesCsvConfig);
Stopwatch stopWatch = Stopwatch.createStarted();
try {
writeHeader(writer, timeSeriesCsvConfig);
if (timeSeriesMetadata != null) {
// read time series in the doubleBuffer per 10 points chunk to avoid cache missed and improve performances
CsvCache cache = new CsvCache();
// write data
for (int version = fromVersion; version <= toVersion; version++) {
for (int point = 0; point < tableIndex.getPointCount(); point += CsvCache.CACHE_SIZE) {
int cachedPoints = Math.min(CsvCache.CACHE_SIZE, tableIndex.getPointCount() - point);
// copy from doubleBuffer to cache
fillCache(point, cache, cachedPoints, version);
// then write cache to CSV
dumpCache(writer, timeSeriesCsvConfig, point, cache, cachedPoints, version);
}
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
LOGGER.info("Csv written in {} ms", stopWatch.elapsed(TimeUnit.MILLISECONDS));
writer.flush();
}
}