SerializedPageReference.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.execution.buffer;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.spi.page.SerializedPage;
import javax.annotation.concurrent.ThreadSafe;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
@ThreadSafe
final class SerializedPageReference
{
private static final AtomicIntegerFieldUpdater<SerializedPageReference> REFERENCE_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SerializedPageReference.class, "referenceCount");
private final SerializedPage serializedPage;
private final Lifespan lifespan;
private volatile int referenceCount;
public SerializedPageReference(SerializedPage serializedPage, int referenceCount, Lifespan lifespan)
{
this.serializedPage = requireNonNull(serializedPage, "page is null");
this.lifespan = requireNonNull(lifespan, "lifespan is null");
this.referenceCount = referenceCount;
checkArgument(referenceCount > 0, "referenceCount must be at least 1");
}
public void addReference()
{
int oldReferences = REFERENCE_COUNT_UPDATER.getAndIncrement(this);
checkState(oldReferences > 0, "Page has already been dereferenced");
}
public SerializedPage getSerializedPage()
{
return serializedPage;
}
public int getPositionCount()
{
return serializedPage.getPositionCount();
}
public long getRetainedSizeInBytes()
{
return serializedPage.getRetainedSizeInBytes();
}
private boolean dereferencePage()
{
int remainingReferences = REFERENCE_COUNT_UPDATER.decrementAndGet(this);
checkState(remainingReferences >= 0, "Page reference count is negative");
return remainingReferences == 0;
}
@Override
public String toString()
{
return toStringHelper(this)
.add("referenceCount", referenceCount)
.toString();
}
public static void dereferencePages(List<SerializedPageReference> serializedPageReferences, PagesReleasedListener onPagesReleased)
{
requireNonNull(onPagesReleased, "onPagesReleased is null");
if (requireNonNull(serializedPageReferences, "serializedPageReferences is null").isEmpty()) {
return;
}
Lifespan currentLifespan = null;
int currentLifespanPages = 0;
long releasedMemoryBytes = 0;
for (SerializedPageReference serializedPageReference : serializedPageReferences) {
if (serializedPageReference.dereferencePage()) {
if (!serializedPageReference.lifespan.equals(currentLifespan)) {
if (currentLifespan != null) {
// Flush the current run of pages for the same lifespan
onPagesReleased.onPagesReleased(currentLifespan, currentLifespanPages, releasedMemoryBytes);
}
currentLifespan = serializedPageReference.lifespan;
currentLifespanPages = 0;
releasedMemoryBytes = 0;
}
currentLifespanPages++;
releasedMemoryBytes += serializedPageReference.getRetainedSizeInBytes();
}
}
// Flush pending updates if present
if (currentLifespan != null) {
onPagesReleased.onPagesReleased(currentLifespan, currentLifespanPages, releasedMemoryBytes);
}
}
interface PagesReleasedListener
{
void onPagesReleased(Lifespan lifespan, int releasedPagesCount, long releasedMemorySizeInBytes);
}
}