NotifyingRepositoryConnectionWrapper.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.repository.event.base;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.Update;
import org.eclipse.rdf4j.query.UpdateExecutionException;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.base.RepositoryConnectionWrapper;
import org.eclipse.rdf4j.repository.event.NotifyingRepositoryConnection;
import org.eclipse.rdf4j.repository.event.RepositoryConnectionListener;

/**
 * This broadcaster is used by the RepositoryBroadcaster to wrap the delegate repository connection. Listeners are
 * notified of changes after they have occurred.
 *
 * @author James Leigh
 * @author Herko ter Horst
 */
public class NotifyingRepositoryConnectionWrapper extends RepositoryConnectionWrapper
		implements NotifyingRepositoryConnection {

	/*-----------*
	 * Variables *
	 *-----------*/

	private boolean activated = false;

	private boolean reportDeltas = false;

	private final Set<RepositoryConnectionListener> listeners = new CopyOnWriteArraySet<>();

	/*--------------*
	 * Constructors *
	 *--------------*/

	public NotifyingRepositoryConnectionWrapper(Repository repository, RepositoryConnection connection) {
		super(repository, connection);
	}

	public NotifyingRepositoryConnectionWrapper(Repository repository, RepositoryConnection connection,
			boolean reportDeltas) {
		this(repository, connection);
		setReportDeltas(reportDeltas);
	}

	/*---------*
	 * Methods *
	 *---------*/

	public boolean reportDeltas() {
		return reportDeltas;
	}

	public void setReportDeltas(boolean reportDeltas) {
		this.reportDeltas = reportDeltas;
	}

	/**
	 * Registers a <var>RepositoryConnectionListener</var> that will receive notifications of operations that are
	 * performed on this connection.
	 */
	@Override
	public void addRepositoryConnectionListener(RepositoryConnectionListener listener) {
		listeners.add(listener);
		activated = true;
	}

	/**
	 * Removes a registered <var>RepositoryConnectionListener</var> from this connection.
	 */
	@Override
	public void removeRepositoryConnectionListener(RepositoryConnectionListener listener) {
		listeners.remove(listener);
		activated = !listeners.isEmpty();
	}

	@Override
	protected boolean isDelegatingAdd() {
		return !activated;
	}

	@Override
	protected boolean isDelegatingRemove() {
		return !activated;
	}

	@Override
	public void addWithoutCommit(Resource subject, IRI predicate, Value object, Resource... contexts)
			throws RepositoryException {
		boolean reportEvent = activated;

		if (reportEvent && reportDeltas()) {
			// Only report if the statement is not present yet
			reportEvent = !getDelegate().hasStatement(subject, predicate, object, false, contexts);
		}

		getDelegate().add(subject, predicate, object, contexts);

		if (reportEvent) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.add(getDelegate(), subject, predicate, object, contexts);
			}
		}
	}

	@Override
	public void clear(Resource... contexts) throws RepositoryException {
		if (activated && reportDeltas()) {
			removeWithoutCommit(null, null, null, contexts);
		} else if (activated) {
			getDelegate().clear(contexts);
			for (RepositoryConnectionListener listener : listeners) {
				listener.clear(getDelegate(), contexts);
			}
		} else {
			getDelegate().clear(contexts);
		}
	}

	@Override
	public void close() throws RepositoryException {
		getDelegate().close();

		if (activated) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.close(getDelegate());
			}
		}
	}

	@Override
	public void commit() throws RepositoryException {
		getDelegate().commit();

		if (activated) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.commit(getDelegate());
			}
		}
	}

	@Override
	public void removeWithoutCommit(Resource subj, IRI pred, Value obj, Resource... ctx) throws RepositoryException {
		if (activated && reportDeltas()) {

			List<Statement> list;
			try (Stream<Statement> stream = getDelegate().getStatements(subj, pred, obj, false, ctx).stream()) {
				list = stream.collect(Collectors.toList());
			}

			getDelegate().remove(subj, pred, obj, ctx);
			for (RepositoryConnectionListener listener : listeners) {
				for (Statement stmt : list) {
					Resource s = stmt.getSubject();
					IRI p = stmt.getPredicate();
					Value o = stmt.getObject();
					Resource c = stmt.getContext();
					listener.remove(getDelegate(), s, p, o, c);
				}
			}
		} else if (activated) {
			getDelegate().remove(subj, pred, obj, ctx);
			for (RepositoryConnectionListener listener : listeners) {
				listener.remove(getDelegate(), subj, pred, obj, ctx);
			}
		} else {
			getDelegate().remove(subj, pred, obj, ctx);
		}
	}

	@Override
	public void removeNamespace(String prefix) throws RepositoryException {
		getDelegate().removeNamespace(prefix);

		if (activated) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.removeNamespace(getDelegate(), prefix);
			}
		}
	}

	@Override
	public void clearNamespaces() throws RepositoryException {
		if (activated && reportDeltas()) {

			List<String> prefix;
			try (Stream<Namespace> stream = getDelegate().getNamespaces().stream()) {
				prefix = stream.map(Namespace::getPrefix).collect(Collectors.toList());
			}

			getDelegate().clearNamespaces();
			for (String p : prefix) {
				removeNamespace(p);
			}
		} else if (activated) {
			getDelegate().clearNamespaces();
			for (RepositoryConnectionListener listener : listeners) {
				listener.clearNamespaces(getDelegate());
			}
		} else {
			getDelegate().clearNamespaces();
		}
	}

	@Override
	public void begin() throws RepositoryException {
		getDelegate().begin();

		if (activated) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.begin(getDelegate());
			}
		}
	}

	@Override
	public void rollback() throws RepositoryException {
		getDelegate().rollback();

		if (activated) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.rollback(getDelegate());
			}
		}
	}

	@Override
	@Deprecated
	public void setAutoCommit(boolean autoCommit) throws RepositoryException {
		boolean wasAutoCommit = !isActive();
		getDelegate().setAutoCommit(autoCommit);

		if (activated && wasAutoCommit != autoCommit) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.setAutoCommit(getDelegate(), autoCommit);
			}
			if (autoCommit) {
				for (RepositoryConnectionListener listener : listeners) {
					listener.commit(getDelegate());
				}
			}
		}
	}

	@Override
	public void setNamespace(String prefix, String name) throws RepositoryException {
		getDelegate().setNamespace(prefix, name);

		if (activated) {
			for (RepositoryConnectionListener listener : listeners) {
				listener.setNamespace(getDelegate(), prefix, name);
			}
		}
	}

	@Override
	public Update prepareUpdate(final QueryLanguage ql, final String update, final String baseURI)
			throws MalformedQueryException, RepositoryException {
		if (activated) {
			return new Update() {

				private final RepositoryConnection conn = getDelegate();

				private final Update delegate = conn.prepareUpdate(ql, update, baseURI);

				@Override
				public void execute() throws UpdateExecutionException {
					delegate.execute();
					if (activated) {
						for (RepositoryConnectionListener listener : listeners) {
							listener.execute(conn, ql, update, baseURI, delegate);
						}
					}
				}

				@Override
				public void setBinding(String name, Value value) {
					delegate.setBinding(name, value);
				}

				@Override
				public void removeBinding(String name) {
					delegate.removeBinding(name);
				}

				@Override
				public void clearBindings() {
					delegate.clearBindings();
				}

				@Override
				public BindingSet getBindings() {
					return delegate.getBindings();
				}

				@Override
				public void setDataset(Dataset dataset) {
					delegate.setDataset(dataset);
				}

				@Override
				public Dataset getDataset() {
					return delegate.getDataset();
				}

				@Override
				public void setIncludeInferred(boolean includeInferred) {
					delegate.setIncludeInferred(includeInferred);
				}

				@Override
				public boolean getIncludeInferred() {
					return delegate.getIncludeInferred();
				}

				@Override
				public void setMaxExecutionTime(int maxExecutionTimeSeconds) {
					delegate.setMaxExecutionTime(maxExecutionTimeSeconds);
				}

				@Override
				public int getMaxExecutionTime() {
					return delegate.getMaxExecutionTime();
				}
			};
		} else {
			return getDelegate().prepareUpdate(ql, update, baseURI);
		}
	}
}