RowBlockEncodingBuffer.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.repartition;
import com.facebook.presto.common.block.ArrayAllocator;
import com.facebook.presto.common.block.ColumnarRow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects.ToStringHelper;
import io.airlift.slice.SliceOutput;
import org.openjdk.jol.info.ClassLayout;
import java.util.List;
import static com.facebook.presto.common.array.Arrays.ExpansionFactor.LARGE;
import static com.facebook.presto.common.array.Arrays.ExpansionFactor.SMALL;
import static com.facebook.presto.common.array.Arrays.ExpansionOption.NONE;
import static com.facebook.presto.common.array.Arrays.ExpansionOption.PRESERVE;
import static com.facebook.presto.common.array.Arrays.ensureCapacity;
import static com.facebook.presto.operator.UncheckedByteArrays.setIntUnchecked;
import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.SIZE_OF_BYTE;
import static io.airlift.slice.SizeOf.SIZE_OF_INT;
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_INT_INDEX_SCALE;
public class RowBlockEncodingBuffer
extends AbstractBlockEncodingBuffer
{
@VisibleForTesting
static final int POSITION_SIZE = SIZE_OF_INT + SIZE_OF_BYTE;
private static final String NAME = "ROW";
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RowBlockEncodingBuffer.class).instanceSize();
// The buffer for the offsets for all incoming blocks so far
private byte[] offsetsBuffer;
// The address next offset value will be written to.
private int offsetsBufferIndex;
// The estimated maximum size for offsetsBuffer
private int estimatedOffsetBufferMaxCapacity;
// This array holds the condensed offsets for each position for the incoming block.
private int[] offsets;
// The last offset in the offsets buffer
private int lastOffset;
// The AbstractBlockEncodingBuffer for the nested field blocks of the RowBlock
private final AbstractBlockEncodingBuffer[] fieldBuffers;
public RowBlockEncodingBuffer(DecodedBlockNode decodedBlockNode, ArrayAllocator bufferAllocator, boolean isNested)
{
super(bufferAllocator, isNested);
List<DecodedBlockNode> childrenNodes = decodedBlockNode.getChildren();
fieldBuffers = new AbstractBlockEncodingBuffer[childrenNodes.size()];
for (int i = 0; i < childrenNodes.size(); i++) {
fieldBuffers[i] = (AbstractBlockEncodingBuffer) createBlockEncodingBuffers(decodedBlockNode.getChildren().get(i), bufferAllocator, true);
}
}
@Override
public void accumulateSerializedRowSizes(int[] serializedRowSizes)
{
for (int i = 0; i < positionCount; i++) {
serializedRowSizes[i] += POSITION_SIZE;
}
int[] offsetsCopy = ensureCapacity(null, positionCount + 1, SMALL, NONE, bufferAllocator);
try {
for (int i = 0; i < fieldBuffers.length; i++) {
System.arraycopy(offsets, 0, offsetsCopy, 0, positionCount + 1);
((AbstractBlockEncodingBuffer) fieldBuffers[i]).accumulateSerializedRowSizes(offsetsCopy, positionCount, serializedRowSizes);
}
}
finally {
bufferAllocator.returnArray(offsetsCopy);
}
}
@Override
public void setNextBatch(int positionsOffset, int batchSize)
{
this.positionsOffset = positionsOffset;
this.batchSize = batchSize;
this.flushed = false;
// The nested level positionCount could be 0.
if (this.positionCount == 0) {
return;
}
int offset = offsets[positionsOffset];
for (int i = 0; i < fieldBuffers.length; i++) {
fieldBuffers[i].setNextBatch(offset, offsets[positionsOffset + batchSize] - offset);
}
}
@Override
public void appendDataInBatch()
{
if (batchSize == 0) {
return;
}
appendNulls();
appendOffsets();
for (int i = 0; i < fieldBuffers.length; i++) {
fieldBuffers[i].appendDataInBatch();
}
bufferedPositionCount += batchSize;
}
@Override
public void serializeTo(SliceOutput output)
{
writeLengthPrefixedString(output, NAME);
output.writeInt(fieldBuffers.length);
for (int i = 0; i < fieldBuffers.length; i++) {
fieldBuffers[i].serializeTo(output);
}
// positionCount
output.writeInt(bufferedPositionCount);
// offsets
output.writeInt(0); // the base position
if (offsetsBufferIndex > 0) {
output.appendBytes(offsetsBuffer, 0, offsetsBufferIndex);
}
serializeNullsTo(output);
}
@Override
public void resetBuffers()
{
bufferedPositionCount = 0;
offsetsBufferIndex = 0;
lastOffset = 0;
flushed = true;
resetNullsBuffer();
for (int i = 0; i < fieldBuffers.length; i++) {
fieldBuffers[i].resetBuffers();
}
}
@Override
public void noMoreBatches()
{
for (int i = fieldBuffers.length - 1; i >= 0; i--) {
fieldBuffers[i].noMoreBatches();
}
if (flushed && offsetsBuffer != null) {
bufferAllocator.returnArray(offsetsBuffer);
offsetsBuffer = null;
}
super.noMoreBatches();
if (offsets != null) {
bufferAllocator.returnArray(offsets);
offsets = null;
}
}
@Override
public long getRetainedSizeInBytes()
{
int size = INSTANCE_SIZE;
for (int i = 0; i < fieldBuffers.length; i++) {
size += fieldBuffers[i].getRetainedSizeInBytes();
}
return size;
}
@Override
public long getSerializedSizeInBytes()
{
int size = 0;
for (int i = 0; i < fieldBuffers.length; i++) {
size += fieldBuffers[i].getSerializedSizeInBytes();
}
return NAME.length() + SIZE_OF_INT + // encoding name
SIZE_OF_INT + // field count
size + // field blocks
SIZE_OF_INT + // positionCount
SIZE_OF_INT + // offset 0. The offsetsBuffer doesn't contain the offset 0 so we need to add it here.
offsetsBufferIndex + // offsets buffer.
getNullsBufferSerializedSizeInBytes(); // nulls
}
@Override
public String toString()
{
ToStringHelper stringHelper = toStringHelper(this)
.add("super", super.toString())
.add("estimatedOffsetBufferMaxCapacity", estimatedOffsetBufferMaxCapacity)
.add("offsetsBufferCapacity", offsetsBuffer == null ? 0 : offsetsBuffer.length)
.add("offsetsBufferIndex", offsetsBufferIndex)
.add("offsetsCapacity", offsets == null ? 0 : offsets.length)
.add("lastOffset", lastOffset);
for (int i = 0; i < fieldBuffers.length; i++) {
stringHelper.add("fieldBuffer_" + i, fieldBuffers[i]);
}
return stringHelper.toString();
}
@Override
protected void setupDecodedBlockAndMapPositions(DecodedBlockNode decodedBlockNode, int partitionBufferCapacity, double decodedBlockPageSizeFraction)
{
requireNonNull(decodedBlockNode, "decodedBlockNode is null");
decodedBlockNode = mapPositionsToNestedBlock(decodedBlockNode);
ColumnarRow columnarRow = (ColumnarRow) decodedBlockNode.getDecodedBlock();
decodedBlock = columnarRow.getNullCheckBlock();
populateNestedPositions(columnarRow);
long estimatedSerializedSizeInBytes = decodedBlockNode.getEstimatedSerializedSizeInBytes();
long childrenEstimatedSerializedSizeInBytes = 0;
for (int i = 0; i < fieldBuffers.length; i++) {
DecodedBlockNode childDecodedBlockNode = decodedBlockNode.getChildren().get(i);
long childEstimatedSerializedSizeInBytes = childDecodedBlockNode.getEstimatedSerializedSizeInBytes();
childrenEstimatedSerializedSizeInBytes += childEstimatedSerializedSizeInBytes;
fieldBuffers[i].setupDecodedBlockAndMapPositions(childDecodedBlockNode, partitionBufferCapacity, decodedBlockPageSizeFraction * childEstimatedSerializedSizeInBytes / estimatedSerializedSizeInBytes);
}
double targetBufferSize = partitionBufferCapacity * decodedBlockPageSizeFraction * (estimatedSerializedSizeInBytes - childrenEstimatedSerializedSizeInBytes) / estimatedSerializedSizeInBytes;
setEstimatedNullsBufferMaxCapacity(getEstimatedBufferMaxCapacity(targetBufferSize, Byte.BYTES, POSITION_SIZE));
estimatedOffsetBufferMaxCapacity = getEstimatedBufferMaxCapacity(targetBufferSize, Integer.BYTES, POSITION_SIZE);
}
@Override
protected void accumulateSerializedRowSizes(int[] positionOffsets, int positionCount, int[] serializedRowSizes)
{
// If all positions for the RowBlock to be copied are null, the number of positions to copy for its
// nested field blocks could be 0. In such case we don't need to proceed.
if (this.positionCount == 0) {
return;
}
int lastOffset = positionOffsets[0];
for (int i = 0; i < positionCount; i++) {
int offset = positionOffsets[i + 1];
serializedRowSizes[i] += POSITION_SIZE * (offset - lastOffset);
lastOffset = offset;
positionOffsets[i + 1] = this.offsets[offset];
}
int[] offsetsCopy = ensureCapacity(null, positionCount + 1, SMALL, NONE, bufferAllocator);
try {
for (int i = 0; i < fieldBuffers.length; i++) {
System.arraycopy(positionOffsets, 0, offsetsCopy, 0, positionCount + 1);
fieldBuffers[i].accumulateSerializedRowSizes(offsetsCopy, positionCount, serializedRowSizes);
}
}
finally {
bufferAllocator.returnArray(offsetsCopy);
}
}
@Override
int getEstimatedValueBufferMaxCapacity()
{
throw new UnsupportedOperationException();
}
@VisibleForTesting
int getEstimatedOffsetBufferMaxCapacity()
{
return estimatedOffsetBufferMaxCapacity;
}
@VisibleForTesting
BlockEncodingBuffer[] getFieldBuffers()
{
return fieldBuffers;
}
private void populateNestedPositions(ColumnarRow columnarRow)
{
// Reset nested level positions before checking positionCount. Failing to do so may result in elementsBuffers having stale values when positionCount is 0.
for (int i = 0; i < fieldBuffers.length; i++) {
fieldBuffers[i].resetPositions();
}
if (positionCount == 0) {
return;
}
offsets = ensureCapacity(offsets, positionCount + 1, SMALL, NONE, bufferAllocator);
offsets[0] = 0;
int[] positions = getPositions();
int columnarRowBaseOffset = columnarRow.getOffset(0);
for (int i = 0; i < positionCount; i++) {
int position = positions[i];
offsets[i + 1] = offsets[i] + columnarRow.getOffset(position + 1) - columnarRow.getOffset(position);
}
for (int j = 0; j < fieldBuffers.length; j++) {
fieldBuffers[j].ensurePositionsCapacity(offsets[positionCount]);
}
for (int i = 0; i < positionCount; i++) {
int beginOffset = columnarRow.getOffset(positions[i]);
int currentRowSize = offsets[i + 1] - offsets[i];
if (currentRowSize > 0) {
for (int j = 0; j < fieldBuffers.length; j++) {
fieldBuffers[j].appendPositionRange(beginOffset - columnarRowBaseOffset, currentRowSize);
}
}
}
}
private void appendOffsets()
{
offsetsBuffer = ensureCapacity(offsetsBuffer, offsetsBufferIndex + batchSize * ARRAY_INT_INDEX_SCALE, estimatedOffsetBufferMaxCapacity, LARGE, PRESERVE, bufferAllocator);
int baseOffset = lastOffset - offsets[positionsOffset];
for (int i = positionsOffset; i < positionsOffset + batchSize; i++) {
offsetsBufferIndex = setIntUnchecked(offsetsBuffer, offsetsBufferIndex, offsets[i + 1] + baseOffset);
}
lastOffset = offsets[positionsOffset + batchSize] + baseOffset;
}
}