BaseValueVector.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector;
import java.util.Collections;
import java.util.Iterator;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.util.DataSizeRoundingUtil;
import org.apache.arrow.vector.util.TransferPair;
import org.apache.arrow.vector.util.ValueVectorUtility;
/**
* Base class for other Arrow Vector Types. Provides basic functionality around memory management.
*/
public abstract class BaseValueVector implements ValueVector {
public static final String MAX_ALLOCATION_SIZE_PROPERTY = "arrow.vector.max_allocation_bytes";
public static final long MAX_ALLOCATION_SIZE =
Long.getLong(MAX_ALLOCATION_SIZE_PROPERTY, Long.MAX_VALUE);
/*
* For all fixed width vectors, the value and validity buffers are sliced from a single buffer.
* Similarly, for variable width vectors, the offsets and validity buffers are sliced from a
* single buffer. To ensure the single buffer is power-of-2 size, the initial value allocation
* should be less than power-of-2. For IntVectors, this comes to 3970*4 (15880) for the data
* buffer and 504 bytes for the validity buffer, totalling to 16384 (2^16).
*/
public static final int INITIAL_VALUE_ALLOCATION = 3970;
protected final BufferAllocator allocator;
protected volatile FieldReader fieldReader;
protected ArrowBuf validityBuffer;
protected int valueCount;
protected BaseValueVector(BufferAllocator allocator) {
this.allocator = Preconditions.checkNotNull(allocator, "allocator cannot be null");
}
@Override
public abstract String getName();
/** Representation of vector suitable for debugging. */
@Override
public String toString() {
return ValueVectorUtility.getToString(this, 0, getValueCount());
}
@Override
public void clear() {}
@Override
public void close() {
clear();
}
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return getTransferPair(getName(), allocator);
}
@Override
public Iterator<ValueVector> iterator() {
return Collections.emptyIterator();
}
/**
* Checks to ensure that every buffer <code>vv</code> uses has a positive reference count, throws
* if this precondition isn't met. Returns true otherwise.
*/
public static boolean checkBufRefs(final ValueVector vv) {
for (final ArrowBuf buffer : vv.getBuffers(false)) {
if (buffer.refCnt() <= 0) {
throw new IllegalStateException("zero refcount");
}
}
return true;
}
@Override
public BufferAllocator getAllocator() {
return allocator;
}
void compareTypes(BaseValueVector target, String caller) {
if (this.getMinorType() != target.getMinorType()) {
throw new UnsupportedOperationException(caller + " should have vectors of exact same type");
}
}
protected ArrowBuf releaseBuffer(ArrowBuf buffer) {
buffer.getReferenceManager().release();
buffer = allocator.getEmpty();
return buffer;
}
/**
* Compute the size of validity buffer required to manage a given number of elements in a vector.
*
* @param valueCount number of elements in the vector
* @return buffer size
* @deprecated -- use {@link BitVectorHelper#getValidityBufferSizeFromCount} instead.
*/
@Deprecated(forRemoval = true, since = "18.4.0")
protected static int getValidityBufferSizeFromCount(final int valueCount) {
return DataSizeRoundingUtil.divideBy8Ceil(valueCount);
}
/* round up bytes for the validity buffer for the given valueCount */
private static long roundUp8ForValidityBuffer(long valueCount) {
return ((valueCount + 63) >> 6) << 3;
}
long computeCombinedBufferSize(int valueCount, int typeWidth) {
Preconditions.checkArgument(valueCount >= 0, "valueCount must be >= 0");
Preconditions.checkArgument(typeWidth >= 0, "typeWidth must be >= 0");
// compute size of validity buffer.
long bufferSize = roundUp8ForValidityBuffer(valueCount);
// add the size of the value buffer.
if (typeWidth == 0) {
// for boolean type, value-buffer and validity-buffer are of same size.
bufferSize *= 2;
} else {
bufferSize += DataSizeRoundingUtil.roundUpTo8Multiple((long) valueCount * typeWidth);
}
return allocator.getRoundingPolicy().getRoundedSize(bufferSize);
}
/**
* Each vector has a different reader that implements the FieldReader interface. Overridden
* methods must make sure to return the correct concrete reader implementation.
*
* @return Returns a lambda that initializes a reader when called.
*/
protected abstract FieldReader getReaderImpl();
/**
* Default implementation to create a reader for the vector. Depends on the individual vector
* class' implementation of {@link #getReaderImpl} to initialize the reader appropriately.
*
* @return Concrete instance of FieldReader by using double-checked locking.
*/
@Override
public FieldReader getReader() {
FieldReader reader = fieldReader;
if (reader != null) {
return reader;
}
synchronized (this) {
if (fieldReader == null) {
fieldReader = getReaderImpl();
}
return fieldReader;
}
}
/** Container for primitive vectors (1 for the validity bit-mask and one to hold the values). */
static class DataAndValidityBuffers {
private ArrowBuf dataBuf;
private ArrowBuf validityBuf;
DataAndValidityBuffers(ArrowBuf dataBuf, ArrowBuf validityBuf) {
this.dataBuf = dataBuf;
this.validityBuf = validityBuf;
}
ArrowBuf getDataBuf() {
return dataBuf;
}
ArrowBuf getValidityBuf() {
return validityBuf;
}
}
DataAndValidityBuffers allocFixedDataAndValidityBufs(int valueCount, int typeWidth) {
long bufferSize = computeCombinedBufferSize(valueCount, typeWidth);
assert bufferSize <= MAX_ALLOCATION_SIZE;
long validityBufferSize;
long dataBufferSize;
if (typeWidth == 0) {
validityBufferSize = dataBufferSize = bufferSize / 2;
} else {
// Due to the rounding policy, the bufferSize could be greater than the
// requested size. Utilize the allocated buffer fully.;
long actualCount = (long) ((bufferSize * 8.0) / (8 * typeWidth + 1));
do {
validityBufferSize = roundUp8ForValidityBuffer(actualCount);
dataBufferSize = DataSizeRoundingUtil.roundUpTo8Multiple(actualCount * typeWidth);
if (validityBufferSize + dataBufferSize <= bufferSize) {
break;
}
--actualCount;
} while (true);
}
/* allocate combined buffer */
ArrowBuf combinedBuffer = allocator.buffer(bufferSize);
/* slice into requested lengths */
ArrowBuf dataBuf = null;
ArrowBuf validityBuf = null;
long bufferOffset = 0;
for (int numBuffers = 0; numBuffers < 2; ++numBuffers) {
long len = (numBuffers == 0 ? dataBufferSize : validityBufferSize);
ArrowBuf buf = combinedBuffer.slice(bufferOffset, len);
buf.getReferenceManager().retain();
buf.readerIndex(0);
buf.writerIndex(0);
bufferOffset += len;
if (numBuffers == 0) {
dataBuf = buf;
} else {
validityBuf = buf;
}
}
combinedBuffer.getReferenceManager().release();
return new DataAndValidityBuffers(dataBuf, validityBuf);
}
public static ArrowBuf transferBuffer(
final ArrowBuf srcBuffer, final BufferAllocator targetAllocator) {
final ReferenceManager referenceManager = srcBuffer.getReferenceManager();
return referenceManager.transferOwnership(srcBuffer, targetAllocator).getTransferredBuffer();
}
@Override
public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
throw new UnsupportedOperationException();
}
@Override
public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
throw new UnsupportedOperationException();
}
/**
* Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
* Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte
* aligned, then the buffer is sliced from that index and ownership is transferred. If not,
* individual bytes are copied.
*
* @param startIndex starting index
* @param length number of elements to be copied
* @param target target vector
*/
protected void splitAndTransferValidityBuffer(
int startIndex, int length, BaseValueVector target) {
int offset = startIndex % 8;
if (length <= 0) {
return;
}
if (offset == 0) {
sliceAndTransferValidityBuffer(startIndex, length, target);
} else {
copyValidityBuffer(startIndex, length, target);
}
}
/**
* If the start index is 8 byte aligned, slice `validityBuffer` and transfer ownership to
* `target`'s `validityBuffer`.
*
* @param startIndex starting index
* @param length number of elements to be copied
* @param target target vector
*/
protected void sliceAndTransferValidityBuffer(
int startIndex, int length, BaseValueVector target) {
final int firstByteSource = BitVectorHelper.byteIndex(startIndex);
final int byteSizeTarget = getValidityBufferSizeFromCount(length);
if (target.validityBuffer != null) {
target.validityBuffer.getReferenceManager().release();
}
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
target.validityBuffer.getReferenceManager().retain(1);
}
/**
* Allocate new validity buffer for `target` and copy bytes from `validityBuffer`. Precise details
* in the comments below.
*
* @param startIndex starting index
* @param length number of elements to be copied
* @param target target vector
*/
protected void copyValidityBuffer(int startIndex, int length, BaseValueVector target) {
final int firstByteSource = BitVectorHelper.byteIndex(startIndex);
final int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
final int byteSizeTarget = getValidityBufferSizeFromCount(length);
final int offset = startIndex % 8;
/* Copy data
* When the first bit starts from the middle of a byte (offset != 0),
* copy data from src BitVector.
* Each byte in the target is composed by a part in i-th byte,
* another part in (i+1)-th byte.
*/
target.allocateValidityBuffer(byteSizeTarget);
for (int i = 0; i < byteSizeTarget - 1; i++) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(this.validityBuffer, firstByteSource + i, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(this.validityBuffer, firstByteSource + i + 1, offset);
target.validityBuffer.setByte(i, (b1 + b2));
}
/* Copying the last piece is done in the following manner:
* if the source vector has 1 or more bytes remaining, we copy
* the last piece as a byte formed by shifting data
* from the current byte and the next byte.
*
* if the source vector has no more bytes remaining
* (we are at the last byte), we copy the last piece as a byte
* by shifting data from the current byte.
*/
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
this.validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
this.validityBuffer, firstByteSource + byteSizeTarget, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
} else {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
this.validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
}
}
/**
* Allocate new validity buffer for when the bytes need to be copied over.
*
* @param byteSizeTarget desired size of the buffer
*/
protected void allocateValidityBuffer(long byteSizeTarget) {
validityBuffer = allocator.buffer(byteSizeTarget);
validityBuffer.readerIndex(0);
validityBuffer.setZero(0, validityBuffer.capacity());
}
}