Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package colblk
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "math/bits"
12 : "slices"
13 : "strings"
14 : "unsafe"
15 :
16 : "github.com/cockroachdb/crlib/crbytes"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/binfmt"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/treeprinter"
21 : "github.com/cockroachdb/pebble/sstable/block"
22 : )
23 :
24 : // PrefixBytes holds an array of lexicographically ordered byte slices. It
25 : // provides prefix compression. Prefix compression applies strongly to two cases
26 : // in CockroachDB: removal of the "[/tenantID]/tableID/indexID" prefix that is
27 : // present on all table data keys, and multiple versions of a key that are
28 : // distinguished only by different timestamp suffixes. With columnar blocks
29 : // enabling the timestamp to be placed in a separate column, the multiple
30 : // version problem becomes one of efficiently handling exact duplicate keys.
31 : // PrefixBytes builds off of the RawBytes encoding, introducing additional
32 : // slices for encoding (n+bundleSize-1)/bundleSize bundle prefixes and 1
33 : // block-level shared prefix for the column.
34 : //
35 : // Unlike the original prefix compression performed by rowblk (inherited from
36 : // LevelDB and RocksDB), PrefixBytes does not perform all prefix compression
37 : // relative to the previous key. Rather it performs prefix compression relative
38 : // to the first key of a key's bundle. This can result in less compression, but
39 : // simplifies reverse iteration and allows iteration to be largely stateless.
40 : //
41 : // To understand the PrefixBytes layout, we'll work through an example using
42 : // these 15 keys:
43 : //
44 : // 0123456789
45 : // 0 aaabbbc
46 : // 1 aaabbbcc
47 : // 2 aaabbbcde
48 : // 3 aaabbbce
49 : // 4 aaabbbdee
50 : // 5 aaabbbdee
51 : // 6 aaabbbdee
52 : // 7 aaabbbeff
53 : // 8 aaabbe
54 : // 9 aaabbeef
55 : // 10 aaabbeef
56 : // 11 aaabc
57 : // 12 aabcceef
58 : // 13 aabcceef
59 : // 14 aabcceef
60 : //
61 : // The total length of these keys is 119 bytes. There are 3 keys which occur
62 : // multiple times (rows 4-6, 9-10, 12-14) which models multiple versions of the
63 : // same MVCC key in CockroachDB. There is a shared prefix to all of the keys
64 : // which models the "[/tenantID]/tableID/indexID" present on CockroachDB table
65 : // data keys. There are other shared prefixes which model identical values in
66 : // table key columns.
67 : //
68 : // The table below shows the components of the KeyBytes encoding for these 15
69 : // keys when using a bundle size of 4 which results in 4 bundles. The 15 keys
70 : // are encoded into 20 slices: 1 block prefix, 4 bundle prefixes, and 15
71 : // suffixes. The first slice in the table is the block prefix that is shared by
72 : // all keys in the block. The first slice in each bundle is the bundle prefix
73 : // which is shared by all keys in the bundle.
74 : //
75 : // idx | row | end offset | data
76 : // -------+-------+------------+----------
77 : // 0 | | 2 | aa
78 : // 1 | | 7 | ..abbbc
79 : // 2 | 0 | 7 | .......
80 : // 3 | 1 | 8 | .......c
81 : // 4 | 2 | 10 | .......de
82 : // 5 | 3 | 11 | .......e
83 : // 6 | | 15 | ..abbb
84 : // 7 | 4 | 18 | ......dee
85 : // 8 | 5 | 18 | .........
86 : // 9 | 6 | 18 | .........
87 : // 10 | 7 | 21 | ......eff
88 : // 11 | | 23 | ..ab
89 : // 12 | 8 | 25 | ....be
90 : // 13 | 9 | 29 | ....beef
91 : // 14 | 10 | 29 | ........
92 : // 15 | 11 | 30 | ....c
93 : // 16 | | 36 | ..bcceef
94 : // 17 | 12 | 36 | ........
95 : // 18 | 13 | 36 | ........
96 : // 19 | 14 | 36 | ........
97 : //
98 : // The 'end offset' column in the table encodes the exclusive offset within the
99 : // string data section where each of the slices end. Each slice starts at the
100 : // previous slice's end offset. The first slice (the block prefix)'s start
101 : // offset is implicitly zero. Note that this differs from the plain RawBytes
102 : // encoding which always stores a zero offset at the beginning of the offsets
103 : // array to avoid special-casing the first slice. The block prefix already
104 : // requires special-casing, so materializing the zero start offset is not
105 : // needed.
106 : //
107 : // The table above defines 20 slices: the 1 block key prefix, the 4 bundle key
108 : // prefixes and the 15 key suffixes. Offset[0] is the length of the first slice
109 : // which is always anchored at data[0]. The data columns display the portion of
110 : // the data array the slice covers. For row slices, an empty suffix column
111 : // indicates that the slice is identical to the slice at the previous index
112 : // which is indicated by the slice's offset being equal to the previous slice's
113 : // offset. Due to the lexicographic sorting, the key at row i can't be a prefix
114 : // of the key at row i-1 or it would have sorted before the key at row i-1. And
115 : // if the key differs then only the differing bytes will be part of the suffix
116 : // and not contained in the bundle prefix.
117 : //
118 : // The end result of this encoding is that we can store the 119 bytes of the 15
119 : // keys plus their start and end offsets (which would naively consume 15*4=60
120 : // bytes for at least the key lengths) in 61 bytes (36 bytes of data + 4 bytes
121 : // of offset constant + 20 bytes of offset delta data + 1 byte of bundle size).
122 : //
123 : // # Physical representation
124 : //
125 : // +==================================================================+
126 : // | Bundle size (1 byte) |
127 : // | |
128 : // | The bundle size indicates how many keys prefix compression may |
129 : // | apply across. Every bundleSize keys, prefix compression restarts.|
130 : // | The bundleSize is required to be a power of two, and this 1- |
131 : // | byte prefix stores log2(bundleSize). |
132 : // +==================================================================+
133 : // | RawBytes |
134 : // | |
135 : // | A modified RawBytes encoding is used to store the data slices. A |
136 : // | PrefixBytes column storing n keys will encode |
137 : // | |
138 : // | 1 block prefix |
139 : // | + |
140 : // | (n + bundleSize-1)/bundleSize bundle prefixes |
141 : // | + |
142 : // | n row suffixes |
143 : // | |
144 : // | slices. Unlike the RawBytes encoding, the first offset encoded |
145 : // | is not guaranteed to be zero. In the PrefixBytes encoding, the |
146 : // | first offset encodes the length of the column-wide prefix. The |
147 : // | column-wide prefix is stored in slice(0, offset(0)). |
148 : // | |
149 : // | +------------------------------------------------------------+ |
150 : // | | Offset table | |
151 : // | | | |
152 : // | | A Uint32 column encoding offsets into the string data, | |
153 : // | | possibly delta8 or delta16 encoded. When a delta encoding | |
154 : // | | is used, the base constant is always zero. | |
155 : // | +------------------------------------------------------------+ |
156 : // | | offsetDelta[0] | offsetDelta[1] | ... | offsetDelta[m] | |
157 : // | +------------------------------------------------------------+ |
158 : // | | prefix-compressed string data | |
159 : // | | ... | |
160 : // | +------------------------------------------------------------+ |
161 : // +==================================================================+
162 : //
163 : // TODO(jackson): Consider stealing the low bit of the offset for a flag
164 : // indicating that a key is a duplicate and then using the remaining bits to
165 : // encode the relative index of the duplicated key's end offset. This would
166 : // avoid the O(bundle size) scan in the case of duplicate keys, but at the cost
167 : // of complicating logic to look up a bundle prefix (which may need to follow a
168 : // duplicate key's relative index to uncover the start offset of the bundle
169 : // prefix).
170 : //
171 : // # Reads
172 : //
173 : // This encoding provides O(1) access to any row by calculating the bundle for
174 : // the row (see bundleOffsetIndexForRow), then the per-row's suffix (see
175 : // rowSuffixIndex). If the per-row suffix's end offset equals the previous
176 : // offset, then the row is a duplicate key and we need to step backward until we
177 : // find a non-empty slice or the start of the bundle (a variable number of
178 : // steps, but bounded by the bundle size).
179 : //
180 : // Forward iteration can easily reuse the previous row's key with a check on
181 : // whether the row's slice is empty. Reverse iteration within a run of equal
182 : // keys can reuse the next row's key. When reverse iteration steps backward from
183 : // a non-empty slice onto an empty slice, it must continue backward until a
184 : // non-empty slice is found (just as in absolute positioning) to discover the
185 : // row suffix that is duplicated.
186 : //
187 : // The Seek{GE,LT} routines first binary search on the first key of each bundle
188 : // which can be retrieved without data movement because the bundle prefix is
189 : // immediately adjacent to it in the data array. We can slightly optimize the
190 : // binary search by skipping over all of the keys in the bundle on prefix
191 : // mismatches.
192 : type PrefixBytes struct {
193 : bundleCalc
194 : rows int
195 : sharedPrefixLen int
196 : rawBytes RawBytes
197 : }
198 :
199 : // Assert that PrefixBytes implements Array[[]byte].
200 : var _ Array[[]byte] = PrefixBytes{}
201 :
202 : // DecodePrefixBytes decodes the structure of a PrefixBytes, constructing an
203 : // accessor for an array of lexicographically sorted byte slices constructed by
204 : // PrefixBytesBuilder. Count must be the number of logical slices within the
205 : // array.
206 : func DecodePrefixBytes(
207 : b []byte, offset uint32, count int,
208 2 : ) (prefixBytes PrefixBytes, endOffset uint32) {
209 2 : if count == 0 {
210 0 : panic(errors.AssertionFailedf("empty PrefixBytes"))
211 : }
212 : // The first byte of a PrefixBytes-encoded column is the bundle size
213 : // expressed as log2 of the bundle size (the bundle size must always be a
214 : // power of two)
215 2 : bundleShift := uint32(*((*uint8)(unsafe.Pointer(&b[offset]))))
216 2 : calc := makeBundleCalc(bundleShift)
217 2 : nBundles := int(calc.bundleCount(count))
218 2 :
219 2 : rb, endOffset := DecodeRawBytes(b, offset+1, count+nBundles)
220 2 : pb := PrefixBytes{
221 2 : bundleCalc: calc,
222 2 : rows: count,
223 2 : rawBytes: rb,
224 2 : }
225 2 : pb.sharedPrefixLen = int(pb.rawBytes.offsets.At(0))
226 2 : return pb, endOffset
227 : }
228 :
229 : // Assert that DecodePrefixBytes implements DecodeFunc.
230 : var _ DecodeFunc[PrefixBytes] = DecodePrefixBytes
231 :
232 : // At returns the i'th []byte slice in the PrefixBytes. At must allocate, so
233 : // callers should prefer accessing a slice's constituent components through
234 : // SharedPrefix, BundlePrefix and RowSuffix.
235 1 : func (b PrefixBytes) At(i int) []byte {
236 1 : return slices.Concat(b.SharedPrefix(), b.RowBundlePrefix(i), b.RowSuffix(i))
237 1 : }
238 :
239 : // UnsafeFirstSlice returns first slice in the PrefixBytes. The returned slice
240 : // points directly into the PrefixBytes buffer and must not be mutated.
241 2 : func (b *PrefixBytes) UnsafeFirstSlice() []byte {
242 2 : return b.rawBytes.slice(0, b.rawBytes.offsets.At(2))
243 2 : }
244 :
245 : // PrefixBytesIter is an iterator and associated buffers for PrefixBytes. It
246 : // provides a means for efficiently iterating over the []byte slices contained
247 : // within a PrefixBytes, avoiding unnecessary copying when portions of slices
248 : // are shared.
249 : type PrefixBytesIter struct {
250 : // Buf is used for materializing a user key. It is preallocated to the maximum
251 : // key length in the data block.
252 : Buf []byte
253 : syntheticPrefixLen uint32
254 : sharedAndBundlePrefixLen uint32
255 : offsetIndex int
256 : nextBundleOffsetIndex int
257 : }
258 :
259 : // Init initializes the prefix bytes iterator; maxKeyLength must be
260 : // large enough to fit any key in the block after applying any synthetic prefix
261 : // and/or suffix.
262 2 : func (i *PrefixBytesIter) Init(maxKeyLength int, syntheticPrefix block.SyntheticPrefix) {
263 2 : // Allocate a buffer that's large enough to hold the largest user key in the
264 2 : // block with 1 byte to spare (so that pointer arithmetic is never pointing
265 2 : // beyond the allocation, which would violate Go rules).
266 2 : n := maxKeyLength + 1
267 2 : if cap(i.Buf) < n {
268 2 : ptr := mallocgc(uintptr(n), nil, false)
269 2 : i.Buf = unsafe.Slice((*byte)(ptr), n)
270 2 : }
271 2 : i.Buf = i.Buf[:0]
272 2 : i.syntheticPrefixLen = uint32(len(syntheticPrefix))
273 2 : if syntheticPrefix.IsSet() {
274 2 : i.Buf = append(i.Buf, syntheticPrefix...)
275 2 : }
276 : }
277 :
278 : // SetAt updates the provided PrefixBytesIter to hold the i'th []byte slice in
279 : // the PrefixBytes. The PrefixBytesIter's buffer must be sufficiently large to
280 : // hold the i'th []byte slice, and the caller is required to statically ensure
281 : // this.
282 2 : func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {
283 2 : // Determine the offset and length of the bundle prefix.
284 2 : bundleOffsetIndex := b.bundleOffsetIndexForRow(i)
285 2 : bundleOffsetStart, bundleOffsetEnd := b.rawBytes.offsets.At2(bundleOffsetIndex)
286 2 : bundlePrefixLen := bundleOffsetEnd - bundleOffsetStart
287 2 :
288 2 : // Determine the offset and length of the row's individual suffix.
289 2 : it.offsetIndex = b.rowSuffixIndex(i)
290 2 : // TODO(jackson): rowSuffixOffsets will recompute bundleOffsetIndexForRow in
291 2 : // the case that the row is a duplicate key. Is it worth optimizing to avoid
292 2 : // this recomputation? The expected case is non-duplicate keys, so it may
293 2 : // not be worthwhile.
294 2 : rowSuffixStart, rowSuffixEnd := b.rowSuffixOffsets(i, it.offsetIndex)
295 2 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
296 2 :
297 2 : it.sharedAndBundlePrefixLen = it.syntheticPrefixLen + uint32(b.sharedPrefixLen) + bundlePrefixLen
298 2 : it.Buf = it.Buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
299 2 :
300 2 : ptr := unsafe.Pointer(unsafe.SliceData(it.Buf))
301 2 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(it.syntheticPrefixLen))
302 2 : // Copy the shared key prefix.
303 2 : memmove(ptr, b.rawBytes.data, uintptr(b.sharedPrefixLen))
304 2 : // Copy the bundle prefix.
305 2 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(b.sharedPrefixLen))
306 2 : memmove(
307 2 : ptr,
308 2 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundleOffsetStart)),
309 2 : uintptr(bundlePrefixLen))
310 2 :
311 2 : // Copy the per-row suffix.
312 2 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(bundlePrefixLen))
313 2 : memmove(
314 2 : ptr,
315 2 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
316 2 : uintptr(rowSuffixLen))
317 2 : // Set nextBundleOffsetIndex so that a call to SetNext can cheaply determine
318 2 : // whether the next row is in the same bundle.
319 2 : it.nextBundleOffsetIndex = bundleOffsetIndex + (1 << b.bundleShift) + 1
320 2 : }
321 :
322 : // SetNext updates the provided PrefixBytesIter to hold the next []byte slice in
323 : // the PrefixBytes. SetNext requires the provided iter to currently hold a slice
324 : // and for a subsequent slice to exist within the PrefixBytes. The
325 : // PrefixBytesIter's buffer must be sufficiently large to hold the next []byte
326 : // slice, and the caller is required to statically ensure this.
327 2 : func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
328 2 : it.offsetIndex++
329 2 : // If the next row is in the same bundle, we can take a fast path of only
330 2 : // updating the per-row suffix.
331 2 : if it.offsetIndex < it.nextBundleOffsetIndex {
332 2 : rowSuffixStart, rowSuffixEnd := b.rawBytes.offsets.At2(it.offsetIndex)
333 2 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
334 2 : if rowSuffixLen == 0 {
335 2 : // The start and end offsets are equal, indicating that the key is a
336 2 : // duplicate. Since it's identical to the previous key, there's
337 2 : // nothing left to do, we can leave buf as-is.
338 2 : return
339 2 : }
340 2 : it.Buf = it.Buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
341 2 : // Copy in the per-row suffix.
342 2 : ptr := unsafe.Pointer(unsafe.SliceData(it.Buf))
343 2 : memmove(
344 2 : unsafe.Pointer(uintptr(ptr)+uintptr(it.sharedAndBundlePrefixLen)),
345 2 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
346 2 : uintptr(rowSuffixLen))
347 2 : return
348 : }
349 :
350 : // We've reached the end of the bundle. We need to update the bundle prefix.
351 : // The offsetIndex is currently pointing to the start of the new bundle
352 : // prefix. Increment it to point at the start of the new row suffix.
353 2 : it.offsetIndex++
354 2 : rowSuffixStart, rowSuffixEnd := b.rawBytes.offsets.At2(it.offsetIndex)
355 2 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
356 2 :
357 2 : // Read the offsets of the new bundle prefix and update the index of the
358 2 : // next bundle.
359 2 : bundlePrefixStart := b.rawBytes.offsets.At(it.nextBundleOffsetIndex)
360 2 : bundlePrefixLen := rowSuffixStart - bundlePrefixStart
361 2 : it.nextBundleOffsetIndex = it.offsetIndex + (1 << b.bundleShift)
362 2 :
363 2 : it.sharedAndBundlePrefixLen = it.syntheticPrefixLen + uint32(b.sharedPrefixLen) + bundlePrefixLen
364 2 : it.Buf = it.Buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
365 2 : // Copy in the new bundle suffix.
366 2 : ptr := unsafe.Pointer(unsafe.SliceData(it.Buf))
367 2 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(it.syntheticPrefixLen) + uintptr(b.sharedPrefixLen))
368 2 : memmove(
369 2 : ptr,
370 2 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundlePrefixStart)),
371 2 : uintptr(bundlePrefixLen))
372 2 : // Copy in the per-row suffix.
373 2 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(bundlePrefixLen))
374 2 : memmove(
375 2 : ptr,
376 2 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
377 2 : uintptr(rowSuffixLen))
378 : }
379 :
380 : // SharedPrefix return a []byte of the shared prefix that was extracted from
381 : // all of the values in the Bytes vector. The returned slice should not be
382 : // mutated.
383 2 : func (b *PrefixBytes) SharedPrefix() []byte {
384 2 : // The very first slice is the prefix for the entire column.
385 2 : return b.rawBytes.slice(0, b.rawBytes.offsets.At(0))
386 2 : }
387 :
388 : // RowBundlePrefix takes a row index and returns a []byte of the prefix shared
389 : // among all the keys in the row's bundle, but without the block-level shared
390 : // prefix for the column. The returned slice should not be mutated.
391 1 : func (b *PrefixBytes) RowBundlePrefix(row int) []byte {
392 1 : i := b.bundleOffsetIndexForRow(row)
393 1 : return b.rawBytes.slice(b.rawBytes.offsets.At(i), b.rawBytes.offsets.At(i+1))
394 1 : }
395 :
396 : // BundlePrefix returns the prefix of the i-th bundle in the column. The
397 : // provided i must be in the range [0, BundleCount()). The returned slice should
398 : // not be mutated.
399 1 : func (b *PrefixBytes) BundlePrefix(i int) []byte {
400 1 : j := b.offsetIndexByBundleIndex(i)
401 1 : return b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+1))
402 1 : }
403 :
404 : // RowSuffix returns a []byte of the suffix unique to the row. A row's full key
405 : // is the result of concatenating SharedPrefix(), BundlePrefix() and
406 : // RowSuffix().
407 : //
408 : // The returned slice should not be mutated.
409 1 : func (b *PrefixBytes) RowSuffix(row int) []byte {
410 1 : return b.rawBytes.slice(b.rowSuffixOffsets(row, b.rowSuffixIndex(row)))
411 1 : }
412 :
413 : // rowSuffixOffsets finds the start and end offsets of the row's suffix slice,
414 : // accounting for duplicate keys. It takes the index of the row, and the value
415 : // of rowSuffixIndex(row).
416 2 : func (b *PrefixBytes) rowSuffixOffsets(row, i int) (low uint32, high uint32) {
417 2 : // Retrieve the low and high offsets indicating the start and end of the
418 2 : // row's suffix slice.
419 2 : low, high = b.rawBytes.offsets.At2(i)
420 2 : // If there's a non-empty slice for the row, this row is different than its
421 2 : // predecessor.
422 2 : if low != high {
423 2 : return low, high
424 2 : }
425 : // Otherwise, an empty slice indicates a duplicate key. We need to find the
426 : // first non-empty predecessor within the bundle, or if all the rows are
427 : // empty, return arbitrary equal low and high.
428 : //
429 : // Compute the index of the first row in the bundle so we know when to stop.
430 2 : firstIndex := 1 + b.bundleOffsetIndexForRow(row)
431 2 : for i > firstIndex {
432 2 : // Step back a row, and check if the slice is non-empty.
433 2 : i--
434 2 : high = low
435 2 : low = b.rawBytes.offsets.At(i)
436 2 : if low != high {
437 2 : return low, high
438 2 : }
439 : }
440 : // All the rows in the bundle are empty.
441 2 : return low, high
442 : }
443 :
444 : // Rows returns the count of rows whose keys are encoded within the PrefixBytes.
445 1 : func (b *PrefixBytes) Rows() int {
446 1 : return b.rows
447 1 : }
448 :
449 : // BundleCount returns the count of bundles within the PrefixBytes.
450 2 : func (b *PrefixBytes) BundleCount() int {
451 2 : return b.bundleCount(b.rows)
452 2 : }
453 :
454 : // Search searches for the first key in the PrefixBytes that is greater than or
455 : // equal to k, returning the index of the key and whether an equal key was
456 : // found. If multiple keys are equal, the index of the first such key is
457 : // returned. If all keys are < k, Search returns Rows() for the row index.
458 2 : func (b *PrefixBytes) Search(k []byte) (rowIndex int, isEqual bool) {
459 2 : // First compare to the block-level shared prefix.
460 2 : n := min(len(k), b.sharedPrefixLen)
461 2 : c := bytes.Compare(k[:n], unsafe.Slice((*byte)(b.rawBytes.data), b.sharedPrefixLen))
462 2 : // Note that c cannot be 0 when n < b.sharedPrefixLen.
463 2 : if c != 0 {
464 2 : if c < 0 {
465 2 : // Search key is less than any prefix in the block.
466 2 : return 0, false
467 2 : }
468 : // Search key is greater than any key in the block.
469 2 : return b.rows, false
470 : }
471 : // Trim the block-level shared prefix from the search key.
472 2 : k = k[b.sharedPrefixLen:]
473 2 :
474 2 : // Binary search among the first keys of each bundle.
475 2 : //
476 2 : // Define f(-1) == false and f(upper) == true.
477 2 : // Invariant: f(bi-1) == false, f(upper) == true.
478 2 : nBundles := b.BundleCount()
479 2 : bi, upper := 0, nBundles
480 2 : upperEqual := false
481 2 : for bi < upper {
482 2 : h := int(uint(bi+upper) >> 1) // avoid overflow when computing h
483 2 : // bi ≤ h < upper
484 2 :
485 2 : // Retrieve the first key in the h-th (zero-indexed) bundle. We take
486 2 : // advantage of the fact that the first row is stored contiguously in
487 2 : // the data array (modulo the block prefix) to slice the entirety of the
488 2 : // first key:
489 2 : //
490 2 : // b u n d l e p r e f i x f i r s t k e y r e m a i n d e r
491 2 : // ^ ^ ^
492 2 : // offset(j) offset(j+1) offset(j+2)
493 2 : //
494 2 : j := b.offsetIndexByBundleIndex(h)
495 2 : bundleFirstKey := b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+2))
496 2 : c = bytes.Compare(k, bundleFirstKey)
497 2 : switch {
498 2 : case c > 0:
499 2 : bi = h + 1 // preserves f(bi-1) == false
500 2 : case c < 0:
501 2 : upper = h // preserves f(upper) == true
502 2 : upperEqual = false
503 2 : default:
504 2 : // c == 0
505 2 : upper = h // preserves f(upper) == true
506 2 : upperEqual = true
507 : }
508 : }
509 2 : if bi == 0 {
510 2 : // The very first key is ≥ k. Return it.
511 2 : return 0, upperEqual
512 2 : }
513 : // The first key of the bundle bi is ≥ k, but any of the keys in the
514 : // previous bundle besides the first could also be ≥ k. We can binary search
515 : // among them, but if the seek key doesn't share the previous bundle's
516 : // prefix there's no need.
517 2 : j := b.offsetIndexByBundleIndex(bi - 1)
518 2 : bundlePrefix := b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+1))
519 2 :
520 2 : // The row we are looking for might still be in the previous bundle even
521 2 : // though the seek key is greater than the first key. This is possible only
522 2 : // if the search key shares the first bundle's prefix (eg, the search key
523 2 : // equals a row in the previous bundle or falls between two rows within the
524 2 : // previous bundle).
525 2 : if len(bundlePrefix) > len(k) || !bytes.Equal(k[:len(bundlePrefix)], bundlePrefix) {
526 2 : // The search key doesn't share the previous bundle's prefix, so all of
527 2 : // the keys in the previous bundle must be less than k. We know the
528 2 : // first key of bi is ≥ k, so return it.
529 2 : if bi >= nBundles {
530 2 : return b.rows, false
531 2 : }
532 2 : return bi << b.bundleShift, upperEqual
533 : }
534 : // Binary search among bundle bi-1's key remainders after stripping bundle
535 : // bi-1's prefix.
536 : //
537 : // Define f(l-1) == false and f(u) == true.
538 : // Invariant: f(l-1) == false, f(u) == true.
539 2 : k = k[len(bundlePrefix):]
540 2 : l := 1
541 2 : u := min(1<<b.bundleShift, b.rows-(bi-1)<<b.bundleShift)
542 2 : for l < u {
543 2 : h := int(uint(l+u) >> 1) // avoid overflow when computing h
544 2 : // l ≤ h < u
545 2 :
546 2 : // j is currently the index of the offset of bundle bi-i's prefix.
547 2 : //
548 2 : // b u n d l e p r e f i x f i r s t k e y s e c o n d k e y
549 2 : // ^ ^ ^
550 2 : // offset(j) offset(j+1) offset(j+2)
551 2 : //
552 2 : // The beginning of the zero-indexed i-th key of the bundle is at
553 2 : // offset(j+i+1).
554 2 : //
555 2 : hStart, hEnd := b.rawBytes.offsets.At2(j + h + 1)
556 2 : // There's a complication with duplicate keys. When keys are repeated,
557 2 : // the PrefixBytes encoding avoids re-encoding the duplicate key,
558 2 : // instead encoding an empty slice. While binary searching, if we land
559 2 : // on an empty slice, we need to back up until we find a non-empty slice
560 2 : // which is the key at index h. We iterate with p. If we eventually find
561 2 : // the duplicated key at index p < h and determine f(p) == true, then we
562 2 : // can set u=p (rather than h). If we determine f(p)==false, then we
563 2 : // know f(h)==false too and set l=h+1.
564 2 : p := h
565 2 : if hStart == hEnd {
566 2 : // Back up looking for an empty slice.
567 2 : for hStart == hEnd && p >= l {
568 2 : p--
569 2 : hEnd = hStart
570 2 : hStart = b.rawBytes.offsets.At(j + p + 1)
571 2 : }
572 : // If we backed up to l-1, then all the rows in indexes [l, h] have
573 : // the same keys as index l-1. We know f(l-1) == false [see the
574 : // invariants above], so we can move l to h+1 and continue the loop
575 : // without performing any key comparisons.
576 2 : if p < l {
577 2 : l = h + 1
578 2 : continue
579 : }
580 : }
581 2 : rem := b.rawBytes.slice(hStart, hEnd)
582 2 : c = bytes.Compare(k, rem)
583 2 : switch {
584 2 : case c > 0:
585 2 : l = h + 1 // preserves f(l-1) == false
586 2 : case c < 0:
587 2 : u = p // preserves f(u) == true
588 2 : upperEqual = false
589 2 : default:
590 2 : // c == 0
591 2 : u = p // preserves f(u) == true
592 2 : upperEqual = true
593 : }
594 : }
595 2 : i := (bi-1)<<b.bundleShift + l
596 2 : if i < b.rows {
597 2 : return i, upperEqual
598 2 : }
599 2 : return b.rows, false
600 : }
601 :
602 : func prefixBytesToBinFormatter(
603 : f *binfmt.Formatter, tp treeprinter.Node, count int, sliceFormatter func([]byte) string,
604 1 : ) {
605 1 : if sliceFormatter == nil {
606 1 : sliceFormatter = defaultSliceFormatter
607 1 : }
608 1 : pb, _ := DecodePrefixBytes(f.RelativeData(), uint32(f.RelativeOffset()), count)
609 1 :
610 1 : f.HexBytesln(1, "bundle size: %d", 1<<pb.bundleShift)
611 1 : f.ToTreePrinter(tp)
612 1 :
613 1 : n := tp.Child("offsets table")
614 1 : dataOffset := uint64(f.RelativeOffset()) + uint64(uintptr(pb.rawBytes.data)-uintptr(pb.rawBytes.start))
615 1 : uintsToBinFormatter(f, n, pb.rawBytes.slices+1, func(offsetDelta, offsetBase uint64) string {
616 1 : // NB: offsetBase will always be zero for PrefixBytes columns.
617 1 : return fmt.Sprintf("%d [%d overall]", offsetDelta+offsetBase, offsetDelta+offsetBase+dataOffset)
618 1 : })
619 :
620 1 : n = tp.Child("data")
621 1 :
622 1 : // The first offset encodes the length of the block prefix.
623 1 : blockPrefixLen := pb.rawBytes.offsets.At(0)
624 1 : f.HexBytesln(int(blockPrefixLen), "data[00]: %s (block prefix)",
625 1 : sliceFormatter(pb.rawBytes.slice(0, blockPrefixLen)))
626 1 :
627 1 : k := 2 + (count-1)>>pb.bundleShift + count
628 1 : startOff := blockPrefixLen
629 1 : prevLen := blockPrefixLen
630 1 :
631 1 : // Use dots to indicate string data that's elided because it falls within
632 1 : // the block or bundle prefix.
633 1 : dots := strings.Repeat(".", int(blockPrefixLen))
634 1 : // Iterate through all the slices in the data section, annotating bundle
635 1 : // prefixes and using dots to indicate elided data.
636 1 : for i := 0; i < k-1; i++ {
637 1 : endOff := pb.rawBytes.offsets.At(i + 1)
638 1 : if i%(1+(1<<pb.bundleShift)) == 0 {
639 1 : // This is a bundle prefix.
640 1 : dots = strings.Repeat(".", int(blockPrefixLen))
641 1 : f.HexBytesln(int(endOff-startOff), "data[%02d]: %s%s (bundle prefix)", i+1, dots, sliceFormatter(pb.rawBytes.At(i)))
642 1 : dots = strings.Repeat(".", int(endOff-startOff+blockPrefixLen))
643 1 : prevLen = endOff - startOff + blockPrefixLen
644 1 : } else if startOff == endOff {
645 1 : // An empty slice that's not a block or bundle prefix indicates a
646 1 : // repeat key.
647 1 : f.HexBytesln(0, "data[%02d]: %s", i+1, strings.Repeat(".", int(prevLen)))
648 1 : } else {
649 1 : f.HexBytesln(int(endOff-startOff), "data[%02d]: %s%s", i+1, dots, sliceFormatter(pb.rawBytes.At(i)))
650 1 : prevLen = uint32(len(dots)) + endOff - startOff
651 1 : }
652 1 : startOff = endOff
653 : }
654 1 : f.ToTreePrinter(n)
655 : }
656 :
657 : // PrefixBytesBuilder encodes a column of lexicographically-sorted byte slices,
658 : // applying prefix compression to reduce the encoded size.
659 : type PrefixBytesBuilder struct {
660 : bundleCalc
661 : // TODO(jackson): If prefix compression is very effective, the encoded size
662 : // may remain very small while the physical in-memory size of the
663 : // in-progress data slice may grow very large. This may pose memory usage
664 : // problems during block building.
665 : data []byte // The raw, concatenated keys w/o any prefix compression
666 : nKeys int // The number of keys added to the builder
667 : bundleSize int // The number of keys per bundle
668 : // sizings maintains metadata about the size of the accumulated data at both
669 : // nKeys and nKeys-1. Information for the state after the most recently
670 : // added key is stored at (b.nKeys+1)%2.
671 : sizings [2]prefixBytesSizing
672 : offsets struct {
673 : count int // The number of offsets in the builder
674 : // elemsSize is the size of the array (in count of uint32 elements; not
675 : // bytes)
676 : elemsSize int
677 : // elems provides access to elements without bounds checking. elems is
678 : // grown automatically in addOffset.
679 : elems UnsafeRawSlice[uint32]
680 : }
681 : maxShared uint16
682 : }
683 :
684 : // Assert that PrefixBytesBuilder implements ColumnWriter.
685 : var _ ColumnWriter = (*PrefixBytesBuilder)(nil)
686 :
687 : // Init initializes the PrefixBytesBuilder with the specified bundle size. The
688 : // builder will produce a prefix-compressed column of data type
689 : // DataTypePrefixBytes. The [bundleSize] indicates the number of keys that form
690 : // a "bundle," across which prefix-compression is applied. All keys in the
691 : // column will share a column-wide prefix if there is one.
692 2 : func (b *PrefixBytesBuilder) Init(bundleSize int) {
693 2 : if bundleSize > 0 && (bundleSize&(bundleSize-1)) != 0 {
694 0 : panic(errors.AssertionFailedf("prefixbytes bundle size %d is not a power of 2", bundleSize))
695 : }
696 2 : *b = PrefixBytesBuilder{
697 2 : bundleCalc: makeBundleCalc(uint32(bits.TrailingZeros32(uint32(bundleSize)))),
698 2 : data: b.data[:0],
699 2 : bundleSize: bundleSize,
700 2 : offsets: b.offsets,
701 2 : maxShared: (1 << 16) - 1,
702 2 : }
703 2 : b.offsets.count = 0
704 : }
705 :
706 : // NumColumns implements ColumnWriter.
707 1 : func (b *PrefixBytesBuilder) NumColumns() int { return 1 }
708 :
709 : // DataType implements ColumnWriter.
710 1 : func (b *PrefixBytesBuilder) DataType(int) DataType { return DataTypePrefixBytes }
711 :
712 : // Reset resets the builder to an empty state, preserving the existing bundle
713 : // size.
714 2 : func (b *PrefixBytesBuilder) Reset() {
715 2 : const maxRetainedData = 512 << 10 // 512 KB
716 2 : *b = PrefixBytesBuilder{
717 2 : bundleCalc: b.bundleCalc,
718 2 : data: b.data[:0],
719 2 : bundleSize: b.bundleSize,
720 2 : offsets: b.offsets,
721 2 : maxShared: b.maxShared,
722 2 : sizings: [2]prefixBytesSizing{},
723 2 : }
724 2 : b.offsets.count = 0
725 2 : if len(b.data) > maxRetainedData {
726 0 : b.data = nil
727 0 : }
728 : }
729 :
730 : // Rows returns the number of keys added to the builder.
731 1 : func (b *PrefixBytesBuilder) Rows() int { return b.nKeys }
732 :
733 : // prefixBytesSizing maintains metadata about the size of the accumulated data
734 : // and its encoded size. Every key addition computes a new prefixBytesSizing
735 : // struct. The PrefixBytesBuilder maintains two prefixBytesSizing structs, one
736 : // for the state after the most recent key addition, and one for the state after
737 : // the second most recent key addition.
738 : type prefixBytesSizing struct {
739 : lastKeyOff int // the offset in data where the last key added begins
740 : offsetCount int // the count of offsets required to encode the data
741 : blockPrefixLen int // the length of the block prefix
742 : currentBundleDistinctLen int // the length of the "current" bundle's distinct keys
743 : currentBundleDistinctKeys int // the number of distinct keys in the "current" bundle
744 : // currentBundlePrefixLen is the length of the "current" bundle's prefix.
745 : // The current bundle holds all keys that are not included within
746 : // PrefixBytesBuilder.completedBundleLen. If the addition of a key causes
747 : // the creation of a new bundle, the previous bundle's size is incorporated
748 : // into completedBundleLen and currentBundlePrefixLen is updated to the
749 : // length of the new bundle key. This ensures that there's always at least 1
750 : // key in the "current" bundle allowing Finish to accept rows = nKeys-1.
751 : //
752 : // Note that currentBundlePrefixLen is inclusive of the blockPrefixLen.
753 : //
754 : // INVARIANT: currentBundlePrefixLen >= blockPrefixLen
755 : currentBundlePrefixLen int // the length of the "current" bundle's prefix
756 : currentBundlePrefixOffset int // the index of the offset of the "current" bundle's prefix
757 : completedBundleLen int // the encoded size of completed bundles
758 : compressedDataLen int // the compressed, encoded size of data
759 : offsetEncoding UintEncoding // the encoding necessary to encode the offsets
760 : }
761 :
762 0 : func (sz *prefixBytesSizing) String() string {
763 0 : return fmt.Sprintf("lastKeyOff:%d offsetCount:%d blockPrefixLen:%d\n"+
764 0 : "currentBundleDistinct{Len,Keys}: (%d,%d)\n"+
765 0 : "currentBundlePrefix{Len,Offset}: (%d,%d)\n"+
766 0 : "completedBundleLen:%d compressedDataLen:%d offsetEncoding:%s",
767 0 : sz.lastKeyOff, sz.offsetCount, sz.blockPrefixLen, sz.currentBundleDistinctLen,
768 0 : sz.currentBundleDistinctKeys, sz.currentBundlePrefixLen, sz.currentBundlePrefixOffset,
769 0 : sz.completedBundleLen, sz.compressedDataLen, sz.offsetEncoding)
770 0 : }
771 :
772 : // Put adds the provided key to the column. The provided key must be
773 : // lexicographically greater than or equal to the previous key added to the
774 : // builder.
775 : //
776 : // The provided bytesSharedWithPrev must be the length of the byte prefix the
777 : // provided key shares with the previous key. The caller is required to provide
778 : // this because in the primary expected use, the caller will already need to
779 : // compute it for the purpose of determining whether successive keys share the
780 : // same prefix.
781 2 : func (b *PrefixBytesBuilder) Put(key []byte, bytesSharedWithPrev int) {
782 2 : currIdx := b.nKeys & 1 // %2
783 2 : curr := &b.sizings[currIdx]
784 2 : prev := &b.sizings[1-currIdx]
785 2 :
786 2 : if invariants.Enabled {
787 2 : if len(key) == 0 {
788 0 : panic(errors.AssertionFailedf("key must be non-empty"))
789 : }
790 2 : if b.maxShared == 0 {
791 0 : panic(errors.AssertionFailedf("maxShared must be positive"))
792 : }
793 2 : if b.nKeys > 0 {
794 2 : if bytes.Compare(key, b.data[prev.lastKeyOff:]) < 0 {
795 0 : panic(errors.AssertionFailedf("keys must be added in order: %q < %q", key, b.data[prev.lastKeyOff:]))
796 : }
797 2 : if bytesSharedWithPrev != crbytes.CommonPrefix(key, b.data[prev.lastKeyOff:]) {
798 0 : panic(errors.AssertionFailedf("bytesSharedWithPrev %d != %d", bytesSharedWithPrev,
799 0 : crbytes.CommonPrefix(key, b.data[prev.lastKeyOff:])))
800 : }
801 : }
802 : }
803 :
804 2 : switch {
805 2 : case b.nKeys == 0:
806 2 : // We're adding the first key to the block.
807 2 : // Set a placeholder offset for the block prefix length.
808 2 : b.addOffset(0)
809 2 : // Set a placeholder offset for the bundle prefix length.
810 2 : b.addOffset(0)
811 2 : b.nKeys++
812 2 : b.data = append(b.data, key...)
813 2 : b.addOffset(uint32(len(b.data)))
814 2 : *curr = prefixBytesSizing{
815 2 : lastKeyOff: 0,
816 2 : offsetCount: b.offsets.count,
817 2 : blockPrefixLen: min(len(key), int(b.maxShared)),
818 2 : currentBundleDistinctLen: len(key),
819 2 : currentBundleDistinctKeys: 1,
820 2 : currentBundlePrefixLen: min(len(key), int(b.maxShared)),
821 2 : currentBundlePrefixOffset: 1,
822 2 : completedBundleLen: 0,
823 2 : compressedDataLen: len(key),
824 2 : offsetEncoding: DetermineUintEncoding(0, uint64(len(key)), UintEncodingRowThreshold),
825 2 : }
826 2 : case b.nKeys&(b.bundleSize-1) == 0:
827 2 : // We're starting a new bundle.
828 2 :
829 2 : // Set the bundle prefix length of the previous bundle.
830 2 : b.offsets.elems.set(prev.currentBundlePrefixOffset,
831 2 : b.offsets.elems.At(prev.currentBundlePrefixOffset-1)+uint32(prev.currentBundlePrefixLen))
832 2 :
833 2 : // Finalize the encoded size of the previous bundle.
834 2 : bundleSizeJustCompleted := prev.currentBundleDistinctLen - (prev.currentBundleDistinctKeys-1)*prev.currentBundlePrefixLen
835 2 : completedBundleSize := prev.completedBundleLen + bundleSizeJustCompleted
836 2 :
837 2 : // Update the block prefix length if necessary. The caller tells us how
838 2 : // many bytes of prefix this key shares with the previous key. The block
839 2 : // prefix can only shrink if the bytes shared with the previous key are
840 2 : // less than the block prefix length, in which case the new block prefix
841 2 : // is the number of bytes shared with the previous key.
842 2 : blockPrefixLen := min(prev.blockPrefixLen, bytesSharedWithPrev)
843 2 : b.nKeys++
844 2 : *curr = prefixBytesSizing{
845 2 : lastKeyOff: len(b.data),
846 2 : offsetCount: b.offsets.count + 2,
847 2 : blockPrefixLen: blockPrefixLen,
848 2 : completedBundleLen: completedBundleSize,
849 2 : // We're adding the first key to the current bundle. Initialize
850 2 : // the current bundle prefix.
851 2 : currentBundlePrefixOffset: b.offsets.count,
852 2 : currentBundlePrefixLen: min(len(key), int(b.maxShared)),
853 2 : currentBundleDistinctLen: len(key),
854 2 : currentBundleDistinctKeys: 1,
855 2 : compressedDataLen: completedBundleSize + len(key) - (b.bundleCount(b.nKeys)-1)*blockPrefixLen,
856 2 : }
857 2 : curr.offsetEncoding = DetermineUintEncoding(0, uint64(curr.compressedDataLen), UintEncodingRowThreshold)
858 2 : b.data = append(b.data, key...)
859 2 : b.addOffset(0) // Placeholder for bundle prefix.
860 2 : b.addOffset(uint32(len(b.data)))
861 2 : default:
862 2 : // Adding a new key to an existing bundle.
863 2 : b.nKeys++
864 2 :
865 2 : if bytesSharedWithPrev == len(key) {
866 2 : // Duplicate key; don't add it to the data slice and don't adjust
867 2 : // currentBundleDistinct{Len,Keys}.
868 2 : *curr = *prev
869 2 : curr.offsetCount++
870 2 : b.addOffset(b.offsets.elems.At(b.offsets.count - 1))
871 2 : return
872 2 : }
873 :
874 : // Update the bundle prefix length. Note that the shared prefix length
875 : // can only shrink as new values are added. During construction, the
876 : // bundle prefix value is stored contiguously in the data array so even
877 : // if the bundle prefix length changes no adjustment is needed to that
878 : // value or to the first key in the bundle.
879 2 : *curr = prefixBytesSizing{
880 2 : lastKeyOff: len(b.data),
881 2 : offsetCount: prev.offsetCount + 1,
882 2 : blockPrefixLen: min(prev.blockPrefixLen, bytesSharedWithPrev),
883 2 : currentBundleDistinctLen: prev.currentBundleDistinctLen + len(key),
884 2 : currentBundleDistinctKeys: prev.currentBundleDistinctKeys + 1,
885 2 : currentBundlePrefixLen: min(prev.currentBundlePrefixLen, bytesSharedWithPrev),
886 2 : currentBundlePrefixOffset: prev.currentBundlePrefixOffset,
887 2 : completedBundleLen: prev.completedBundleLen,
888 2 : }
889 2 : // Compute the correct compressedDataLen.
890 2 : curr.compressedDataLen = curr.completedBundleLen +
891 2 : curr.currentBundleDistinctLen -
892 2 : (curr.currentBundleDistinctKeys-1)*curr.currentBundlePrefixLen
893 2 : // Currently compressedDataLen is correct, except that it includes the block
894 2 : // prefix length for all bundle prefixes. Adjust the length to account for
895 2 : // the block prefix being stripped from every bundle except the first one.
896 2 : curr.compressedDataLen -= (b.bundleCount(b.nKeys) - 1) * curr.blockPrefixLen
897 2 : // The compressedDataLen is the largest offset we'll need to encode in the
898 2 : // offset table.
899 2 : curr.offsetEncoding = DetermineUintEncoding(0, uint64(curr.compressedDataLen), UintEncodingRowThreshold)
900 2 : b.data = append(b.data, key...)
901 2 : b.addOffset(uint32(len(b.data)))
902 : }
903 : }
904 :
905 : // UnsafeGet returns the zero-indexed i'th key added to the builder through Put.
906 : // UnsafeGet may only be used to retrieve the Rows()-1'th or Rows()-2'th keys.
907 : // If called with a different i value, UnsafeGet panics. The keys returned by
908 : // UnsafeGet are guaranteed to be stable until Finish or Reset is called. The
909 : // caller must not mutate the returned slice.
910 2 : func (b *PrefixBytesBuilder) UnsafeGet(i int) []byte {
911 2 : switch i {
912 2 : case b.nKeys - 1:
913 2 : lastKeyOff := b.sizings[i&1].lastKeyOff
914 2 : return b.data[lastKeyOff:]
915 2 : case b.nKeys - 2:
916 2 : lastKeyOff := b.sizings[(i+1)&1].lastKeyOff
917 2 : secondLastKeyOff := b.sizings[i&1].lastKeyOff
918 2 : if secondLastKeyOff == lastKeyOff {
919 2 : // The last key is a duplicate of the second-to-last key.
920 2 : return b.data[secondLastKeyOff:]
921 2 : }
922 2 : return b.data[secondLastKeyOff:lastKeyOff]
923 0 : default:
924 0 : panic(errors.AssertionFailedf("UnsafeGet(%d) called on PrefixBytes with %d keys", i, b.nKeys))
925 : }
926 : }
927 :
928 : // addOffset adds an offset to the offsets table. If necessary, addOffset will
929 : // grow the offset table to accommodate the new offset.
930 2 : func (b *PrefixBytesBuilder) addOffset(offset uint32) {
931 2 : if b.offsets.count == b.offsets.elemsSize {
932 2 : // Double the size of the allocated array, or initialize it to at least
933 2 : // 64 rows if this is the first allocation.
934 2 : n2 := max(b.offsets.elemsSize<<1, 64)
935 2 : newDataTyped := make([]uint32, n2)
936 2 : copy(newDataTyped, b.offsets.elems.Slice(b.offsets.elemsSize))
937 2 : b.offsets.elems = makeUnsafeRawSlice[uint32](unsafe.Pointer(&newDataTyped[0]))
938 2 : b.offsets.elemsSize = n2
939 2 : }
940 2 : b.offsets.elems.set(b.offsets.count, offset)
941 2 : b.offsets.count++
942 : }
943 :
944 : // writePrefixCompressed writes the provided builder's first [rows] rows with
945 : // prefix-compression applied. It writes offsets and string data in tandem,
946 : // writing offsets of width T into [offsetDeltas] and compressed string data
947 : // into [buf]. The builder's internal state is not modified by
948 : // writePrefixCompressed. writePrefixCompressed is generic in terms of the type
949 : // T of the offset deltas.
950 : //
951 : // The caller must have correctly constructed [offsetDeltas] such that writing
952 : // [sizing.offsetCount] offsets of size T does not overwrite the beginning of
953 : // [buf]:
954 : //
955 : // +-------------------------------------+ <- offsetDeltas.ptr
956 : // | offsetDeltas[0] |
957 : // +-------------------------------------+
958 : // | offsetDeltas[1] |
959 : // +-------------------------------------+
960 : // | ... |
961 : // +-------------------------------------+
962 : // | offsetDeltas[sizing.offsetCount-1] |
963 : // +-------------------------------------+ <- &buf[0]
964 : // | buf (string data) |
965 : // | ... |
966 : // +-------------------------------------+
967 : func writePrefixCompressed[T Uint](
968 : b *PrefixBytesBuilder,
969 : rows int,
970 : sz *prefixBytesSizing,
971 : offsetDeltas UnsafeRawSlice[T],
972 : buf []byte,
973 2 : ) {
974 2 : if rows == 0 {
975 0 : return
976 2 : } else if rows == 1 {
977 2 : // If there's just 1 row, no prefix compression is necessary and we can
978 2 : // just encode the first key as the entire block prefix and first bundle
979 2 : // prefix.
980 2 : e := b.offsets.elems.At(2)
981 2 : offsetDeltas.set(0, T(e))
982 2 : offsetDeltas.set(1, T(e))
983 2 : offsetDeltas.set(2, T(e))
984 2 : copy(buf, b.data[:e])
985 2 : return
986 2 : }
987 :
988 : // The offset at index 0 is the block prefix length.
989 2 : offsetDeltas.set(0, T(sz.blockPrefixLen))
990 2 : destOffset := T(copy(buf, b.data[:sz.blockPrefixLen]))
991 2 : var lastRowOffset uint32
992 2 : var shared int
993 2 :
994 2 : // Loop over the slices starting at the bundle prefix of the first bundle.
995 2 : // If the slice is a bundle prefix, carve off the suffix that excludes the
996 2 : // block prefix. Otherwise, carve off the suffix that excludes the block
997 2 : // prefix + bundle prefix.
998 2 : for i := 1; i < sz.offsetCount; i++ {
999 2 : off := b.offsets.elems.At(i)
1000 2 : var suffix []byte
1001 2 : if (i-1)%(b.bundleSize+1) == 0 {
1002 2 : // This is a bundle prefix.
1003 2 : if i == sz.currentBundlePrefixOffset {
1004 2 : suffix = b.data[lastRowOffset+uint32(sz.blockPrefixLen) : lastRowOffset+uint32(sz.currentBundlePrefixLen)]
1005 2 : } else {
1006 2 : suffix = b.data[lastRowOffset+uint32(sz.blockPrefixLen) : off]
1007 2 : }
1008 2 : shared = sz.blockPrefixLen + len(suffix)
1009 : // We don't update lastRowOffset here because the bundle prefix
1010 : // was never actually stored separately in the data array.
1011 2 : } else {
1012 2 : // If the offset of this key is the same as the offset of the
1013 2 : // previous key, then the key is a duplicate. All we need to do is
1014 2 : // set the same offset in the destination.
1015 2 : if off == lastRowOffset {
1016 2 : offsetDeltas.set(i, offsetDeltas.At(i-1))
1017 2 : continue
1018 : }
1019 2 : suffix = b.data[lastRowOffset+uint32(shared) : off]
1020 2 : // Update lastRowOffset for the next iteration of this loop.
1021 2 : lastRowOffset = off
1022 : }
1023 2 : if invariants.Enabled && len(buf[destOffset:]) < len(suffix) {
1024 0 : panic(errors.AssertionFailedf("buf is too small: %d < %d", len(buf[destOffset:]), len(suffix)))
1025 : }
1026 2 : destOffset += T(copy(buf[destOffset:], suffix))
1027 2 : offsetDeltas.set(i, destOffset)
1028 : }
1029 2 : if destOffset != T(sz.compressedDataLen) {
1030 0 : panic(errors.AssertionFailedf("wrote %d, expected %d", destOffset, sz.compressedDataLen))
1031 : }
1032 : }
1033 :
1034 : // Finish writes the serialized byte slices to buf starting at offset. The buf
1035 : // slice must be sufficiently large to store the serialized output. The caller
1036 : // should use [Size] to size buf appropriately before calling Finish.
1037 : //
1038 : // Finish only supports values of [rows] equal to the number of keys set on the
1039 : // builder, or one less.
1040 : func (b *PrefixBytesBuilder) Finish(
1041 : col int, rows int, offset uint32, buf []byte,
1042 2 : ) (endOffset uint32) {
1043 2 : if rows < b.nKeys-1 || rows > b.nKeys {
1044 0 : panic(errors.AssertionFailedf("PrefixBytes has accumulated %d keys, asked to Finish %d", b.nKeys, rows))
1045 : }
1046 2 : if rows == 0 {
1047 0 : return offset
1048 0 : }
1049 : // Encode the bundle shift.
1050 2 : buf[offset] = byte(b.bundleShift)
1051 2 : offset++
1052 2 :
1053 2 : sz := &b.sizings[(rows+1)%2]
1054 2 : stringDataOffset := uintColumnSize(uint32(sz.offsetCount), offset, sz.offsetEncoding)
1055 2 : if sz.offsetEncoding.IsDelta() {
1056 0 : panic(errors.AssertionFailedf("offsets never need delta encoding"))
1057 : }
1058 :
1059 2 : width := uint32(sz.offsetEncoding.Width())
1060 2 : buf[offset] = byte(sz.offsetEncoding)
1061 2 : offset++
1062 2 : offset = alignWithZeroes(buf, offset, width)
1063 2 : switch width {
1064 2 : case 1:
1065 2 : offsetDest := makeUnsafeRawSlice[uint8](unsafe.Pointer(&buf[offset]))
1066 2 : writePrefixCompressed[uint8](b, rows, sz, offsetDest, buf[stringDataOffset:])
1067 2 : case align16:
1068 2 : offsetDest := makeUnsafeRawSlice[uint16](unsafe.Pointer(&buf[offset]))
1069 2 : writePrefixCompressed[uint16](b, rows, sz, offsetDest, buf[stringDataOffset:])
1070 1 : case align32:
1071 1 : offsetDest := makeUnsafeRawSlice[uint32](unsafe.Pointer(&buf[offset]))
1072 1 : writePrefixCompressed[uint32](b, rows, sz, offsetDest, buf[stringDataOffset:])
1073 0 : default:
1074 0 : panic("unreachable")
1075 : }
1076 2 : return stringDataOffset + uint32(sz.compressedDataLen)
1077 : }
1078 :
1079 : // Size computes the size required to encode the byte slices beginning at the
1080 : // provided offset. The offset is required to ensure proper alignment. The
1081 : // returned uint32 is the offset of the first byte after the end of the encoded
1082 : // data. To compute the size in bytes, subtract the [offset] passed into Size
1083 : // from the returned offset.
1084 2 : func (b *PrefixBytesBuilder) Size(rows int, offset uint32) uint32 {
1085 2 : if rows == 0 {
1086 1 : return offset
1087 2 : } else if rows != b.nKeys && rows != b.nKeys-1 {
1088 0 : panic(errors.AssertionFailedf("PrefixBytes has accumulated %d keys, asked to Size %d", b.nKeys, rows))
1089 : }
1090 2 : sz := &b.sizings[(rows+1)%2]
1091 2 : // The 1-byte bundleSize.
1092 2 : offset++
1093 2 : // Compute the size of the offsets table.
1094 2 : offset = uintColumnSize(uint32(sz.offsetCount), offset, sz.offsetEncoding)
1095 2 : return offset + uint32(sz.compressedDataLen)
1096 : }
1097 :
1098 : // WriteDebug implements the Encoder interface.
1099 1 : func (b *PrefixBytesBuilder) WriteDebug(w io.Writer, rows int) {
1100 1 : fmt.Fprintf(w, "prefixbytes(%d): %d keys", b.bundleSize, b.nKeys)
1101 1 : }
1102 :
1103 : // bundleCalc provides facilities for computing indexes and offsets within a
1104 : // PrefixBytes structure.
1105 : type bundleCalc struct {
1106 : bundleShift uint32 // log2(bundleSize)
1107 : // bundleMask is a mask with 1s across the high bits that indicate the
1108 : // bundle and 0s for the bits that indicate the position within the bundle.
1109 : bundleMask uint32
1110 : }
1111 :
1112 2 : func makeBundleCalc(bundleShift uint32) bundleCalc {
1113 2 : return bundleCalc{
1114 2 : bundleShift: bundleShift,
1115 2 : bundleMask: ^((1 << bundleShift) - 1),
1116 2 : }
1117 2 : }
1118 :
1119 : // rowSuffixIndex computes the index of the offset encoding the start of a row's
1120 : // suffix. Example usage of retrieving the row's suffix:
1121 : //
1122 : // i := b.rowSuffixIndex(row)
1123 : // l := b.rawBytes.offsets.At(i)
1124 : // h := b.rawBytes.offsets.At(i + 1)
1125 : // suffix := b.rawBytes.slice(l, h)
1126 2 : func (b bundleCalc) rowSuffixIndex(row int) int {
1127 2 : return 1 + (row >> b.bundleShift) + row
1128 2 : }
1129 :
1130 : // bundleOffsetIndexForRow computes the index of the offset encoding the start
1131 : // of a bundle's prefix.
1132 2 : func (b bundleCalc) bundleOffsetIndexForRow(row int) int {
1133 2 : // AND-ing the row with the bundle mask removes the least significant bits
1134 2 : // of the row, which encode the row's index within the bundle.
1135 2 : return int((uint32(row) >> b.bundleShift) + (uint32(row) & b.bundleMask))
1136 2 : }
1137 :
1138 : // offsetIndexByBundleIndex computes the index of the offset encoding the start
1139 : // of a bundle's prefix when given the bundle's index (an index in
1140 : // [0,Rows/BundleSize)).
1141 2 : func (b bundleCalc) offsetIndexByBundleIndex(bi int) int {
1142 2 : return bi<<b.bundleShift + bi
1143 2 : }
1144 :
1145 2 : func (b bundleCalc) bundleCount(rows int) int {
1146 2 : return 1 + (rows-1)>>b.bundleShift
1147 2 : }
|