TableFinishOperator.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.Session;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.OperationTimer.OperationTiming;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.StatisticAggregationsDescriptor;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.operator.PageSinkCommitStrategy.LIFESPAN_COMMIT;
import static com.facebook.presto.operator.TableWriterUtils.FRAGMENT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.ROW_COUNT_CHANNEL;
import static com.facebook.presto.operator.TableWriterUtils.extractStatisticsRows;
import static com.facebook.presto.operator.TableWriterUtils.getTableCommitContext;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.propagateIfPossible;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.Duration.succinctNanos;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;

public class TableFinishOperator
        implements Operator
{
    public static final List<Type> TYPES = ImmutableList.of(BIGINT);

    public static class TableFinishOperatorFactory
            implements OperatorFactory
    {
        private final int operatorId;
        private final PlanNodeId planNodeId;
        private final TableFinisher tableFinisher;
        private final PageSinkCommitter pageSinkCommitter;
        private final OperatorFactory statisticsAggregationOperatorFactory;
        private final StatisticAggregationsDescriptor<Integer> descriptor;
        private final Session session;
        private final JsonCodec<TableCommitContext> tableCommitContextCodec;
        private final boolean memoryTrackingEnabled;

        private boolean closed;

        public TableFinishOperatorFactory(
                int operatorId,
                PlanNodeId planNodeId,
                TableFinisher tableFinisher,
                PageSinkCommitter pageSinkCommitter,
                OperatorFactory statisticsAggregationOperatorFactory,
                StatisticAggregationsDescriptor<Integer> descriptor,
                Session session,
                JsonCodec<TableCommitContext> tableCommitContextCodec,
                boolean memoryTrackingEnabled)
        {
            this.operatorId = operatorId;
            this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
            this.tableFinisher = requireNonNull(tableFinisher, "tableFinisher is null");
            this.pageSinkCommitter = requireNonNull(pageSinkCommitter, "pageSinkCommitter is null");
            this.statisticsAggregationOperatorFactory = requireNonNull(statisticsAggregationOperatorFactory, "statisticsAggregationOperatorFactory is null");
            this.descriptor = requireNonNull(descriptor, "descriptor is null");
            this.session = requireNonNull(session, "session is null");
            this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
            this.memoryTrackingEnabled = memoryTrackingEnabled;
        }

        @Override
        public Operator createOperator(DriverContext driverContext)
        {
            checkState(!closed, "Factory is already closed");
            OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableFinishOperator.class.getSimpleName());
            Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext);
            boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
            return new TableFinishOperator(
                    context,
                    tableFinisher,
                    pageSinkCommitter,
                    statisticsAggregationOperator,
                    descriptor,
                    statisticsCpuTimerEnabled,
                    memoryTrackingEnabled,
                    tableCommitContextCodec);
        }

        @Override
        public void noMoreOperators()
        {
            closed = true;
        }

        @Override
        public OperatorFactory duplicate()
        {
            return new TableFinishOperatorFactory(
                    operatorId,
                    planNodeId,
                    tableFinisher,
                    pageSinkCommitter,
                    statisticsAggregationOperatorFactory,
                    descriptor,
                    session,
                    tableCommitContextCodec,
                    memoryTrackingEnabled);
        }
    }

    private enum State
    {
        RUNNING, FINISHING, FINISHED
    }

    private final OperatorContext operatorContext;
    private final TableFinisher tableFinisher;
    private final Operator statisticsAggregationOperator;
    private final StatisticAggregationsDescriptor<Integer> descriptor;

    private State state = State.RUNNING;
    private final AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata = new AtomicReference<>(Optional.empty());
    private final ImmutableList.Builder<ComputedStatistics> computedStatisticsBuilder = ImmutableList.builder();

    private final OperationTiming statisticsTiming = new OperationTiming();
    private final boolean statisticsCpuTimerEnabled;
    private final boolean memoryTrackingEnabled;

    private final JsonCodec<TableCommitContext> tableCommitContextCodec;
    private final LifespanAndStageStateTracker lifespanAndStageStateTracker;

    private final LocalMemoryContext systemMemoryContext;
    private final AtomicLong operatorRetainedMemoryBytes = new AtomicLong();

    private final Supplier<TableFinishInfo> tableFinishInfoSupplier;

    public TableFinishOperator(
            OperatorContext operatorContext,
            TableFinisher tableFinisher,
            PageSinkCommitter pageSinkCommitter,
            Operator statisticsAggregationOperator,
            StatisticAggregationsDescriptor<Integer> descriptor,
            boolean statisticsCpuTimerEnabled,
            boolean memoryTrackingEnabled,
            JsonCodec<TableCommitContext> tableCommitContextCodec)
    {
        this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
        this.tableFinisher = requireNonNull(tableFinisher, "tableCommitter is null");
        this.statisticsAggregationOperator = requireNonNull(statisticsAggregationOperator, "statisticsAggregationOperator is null");
        this.descriptor = requireNonNull(descriptor, "descriptor is null");
        this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled;
        this.memoryTrackingEnabled = memoryTrackingEnabled;
        this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
        this.lifespanAndStageStateTracker = new LifespanAndStageStateTracker(pageSinkCommitter, operatorRetainedMemoryBytes);
        this.systemMemoryContext = operatorContext.localSystemMemoryContext();
        this.tableFinishInfoSupplier = createTableFinishInfoSupplier(outputMetadata, statisticsTiming);
        operatorContext.setInfoSupplier(tableFinishInfoSupplier);
    }

    @Override
    public OperatorContext getOperatorContext()
    {
        return operatorContext;
    }

    @Override
    public void finish()
    {
        OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled);
        statisticsAggregationOperator.finish();
        timer.end(statisticsTiming);

        if (state == State.RUNNING) {
            state = State.FINISHING;
        }
    }

    @Override
    public boolean isFinished()
    {
        if (state == State.FINISHED) {
            verify(statisticsAggregationOperator.isFinished());
            return true;
        }
        return false;
    }

    @Override
    public ListenableFuture<?> isBlocked()
    {
        return statisticsAggregationOperator.isBlocked();
    }

    @Override
    public boolean needsInput()
    {
        if (state != State.RUNNING) {
            return false;
        }
        return statisticsAggregationOperator.needsInput();
    }

    @Override
    public void addInput(Page page)
    {
        requireNonNull(page, "page is null");
        checkState(state == State.RUNNING, "Operator is %s", state);

        TableCommitContext tableCommitContext = getTableCommitContext(page, tableCommitContextCodec);
        if (lifespanAndStageStateTracker.update(page, tableCommitContext)) {
            lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> {
                OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled);
                statisticsAggregationOperator.addInput(statisticsPage);
                timer.end(statisticsTiming);
            });
        }
        if (memoryTrackingEnabled) {
            systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get());
        }
    }

    @Override
    public Page getOutput()
    {
        if (!isBlocked().isDone()) {
            return null;
        }

        if (!statisticsAggregationOperator.isFinished()) {
            verify(statisticsAggregationOperator.isBlocked().isDone(), "aggregation operator should not be blocked");

            OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled);
            Page page = statisticsAggregationOperator.getOutput();
            timer.end(statisticsTiming);

            if (page == null) {
                return null;
            }
            for (int position = 0; position < page.getPositionCount(); position++) {
                computedStatisticsBuilder.add(getComputedStatistics(page, position));
            }
            return null;
        }

        if (state != State.FINISHING) {
            return null;
        }
        state = State.FINISHED;

        lifespanAndStageStateTracker.commit();
        outputMetadata.set(tableFinisher.finishTable(lifespanAndStageStateTracker.getFinalFragments(), computedStatisticsBuilder.build()));

        // output page will only be constructed once,
        // so a new PageBuilder is constructed (instead of using PageBuilder.reset)
        PageBuilder page = new PageBuilder(1, TYPES);
        page.declarePosition();
        BIGINT.writeLong(page.getBlockBuilder(0), lifespanAndStageStateTracker.getFinalRowCount());
        return page.build();
    }

    private ComputedStatistics getComputedStatistics(Page page, int position)
    {
        ImmutableList.Builder<String> groupingColumns = ImmutableList.builder();
        ImmutableList.Builder<Block> groupingValues = ImmutableList.builder();
        descriptor.getGrouping().forEach((column, channel) -> {
            groupingColumns.add(column);
            groupingValues.add(page.getBlock(channel).getSingleValueBlock(position));
        });

        ComputedStatistics.Builder statistics = ComputedStatistics.builder(groupingColumns.build(), groupingValues.build());

        descriptor.getTableStatistics().forEach((type, channel) ->
                statistics.addTableStatistic(type, page.getBlock(channel).getSingleValueBlock(position)));

        descriptor.getColumnStatistics().forEach((descriptor) -> statistics.addColumnStatistic(descriptor.getMetadata(), page.getBlock(descriptor.getItem()).getSingleValueBlock(position)));

        return statistics.build();
    }

    @VisibleForTesting
    TableFinishInfo getInfo()
    {
        return tableFinishInfoSupplier.get();
    }

    private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata, OperationTiming statisticsTiming)
    {
        requireNonNull(outputMetadata, "outputMetadata is null");
        requireNonNull(statisticsTiming, "statisticsTiming is null");
        return () -> new TableFinishInfo(
                outputMetadata.get(),
                succinctNanos(statisticsTiming.getWallNanos()),
                succinctNanos(statisticsTiming.getCpuNanos()));
    }

    @Override
    public void close()
            throws Exception
    {
        statisticsAggregationOperator.close();
        systemMemoryContext.setBytes(0);
    }

    public interface TableFinisher
    {
        Optional<ConnectorOutputMetadata> finishTable(Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics);
    }

    public interface PageSinkCommitter
    {
        ListenableFuture<Void> commitAsync(Collection<Slice> fragments);
    }

    // A lifespan in a stage defines the unit for commit and recovery in recoverable grouped execution
    private static class LifespanAndStageStateTracker
    {
        private final Map<LifespanAndStage, LifespanAndStageState> noCommitUnrecoverableLifespanAndStageStates = new HashMap<>();
        private final Map<LifespanAndStage, LifespanAndStageState> taskCommitUnrecoverableLifespanAndStageStates = new HashMap<>();

        // For recoverable execution, it is possible to receive pages of the same lifespan-stage from different tasks. We track all of them and commit the one
        // which finishes sending pages first.
        private final Map<LifespanAndStage, Map<TaskId, LifespanAndStageState>> uncommittedRecoverableLifespanAndStageStates = new HashMap<>();
        private final Map<LifespanAndStage, LifespanAndStageState> committedRecoverableLifespanAndStages = new HashMap<>();

        private final PageSinkCommitter pageSinkCommitter;
        private final List<ListenableFuture<Void>> commitFutures = new ArrayList<>();
        private final AtomicLong operatorRetainedMemoryBytes;

        LifespanAndStageStateTracker(
                PageSinkCommitter pageSinkCommitter,
                AtomicLong operatorRetainedMemoryBytes)
        {
            this.pageSinkCommitter = requireNonNull(pageSinkCommitter, "pageSinkCommitter is null");
            this.operatorRetainedMemoryBytes = requireNonNull(operatorRetainedMemoryBytes, "operatorRetainedMemoryBytes is null");
        }

        public void commit()
        {
            for (LifespanAndStageState lifespanAndStageState : taskCommitUnrecoverableLifespanAndStageStates.values()) {
                commitFutures.add(pageSinkCommitter.commitAsync(lifespanAndStageState.getFragments()));
            }

            ListenableFuture<Void> future = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
            try {
                future.get();
            }
            catch (InterruptedException e) {
                future.cancel(true);
                currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                future.cancel(true);
                propagateIfPossible(e.getCause(), PrestoException.class);
                throw new RuntimeException(e.getCause());
            }
        }

        public boolean update(Page page, TableCommitContext tableCommitContext)
        {
            LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext);
            PageSinkCommitStrategy commitStrategy = tableCommitContext.getPageSinkCommitStrategy();
            switch (commitStrategy) {
                case NO_COMMIT: {
                    // Case 1: lifespan commit is not required, this can be one of the following cases:
                    //  - The source fragment is ungrouped execution (lifespan is TASK_WIDE).
                    //  - The source fragment is grouped execution but not recoverable.
                    noCommitUnrecoverableLifespanAndStageStates.computeIfAbsent(
                            lifespanAndStage, ignored -> new LifespanAndStageState(
                                    tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, false)).update(page);
                    return true;
                }
                case TASK_COMMIT: {
                    // Case 2: Commit is required, but partial recovery is not supported
                    taskCommitUnrecoverableLifespanAndStageStates.computeIfAbsent(
                            lifespanAndStage, ignored -> new LifespanAndStageState(
                                    tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, false)).update(page);
                    return true;
                }
                case LIFESPAN_COMMIT: {
                    // Case 3: Lifespan commit is required
                    checkState(lifespanAndStage.lifespan != Lifespan.taskWide(), "Recoverable lifespan cannot be TASK_WIDE");

                    // Case 3a: Current (stage, lifespan) combination is already committed
                    if (committedRecoverableLifespanAndStages.containsKey(lifespanAndStage)) {
                        checkState(
                                !committedRecoverableLifespanAndStages.get(lifespanAndStage).getTaskId().equals(tableCommitContext.getTaskId()),
                                "Received page from same task of committed lifespan and stage combination");
                        return false;
                    }

                    // Case 3b: Current (stage, lifespan) combination is not yet committed
                    Map<TaskId, LifespanAndStageState> lifespanStageStatesPerTask = uncommittedRecoverableLifespanAndStageStates.computeIfAbsent(lifespanAndStage, ignored -> new HashMap<>());
                    lifespanStageStatesPerTask.computeIfAbsent(
                            tableCommitContext.getTaskId(), ignored -> new LifespanAndStageState(
                                    tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, true)).update(page);

                    if (tableCommitContext.isLastPage()) {
                        checkState(!committedRecoverableLifespanAndStages.containsKey(lifespanAndStage), "LifespanAndStage already finished");
                        LifespanAndStageState lifespanAndStageState = lifespanStageStatesPerTask.get(tableCommitContext.getTaskId());
                        committedRecoverableLifespanAndStages.put(lifespanAndStage, lifespanAndStageState);
                        uncommittedRecoverableLifespanAndStageStates.remove(lifespanAndStage);
                        commitFutures.add(pageSinkCommitter.commitAsync(lifespanAndStageState.getFragments()));
                    }
                    return true;
                }
                default:
                    throw new IllegalArgumentException("unexpected commit strategy: " + commitStrategy);
            }
        }

        List<Page> getStatisticsPagesToProcess(Page page, TableCommitContext tableCommitContext)
        {
            LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext);
            if (tableCommitContext.getPageSinkCommitStrategy() != LIFESPAN_COMMIT) {
                return extractStatisticsRows(page).map(ImmutableList::of).orElse(ImmutableList.of());
            }
            if (!committedRecoverableLifespanAndStages.containsKey(lifespanAndStage)) {
                return ImmutableList.of();
            }
            checkState(!uncommittedRecoverableLifespanAndStageStates.containsKey(lifespanAndStage), "lifespanAndStage %s is already committed", lifespanAndStage);
            LifespanAndStageState lifespanAndStageState = committedRecoverableLifespanAndStages.get(lifespanAndStage);
            List<Page> pages = lifespanAndStageState.getStatisticsPages();
            // statistics data in this lifespanAndStageState will not be accessed any more
            lifespanAndStageState.resetStatisticsPages();
            return pages;
        }

        public long getFinalRowCount()
        {
            checkState(uncommittedRecoverableLifespanAndStageStates.isEmpty(), "All recoverable LifespanAndStage should be committed when fetching final row count");
            return Streams.concat(
                            noCommitUnrecoverableLifespanAndStageStates.values().stream(),
                            taskCommitUnrecoverableLifespanAndStageStates.values().stream(),
                            committedRecoverableLifespanAndStages.values().stream())
                    .mapToLong(LifespanAndStageState::getRowCount)
                    .sum();
        }

        public List<Slice> getFinalFragments()
        {
            checkState(uncommittedRecoverableLifespanAndStageStates.isEmpty(), "All recoverable LifespanAndStage should be committed when fetching final fragments");
            return Streams.concat(
                            noCommitUnrecoverableLifespanAndStageStates.values().stream(),
                            taskCommitUnrecoverableLifespanAndStageStates.values().stream(),
                            committedRecoverableLifespanAndStages.values().stream())
                    .map(LifespanAndStageState::getFragments)
                    .flatMap(List::stream)
                    .collect(toImmutableList());
        }

        private static class LifespanAndStage
        {
            private final Lifespan lifespan;
            private final int stageId;

            private LifespanAndStage(Lifespan lifespan, int stageId)
            {
                this.lifespan = requireNonNull(lifespan, "lifespan is null");
                this.stageId = stageId;
            }

            public static LifespanAndStage fromTableCommitContext(TableCommitContext operatorExecutionContext)
            {
                return new LifespanAndStage(operatorExecutionContext.getLifespan(), operatorExecutionContext.getTaskId().getStageExecutionId().getStageId().getId());
            }

            public Lifespan getLifespan()
            {
                return lifespan;
            }

            public int getStageId()
            {
                return stageId;
            }

            @Override
            public boolean equals(Object o)
            {
                if (this == o) {
                    return true;
                }
                if (!(o instanceof LifespanAndStage)) {
                    return false;
                }
                LifespanAndStage that = (LifespanAndStage) o;
                return stageId == that.stageId &&
                        Objects.equals(lifespan, that.lifespan);
            }

            @Override
            public int hashCode()
            {
                return Objects.hash(lifespan, stageId);
            }

            @Override
            public String toString()
            {
                return toStringHelper(this)
                        .add("lifespan", lifespan)
                        .add("stageId", stageId)
                        .toString();
            }
        }

        private static class LifespanAndStageState
        {
            private final ImmutableList.Builder<Slice> fragmentBuilder = ImmutableList.builder();
            private final TaskId taskId;
            private final AtomicLong operatorRetainedMemoryBytes;

            private long retainedMemoryBytesForStatisticsPages;
            private long rowCount;
            private Optional<ImmutableList.Builder<Page>> statisticsPages;

            public LifespanAndStageState(
                    TaskId taskId,
                    AtomicLong operatorRetainedMemoryBytes,
                    boolean trackStatisticsPages)
            {
                this.taskId = requireNonNull(taskId, "taskId is null");
                this.operatorRetainedMemoryBytes = requireNonNull(operatorRetainedMemoryBytes, "operatorRetainedMemoryBytes is null");
                this.statisticsPages = trackStatisticsPages ? Optional.of(ImmutableList.builder()) : Optional.empty();
            }

            public void update(Page page)
            {
                long memoryBytesDelta = 0;
                Block rowCountBlock = page.getBlock(ROW_COUNT_CHANNEL);
                Block fragmentBlock = page.getBlock(FRAGMENT_CHANNEL);
                for (int position = 0; position < page.getPositionCount(); position++) {
                    if (!rowCountBlock.isNull(position)) {
                        rowCount += BIGINT.getLong(rowCountBlock, position);
                    }
                    if (!fragmentBlock.isNull(position)) {
                        Slice fragment = VARBINARY.getSlice(fragmentBlock, position);
                        fragmentBuilder.add(fragment);
                        memoryBytesDelta += fragment.getRetainedSize();
                    }
                }
                if (statisticsPages.isPresent()) {
                    Optional<Page> statisticsPage = extractStatisticsRows(page);
                    if (statisticsPage.isPresent()) {
                        statisticsPages.get().add(statisticsPage.get());
                        long retainedSizeForStatisticsPage = statisticsPage.get().getRetainedSizeInBytes();
                        retainedMemoryBytesForStatisticsPages += retainedSizeForStatisticsPage;
                        memoryBytesDelta += retainedSizeForStatisticsPage;
                    }
                }
                operatorRetainedMemoryBytes.addAndGet(memoryBytesDelta);
            }

            public long getRowCount()
            {
                return rowCount;
            }

            public List<Slice> getFragments()
            {
                return fragmentBuilder.build();
            }

            public List<Page> getStatisticsPages()
            {
                checkState(statisticsPages.isPresent(), "statisticsPages is present for recoverable grouped execution only");
                return statisticsPages.get().build();
            }

            public TaskId getTaskId()
            {
                return taskId;
            }

            public void resetStatisticsPages()
            {
                statisticsPages = Optional.empty();
                operatorRetainedMemoryBytes.addAndGet(-retainedMemoryBytesForStatisticsPages);
                retainedMemoryBytesForStatisticsPages = 0;
            }
        }
    }
}