GroupIterator.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.query.algebra.evaluation.iterator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.collection.factory.api.BindingSetEntry;
import org.eclipse.rdf4j.collection.factory.api.BindingSetKey;
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.base.CoreDatatype;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MutableBindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.AggregateFunctionCall;
import org.eclipse.rdf4j.query.algebra.AggregateOperator;
import org.eclipse.rdf4j.query.algebra.Avg;
import org.eclipse.rdf4j.query.algebra.Count;
import org.eclipse.rdf4j.query.algebra.Group;
import org.eclipse.rdf4j.query.algebra.GroupConcat;
import org.eclipse.rdf4j.query.algebra.GroupElem;
import org.eclipse.rdf4j.query.algebra.MathExpr.MathOp;
import org.eclipse.rdf4j.query.algebra.Max;
import org.eclipse.rdf4j.query.algebra.Min;
import org.eclipse.rdf4j.query.algebra.Sample;
import org.eclipse.rdf4j.query.algebra.Sum;
import org.eclipse.rdf4j.query.algebra.UnaryValueOperator;
import org.eclipse.rdf4j.query.algebra.ValueExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryValueEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.util.MathUtil;
import org.eclipse.rdf4j.query.algebra.evaluation.util.ValueComparator;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.query.parser.sparql.aggregate.AggregateCollector;
import org.eclipse.rdf4j.query.parser.sparql.aggregate.AggregateFunction;
import org.eclipse.rdf4j.query.parser.sparql.aggregate.CustomAggregateFunctionRegistry;

/**
 * @author David Huynh
 * @author Arjohn Kampman
 * @author Jeen Broekstra
 * @author James Leigh
 * @author Jerven Bolleman
 * @author Tomas Kovachev
 */
public class GroupIterator extends AbstractCloseableIteratorIteration<BindingSet> {

	/*-----------*
	 * Constants *
	 *-----------*/

	private final EvaluationStrategy strategy;

	private final BindingSet parentBindings;

	private final Group group;

	private final QueryEvaluationContext context;

	private final QueryEvaluationStep arguments;

	// The iteration of the arguments, stored while building entries for allowing premature closing
	private volatile CloseableIteration<BindingSet> argumentsIter;

	private final ValueFactory vf;

	private final CollectionFactory cf;

	/*--------------*
	 * Constructors *
	 *--------------*/

	public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parentBindings,
			QueryEvaluationContext context) throws QueryEvaluationException {
		this(strategy, group, parentBindings, 0, context);
	}

	@Deprecated
	public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parentBindings,
			long iterationCacheSyncThreshold, QueryEvaluationContext context) throws QueryEvaluationException {
		this(strategy, group, parentBindings, iterationCacheSyncThreshold, context, SimpleValueFactory.getInstance(),
				new DefaultCollectionFactory());
	}

	public GroupIterator(EvaluationStrategy strategy, Group group, BindingSet parentBindings,
			long iterationCacheSyncThreshold, QueryEvaluationContext context, ValueFactory vf, CollectionFactory cf)
			throws QueryEvaluationException {
		this.strategy = strategy;
		this.group = group;
		this.parentBindings = parentBindings;
//		this is ignored as it is just a left over from earlier, this is now stored in the collection factory.
//		this.iterationCacheSyncThreshold = iterationCacheSyncThreshold;
		this.context = context;
		this.vf = vf;
		this.cf = cf;
		this.arguments = strategy.precompile(group.getArg(), context);
	}

	/*---------*
	 * Methods *
	 *---------*/

	@Override
	public void handleClose() throws QueryEvaluationException {
		try {
			cf.close();
		} finally {
			var iter = argumentsIter;
			if (iter != null)
				iter.close();
		}
	}

	@Override
	protected Iterator<BindingSet> getIterator() throws QueryEvaluationException {
		List<AggregatePredicateCollectorSupplier<?, ?>> aggregates = makeAggregates();

		Supplier<MutableBindingSet> makeNewBindingSet;
		if (parentBindings.isEmpty()) {
			makeNewBindingSet = context::createBindingSet;
		} else {
			makeNewBindingSet = () -> context.createBindingSet(parentBindings);
		}

		List<Function<BindingSet, Value>> getValues = new ArrayList<>();
		List<BiConsumer<Value, MutableBindingSet>> setBindings = new ArrayList<>();
		for (String name : group.getGroupBindingNames()) {
			Function<BindingSet, Value> getValue = context.getValue(name);
			BiConsumer<Value, MutableBindingSet> setBinding = context.setBinding(name);
			if (getValue != null) {
				getValues.add(getValue);
				setBindings.add(setBinding);
			}
		}

		BiConsumer<Entry, MutableBindingSet> bindSolution = makeBindSolution(aggregates);
		Collection<Entry> entries = buildEntries(aggregates);
		Set<BindingSet> bindingSets = cf.createSetOfBindingSets(context::createBindingSet, context::hasBinding,
				context::getValue, context::setBinding);
		BiConsumer<BindingSet, MutableBindingSet> setValues = makeSetValues(getValues, setBindings);
		for (Entry entry : entries) {
			MutableBindingSet sol = makeNewBindingSet.get();

			BindingSet prototype = entry.getPrototype();
			if (prototype != null) {
				setValues.accept(prototype, sol);
			}

			bindSolution.accept(entry, sol);
			bindingSets.add(sol);
		}

		return bindingSets.iterator();
	}

	/**
	 * Build a single method that sets all values without a loop or lookups during evaluation.
	 *
	 * @param getValues   the methods to access values in a bindingset
	 * @param setBindings the methods to set values in a bindingset
	 * @return a BiConsumer that takes the prototype and sets parts into solution as required
	 */
	private BiConsumer<BindingSet, MutableBindingSet> makeSetValues(List<Function<BindingSet, Value>> getValues,
			List<BiConsumer<Value, MutableBindingSet>> setBindings) {
		if (getValues.isEmpty()) {
			return (prototype, solution) -> {
			};
		}
		BiConsumer<BindingSet, MutableBindingSet> consumeAValue = makeSetAValue(getValues, setBindings, 0);
		for (int i = 1; i < getValues.size(); i++) {
			consumeAValue = consumeAValue.andThen(makeSetAValue(getValues, setBindings, i));
		}
		return consumeAValue;
	}

	private BiConsumer<BindingSet, MutableBindingSet> makeSetAValue(List<Function<BindingSet, Value>> getValues,
			List<BiConsumer<Value, MutableBindingSet>> setBindings, int i) {
		Function<BindingSet, Value> getBinding = getValues.get(i);
		BiConsumer<Value, MutableBindingSet> setBinding = setBindings.get(i);
		BiConsumer<BindingSet, MutableBindingSet> nextConsumeAValue = (prototype, solution) -> {
			Value value = getBinding.apply(prototype);
			if (value != null) {
				// Potentially overwrites bindings from super
				setBinding.accept(value, solution);
			}
		};
		return nextConsumeAValue;
	}

	private BiConsumer<Entry, MutableBindingSet> makeBindSolution(
			List<AggregatePredicateCollectorSupplier<?, ?>> aggregates) {
		BiConsumer<Entry, MutableBindingSet> bindSolution = null;
		for (int i = 0; i < aggregates.size(); i++) {
			AggregatePredicateCollectorSupplier<?, ?> a = aggregates.get(i);
			BiConsumer<Value, MutableBindingSet> setBinding = context.setBinding(a.name);
			final int j = i;
			BiConsumer<Entry, MutableBindingSet> biConsumer = (e, bs) -> {
				try {
					Value value = e.collectors.get(j).getFinalValue();
					if (value != null) {
						// Potentially overwrites bindings from super
						setBinding.accept(value, bs);
					}
				} catch (ValueExprEvaluationException ex) {
					// There was a type error when calculating the value of the aggregate. We
					// silently ignore the error,
					// resulting in no result value being bound.
				}
			};
			if (bindSolution == null) {
				bindSolution = biConsumer;
			} else {
				bindSolution = bindSolution.andThen(biConsumer);
			}
		}
		if (bindSolution == null) {
			return (e, bs) -> {
			};
		} else {
			return bindSolution;
		}
	}

	private List<AggregatePredicateCollectorSupplier<?, ?>> makeAggregates() {
		List<AggregatePredicateCollectorSupplier<?, ?>> aggregates = new ArrayList<>(
				group.getGroupBindingNames().size());
		for (GroupElem ge : group.getGroupElements()) {
			AggregatePredicateCollectorSupplier<?, ?> create = create(ge, vf);
			if (create != null) {
				aggregates.add(create);
			}
		}
		return aggregates;
	}

	private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
			throws QueryEvaluationException {
		// store the arguments' iterator so it can be closed while building entries
		this.argumentsIter = arguments.evaluate(parentBindings);
		try (var iter = argumentsIter) {
			if (!iter.hasNext()) {
				return emptySolutionSpecialCase(aggregates);
			}

			List<Function<BindingSet, Value>> getValues = group.getGroupBindingNames()
					.stream()
					.map(n -> context.getValue(n))
					.collect(Collectors.toList());

			// TODO: this is an in memory map with no backing into any disk form.
			// Fixing this requires separating the computation of the aggregates and their
			// distinct sets if needed from the intermediary values.

			Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
			// Make an optimized hash function valid during this query evaluation step.
			ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
			while (!isClosed() && iter.hasNext()) {
				BindingSet sol = iter.next();
				// The binding set key will be constant
				BindingSetKey key = cf.createBindingSetKey(sol, getValues, hashMaker);
				Entry entry = entries.get(key);
				if (entry == null) {
					List<AggregateCollector> collectors = makeCollectors(aggregates);
					List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
					for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
						predicates.add(a.makePotentialDistinctTest.get());
					}

					entry = new Entry(sol, collectors, predicates);
					entries.put(key, entry);
				}

				entry.addSolution(sol, aggregates);
			}
			return entries.values();
		} finally {
			this.argumentsIter = null;
		}
	}

	private List<Entry> emptySolutionSpecialCase(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates) {
		// no solutions, but if we are not explicitly grouping and aggregates are
		// present, we still need to process them to produce a zero-result.
		if (group.getGroupBindingNames().isEmpty()) {
			if (group.getGroupElements().isEmpty()) {
				final Entry entry = new Entry(null, null, null);
				return List.of(entry);
			} else {
				List<AggregateCollector> collectors = makeCollectors(aggregates);
				List<Predicate<?>> predicates = new ArrayList<>(aggregates.size());
				for (int i = 0; i < aggregates.size(); i++) {
					predicates.add(ALWAYS_TRUE_BINDING_SET);
				}
				final Entry entry = new Entry(null, collectors, predicates);
				entry.addSolution(EmptyBindingSet.getInstance(), aggregates);
				return List.of(entry);
			}
		}
		return Collections.emptyList();
	}

	private List<AggregateCollector> makeCollectors(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates) {
		List<AggregateCollector> collectors = new ArrayList<>(aggregates.size());
		for (AggregatePredicateCollectorSupplier<?, ?> a : aggregates) {
			collectors.add(a.makeAggregateCollector.get());
		}

		return collectors;
	}

	private static class Entry implements BindingSetEntry {

		private static final long serialVersionUID = 1L;
		private final BindingSet prototype;
		private final List<AggregateCollector> collectors;
		private final List<Predicate<?>> predicates;

		public Entry(BindingSet prototype, List<AggregateCollector> collectors, List<Predicate<?>> predicates)
				throws QueryEvaluationException {
			this.prototype = prototype;
			this.collectors = collectors;
			this.predicates = predicates;
		}

		public void addSolution(BindingSet bs, List<AggregatePredicateCollectorSupplier<?, ?>> operators) {
			for (int i = 0; i < operators.size(); i++) {
				AggregatePredicateCollectorSupplier<?, ?> aggregatePredicateCollectorSupplier = operators.get(i);
				aggregatePredicateCollectorSupplier.operate(bs, predicates.get(i), collectors.get(i));
			}
		}

		public BindingSet getPrototype() {
			return prototype;
		}
	}

	/**
	 * This is to collect together in operation an aggregate function the name of it. And the suppliers that will give
	 * the unique set and final value collectors per final binding set.
	 * <p>
	 * Making an aggregate function is quite a lot of work and we do not want to repeat that for each final binding.
	 */
	private static class AggregatePredicateCollectorSupplier<T extends AggregateCollector, D> {
		public final String name;
		private final AggregateFunction<T, D> agg;
		private final Supplier<Predicate<D>> makePotentialDistinctTest;
		private final Supplier<T> makeAggregateCollector;

		public AggregatePredicateCollectorSupplier(AggregateFunction<T, D> agg,
				Supplier<Predicate<D>> makePotentialDistinctTest, Supplier<T> makeAggregateCollector, String name) {
			super();
			this.agg = agg;
			this.makePotentialDistinctTest = makePotentialDistinctTest;
			this.makeAggregateCollector = makeAggregateCollector;
			this.name = name;
		}

		private void operate(BindingSet bs, Predicate<?> predicate, Object t) {
			agg.processAggregate(bs, (Predicate<D>) predicate, (T) t);
		}
	}

	private static final Predicate<BindingSet> ALWAYS_TRUE_BINDING_SET = t -> true;
	private static final Predicate<Value> ALWAYS_TRUE_VALUE = t -> true;
	private static final Supplier<Predicate<Value>> ALWAYS_TRUE_VALUE_SUPPLIER = () -> ALWAYS_TRUE_VALUE;

	private AggregatePredicateCollectorSupplier<?, ?> create(GroupElem ge, ValueFactory vf)
			throws QueryEvaluationException {
		AggregateOperator operator = ge.getOperator();

		if (operator instanceof Count) {
			if (((Count) operator).getArg() == null) {
				WildCardCountAggregate wildCardCountAggregate = new WildCardCountAggregate();
				Supplier<Predicate<BindingSet>> potentialDistinctTest = operator.isDistinct() ? DistinctBindingSets::new
						: () -> ALWAYS_TRUE_BINDING_SET;
				return new AggregatePredicateCollectorSupplier<>(wildCardCountAggregate, potentialDistinctTest,
						() -> new CountCollector(vf), ge.getName());
			} else {
				QueryStepEvaluator f = new QueryStepEvaluator(
						strategy.precompile(((Count) operator).getArg(), context));
				CountAggregate agg = new CountAggregate(f);
				Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
						: ALWAYS_TRUE_VALUE_SUPPLIER;
				return new AggregatePredicateCollectorSupplier<>(agg, predicate, () -> new CountCollector(vf),
						ge.getName());
			}
		} else if (operator instanceof Min) {
			MinAggregate agg = new MinAggregate(precompileArg(operator), shouldValueComparisonBeStrict());
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			return new AggregatePredicateCollectorSupplier<>(agg, predicate, ValueCollector::new, ge.getName());
		} else if (operator instanceof Max) {
			MaxAggregate agg = new MaxAggregate(precompileArg(operator), shouldValueComparisonBeStrict());
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			return new AggregatePredicateCollectorSupplier<>(agg, predicate, ValueCollector::new, ge.getName());
		} else if (operator instanceof Sum) {

			SumAggregate agg = new SumAggregate(precompileArg(operator));
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			return new AggregatePredicateCollectorSupplier<>(agg, predicate, () -> new IntegerCollector(vf),
					ge.getName());
		} else if (operator instanceof Avg) {
			AvgAggregate agg = new AvgAggregate(precompileArg(operator));
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			return new AggregatePredicateCollectorSupplier<>(agg, predicate, () -> new AvgCollector(vf), ge.getName());
		} else if (operator instanceof Sample) {
			SampleAggregate agg = new SampleAggregate(precompileArg(operator));
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			return new AggregatePredicateCollectorSupplier<>(agg, predicate, SampleCollector::new, ge.getName());
		} else if (operator instanceof GroupConcat) {
			ValueExpr separatorExpr = ((GroupConcat) operator).getSeparator();
			ConcatAggregate agg;
			if (separatorExpr != null) {
				Value separatorValue = strategy.evaluate(separatorExpr, parentBindings);
				agg = new ConcatAggregate(precompileArg(operator), separatorValue.stringValue());
			} else {
				agg = new ConcatAggregate(precompileArg(operator));
			}
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			return new AggregatePredicateCollectorSupplier<>(agg, predicate, () -> new StringBuilderCollector(vf),
					ge.getName());
		} else if (operator instanceof AggregateFunctionCall) {
			var aggOperator = (AggregateFunctionCall) operator;
			Supplier<Predicate<Value>> predicate = operator.isDistinct() ? DistinctValues::new
					: ALWAYS_TRUE_VALUE_SUPPLIER;
			var factory = CustomAggregateFunctionRegistry.getInstance().get(aggOperator.getIRI());

			var function = factory.orElseThrow(
					() -> new QueryEvaluationException("Unknown aggregate function '" + aggOperator.getIRI() + "'"))
					.buildFunction(new QueryStepEvaluator(strategy.precompile(aggOperator.getArg(), context)));
			return new AggregatePredicateCollectorSupplier<>(function, predicate, () -> factory.get().getCollector(),
					ge.getName());

		}

		return null;
	}

	private QueryStepEvaluator precompileArg(AggregateOperator operator) {
		return new QueryStepEvaluator(strategy.precompile(((UnaryValueOperator) operator).getArg(), context));
	}

	private boolean shouldValueComparisonBeStrict() {
		return strategy.getQueryEvaluationMode() == QueryEvaluationMode.STRICT;
	}

	private static class CountCollector implements AggregateCollector {
		private long value;
		private final ValueFactory vf;

		public CountCollector(ValueFactory vf) {
			super();
			this.vf = vf;
		}

		@Override
		public Value getFinalValue() {
			return SimpleValueFactory.getInstance().createLiteral(Long.toString(value), CoreDatatype.XSD.INTEGER);
		}
	}

	private static class ValueCollector implements AggregateCollector {
		private Value value;

		@Override
		public Value getFinalValue() {
			return value;
		}
	}

	private static class IntegerCollector implements AggregateCollector {
		private ValueExprEvaluationException typeError;

		private Literal value;

		public IntegerCollector(ValueFactory vf) {
			super();
			this.value = vf.createLiteral("0", CoreDatatype.XSD.INTEGER);
		}

		public void setTypeError(ValueExprEvaluationException typeError) {
			this.typeError = typeError;
		}

		public boolean hasError() {
			return typeError != null;
		}

		@Override
		public Value getFinalValue() {
			if (typeError != null) {
				// a type error occurred while processing the aggregate, throw it now.
				throw typeError;
			}
			return value;
		}
	}

	private class AvgCollector implements AggregateCollector {
		private final ValueFactory vf;
		private Literal sum;
		private long count;
		private ValueExprEvaluationException typeError;

		public AvgCollector(ValueFactory vf) {
			super();
			this.vf = vf;
			sum = vf.createLiteral("0", CoreDatatype.XSD.INTEGER);
		}

		public void setTypeError(ValueExprEvaluationException typeError) {
			this.typeError = typeError;
		}

		public boolean hasError() {
			return typeError != null;
		}

		@Override
		public Value getFinalValue() throws ValueExprEvaluationException {
			if (typeError != null) {
				// a type error occurred while processing the aggregate, throw it
				// now.
				throw typeError;
			}

			if (count == 0) {
				return SimpleValueFactory.getInstance().createLiteral("0", CoreDatatype.XSD.INTEGER);
			}

			Literal sizeLit = SimpleValueFactory.getInstance().createLiteral(count);
			return MathUtil.compute(sum, sizeLit, MathOp.DIVIDE);
		}
	}

	private class DistinctValues implements Predicate<Value> {
		private final Set<Value> distinctValues;

		public DistinctValues() {
			distinctValues = cf.createValueSet();
		}

		@Override
		public boolean test(Value value) {
			return distinctValues.add(value);
		}
	}

	private class DistinctBindingSets implements Predicate<BindingSet> {
		private final Set<BindingSet> distinctValues;

		public DistinctBindingSets() {
			distinctValues = cf.createSet();
		}

		@Override
		public boolean test(BindingSet value) {
			return distinctValues.add(value);
		}
	}

	private static class CountAggregate extends AggregateFunction<CountCollector, Value> {

		public CountAggregate(Function<BindingSet, Value> f) {
			super(f);
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinctValue, CountCollector agv)
				throws QueryEvaluationException {
			Value value = evaluate(s);
			if (value != null && distinctValue.test(value)) {
				agv.value++;
			}
		}
	}

	private static class WildCardCountAggregate extends AggregateFunction<CountCollector, BindingSet> {

		public WildCardCountAggregate() {
			super(null);
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<BindingSet> distinctValue, CountCollector agv)
				throws QueryEvaluationException {
			// wildcard count
			if (!s.isEmpty() && distinctValue.test(s)) {
				agv.value++;
			}
		}
	}

	private class MinAggregate extends AggregateFunction<ValueCollector, Value> {

		private final ValueComparator comparator = new ValueComparator();

		public MinAggregate(Function<BindingSet, Value> f, boolean strict) {
			super(f);
			comparator.setStrict(strict);
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinctValue, ValueCollector min)
				throws QueryEvaluationException {
			Value v = evaluate(s);

			if (v != null && distinctValue.test(v)) {
				if (min.value == null) {
					min.value = v;
				} else if (comparator.compare(v, min.value) < 0) {
					min.value = v;
				}
			}
		}
	}

	private static class MaxAggregate extends AggregateFunction<ValueCollector, Value> {

		private final ValueComparator comparator = new ValueComparator();

		public MaxAggregate(Function<BindingSet, Value> f, boolean strict) {
			super(f);
			comparator.setStrict(strict);
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinctValue, ValueCollector max)
				throws QueryEvaluationException {
			Value v = evaluate(s);
			if (v != null && distinctValue.test(v)) {
				if (max.value == null) {
					max.value = v;
				} else if (comparator.compare(v, max.value) > 0) {
					max.value = v;
				}
			}
		}
	}

	private static class SumAggregate extends AggregateFunction<IntegerCollector, Value> {
		public SumAggregate(Function<BindingSet, Value> f) {
			super(f);
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinctValue, IntegerCollector sum)
				throws QueryEvaluationException {
			if (sum.hasError()) {
				// halt further processing if a type error has been raised
				return;
			}

			Value v = evaluate(s);
			if (v != null) {
				if (v.isLiteral()) {
					if (distinctValue.test(v)) {
						Literal literal = (Literal) v;
						CoreDatatype.XSD coreDatatype = literal.getCoreDatatype().asXSDDatatypeOrNull();
						if (coreDatatype != null && coreDatatype.isNumericDatatype()) {
							sum.value = MathUtil.compute(sum.value, literal, MathOp.PLUS);
						} else {
							sum.setTypeError(new ValueExprEvaluationException("not a number: " + v));
						}
					}
				} else {
					sum.setTypeError(new ValueExprEvaluationException("not a number: " + v));
				}
			}
		}
	}

	private static class AvgAggregate extends AggregateFunction<AvgCollector, Value> {

		public AvgAggregate(Function<BindingSet, Value> operator) {
			super(operator);
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinctValue, AvgCollector avg)
				throws QueryEvaluationException {
			if (avg.hasError()) {
				// Prevent calculating the aggregate further if a type error has
				// occurred.
				return;
			}

			Value v = evaluate(s);
			if (distinctValue.test(v)) {
				if (v instanceof Literal) {
					Literal nextLiteral = (Literal) v;
					// check if the literal is numeric.
					CoreDatatype.XSD datatype = nextLiteral.getCoreDatatype().asXSDDatatypeOrNull();

					if (datatype != null && datatype.isNumericDatatype()) {
						avg.sum = MathUtil.compute(avg.sum, nextLiteral, MathOp.PLUS);
					} else {
						avg.setTypeError(new ValueExprEvaluationException("not a number: " + v));
					}
					avg.count++;
				} else if (v != null) {
					// we do not actually throw the exception yet, but record it and
					// stop further processing. The exception will be thrown when
					// getValue() is invoked.
					avg.setTypeError(new ValueExprEvaluationException("not a number: " + v));
				}
			}
		}
	}

	private static class SampleCollector implements AggregateCollector {
		private Value sample;

		@Override
		public Value getFinalValue() throws ValueExprEvaluationException {
			if (sample == null) {
				throw new ValueExprEvaluationException("SAMPLE undefined");
			}
			return sample;
		}
	}

	private static class SampleAggregate extends AggregateFunction<SampleCollector, Value> {

		private final Random random;

		public SampleAggregate(Function<BindingSet, Value> f) {
			super(f);
			random = new Random(System.currentTimeMillis());
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinct, SampleCollector sample)
				throws QueryEvaluationException {
			// we flip a coin to determine if we keep the current value or set a
			// new value to report.
			if (sample.sample == null || random.nextFloat() < 0.5f) {
				final Value newValue = evaluate(s);
				if (newValue != null) {
					sample.sample = newValue;
				}
			}
		}
	}

	private static class StringBuilderCollector implements AggregateCollector {
		private StringBuilder concatenated;
		private final ValueFactory vf;

		public StringBuilderCollector(ValueFactory vf) {
			super();
			this.vf = vf;
		}

		@Override
		public Value getFinalValue() throws ValueExprEvaluationException {
			if (concatenated == null || concatenated.length() == 0) {
				return SimpleValueFactory.getInstance().createLiteral("");
			}
			return SimpleValueFactory.getInstance().createLiteral(concatenated.toString());
		}
	}

	private static class ConcatAggregate extends AggregateFunction<StringBuilderCollector, Value> {

		private static final String DEFAULT_SEPERATOR = " ";
		private final String separator;

		public ConcatAggregate(Function<BindingSet, Value> f, String seperator) throws QueryEvaluationException {
			super(f);
			this.separator = seperator;
		}

		public ConcatAggregate(Function<BindingSet, Value> f) throws QueryEvaluationException {
			super(f);
			this.separator = DEFAULT_SEPERATOR;
		}

		@Override
		public void processAggregate(BindingSet s, Predicate<Value> distinctValue, StringBuilderCollector collector)
				throws QueryEvaluationException {
			Value v = evaluate(s);
			if (v != null && distinctValue.test(v)) {
				if (collector.concatenated == null) {
					collector.concatenated = new StringBuilder();
				} else {
					collector.concatenated.append(separator);
				}
				collector.concatenated.append(v.stringValue());
			}
		}
	}

	private static class QueryStepEvaluator implements Function<BindingSet, Value> {
		private final QueryValueEvaluationStep evaluationStep;

		public QueryStepEvaluator(QueryValueEvaluationStep evaluationStep) {
			this.evaluationStep = evaluationStep;
		}

		@Override
		public Value apply(BindingSet bindings) {
			try {
				return evaluationStep.evaluate(bindings);
			} catch (ValueExprEvaluationException e) {
				return null; // treat missing or invalid expressions as null
			}
		}
	}
}