CgmesModelTripleStore.java
/**
* Copyright (c) 2017-2018, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
* SPDX-License-Identifier: MPL-2.0
*/
package com.powsybl.cgmes.model.triplestore;
import com.powsybl.cgmes.model.*;
import com.powsybl.commons.datasource.DataSource;
import com.powsybl.commons.report.ReportNode;
import com.powsybl.triplestore.api.*;
import org.apache.commons.lang3.EnumUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.PrintStream;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.powsybl.cgmes.model.CgmesNamespace.CGMES_EQ_3_OR_GREATER_PREFIX;
import static com.powsybl.cgmes.model.CgmesNamespace.CIM_100_EQ_PROFILE;
/**
* @author Luma Zamarre��o {@literal <zamarrenolm at aia.es>}
*/
public class CgmesModelTripleStore extends AbstractCgmesModel {
public CgmesModelTripleStore(String cimNamespace, TripleStore tripleStore) {
super();
this.cimNamespace = cimNamespace;
this.cimVersion = cimVersionFromCimNamespace(cimNamespace);
this.tripleStore = tripleStore;
tripleStore.defineQueryPrefix("cim", cimNamespace);
tripleStore.defineQueryPrefix("entsoe", CgmesNamespace.ENTSOE_NAMESPACE);
tripleStore.defineQueryPrefix("eu", CgmesNamespace.EU_NAMESPACE);
queryCatalog = queryCatalogFor(cimVersion);
Objects.requireNonNull(queryCatalog);
}
@Override
public void read(InputStream is, String baseName, String contextName, ReportNode reportNode) {
// Reset cached nodeBreaker value everytime we read new data
nodeBreaker = null;
tripleStore.read(is, baseName, contextName);
}
@Override
public void print(PrintStream out) {
tripleStore.print(out);
}
@Override
public void print(Consumer<String> liner) {
tripleStore.print(liner);
}
@Override
public void write(DataSource ds) {
try {
tripleStore.write(ds);
} catch (TripleStoreException x) {
throw new CgmesModelException(String.format("Writing. Triple store problem %s", ds), x);
}
}
@Override
public void write(DataSource ds, CgmesSubset subset) {
try {
tripleStore.write(ds, contextNameFor(subset));
} catch (TripleStoreException e) {
throw new CgmesModelException(String.format("Writing. Triple store problem %s", ds), e);
}
}
// Queries
private static boolean isEquipmentCore(String profile) {
return profile.contains("/EquipmentCore/") || profile.contains("/CIM/CoreEquipment");
}
private static boolean isEquipmentOperation(String profile) {
return profile.contains("/EquipmentOperation/") || profile.contains("/CIM/Operation");
}
@Override
public boolean hasEquipmentCore() {
PropertyBags r = namedQuery(MODEL_PROFILES);
if (r == null) {
return false;
}
for (PropertyBag m : r) {
String p = m.get(PROFILE);
if (p != null && isEquipmentCore(p)) {
if (LOG.isInfoEnabled()) {
LOG.info("Model contains Equipment Core data profile in model {}",
m.get(CgmesNames.FULL_MODEL));
}
return true;
}
}
return false;
}
@Override
public boolean hasBoundary() {
// The Model has boundary if we are able to find models
// that have EquipmentBoundary profile
// and models that have TopologyBoundary profile
boolean hasEquipmentBoundary = false;
boolean hasTopologyBoundary = false;
PropertyBags r = namedQuery(MODEL_PROFILES);
if (r == null) {
return false;
}
for (PropertyBag m : r) {
String p = m.get(PROFILE);
String mid = m.get(CgmesNames.FULL_MODEL);
if (p != null && p.contains("/EquipmentBoundary/")) {
LOG.info("Model contains EquipmentBoundary data in model {}", mid);
hasEquipmentBoundary = true;
}
if (p != null && p.contains("/TopologyBoundary/")) {
LOG.info("Model contains TopologyBoundary data in model {}", mid);
hasTopologyBoundary = true;
}
}
// If we do not have a query for model profiles we assume no boundary exist
return hasEquipmentBoundary && hasTopologyBoundary;
}
@Override
public boolean isNodeBreaker() {
if (nodeBreaker == null) {
nodeBreaker = computeIsNodeBreaker();
}
return nodeBreaker;
}
private boolean computeIsNodeBreaker() {
// Optimization hint: consider caching the results of the query for model
// profiles
PropertyBags r = namedQuery(MODEL_PROFILES);
if (r == null) {
return false;
}
if (allEqCgmes3OrGreater(r) && !connectivityNodes().isEmpty()) {
return true;
}
// Only consider is node breaker if all models that have profile
// EquipmentCore or EquipmentBoundary
// also have EquipmentOperation or EquipmentBoundaryOperation
Map<String, Boolean> modelHasOperationProfile = computeModelHasOperationProfile(r);
boolean consideredNodeBreaker = modelHasOperationProfile.values().stream().allMatch(Boolean::valueOf);
if (LOG.isInfoEnabled()) {
logNodeBreaker(consideredNodeBreaker, modelHasOperationProfile);
}
return consideredNodeBreaker;
}
private boolean allEqCgmes3OrGreater(PropertyBags modelProfiles) {
for (PropertyBag mp : modelProfiles) {
String p = mp.get(PROFILE);
if (p != null && isEquipmentCore(p) && !isEqCgmes3OrGreater(p)) {
return false;
}
}
return true;
}
private static boolean isEqCgmes3OrGreater(String profile) {
return profile.startsWith(CGMES_EQ_3_OR_GREATER_PREFIX) && profile.compareTo(CIM_100_EQ_PROFILE) >= 0;
}
private void logNodeBreaker(boolean consideredNodeBreaker, Map<String, Boolean> modelHasOperationProfile) {
if (consideredNodeBreaker) {
LOG.info(
"All FullModel objects have EquipmentOperation profile, so conversion will be considered node-breaker");
} else {
LOG.info(
"Following FullModel objects do not have EquipmentOperation profile, so conversion will not be considered node-breaker:");
modelHasOperationProfile.entrySet().forEach(meqop -> {
if (!meqop.getValue()) {
LOG.info(" {}", meqop.getKey());
}
});
}
}
private Map<String, Boolean> computeModelHasOperationProfile(PropertyBags modelProfiles) {
// A bus/branch model with a single instance file where its node/breaker boundary has been assembled
// Must not be considered as node-breaker
Map<String, Boolean> modelHasOperationProfile = new HashMap<>();
Map<String, Boolean> modelHasBoundaryOperationProfile = new HashMap<>();
for (PropertyBag mp : modelProfiles) {
String m = mp.get("FullModel");
String p = mp.get(PROFILE);
if (p != null) {
updateModelHasOperationProfile(modelHasOperationProfile, modelHasBoundaryOperationProfile, m, p);
}
}
modelHasBoundaryOperationProfile.forEach((m, v) -> modelHasOperationProfile.merge(m, v, (vm, vbd) -> vm && vbd));
return modelHasOperationProfile;
}
private void updateModelHasOperationProfile(Map<String, Boolean> modelHasOperationProfile, Map<String, Boolean> modelHasBoundaryOperationProfile, String model, String profile) {
if (isEquipmentCore(profile)) {
// Set to false only if we do not have a value already
modelHasOperationProfile.putIfAbsent(model, false);
}
if (isEquipmentOperation(profile)) {
modelHasOperationProfile.put(model, true);
LOG.info("Model {} is considered node-breaker", model);
}
if (profile.contains("/EquipmentBoundary/")) {
// Set to false only if we do not have a value already
modelHasBoundaryOperationProfile.putIfAbsent(model, false);
}
if (profile.contains("/EquipmentBoundaryOperation/")) {
modelHasBoundaryOperationProfile.put(model, true);
LOG.info("Model {} boundary is considered node-breaker", model);
}
}
/**
* Query the model description (the metadata information) for all profiles (EQ, TP, ...).
* @return Property bags (one bag per profile) with all the model description found.
*/
@Override
public PropertyBags fullModels() {
return namedQuery("fullModels");
}
@Override
public String modelId() {
String modelId = "unknown";
if (queryCatalog.containsKey("modelIds")) {
PropertyBags r = namedQuery("modelIds");
if (r != null && !r.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Candidates to model identifier:{}{}", System.lineSeparator(), r.tabulateLocals());
}
String v = r.get(0).get("FullModel");
if (v != null) {
modelId = v;
}
}
}
return modelId;
}
@Override
public ZonedDateTime scenarioTime() {
ZonedDateTime defaultScenarioTime = ZonedDateTime.now();
return queryDate("scenarioTime", defaultScenarioTime);
}
@Override
public ZonedDateTime created() {
ZonedDateTime defaultCreated = ZonedDateTime.now();
return queryDate("created", defaultCreated);
}
private ZonedDateTime queryDate(String propertyName, ZonedDateTime defaultValue) {
ZonedDateTime d = defaultValue;
if (queryCatalog.containsKey("modelDates")) {
PropertyBags r = namedQuery("modelDates");
if (r != null && !r.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Candidates to modelDates:{}{}", System.lineSeparator(), r.tabulateLocals());
}
String s = r.get(0).get(propertyName);
if (s != null && !s.isEmpty()) {
// Assume date time given as UTC if no explicit zone is specified
try {
d = parseDateTime(s);
} catch (DateTimeParseException e) {
LOG.error("Invalid date: {}. The date has been fixed to {}.", s, defaultValue);
return defaultValue;
}
}
}
}
return d;
}
/**
* Parse a date in ISO format. If the offset is not present at the end (ie. no "Z" nor "+xx:xx" or "+xxxx"), it is
* assumed that the date is given as UTC.
* @param dateAsString Date in ISO format
* @return the date as ZonedDateTime
*/
private ZonedDateTime parseDateTime(String dateAsString) {
// Definition of the parser according to the expected date format
DateTimeFormatter dateTimeFormatterLocalised = new DateTimeFormatterBuilder()
// Fixed mandatory pattern
.appendPattern("yyyy-MM-dd'T'HH:mm:ss")
// Between 0 and 9 decimals (9 is the maximum)
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
// Potentially a suffix for localisation (VV: zoneId, x: +HHmm, xx: +HHMM, xxx: +HH:MM)
.appendPattern("[VV][x][xx][xxx]")
.toFormatter();
// Parsing
TemporalAccessor dateParsed = dateTimeFormatterLocalised.parseBest(dateAsString, ZonedDateTime::from, LocalDateTime::from);
if (dateParsed instanceof ZonedDateTime zonedDateTime) {
return zonedDateTime;
} else {
return ZonedDateTime.of((LocalDateTime) dateParsed, ZoneOffset.UTC);
}
}
@Override
public String version() {
String version = "unknown";
PropertyBags r = namedQuery("version");
if (r != null && !r.isEmpty()) {
String v = r.get(0).get("version");
if (v != null) {
version = v;
}
}
return version;
}
@Override
public PropertyBags numObjectsByType() {
Objects.requireNonNull(cimNamespace);
return namedQuery("numObjectsByType", cimNamespace);
}
@Override
public PropertyBags allObjectsOfType(String type) {
Objects.requireNonNull(type);
return namedQuery("allObjectsOfType", type);
}
@Override
public PropertyBags boundaryNodes() {
return namedQuery("boundaryNodes");
}
@Override
public PropertyBags baseVoltages() {
return namedQuery("baseVoltages");
}
@Override
public PropertyBags countrySourcingActors(String countryName) {
return namedQuery("countrySourcingActors", countryName);
}
@Override
public PropertyBags sourcingActor(String sourcingActor) {
return namedQuery("sourcingActor", sourcingActor);
}
@Override
public PropertyBags substations() {
return namedQuery("substations");
}
@Override
public PropertyBags voltageLevels() {
return namedQuery("voltageLevels");
}
@Override
public PropertyBags terminals() {
return namedQuery("terminals");
}
@Override
public PropertyBags connectivityNodes() {
if (cachedNodes) {
return cachedConnectivityNodes;
}
return namedQuery("connectivityNodes");
}
@Override
public PropertyBags topologicalNodes() {
if (cachedNodes) {
return cachedTopologicalNodes;
}
return namedQuery("topologicalNodes");
}
@Override
public PropertyBags connectivityNodeContainers() {
return namedQuery("connectivityNodeContainers");
}
@Override
public PropertyBags operationalLimits() {
return namedQuery("operationalLimits");
}
@Override
public PropertyBags busBarSections() {
return namedQuery("busbarSections");
}
@Override
public PropertyBags switches() {
return namedQuery("switches");
}
@Override
public PropertyBags acLineSegments() {
return namedQuery("acLineSegments");
}
@Override
public PropertyBags equivalentBranches() {
return namedQuery("equivalentBranches");
}
@Override
public PropertyBags seriesCompensators() {
return namedQuery("seriesCompensators");
}
@Override
public PropertyBags transformers() {
return namedQuery("transformers");
}
@Override
public PropertyBags transformerEnds() {
return namedQuery("transformerEnds");
}
@Override
public PropertyBags ratioTapChangers() {
return namedQuery("ratioTapChangers");
}
@Override
public PropertyBags ratioTapChangerTablePoints() {
return namedQuery("ratioTapChangerTablePoints");
}
@Override
public PropertyBags phaseTapChangers() {
return namedQuery("phaseTapChangers");
}
@Override
public PropertyBags phaseTapChangerTablePoints() {
return namedQuery("phaseTapChangerTablePoints");
}
@Override
public PropertyBags regulatingControls() {
return namedQuery("regulatingControls");
}
@Override
public PropertyBags energyConsumers() {
return namedQuery("energyConsumers");
}
@Override
public PropertyBags energySources() {
return namedQuery("energySources");
}
@Override
public PropertyBags shuntCompensators() {
return namedQuery("shuntCompensators");
}
@Override
public PropertyBags equivalentShunts() {
return namedQuery("equivalentShunts");
}
@Override
public PropertyBags nonlinearShuntCompensatorPoints() {
return namedQuery("nonlinearShuntCompensatorPoints");
}
@Override
public PropertyBags staticVarCompensators() {
return namedQuery("staticVarCompensators");
}
@Override
public PropertyBags synchronousMachinesGenerators() {
return namedQuery("synchronousMachinesGenerators");
}
@Override
public PropertyBags synchronousMachinesCondensers() {
return namedQuery("synchronousMachinesCondensers");
}
@Override
public PropertyBags equivalentInjections() {
return namedQuery("equivalentInjections");
}
@Override
public PropertyBags externalNetworkInjections() {
return namedQuery("externalNetworkInjections");
}
@Override
public PropertyBags svInjections() {
return namedQuery("svInjections");
}
@Override
public PropertyBags asynchronousMachines() {
return namedQuery("asynchronousMachines");
}
@Override
public PropertyBags reactiveCapabilityCurveData() {
return namedQuery("reactiveCapabilityCurveData");
}
@Override
public PropertyBags controlAreas() {
return namedQuery("controlAreas");
}
@Override
public PropertyBags acDcConverters() {
return namedQuery("acDcConverters");
}
@Override
public PropertyBags dcLineSegments() {
return namedQuery("dcLineSegments");
}
@Override
public PropertyBags dcTerminals() {
return namedQuery("dcTerminals");
}
@Override
public PropertyBags tieFlows() {
return namedQuery("tieFlows");
}
@Override
public PropertyBags topologicalIslands() {
return namedQuery("topologicalIslands");
}
@Override
public PropertyBags graph() {
return namedQuery("graph");
}
@Override
public PropertyBags grounds() {
return namedQuery("grounds");
}
@Override
public PropertyBags modelProfiles() {
return namedQuery(MODEL_PROFILES);
}
public PropertyBags namedQuery(String name, String... params) {
String queryText = queryCatalog.get(name);
if (queryText == null) {
LOG.warn("Query [{}] not found in catalog", name);
return new PropertyBags();
}
// Optimization hint: Now we do the parameter injection by ourselves,
// to maintain independence of the triple store engine,
// instead of using native query parameters
queryText = injectParams(queryText, params);
final long t0 = System.currentTimeMillis();
PropertyBags r = query(queryText);
final long t1 = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("results query {}{}{}", name, System.lineSeparator(), r.tabulateLocals());
LOG.debug("dt query {} {} ms, result set size = {}", name, t1 - t0, r.size());
}
return r;
}
public void namedQueryUpdate(String name, String... params) {
String queryText = queryCatalog.get(name);
if (queryText == null) {
LOG.warn("Query [{}] not found in catalog", name);
}
queryText = injectParams(queryText, params);
update(queryText);
}
public String getCimNamespace() {
return cimNamespace;
}
public int getCimVersion() {
return cimVersion;
}
public PropertyBags query(String queryText) {
return tripleStore.query(queryText);
}
public void update(String queryText) {
tripleStore.update(queryText);
}
@Override
public TripleStore tripleStore() {
return tripleStore;
}
// Updates
public void update(
String queryName,
String context,
String baseName,
String subject,
String predicate,
String value,
boolean valueIsUri) {
Objects.requireNonNull(cimNamespace);
String baseUri = getBaseUri(baseName);
String value1 = valueIsUri ? baseUri.concat(value) : value;
if (value.contains("cim:")) {
value1 = cimNamespace.concat(value.substring(4));
}
namedQueryUpdate(
queryName,
context,
baseUri.concat(subject),
predicate,
value1,
String.valueOf(valueIsUri));
}
private String getBaseUri(String baseName) {
if (tripleStore.getImplementationName().equals("rdf4j")) {
return baseName.concat("/#");
} else {
return baseName.concat("#");
}
}
@Override
public void clear(CgmesSubset subset) {
// TODO Remove all contexts that are related to the profile of the subset
// For example for state variables:
// <md:Model.profile>http://entsoe.eu/CIM/StateVariables/4/1</md:Model.profile>
Set<String> contextNames = tripleStore.contextNames();
for (String contextName : contextNames) {
if (subset.isValidName(contextName)) {
tripleStore.clear(contextName);
}
}
}
@Override
public void add(CgmesSubset subset, String type, PropertyBags objects) {
String contextName = contextNameFor(subset);
try {
tripleStore.add(contextName, cimNamespace, type, objects);
} catch (TripleStoreException x) {
String msg = String.format("Adding objects of type %s to subset %s, context %s", type, subset, contextName);
throw new CgmesModelException(msg, x);
}
}
@Override
public void add(String context, String type, PropertyBags objects) {
String contextName = EnumUtils.isValidEnum(CgmesSubset.class, context)
? contextNameFor(CgmesSubset.valueOf(context))
: context;
try {
if (type.equals(CgmesNames.FULL_MODEL)) {
tripleStore.add(contextName, mdNamespace(), type, objects);
} else {
tripleStore.add(contextName, cimNamespace, type, objects);
}
} catch (TripleStoreException x) {
String msg = String.format("Adding objects of type %s to context %s", type, context);
throw new CgmesModelException(msg, x);
}
}
private String mdNamespace() {
// Return the first namespace for the prefix md
// If no namespace is found, return default
PrefixNamespace def = new PrefixNamespace("md", CgmesNamespace.MD_NAMESPACE);
return tripleStore.getNamespaces().stream().filter(ns -> ns.getPrefix().equals("md"))
.findFirst().orElse(def).getNamespace();
}
private static final Pattern CIM_NAMESPACE_VERSION_PATTERN_UNTIL_16 = Pattern.compile("^.*CIM-schema-cim(\\d+)#$");
private static final Pattern CIM_NAMESPACE_VERSION_PATTERN_FROM_100 = Pattern.compile("^.*/CIM(\\d+)#$");
private static int cimVersionFromCimNamespace(String cimNamespace) {
Matcher m = CIM_NAMESPACE_VERSION_PATTERN_UNTIL_16.matcher(cimNamespace);
if (m.matches()) {
return Integer.valueOf(m.group(1));
} else {
m = CIM_NAMESPACE_VERSION_PATTERN_FROM_100.matcher(cimNamespace);
if (m.matches()) {
return Integer.valueOf(m.group(1));
}
}
return -1;
}
private String contextNameFor(CgmesSubset subset) {
for (String context : tripleStore.contextNames()) {
if (subset.isValidName(context)) {
return context;
}
}
return modelId() + "_" + subset + ".xml";
}
private QueryCatalog queryCatalogFor(int cimVersion) {
QueryCatalog qc = null;
String resourceName = null;
if (cimVersion > 0) {
resourceName = String.format("CIM%d.sparql", cimVersion);
}
if (resourceName != null) {
qc = new QueryCatalog(resourceName);
}
return qc;
}
private String injectParams(String queryText, String... params) {
String injected = queryText;
// Avoid computing parameter reference for first parameters
int k = 0;
for (; k < Math.min(PARAMETER_REFERENCE.length, params.length); k++) {
injected = injected.replace(PARAMETER_REFERENCE[k], params[k]);
}
for (; k < params.length; k++) {
String paramRef = "{" + k + "}";
injected = injected.replace(paramRef, params[k]);
}
return injected;
}
private final String cimNamespace;
private final int cimVersion;
private final TripleStore tripleStore;
private final QueryCatalog queryCatalog;
private Boolean nodeBreaker = null;
private static final String MODEL_PROFILES = "modelProfiles";
private static final String PROFILE = "profile";
private static final Logger LOG = LoggerFactory.getLogger(CgmesModelTripleStore.class);
private static final String[] PARAMETER_REFERENCE = {"{0}", "{1}", "{2}", "{3}", "{4}", "{5}", "{6}", "{7}", "{8}", "{9}"};
}