TimeSeriesMappingConfigTableLoader.java
/*
* Copyright (c) 2020, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
* SPDX-License-Identifier: MPL-2.0
*/
package com.powsybl.metrix.mapping;
import com.google.common.collect.Iterables;
import com.google.common.collect.Range;
import com.powsybl.timeseries.*;
import com.powsybl.timeseries.ast.NodeCalc;
import com.powsybl.timeseries.ast.TimeSeriesNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static com.powsybl.metrix.mapping.TimeSeriesMapper.CONNECTED_VALUE;
import static com.powsybl.metrix.mapping.TimeSeriesMapper.DISCONNECTED_VALUE;
import static com.powsybl.metrix.mapping.timeseries.TimeSeriesStoreUtil.isNotVersioned;
/**
* @author Marianne Funfrock {@literal <marianne.funfrock at rte-france.com>}
*/
public class TimeSeriesMappingConfigTableLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesMappingConfigTableLoader.class);
private static final int MIN_NUMBER_OF_POINTS = 50;
protected final TimeSeriesMappingConfig config;
protected final ReadOnlyTimeSeriesStore store;
public TimeSeriesMappingConfigTableLoader(TimeSeriesMappingConfig config, ReadOnlyTimeSeriesStore store) {
this.config = Objects.requireNonNull(config);
this.store = Objects.requireNonNull(store);
}
public TimeSeriesTable load(int version, Set<String> requiredTimeSeries, Range<Integer> pointRange) {
Set<String> usedTimeSeriesNames = StreamSupport.stream(findUsedTimeSeriesNames().spliterator(), false).collect(Collectors.toSet());
usedTimeSeriesNames.addAll(requiredTimeSeries);
ReadOnlyTimeSeriesStore storeWithPlannedOutages = buildStoreWithPlannedOutages(store, version, config.getTimeSeriesToPlannedOutagesMapping());
return loadToTable(version, storeWithPlannedOutages, pointRange, usedTimeSeriesNames);
}
public TimeSeriesTable loadToTable(int version, ReadOnlyTimeSeriesStore store, Range<Integer> pointRange, Iterable<String> usedTimeSeriesNames) {
Set<String> timeSeriesNamesToLoad = findTimeSeriesNamesToLoad(usedTimeSeriesNames);
TimeSeriesIndex index = checkIndexUnicity(store, timeSeriesNamesToLoad);
checkValues(store, new TreeSet<>(Set.of(version)), timeSeriesNamesToLoad);
TimeSeriesTable table = new TimeSeriesTable(version, version, index);
// load time series series
List<DoubleTimeSeries> loadedTimeSeries = Collections.emptyList();
if (!timeSeriesNamesToLoad.isEmpty()) {
List<DoubleTimeSeries> timeSeriesList = store.getDoubleTimeSeries(timeSeriesNamesToLoad, version);
int nbPointsToCompute = pointRange.upperEndpoint() - pointRange.lowerEndpoint() + 1;
if (index.getPointCount() != nbPointsToCompute) {
// to avoid loading all values
int nbPointsToLoad = Math.max(pointRange.upperEndpoint() + 1, Math.min(index.getPointCount(), MIN_NUMBER_OF_POINTS));
try {
List<List<DoubleTimeSeries>> split = TimeSeries.split(timeSeriesList, nbPointsToLoad);
loadedTimeSeries = split.get(0);
} catch (RuntimeException e) {
LOGGER.warn("Failed to split timeSeries with {} pointsToLoad and {} pointsToCompute (reason : {}). Will take the whole time series", nbPointsToLoad, nbPointsToCompute, e.getMessage());
loadedTimeSeries = store.getDoubleTimeSeries(timeSeriesNamesToLoad, version);
}
} else {
loadedTimeSeries = store.getDoubleTimeSeries(timeSeriesNamesToLoad, version);
}
}
List<DoubleTimeSeries> timeSeriesToAddToTable = new ArrayList<>(loadedTimeSeries);
ReadOnlyTimeSeriesStore storeCache = new ReadOnlyTimeSeriesStoreCache(loadedTimeSeries);
TimeSeriesNameResolver resolver = new FromStoreTimeSeriesNameResolver(storeCache, version);
// add calculated time series
for (String mappedTimeSeriesName : usedTimeSeriesNames) {
NodeCalc nodeCalc = config.timeSeriesNodes.get(mappedTimeSeriesName);
if (nodeCalc != null) {
CalculatedTimeSeries timeSeries = new CalculatedTimeSeries(mappedTimeSeriesName, nodeCalc);
timeSeries.setTimeSeriesNameResolver(resolver);
timeSeriesToAddToTable.add(timeSeries);
}
}
table.load(version, timeSeriesToAddToTable);
return table;
}
/**
* <p>Deduce from planned outages string time series giving a list of comma separated disconnected equipments
* corresponding double time series for each disconnected equipment</p>
* <p>Example for 1 planned outages time series 'planned_outages_ts' with 4 steps:
* <ul>
* <li>disconnected ids:
* <ul><li>step1 : id1</li>
* <li>step2 : id2</li>
* <li>step3 : id1, id2</li>
* <li>step4 : none</li></ul></li>
* <li>returned store contains 2 double time series:
* <ul><li>'planned_outages'_id1 with values: 'DISCONNECTED_VALUE', 'CONNECTED_VALUE', 'DISCONNECTED_VALUE', 'CONNECTED_VALUE'</li>
* <li>'planned_outages'_id2 with values: 'CONNECTED_VALUE', 'DISCONNECTED_VALUE', 'DISCONNECTED_VALUE', 'CONNECTED_VALUE'</li></ul></li>
* </ul>
*</p>
* @param timeSeriesName planned outages time series name
* @param timeSeriesValues planned outages time series values
* @param disconnectedIds planned outages disconnected ids set
* @param index index of double time series to create
* @return double time series for each disconnected equipment
*/
public static List<DoubleTimeSeries> computeDisconnectedEquipmentTimeSeries(String timeSeriesName, String[] timeSeriesValues, Set<String> disconnectedIds, TimeSeriesIndex index) {
List<DoubleTimeSeries> doubleTimeSeries = new ArrayList<>();
int nbPoints = index.getPointCount();
for (String id : disconnectedIds) {
double[] values = new double[nbPoints];
Arrays.fill(values, CONNECTED_VALUE);
for (int i = 0; i < nbPoints; i++) {
if (timeSeriesValues[i] == null) {
continue;
}
String[] ids = timeSeriesValues[i].split(",");
if (Arrays.asList(ids).contains(id)) {
values[i] = DISCONNECTED_VALUE;
}
}
DoubleTimeSeries doubleTs = new StoredDoubleTimeSeries(
new TimeSeriesMetadata(plannedOutagesEquipmentTsName(timeSeriesName, id), TimeSeriesDataType.DOUBLE, index),
new UncompressedDoubleDataChunk(0, values).tryToCompress());
doubleTimeSeries.add(doubleTs);
}
return doubleTimeSeries;
}
public static String plannedOutagesEquipmentTsName(String tsName, String id) {
return String.format("%s_%s", tsName, id);
}
/**
* Check if store contains already disconnected equipment time series deduced from planned outages string time series
* <ul>
* <li>if so, returns store</li>
* <li>if not, build the store containing disconnected equipment time series</li>
*</ul>
* @param store store containing the planned outages time series
* @param version version to compute
* @param timeSeriesToPlannedOutagesMapping map of planned outages time series giving the set of disconnected equipment ids
* @return depending on the check, store or aggregator of store and disconnected equipment time series store
*/
public static ReadOnlyTimeSeriesStore buildStoreWithPlannedOutages(ReadOnlyTimeSeriesStore store, int version, Map<String, Set<String>> timeSeriesToPlannedOutagesMapping) {
if (timeSeriesToPlannedOutagesMapping.isEmpty()) {
return store;
}
// Check if store already contains equipment outages time series
Set<Boolean> timeSeriesExist = timeSeriesToPlannedOutagesMapping.keySet().stream()
.flatMap(key -> timeSeriesToPlannedOutagesMapping.get(key).stream()
.map(value -> new AbstractMap.SimpleEntry<>(key, value)))
.map(e -> store.timeSeriesExists(plannedOutagesEquipmentTsName(e.getKey(), e.getValue())))
.collect(Collectors.toSet());
if (timeSeriesExist.size() == 1 && timeSeriesExist.contains(true)) {
return store;
}
ReadOnlyTimeSeriesStore plannedOutagesStore = buildPlannedOutagesStore(store, version, timeSeriesToPlannedOutagesMapping);
return new ReadOnlyTimeSeriesStoreAggregator(store, plannedOutagesStore);
}
/**
* <p>Deduce from all planned outages time series corresponding time series of the given version for each disconnected equipment</p>
* @param store store containing the planned outages time series
* @param version version to compute
* @param timeSeriesToPlannedOutagesMapping map of planned outages time series giving the set of disconnected equipment ids
* @return store containing double time series of each disconnected equipment
*/
public static ReadOnlyTimeSeriesStore buildPlannedOutagesStore(ReadOnlyTimeSeriesStore store, int version, Map<String, Set<String>> timeSeriesToPlannedOutagesMapping) {
List<DoubleTimeSeries> doubleTimeSeries = new ArrayList<>();
// Build equipment planned outages time series
TimeSeriesIndex index = checkIndexUnicity(store, timeSeriesToPlannedOutagesMapping.keySet());
for (Map.Entry<String, Set<String>> entry : timeSeriesToPlannedOutagesMapping.entrySet()) {
String timeSeriesName = entry.getKey();
Set<String> disconnectedIds = entry.getValue();
StringTimeSeries plannedOutagesTimeSeries = store.getStringTimeSeries(timeSeriesName, version).orElseThrow(() -> new TimeSeriesException("Invalid planned outages time series name " + timeSeriesName));
List<DoubleTimeSeries> disconnectedEquipmentTimeSeries = computeDisconnectedEquipmentTimeSeries(timeSeriesName, plannedOutagesTimeSeries.toArray(), disconnectedIds, index);
doubleTimeSeries.addAll(disconnectedEquipmentTimeSeries);
}
return new ReadOnlyTimeSeriesStoreCache(doubleTimeSeries);
}
public Set<String> findTimeSeriesNamesToLoad() {
return findTimeSeriesNamesToLoad(findUsedTimeSeriesNames());
}
public Iterable<String> findUsedTimeSeriesNames() {
return Iterables.concat(config.mappedTimeSeriesNames,
config.timeSeriesToEquipmentMap.keySet(),
config.timeSeriesToPlannedOutagesMapping.keySet(),
config.distributionKeys.values().stream()
.filter(distributionKey -> distributionKey instanceof TimeSeriesDistributionKey)
.map(distributionKey -> ((TimeSeriesDistributionKey) distributionKey).getTimeSeriesName())
.collect(Collectors.toSet()));
}
public Set<String> findTimeSeriesNamesToLoad(Iterable<String> usedTimeSeriesNames) {
Set<String> timeSeriesNamesToLoad = new HashSet<>();
// load data of each mapped time series and for each of the equipment time series
for (String timeSeriesName : usedTimeSeriesNames) {
NodeCalc nodeCalc = config.timeSeriesNodes.get(timeSeriesName);
if (nodeCalc != null) {
// it is a calculated time series
// find stored time series used in this calculated time series
timeSeriesNamesToLoad.addAll(TimeSeriesNames.list(nodeCalc));
} else {
// it is a stored time series
timeSeriesNamesToLoad.add(timeSeriesName);
}
}
return timeSeriesNamesToLoad;
}
public TimeSeriesIndex checkIndexUnicity() {
return checkIndexUnicity(store, findTimeSeriesNamesToLoad());
}
public static TimeSeriesIndex checkIndexUnicity(ReadOnlyTimeSeriesStore store, Set<String> timeSeriesNamesToLoad) {
Set<TimeSeriesIndex> indexes = timeSeriesNamesToLoad.isEmpty() ? Collections.emptySet()
: store.getTimeSeriesMetadata(timeSeriesNamesToLoad)
.stream()
.map(TimeSeriesMetadata::getIndex)
.filter(index -> !(index instanceof InfiniteTimeSeriesIndex))
.collect(Collectors.toSet());
if (indexes.isEmpty()) {
return InfiniteTimeSeriesIndex.INSTANCE;
} else if (indexes.size() > 1) {
throw new TimeSeriesMappingException("Time series involved in the mapping must have the same index: "
+ indexes);
}
return indexes.iterator().next();
}
public void checkValues(Set<Integer> versions) {
checkValues(store, versions, findTimeSeriesNamesToLoad());
}
public static void checkValues(ReadOnlyTimeSeriesStore store, Set<Integer> versions, Set<String> timeSeriesNamesToLoad) {
timeSeriesNamesToLoad.forEach(timeSeriesName -> {
Set<Integer> existingVersions = store.getTimeSeriesDataVersions(timeSeriesName);
if (!isNotVersioned(existingVersions) && !existingVersions.isEmpty() && !existingVersions.containsAll(versions)) {
Set<Integer> undefinedVersions = new HashSet<>(versions);
undefinedVersions.removeAll(existingVersions);
throw new TimeSeriesMappingException("The time series store does not contain values for ts " + timeSeriesName + " and version(s) " + undefinedVersions);
}
});
}
}