AbstractTimeSeries.java
/**
* Copyright (c) 2017, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
* SPDX-License-Identifier: MPL-2.0
*/
package com.powsybl.timeseries;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.collect.Iterators;
import com.powsybl.commons.json.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author Geoffroy Jamgotchian {@literal <geoffroy.jamgotchian at rte-france.com>}
*/
public abstract class AbstractTimeSeries<P extends AbstractPoint, C extends DataChunk<P, C>, T extends TimeSeries<P, T>> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTimeSeries.class);
protected final TimeSeriesMetadata metadata;
protected final List<C> chunks;
protected AbstractTimeSeries(TimeSeriesMetadata metadata, C... chunks) {
this(metadata, Arrays.asList(chunks));
}
protected AbstractTimeSeries(TimeSeriesMetadata metadata, List<C> chunks) {
this.metadata = Objects.requireNonNull(metadata);
this.chunks = Objects.requireNonNull(chunks);
}
public void synchronize(TimeSeriesIndex newIndex) {
Objects.requireNonNull(newIndex);
if (!metadata.getIndex().equals(newIndex)) {
throw new UnsupportedOperationException("Not yet implemented");
}
}
public void addChunk(C chunk) {
Objects.requireNonNull(chunk);
chunks.add(chunk);
}
public List<C> getChunks() {
return chunks;
}
public TimeSeriesMetadata getMetadata() {
return metadata;
}
protected abstract C createGapFillingChunk(int i, int length);
private List<C> getSortedChunks() {
return chunks.stream()
.sorted(Comparator.comparing(C::getOffset))
.collect(Collectors.toList());
}
private List<C> getCheckedChunks(boolean fillGap) {
// sort chunks by offset
List<C> sortedChunks = getSortedChunks();
int pointCount = metadata.getIndex().getPointCount();
int i = 0;
List<C> checkedChunks = new ArrayList<>(sortedChunks.size());
for (C chunk : sortedChunks) {
// check chunk offset is included in index range
if (chunk.getOffset() > pointCount - 1) {
throw new TimeSeriesException("Chunk offset " + chunk.getOffset() + " is out of index range [" + (pointCount - 1) +
", " + (i + chunk.getLength()) + "]");
}
// check chunk overlap
if (chunk.getOffset() < i) {
throw new TimeSeriesException("Chunk at offset " + chunk.getOffset() + " overlap with previous one");
}
// check all values are included in index range
if (i + chunk.getLength() > pointCount) {
throw new TimeSeriesException("Chunk value at " + (i + chunk.getLength()) + " is out of index range [" +
(pointCount - 1) + ", " + (i + chunk.getLength()) + "]");
}
// fill with NaN if there is a gap with previous chunk
if (chunk.getOffset() > i) {
if (fillGap) {
checkedChunks.add(createGapFillingChunk(i, chunk.getOffset() - i));
}
i = chunk.getOffset();
}
checkedChunks.add(chunk);
i += chunk.getLength();
}
if (fillGap && i < pointCount) {
checkedChunks.add(createGapFillingChunk(i, pointCount - i));
}
return checkedChunks;
}
public Stream<P> stream() {
return getCheckedChunks(true).stream().flatMap(chunk -> chunk.stream(metadata.getIndex()));
}
public Iterator<P> iterator() {
return Iterators.concat(getCheckedChunks(true).stream().map(c -> c.iterator(metadata.getIndex())).collect(Collectors.toList()).iterator());
}
protected abstract T createTimeSeries(C chunk);
private void split(C chunkToSplit, List<C> splitChunks, int newChunkSize) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Split chunk [{}, {}]", chunkToSplit.getOffset(), chunkToSplit.getOffset() + chunkToSplit.getLength() - 1);
}
boolean usePreviousChunk = false;
C previousChunk = splitChunks.isEmpty() ? null : splitChunks.get(splitChunks.size() - 1);
int previousChunkSize = 0;
//We can complete the previous chunk if 1) it is uncomplete and 2) the current offset is not a multiple of newChunkSize
if (previousChunk != null && previousChunk.getLength() < newChunkSize && chunkToSplit.getOffset() % newChunkSize != 0) {
usePreviousChunk = true;
previousChunkSize = previousChunk.getLength();
}
int correctedChunkSize = newChunkSize - previousChunkSize;
if (usePreviousChunk) {
LOGGER.trace("Previous output chunk's size is {} ; {} elements can be added.", previousChunkSize, correctedChunkSize);
} else {
LOGGER.trace("The previous chunk was complete (or there was no previous chunk). Starting a new one.");
}
if (chunkToSplit.getLength() > correctedChunkSize) {
splitChunk(chunkToSplit, previousChunk, splitChunks, newChunkSize, correctedChunkSize, usePreviousChunk);
} else {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(" Too small...");
}
if (usePreviousChunk) {
C mergedChunk = previousChunk.append(chunkToSplit);
splitChunks.remove(splitChunks.size() - 1);
splitChunks.add(mergedChunk);
} else {
splitChunks.add(chunkToSplit);
}
}
}
private void splitChunk(C chunkToSplit, C previousChunk, List<C> splitChunks, int newChunkSize, int correctedChunkSize, boolean usePreviousChunk) {
// compute lower intersection index with new chunk size
int newChunkLowIndex = (int) Math.round(0.5f + (double) chunkToSplit.getOffset() / correctedChunkSize) * correctedChunkSize;
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(" At index {}", newChunkLowIndex);
}
DataChunk.Split<P, C> split = chunkToSplit.splitAt(newChunkLowIndex);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(" Adding chunk [{}, {}]", split.getChunk1().getOffset(), split.getChunk1().getOffset() + split.getChunk1().getLength() - 1);
}
if (usePreviousChunk) {
C mergedChunk = previousChunk.append(split.getChunk1());
splitChunks.remove(splitChunks.size() - 1);
splitChunks.add(mergedChunk);
} else {
splitChunks.add(split.getChunk1());
}
split(split.getChunk2(), splitChunks, newChunkSize);
}
public List<T> split(int newChunkSize) {
List<C> splitNewChunks = new ArrayList<>();
for (C chunkToSplit : getCheckedChunks(false)) {
split(chunkToSplit, splitNewChunks, newChunkSize);
}
return splitNewChunks.stream().map(this::createTimeSeries).collect(Collectors.toList());
}
public void writeJson(JsonGenerator generator) {
Objects.requireNonNull(generator);
try {
generator.writeStartObject();
generator.writeFieldName("metadata");
metadata.writeJson(generator);
generator.writeFieldName("chunks");
DataChunk.writeJson(generator, chunks);
generator.writeEndObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public String toJson() {
return JsonUtil.toJson(this::writeJson);
}
public void setTimeSeriesNameResolver(TimeSeriesNameResolver ignored) {
// nothing to do
}
@Override
public int hashCode() {
return Objects.hash(metadata, chunks);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof AbstractTimeSeries<?, ?, ?> other) {
return metadata.equals(other.metadata) && chunks.equals(other.chunks);
}
return false;
}
}