Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package colblk
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "math/bits"
12 : "slices"
13 : "strings"
14 : "unsafe"
15 :
16 : "github.com/cockroachdb/crlib/crbytes"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/binfmt"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/treeprinter"
21 : "github.com/cockroachdb/pebble/sstable/block"
22 : )
23 :
24 : // PrefixBytes holds an array of lexicographically ordered byte slices. It
25 : // provides prefix compression. Prefix compression applies strongly to two cases
26 : // in CockroachDB: removal of the "[/tenantID]/tableID/indexID" prefix that is
27 : // present on all table data keys, and multiple versions of a key that are
28 : // distinguished only by different timestamp suffixes. With columnar blocks
29 : // enabling the timestamp to be placed in a separate column, the multiple
30 : // version problem becomes one of efficiently handling exact duplicate keys.
31 : // PrefixBytes builds off of the RawBytes encoding, introducing additional
32 : // slices for encoding (n+bundleSize-1)/bundleSize bundle prefixes and 1
33 : // block-level shared prefix for the column.
34 : //
35 : // Unlike the original prefix compression performed by rowblk (inherited from
36 : // LevelDB and RocksDB), PrefixBytes does not perform all prefix compression
37 : // relative to the previous key. Rather it performs prefix compression relative
38 : // to the first key of a key's bundle. This can result in less compression, but
39 : // simplifies reverse iteration and allows iteration to be largely stateless.
40 : //
41 : // To understand the PrefixBytes layout, we'll work through an example using
42 : // these 15 keys:
43 : //
44 : // 0123456789
45 : // 0 aaabbbc
46 : // 1 aaabbbcc
47 : // 2 aaabbbcde
48 : // 3 aaabbbce
49 : // 4 aaabbbdee
50 : // 5 aaabbbdee
51 : // 6 aaabbbdee
52 : // 7 aaabbbeff
53 : // 8 aaabbe
54 : // 9 aaabbeef
55 : // 10 aaabbeef
56 : // 11 aaabc
57 : // 12 aabcceef
58 : // 13 aabcceef
59 : // 14 aabcceef
60 : //
61 : // The total length of these keys is 119 bytes. There are 3 keys which occur
62 : // multiple times (rows 4-6, 9-10, 12-14) which models multiple versions of the
63 : // same MVCC key in CockroachDB. There is a shared prefix to all of the keys
64 : // which models the "[/tenantID]/tableID/indexID" present on CockroachDB table
65 : // data keys. There are other shared prefixes which model identical values in
66 : // table key columns.
67 : //
68 : // The table below shows the components of the KeyBytes encoding for these 15
69 : // keys when using a bundle size of 4 which results in 4 bundles. The 15 keys
70 : // are encoded into 20 slices: 1 block prefix, 4 bundle prefixes, and 15
71 : // suffixes. The first slice in the table is the block prefix that is shared by
72 : // all keys in the block. The first slice in each bundle is the bundle prefix
73 : // which is shared by all keys in the bundle.
74 : //
75 : // idx | row | end offset | data
76 : // -------+-------+------------+----------
77 : // 0 | | 2 | aa
78 : // 1 | | 7 | ..abbbc
79 : // 2 | 0 | 7 | .......
80 : // 3 | 1 | 8 | .......c
81 : // 4 | 2 | 10 | .......de
82 : // 5 | 3 | 11 | .......e
83 : // 6 | | 15 | ..abbb
84 : // 7 | 4 | 18 | ......dee
85 : // 8 | 5 | 18 | .........
86 : // 9 | 6 | 18 | .........
87 : // 10 | 7 | 21 | ......eff
88 : // 11 | | 23 | ..ab
89 : // 12 | 8 | 25 | ....be
90 : // 13 | 9 | 29 | ....beef
91 : // 14 | 10 | 29 | ........
92 : // 15 | 11 | 30 | ....c
93 : // 16 | | 36 | ..bcceef
94 : // 17 | 12 | 36 | ........
95 : // 18 | 13 | 36 | ........
96 : // 19 | 14 | 36 | ........
97 : //
98 : // The 'end offset' column in the table encodes the exclusive offset within the
99 : // string data section where each of the slices end. Each slice starts at the
100 : // previous slice's end offset. The first slice (the block prefix)'s start
101 : // offset is implicitly zero. Note that this differs from the plain RawBytes
102 : // encoding which always stores a zero offset at the beginning of the offsets
103 : // array to avoid special-casing the first slice. The block prefix already
104 : // requires special-casing, so materializing the zero start offset is not
105 : // needed.
106 : //
107 : // The table above defines 20 slices: the 1 block key prefix, the 4 bundle key
108 : // prefixes and the 15 key suffixes. Offset[0] is the length of the first slice
109 : // which is always anchored at data[0]. The data columns display the portion of
110 : // the data array the slice covers. For row slices, an empty suffix column
111 : // indicates that the slice is identical to the slice at the previous index
112 : // which is indicated by the slice's offset being equal to the previous slice's
113 : // offset. Due to the lexicographic sorting, the key at row i can't be a prefix
114 : // of the key at row i-1 or it would have sorted before the key at row i-1. And
115 : // if the key differs then only the differing bytes will be part of the suffix
116 : // and not contained in the bundle prefix.
117 : //
118 : // The end result of this encoding is that we can store the 119 bytes of the 15
119 : // keys plus their start and end offsets (which would naively consume 15*4=60
120 : // bytes for at least the key lengths) in 61 bytes (36 bytes of data + 4 bytes
121 : // of offset constant + 20 bytes of offset delta data + 1 byte of bundle size).
122 : //
123 : // # Physical representation
124 : //
125 : // +==================================================================+
126 : // | Bundle size (1 byte) |
127 : // | |
128 : // | The bundle size indicates how many keys prefix compression may |
129 : // | apply across. Every bundleSize keys, prefix compression restarts.|
130 : // | The bundleSize is required to be a power of two, and this 1- |
131 : // | byte prefix stores log2(bundleSize). |
132 : // +==================================================================+
133 : // | RawBytes |
134 : // | |
135 : // | A modified RawBytes encoding is used to store the data slices. A |
136 : // | PrefixBytes column storing n keys will encode |
137 : // | |
138 : // | 1 block prefix |
139 : // | + |
140 : // | (n + bundleSize-1)/bundleSize bundle prefixes |
141 : // | + |
142 : // | n row suffixes |
143 : // | |
144 : // | slices. Unlike the RawBytes encoding, the first offset encoded |
145 : // | is not guaranteed to be zero. In the PrefixBytes encoding, the |
146 : // | first offset encodes the length of the column-wide prefix. The |
147 : // | column-wide prefix is stored in slice(0, offset(0)). |
148 : // | |
149 : // | +------------------------------------------------------------+ |
150 : // | | Offset table | |
151 : // | | | |
152 : // | | A Uint32 column encoding offsets into the string data, | |
153 : // | | possibly delta8 or delta16 encoded. When a delta encoding | |
154 : // | | is used, the base constant is always zero. | |
155 : // | +------------------------------------------------------------+ |
156 : // | | offsetDelta[0] | offsetDelta[1] | ... | offsetDelta[m] | |
157 : // | +------------------------------------------------------------+ |
158 : // | | prefix-compressed string data | |
159 : // | | ... | |
160 : // | +------------------------------------------------------------+ |
161 : // +==================================================================+
162 : //
163 : // TODO(jackson): Consider stealing the low bit of the offset for a flag
164 : // indicating that a key is a duplicate and then using the remaining bits to
165 : // encode the relative index of the duplicated key's end offset. This would
166 : // avoid the O(bundle size) scan in the case of duplicate keys, but at the cost
167 : // of complicating logic to look up a bundle prefix (which may need to follow a
168 : // duplicate key's relative index to uncover the start offset of the bundle
169 : // prefix).
170 : //
171 : // # Reads
172 : //
173 : // This encoding provides O(1) access to any row by calculating the bundle for
174 : // the row (see bundleOffsetIndexForRow), then the per-row's suffix (see
175 : // rowSuffixIndex). If the per-row suffix's end offset equals the previous
176 : // offset, then the row is a duplicate key and we need to step backward until we
177 : // find a non-empty slice or the start of the bundle (a variable number of
178 : // steps, but bounded by the bundle size).
179 : //
180 : // Forward iteration can easily reuse the previous row's key with a check on
181 : // whether the row's slice is empty. Reverse iteration within a run of equal
182 : // keys can reuse the next row's key. When reverse iteration steps backward from
183 : // a non-empty slice onto an empty slice, it must continue backward until a
184 : // non-empty slice is found (just as in absolute positioning) to discover the
185 : // row suffix that is duplicated.
186 : //
187 : // The Seek{GE,LT} routines first binary search on the first key of each bundle
188 : // which can be retrieved without data movement because the bundle prefix is
189 : // immediately adjacent to it in the data array. We can slightly optimize the
190 : // binary search by skipping over all of the keys in the bundle on prefix
191 : // mismatches.
192 : type PrefixBytes struct {
193 : bundleCalc
194 : rows int
195 : sharedPrefixLen int
196 : rawBytes RawBytes
197 : }
198 :
199 : // Assert that PrefixBytes implements Array[[]byte].
200 : var _ Array[[]byte] = PrefixBytes{}
201 :
202 : // DecodePrefixBytes decodes the structure of a PrefixBytes, constructing an
203 : // accessor for an array of lexicographically sorted byte slices constructed by
204 : // PrefixBytesBuilder. Count must be the number of logical slices within the
205 : // array.
206 : func DecodePrefixBytes(
207 : b []byte, offset uint32, count int,
208 1 : ) (prefixBytes PrefixBytes, endOffset uint32) {
209 1 : if count == 0 {
210 0 : panic(errors.AssertionFailedf("empty PrefixBytes"))
211 : }
212 : // The first byte of a PrefixBytes-encoded column is the bundle size
213 : // expressed as log2 of the bundle size (the bundle size must always be a
214 : // power of two)
215 1 : bundleShift := uint32(*((*uint8)(unsafe.Pointer(&b[offset]))))
216 1 : calc := makeBundleCalc(bundleShift)
217 1 : nBundles := int(calc.bundleCount(count))
218 1 :
219 1 : rb, endOffset := DecodeRawBytes(b, offset+1, count+nBundles)
220 1 : pb := PrefixBytes{
221 1 : bundleCalc: calc,
222 1 : rows: count,
223 1 : rawBytes: rb,
224 1 : }
225 1 : pb.sharedPrefixLen = int(pb.rawBytes.offsets.At(0))
226 1 : return pb, endOffset
227 : }
228 :
229 : // Assert that DecodePrefixBytes implements DecodeFunc.
230 : var _ DecodeFunc[PrefixBytes] = DecodePrefixBytes
231 :
232 : // At returns the i'th []byte slice in the PrefixBytes. At must allocate, so
233 : // callers should prefer accessing a slice's constituent components through
234 : // SharedPrefix, BundlePrefix and RowSuffix.
235 1 : func (b PrefixBytes) At(i int) []byte {
236 1 : return slices.Concat(b.SharedPrefix(), b.RowBundlePrefix(i), b.RowSuffix(i))
237 1 : }
238 :
239 : // UnsafeFirstSlice returns first slice in the PrefixBytes. The returned slice
240 : // points directly into the PrefixBytes buffer and must not be mutated.
241 1 : func (b *PrefixBytes) UnsafeFirstSlice() []byte {
242 1 : return b.rawBytes.slice(0, b.rawBytes.offsets.At(2))
243 1 : }
244 :
245 : // PrefixBytesIter is an iterator and associated buffers for PrefixBytes. It
246 : // provides a means for efficiently iterating over the []byte slices contained
247 : // within a PrefixBytes, avoiding unnecessary copying when portions of slices
248 : // are shared.
249 : type PrefixBytesIter struct {
250 : // Buf is used for materializing a user key. It is preallocated to the maximum
251 : // key length in the data block.
252 : Buf []byte
253 : syntheticPrefixLen uint32
254 : sharedAndBundlePrefixLen uint32
255 : offsetIndex int
256 : nextBundleOffsetIndex int
257 : }
258 :
259 : // Init initializes the prefix bytes iterator; maxKeyLength must be
260 : // large enough to fit any key in the block after applying any synthetic prefix
261 : // and/or suffix.
262 1 : func (i *PrefixBytesIter) Init(maxKeyLength int, syntheticPrefix block.SyntheticPrefix) {
263 1 : // Allocate a buffer that's large enough to hold the largest user key in the
264 1 : // block with 1 byte to spare (so that pointer arithmetic is never pointing
265 1 : // beyond the allocation, which would violate Go rules).
266 1 : n := maxKeyLength + 1
267 1 : if cap(i.Buf) < n {
268 1 : ptr := mallocgc(uintptr(n), nil, false)
269 1 : i.Buf = unsafe.Slice((*byte)(ptr), n)
270 1 : }
271 1 : i.Buf = i.Buf[:0]
272 1 : i.syntheticPrefixLen = uint32(len(syntheticPrefix))
273 1 : if syntheticPrefix.IsSet() {
274 1 : i.Buf = append(i.Buf, syntheticPrefix...)
275 1 : }
276 : }
277 :
278 : // SetAt updates the provided PrefixBytesIter to hold the i'th []byte slice in
279 : // the PrefixBytes. The PrefixBytesIter's buffer must be sufficiently large to
280 : // hold the i'th []byte slice, and the caller is required to statically ensure
281 : // this.
282 1 : func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {
283 1 : // Determine the offset and length of the bundle prefix.
284 1 : bundleOffsetIndex := b.bundleOffsetIndexForRow(i)
285 1 : bundleOffsetStart, bundleOffsetEnd := b.rawBytes.offsets.At2(bundleOffsetIndex)
286 1 : bundlePrefixLen := bundleOffsetEnd - bundleOffsetStart
287 1 :
288 1 : // Determine the offset and length of the row's individual suffix.
289 1 : it.offsetIndex = b.rowSuffixIndex(i)
290 1 : // TODO(jackson): rowSuffixOffsets will recompute bundleOffsetIndexForRow in
291 1 : // the case that the row is a duplicate key. Is it worth optimizing to avoid
292 1 : // this recomputation? The expected case is non-duplicate keys, so it may
293 1 : // not be worthwhile.
294 1 : rowSuffixStart, rowSuffixEnd := b.rowSuffixOffsets(i, it.offsetIndex)
295 1 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
296 1 :
297 1 : it.sharedAndBundlePrefixLen = it.syntheticPrefixLen + uint32(b.sharedPrefixLen) + bundlePrefixLen
298 1 : it.Buf = it.Buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
299 1 :
300 1 : ptr := unsafe.Pointer(unsafe.SliceData(it.Buf))
301 1 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(it.syntheticPrefixLen))
302 1 : // Copy the shared key prefix.
303 1 : memmove(ptr, b.rawBytes.data, uintptr(b.sharedPrefixLen))
304 1 : // Copy the bundle prefix.
305 1 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(b.sharedPrefixLen))
306 1 : memmove(
307 1 : ptr,
308 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundleOffsetStart)),
309 1 : uintptr(bundlePrefixLen))
310 1 :
311 1 : // Copy the per-row suffix.
312 1 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(bundlePrefixLen))
313 1 : memmove(
314 1 : ptr,
315 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
316 1 : uintptr(rowSuffixLen))
317 1 : // Set nextBundleOffsetIndex so that a call to SetNext can cheaply determine
318 1 : // whether the next row is in the same bundle.
319 1 : it.nextBundleOffsetIndex = bundleOffsetIndex + (1 << b.bundleShift) + 1
320 1 : }
321 :
322 : // SetNext updates the provided PrefixBytesIter to hold the next []byte slice in
323 : // the PrefixBytes. SetNext requires the provided iter to currently hold a slice
324 : // and for a subsequent slice to exist within the PrefixBytes. The
325 : // PrefixBytesIter's buffer must be sufficiently large to hold the next []byte
326 : // slice, and the caller is required to statically ensure this.
327 1 : func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
328 1 : it.offsetIndex++
329 1 : // If the next row is in the same bundle, we can take a fast path of only
330 1 : // updating the per-row suffix.
331 1 : if it.offsetIndex < it.nextBundleOffsetIndex {
332 1 : rowSuffixStart, rowSuffixEnd := b.rawBytes.offsets.At2(it.offsetIndex)
333 1 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
334 1 : if rowSuffixLen == 0 {
335 1 : // The start and end offsets are equal, indicating that the key is a
336 1 : // duplicate. Since it's identical to the previous key, there's
337 1 : // nothing left to do, we can leave buf as-is.
338 1 : return
339 1 : }
340 1 : it.Buf = it.Buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
341 1 : // Copy in the per-row suffix.
342 1 : ptr := unsafe.Pointer(unsafe.SliceData(it.Buf))
343 1 : memmove(
344 1 : unsafe.Pointer(uintptr(ptr)+uintptr(it.sharedAndBundlePrefixLen)),
345 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
346 1 : uintptr(rowSuffixLen))
347 1 : return
348 : }
349 :
350 : // We've reached the end of the bundle. We need to update the bundle prefix.
351 : // The offsetIndex is currently pointing to the start of the new bundle
352 : // prefix. Increment it to point at the start of the new row suffix.
353 1 : it.offsetIndex++
354 1 : rowSuffixStart, rowSuffixEnd := b.rawBytes.offsets.At2(it.offsetIndex)
355 1 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
356 1 :
357 1 : // Read the offsets of the new bundle prefix and update the index of the
358 1 : // next bundle.
359 1 : bundlePrefixStart := b.rawBytes.offsets.At(it.nextBundleOffsetIndex)
360 1 : bundlePrefixLen := rowSuffixStart - bundlePrefixStart
361 1 : it.nextBundleOffsetIndex = it.offsetIndex + (1 << b.bundleShift)
362 1 :
363 1 : it.sharedAndBundlePrefixLen = it.syntheticPrefixLen + uint32(b.sharedPrefixLen) + bundlePrefixLen
364 1 : it.Buf = it.Buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
365 1 : // Copy in the new bundle suffix.
366 1 : ptr := unsafe.Pointer(unsafe.SliceData(it.Buf))
367 1 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(it.syntheticPrefixLen) + uintptr(b.sharedPrefixLen))
368 1 : memmove(
369 1 : ptr,
370 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundlePrefixStart)),
371 1 : uintptr(bundlePrefixLen))
372 1 : // Copy in the per-row suffix.
373 1 : ptr = unsafe.Pointer(uintptr(ptr) + uintptr(bundlePrefixLen))
374 1 : memmove(
375 1 : ptr,
376 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
377 1 : uintptr(rowSuffixLen))
378 : }
379 :
380 : // SharedPrefix return a []byte of the shared prefix that was extracted from
381 : // all of the values in the Bytes vector. The returned slice should not be
382 : // mutated.
383 1 : func (b *PrefixBytes) SharedPrefix() []byte {
384 1 : // The very first slice is the prefix for the entire column.
385 1 : return b.rawBytes.slice(0, b.rawBytes.offsets.At(0))
386 1 : }
387 :
388 : // RowBundlePrefix takes a row index and returns a []byte of the prefix shared
389 : // among all the keys in the row's bundle, but without the block-level shared
390 : // prefix for the column. The returned slice should not be mutated.
391 1 : func (b *PrefixBytes) RowBundlePrefix(row int) []byte {
392 1 : i := b.bundleOffsetIndexForRow(row)
393 1 : return b.rawBytes.slice(b.rawBytes.offsets.At(i), b.rawBytes.offsets.At(i+1))
394 1 : }
395 :
396 : // BundlePrefix returns the prefix of the i-th bundle in the column. The
397 : // provided i must be in the range [0, BundleCount()). The returned slice should
398 : // not be mutated.
399 1 : func (b *PrefixBytes) BundlePrefix(i int) []byte {
400 1 : j := b.offsetIndexByBundleIndex(i)
401 1 : return b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+1))
402 1 : }
403 :
404 : // RowSuffix returns a []byte of the suffix unique to the row. A row's full key
405 : // is the result of concatenating SharedPrefix(), BundlePrefix() and
406 : // RowSuffix().
407 : //
408 : // The returned slice should not be mutated.
409 1 : func (b *PrefixBytes) RowSuffix(row int) []byte {
410 1 : return b.rawBytes.slice(b.rowSuffixOffsets(row, b.rowSuffixIndex(row)))
411 1 : }
412 :
413 : // rowSuffixOffsets finds the start and end offsets of the row's suffix slice,
414 : // accounting for duplicate keys. It takes the index of the row, and the value
415 : // of rowSuffixIndex(row).
416 1 : func (b *PrefixBytes) rowSuffixOffsets(row, i int) (low uint32, high uint32) {
417 1 : // Retrieve the low and high offsets indicating the start and end of the
418 1 : // row's suffix slice.
419 1 : low, high = b.rawBytes.offsets.At2(i)
420 1 : // If there's a non-empty slice for the row, this row is different than its
421 1 : // predecessor.
422 1 : if low != high {
423 1 : return low, high
424 1 : }
425 : // Otherwise, an empty slice indicates a duplicate key. We need to find the
426 : // first non-empty predecessor within the bundle, or if all the rows are
427 : // empty, return arbitrary equal low and high.
428 : //
429 : // Compute the index of the first row in the bundle so we know when to stop.
430 1 : firstIndex := 1 + b.bundleOffsetIndexForRow(row)
431 1 : for i > firstIndex {
432 1 : // Step back a row, and check if the slice is non-empty.
433 1 : i--
434 1 : high = low
435 1 : low = b.rawBytes.offsets.At(i)
436 1 : if low != high {
437 1 : return low, high
438 1 : }
439 : }
440 : // All the rows in the bundle are empty.
441 1 : return low, high
442 : }
443 :
444 : // Rows returns the count of rows whose keys are encoded within the PrefixBytes.
445 1 : func (b *PrefixBytes) Rows() int {
446 1 : return b.rows
447 1 : }
448 :
449 : // BundleCount returns the count of bundles within the PrefixBytes.
450 1 : func (b *PrefixBytes) BundleCount() int {
451 1 : return b.bundleCount(b.rows)
452 1 : }
453 :
454 : // Search searches for the first key in the PrefixBytes that is greater than or
455 : // equal to k, returning the index of the key and whether an equal key was
456 : // found. If multiple keys are equal, the index of the first such key is
457 : // returned. If all keys are < k, Search returns Rows() for the row index.
458 1 : func (b *PrefixBytes) Search(k []byte) (rowIndex int, isEqual bool) {
459 1 : // First compare to the block-level shared prefix.
460 1 : n := min(len(k), b.sharedPrefixLen)
461 1 : c := bytes.Compare(k[:n], unsafe.Slice((*byte)(b.rawBytes.data), b.sharedPrefixLen))
462 1 : // Note that c cannot be 0 when n < b.sharedPrefixLen.
463 1 : if c != 0 {
464 1 : if c < 0 {
465 1 : // Search key is less than any prefix in the block.
466 1 : return 0, false
467 1 : }
468 : // Search key is greater than any key in the block.
469 1 : return b.rows, false
470 : }
471 : // Trim the block-level shared prefix from the search key.
472 1 : k = k[b.sharedPrefixLen:]
473 1 :
474 1 : // Binary search among the first keys of each bundle.
475 1 : //
476 1 : // Define f(-1) == false and f(upper) == true.
477 1 : // Invariant: f(bi-1) == false, f(upper) == true.
478 1 : nBundles := b.BundleCount()
479 1 : bi, upper := 0, nBundles
480 1 : upperEqual := false
481 1 : for bi < upper {
482 1 : h := int(uint(bi+upper) >> 1) // avoid overflow when computing h
483 1 : // bi ≤ h < upper
484 1 :
485 1 : // Retrieve the first key in the h-th (zero-indexed) bundle. We take
486 1 : // advantage of the fact that the first row is stored contiguously in
487 1 : // the data array (modulo the block prefix) to slice the entirety of the
488 1 : // first key:
489 1 : //
490 1 : // b u n d l e p r e f i x f i r s t k e y r e m a i n d e r
491 1 : // ^ ^ ^
492 1 : // offset(j) offset(j+1) offset(j+2)
493 1 : //
494 1 : j := b.offsetIndexByBundleIndex(h)
495 1 : bundleFirstKey := b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+2))
496 1 : c = bytes.Compare(k, bundleFirstKey)
497 1 : switch {
498 1 : case c > 0:
499 1 : bi = h + 1 // preserves f(bi-1) == false
500 1 : case c < 0:
501 1 : upper = h // preserves f(upper) == true
502 1 : upperEqual = false
503 1 : default:
504 1 : // c == 0
505 1 : upper = h // preserves f(upper) == true
506 1 : upperEqual = true
507 : }
508 : }
509 1 : if bi == 0 {
510 1 : // The very first key is ≥ k. Return it.
511 1 : return 0, upperEqual
512 1 : }
513 : // The first key of the bundle bi is ≥ k, but any of the keys in the
514 : // previous bundle besides the first could also be ≥ k. We can binary search
515 : // among them, but if the seek key doesn't share the previous bundle's
516 : // prefix there's no need.
517 1 : j := b.offsetIndexByBundleIndex(bi - 1)
518 1 : bundlePrefix := b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+1))
519 1 :
520 1 : // The row we are looking for might still be in the previous bundle even
521 1 : // though the seek key is greater than the first key. This is possible only
522 1 : // if the search key shares the first bundle's prefix (eg, the search key
523 1 : // equals a row in the previous bundle or falls between two rows within the
524 1 : // previous bundle).
525 1 : if len(bundlePrefix) > len(k) || !bytes.Equal(k[:len(bundlePrefix)], bundlePrefix) {
526 1 : // The search key doesn't share the previous bundle's prefix, so all of
527 1 : // the keys in the previous bundle must be less than k. We know the
528 1 : // first key of bi is ≥ k, so return it.
529 1 : if bi >= nBundles {
530 1 : return b.rows, false
531 1 : }
532 1 : return bi << b.bundleShift, upperEqual
533 : }
534 : // Binary search among bundle bi-1's key remainders after stripping bundle
535 : // bi-1's prefix.
536 : //
537 : // Define f(l-1) == false and f(u) == true.
538 : // Invariant: f(l-1) == false, f(u) == true.
539 1 : k = k[len(bundlePrefix):]
540 1 : l := 1
541 1 : u := min(1<<b.bundleShift, b.rows-(bi-1)<<b.bundleShift)
542 1 : for l < u {
543 1 : h := int(uint(l+u) >> 1) // avoid overflow when computing h
544 1 : // l ≤ h < u
545 1 :
546 1 : // j is currently the index of the offset of bundle bi-i's prefix.
547 1 : //
548 1 : // b u n d l e p r e f i x f i r s t k e y s e c o n d k e y
549 1 : // ^ ^ ^
550 1 : // offset(j) offset(j+1) offset(j+2)
551 1 : //
552 1 : // The beginning of the zero-indexed i-th key of the bundle is at
553 1 : // offset(j+i+1).
554 1 : //
555 1 : hStart, hEnd := b.rawBytes.offsets.At2(j + h + 1)
556 1 : // There's a complication with duplicate keys. When keys are repeated,
557 1 : // the PrefixBytes encoding avoids re-encoding the duplicate key,
558 1 : // instead encoding an empty slice. While binary searching, if we land
559 1 : // on an empty slice, we need to back up until we find a non-empty slice
560 1 : // which is the key at index h. We iterate with p. If we eventually find
561 1 : // the duplicated key at index p < h and determine f(p) == true, then we
562 1 : // can set u=p (rather than h). If we determine f(p)==false, then we
563 1 : // know f(h)==false too and set l=h+1.
564 1 : p := h
565 1 : if hStart == hEnd {
566 1 : // Back up looking for an empty slice.
567 1 : for hStart == hEnd && p >= l {
568 1 : p--
569 1 : hEnd = hStart
570 1 : hStart = b.rawBytes.offsets.At(j + p + 1)
571 1 : }
572 : // If we backed up to l-1, then all the rows in indexes [l, h] have
573 : // the same keys as index l-1. We know f(l-1) == false [see the
574 : // invariants above], so we can move l to h+1 and continue the loop
575 : // without performing any key comparisons.
576 1 : if p < l {
577 1 : l = h + 1
578 1 : continue
579 : }
580 : }
581 1 : rem := b.rawBytes.slice(hStart, hEnd)
582 1 : c = bytes.Compare(k, rem)
583 1 : switch {
584 1 : case c > 0:
585 1 : l = h + 1 // preserves f(l-1) == false
586 1 : case c < 0:
587 1 : u = p // preserves f(u) == true
588 1 : upperEqual = false
589 1 : default:
590 1 : // c == 0
591 1 : u = p // preserves f(u) == true
592 1 : upperEqual = true
593 : }
594 : }
595 1 : i := (bi-1)<<b.bundleShift + l
596 1 : if i < b.rows {
597 1 : return i, upperEqual
598 1 : }
599 1 : return b.rows, false
600 : }
601 :
602 : func prefixBytesToBinFormatter(
603 : f *binfmt.Formatter, tp treeprinter.Node, count int, sliceFormatter func([]byte) string,
604 1 : ) {
605 1 : if sliceFormatter == nil {
606 1 : sliceFormatter = defaultSliceFormatter
607 1 : }
608 1 : pb, _ := DecodePrefixBytes(f.RelativeData(), uint32(f.RelativeOffset()), count)
609 1 :
610 1 : f.HexBytesln(1, "bundle size: %d", 1<<pb.bundleShift)
611 1 : f.ToTreePrinter(tp)
612 1 :
613 1 : n := tp.Child("offsets table")
614 1 : dataOffset := uint64(f.RelativeOffset()) + uint64(uintptr(pb.rawBytes.data)-uintptr(pb.rawBytes.start))
615 1 : uintsToBinFormatter(f, n, pb.rawBytes.slices+1, func(offsetDelta, offsetBase uint64) string {
616 1 : // NB: offsetBase will always be zero for PrefixBytes columns.
617 1 : return fmt.Sprintf("%d [%d overall]", offsetDelta+offsetBase, offsetDelta+offsetBase+dataOffset)
618 1 : })
619 :
620 1 : n = tp.Child("data")
621 1 :
622 1 : // The first offset encodes the length of the block prefix.
623 1 : blockPrefixLen := pb.rawBytes.offsets.At(0)
624 1 : f.HexBytesln(int(blockPrefixLen), "data[00]: %s (block prefix)",
625 1 : sliceFormatter(pb.rawBytes.slice(0, blockPrefixLen)))
626 1 :
627 1 : k := 2 + (count-1)>>pb.bundleShift + count
628 1 : startOff := blockPrefixLen
629 1 : prevLen := blockPrefixLen
630 1 :
631 1 : // Use dots to indicate string data that's elided because it falls within
632 1 : // the block or bundle prefix.
633 1 : dots := strings.Repeat(".", int(blockPrefixLen))
634 1 : // Iterate through all the slices in the data section, annotating bundle
635 1 : // prefixes and using dots to indicate elided data.
636 1 : for i := 0; i < k-1; i++ {
637 1 : endOff := pb.rawBytes.offsets.At(i + 1)
638 1 : if i%(1+(1<<pb.bundleShift)) == 0 {
639 1 : // This is a bundle prefix.
640 1 : dots = strings.Repeat(".", int(blockPrefixLen))
641 1 : f.HexBytesln(int(endOff-startOff), "data[%02d]: %s%s (bundle prefix)", i+1, dots, sliceFormatter(pb.rawBytes.At(i)))
642 1 : dots = strings.Repeat(".", int(endOff-startOff+blockPrefixLen))
643 1 : prevLen = endOff - startOff + blockPrefixLen
644 1 : } else if startOff == endOff {
645 1 : // An empty slice that's not a block or bundle prefix indicates a
646 1 : // repeat key.
647 1 : f.HexBytesln(0, "data[%02d]: %s", i+1, strings.Repeat(".", int(prevLen)))
648 1 : } else {
649 1 : f.HexBytesln(int(endOff-startOff), "data[%02d]: %s%s", i+1, dots, sliceFormatter(pb.rawBytes.At(i)))
650 1 : prevLen = uint32(len(dots)) + endOff - startOff
651 1 : }
652 1 : startOff = endOff
653 : }
654 1 : f.ToTreePrinter(n)
655 : }
656 :
657 : // PrefixBytesBuilder encodes a column of lexicographically-sorted byte slices,
658 : // applying prefix compression to reduce the encoded size.
659 : type PrefixBytesBuilder struct {
660 : bundleCalc
661 : // TODO(jackson): If prefix compression is very effective, the encoded size
662 : // may remain very small while the physical in-memory size of the
663 : // in-progress data slice may grow very large. This may pose memory usage
664 : // problems during block building.
665 : data []byte // The raw, concatenated keys w/o any prefix compression
666 : nKeys int // The number of keys added to the builder
667 : bundleSize int // The number of keys per bundle
668 : // sizings maintains metadata about the size of the accumulated data at both
669 : // nKeys and nKeys-1. Information for the state after the most recently
670 : // added key is stored at (b.nKeys+1)%2.
671 : sizings [2]prefixBytesSizing
672 : offsets struct {
673 : count int // The number of offsets in the builder
674 : // elemsSize is the size of the array (in count of uint32 elements; not
675 : // bytes)
676 : elemsSize int
677 : // elems provides access to elements without bounds checking. elems is
678 : // grown automatically in addOffset.
679 : elems UnsafeRawSlice[uint32]
680 : }
681 : maxShared uint16
682 : }
683 :
684 : // Assert that PrefixBytesBuilder implements ColumnWriter.
685 : var _ ColumnWriter = (*PrefixBytesBuilder)(nil)
686 :
687 : // Init initializes the PrefixBytesBuilder with the specified bundle size. The
688 : // builder will produce a prefix-compressed column of data type
689 : // DataTypePrefixBytes. The [bundleSize] indicates the number of keys that form
690 : // a "bundle," across which prefix-compression is applied. All keys in the
691 : // column will share a column-wide prefix if there is one.
692 1 : func (b *PrefixBytesBuilder) Init(bundleSize int) {
693 1 : if bundleSize > 0 && (bundleSize&(bundleSize-1)) != 0 {
694 0 : panic(errors.AssertionFailedf("prefixbytes bundle size %d is not a power of 2", bundleSize))
695 : }
696 1 : *b = PrefixBytesBuilder{
697 1 : bundleCalc: makeBundleCalc(uint32(bits.TrailingZeros32(uint32(bundleSize)))),
698 1 : data: b.data[:0],
699 1 : bundleSize: bundleSize,
700 1 : offsets: b.offsets,
701 1 : maxShared: (1 << 16) - 1,
702 1 : }
703 1 : b.offsets.count = 0
704 : }
705 :
706 : // NumColumns implements ColumnWriter.
707 1 : func (b *PrefixBytesBuilder) NumColumns() int { return 1 }
708 :
709 : // DataType implements ColumnWriter.
710 1 : func (b *PrefixBytesBuilder) DataType(int) DataType { return DataTypePrefixBytes }
711 :
712 : // Reset resets the builder to an empty state, preserving the existing bundle
713 : // size.
714 1 : func (b *PrefixBytesBuilder) Reset() {
715 1 : const maxRetainedData = 512 << 10 // 512 KB
716 1 : *b = PrefixBytesBuilder{
717 1 : bundleCalc: b.bundleCalc,
718 1 : data: b.data[:0],
719 1 : bundleSize: b.bundleSize,
720 1 : offsets: b.offsets,
721 1 : maxShared: b.maxShared,
722 1 : sizings: [2]prefixBytesSizing{},
723 1 : }
724 1 : b.offsets.count = 0
725 1 : if len(b.data) > maxRetainedData {
726 0 : b.data = nil
727 0 : }
728 : }
729 :
730 : // Rows returns the number of keys added to the builder.
731 1 : func (b *PrefixBytesBuilder) Rows() int { return b.nKeys }
732 :
733 : // prefixBytesSizing maintains metadata about the size of the accumulated data
734 : // and its encoded size. Every key addition computes a new prefixBytesSizing
735 : // struct. The PrefixBytesBuilder maintains two prefixBytesSizing structs, one
736 : // for the state after the most recent key addition, and one for the state after
737 : // the second most recent key addition.
738 : type prefixBytesSizing struct {
739 : lastKeyOff int // the offset in data where the last key added begins
740 : offsetCount int // the count of offsets required to encode the data
741 : blockPrefixLen int // the length of the block prefix
742 : currentBundleDistinctLen int // the length of the "current" bundle's distinct keys
743 : currentBundleDistinctKeys int // the number of distinct keys in the "current" bundle
744 : // currentBundlePrefixLen is the length of the "current" bundle's prefix.
745 : // The current bundle holds all keys that are not included within
746 : // PrefixBytesBuilder.completedBundleLen. If the addition of a key causes
747 : // the creation of a new bundle, the previous bundle's size is incorporated
748 : // into completedBundleLen and currentBundlePrefixLen is updated to the
749 : // length of the new bundle key. This ensures that there's always at least 1
750 : // key in the "current" bundle allowing Finish to accept rows = nKeys-1.
751 : //
752 : // Note that currentBundlePrefixLen is inclusive of the blockPrefixLen.
753 : //
754 : // INVARIANT: currentBundlePrefixLen >= blockPrefixLen
755 : currentBundlePrefixLen int // the length of the "current" bundle's prefix
756 : currentBundlePrefixOffset int // the index of the offset of the "current" bundle's prefix
757 : completedBundleLen int // the encoded size of completed bundles
758 : compressedDataLen int // the compressed, encoded size of data
759 : offsetEncoding UintEncoding // the encoding necessary to encode the offsets
760 : }
761 :
762 0 : func (sz *prefixBytesSizing) String() string {
763 0 : return fmt.Sprintf("lastKeyOff:%d offsetCount:%d blockPrefixLen:%d\n"+
764 0 : "currentBundleDistinct{Len,Keys}: (%d,%d)\n"+
765 0 : "currentBundlePrefix{Len,Offset}: (%d,%d)\n"+
766 0 : "completedBundleLen:%d compressedDataLen:%d offsetEncoding:%s",
767 0 : sz.lastKeyOff, sz.offsetCount, sz.blockPrefixLen, sz.currentBundleDistinctLen,
768 0 : sz.currentBundleDistinctKeys, sz.currentBundlePrefixLen, sz.currentBundlePrefixOffset,
769 0 : sz.completedBundleLen, sz.compressedDataLen, sz.offsetEncoding)
770 0 : }
771 :
772 : // Put adds the provided key to the column. The provided key must be
773 : // lexicographically greater than or equal to the previous key added to the
774 : // builder.
775 : //
776 : // The provided bytesSharedWithPrev must be the length of the byte prefix the
777 : // provided key shares with the previous key. The caller is required to provide
778 : // this because in the primary expected use, the caller will already need to
779 : // compute it for the purpose of determining whether successive keys share the
780 : // same prefix.
781 1 : func (b *PrefixBytesBuilder) Put(key []byte, bytesSharedWithPrev int) {
782 1 : currIdx := b.nKeys & 1 // %2
783 1 : curr := &b.sizings[currIdx]
784 1 : prev := &b.sizings[1-currIdx]
785 1 :
786 1 : if invariants.Enabled {
787 1 : if len(key) == 0 {
788 0 : panic(errors.AssertionFailedf("key must be non-empty"))
789 : }
790 1 : if b.maxShared == 0 {
791 0 : panic(errors.AssertionFailedf("maxShared must be positive"))
792 : }
793 1 : if b.nKeys > 0 {
794 1 : if bytes.Compare(key, b.data[prev.lastKeyOff:]) < 0 {
795 0 : panic(errors.AssertionFailedf("keys must be added in order: %q < %q", key, b.data[prev.lastKeyOff:]))
796 : }
797 1 : if bytesSharedWithPrev != crbytes.CommonPrefix(key, b.data[prev.lastKeyOff:]) {
798 0 : panic(errors.AssertionFailedf("bytesSharedWithPrev %d != %d", bytesSharedWithPrev,
799 0 : crbytes.CommonPrefix(key, b.data[prev.lastKeyOff:])))
800 : }
801 : }
802 : }
803 :
804 1 : switch {
805 1 : case b.nKeys == 0:
806 1 : // We're adding the first key to the block.
807 1 : // Set a placeholder offset for the block prefix length.
808 1 : b.addOffset(0)
809 1 : // Set a placeholder offset for the bundle prefix length.
810 1 : b.addOffset(0)
811 1 : b.nKeys++
812 1 : b.data = append(b.data, key...)
813 1 : b.addOffset(uint32(len(b.data)))
814 1 : *curr = prefixBytesSizing{
815 1 : lastKeyOff: 0,
816 1 : offsetCount: b.offsets.count,
817 1 : blockPrefixLen: min(len(key), int(b.maxShared)),
818 1 : currentBundleDistinctLen: len(key),
819 1 : currentBundleDistinctKeys: 1,
820 1 : currentBundlePrefixLen: min(len(key), int(b.maxShared)),
821 1 : currentBundlePrefixOffset: 1,
822 1 : completedBundleLen: 0,
823 1 : compressedDataLen: len(key),
824 1 : offsetEncoding: DetermineUintEncoding(0, uint64(len(key)), UintEncodingRowThreshold),
825 1 : }
826 1 : case b.nKeys&(b.bundleSize-1) == 0:
827 1 : // We're starting a new bundle.
828 1 :
829 1 : // Set the bundle prefix length of the previous bundle.
830 1 : b.offsets.elems.set(prev.currentBundlePrefixOffset,
831 1 : b.offsets.elems.At(prev.currentBundlePrefixOffset-1)+uint32(prev.currentBundlePrefixLen))
832 1 :
833 1 : // Finalize the encoded size of the previous bundle.
834 1 : bundleSizeJustCompleted := prev.currentBundleDistinctLen - (prev.currentBundleDistinctKeys-1)*prev.currentBundlePrefixLen
835 1 : completedBundleSize := prev.completedBundleLen + bundleSizeJustCompleted
836 1 :
837 1 : // Update the block prefix length if necessary. The caller tells us how
838 1 : // many bytes of prefix this key shares with the previous key. The block
839 1 : // prefix can only shrink if the bytes shared with the previous key are
840 1 : // less than the block prefix length, in which case the new block prefix
841 1 : // is the number of bytes shared with the previous key.
842 1 : blockPrefixLen := min(prev.blockPrefixLen, bytesSharedWithPrev)
843 1 : b.nKeys++
844 1 : *curr = prefixBytesSizing{
845 1 : lastKeyOff: len(b.data),
846 1 : offsetCount: b.offsets.count + 2,
847 1 : blockPrefixLen: blockPrefixLen,
848 1 : completedBundleLen: completedBundleSize,
849 1 : // We're adding the first key to the current bundle. Initialize
850 1 : // the current bundle prefix.
851 1 : currentBundlePrefixOffset: b.offsets.count,
852 1 : currentBundlePrefixLen: min(len(key), int(b.maxShared)),
853 1 : currentBundleDistinctLen: len(key),
854 1 : currentBundleDistinctKeys: 1,
855 1 : compressedDataLen: completedBundleSize + len(key) - (b.bundleCount(b.nKeys)-1)*blockPrefixLen,
856 1 : }
857 1 : curr.offsetEncoding = DetermineUintEncoding(0, uint64(curr.compressedDataLen), UintEncodingRowThreshold)
858 1 : b.data = append(b.data, key...)
859 1 : b.addOffset(0) // Placeholder for bundle prefix.
860 1 : b.addOffset(uint32(len(b.data)))
861 1 : default:
862 1 : // Adding a new key to an existing bundle.
863 1 : b.nKeys++
864 1 :
865 1 : if bytesSharedWithPrev == len(key) {
866 1 : // Duplicate key; don't add it to the data slice and don't adjust
867 1 : // currentBundleDistinct{Len,Keys}.
868 1 : *curr = *prev
869 1 : curr.offsetCount++
870 1 : b.addOffset(b.offsets.elems.At(b.offsets.count - 1))
871 1 : return
872 1 : }
873 :
874 : // Update the bundle prefix length. Note that the shared prefix length
875 : // can only shrink as new values are added. During construction, the
876 : // bundle prefix value is stored contiguously in the data array so even
877 : // if the bundle prefix length changes no adjustment is needed to that
878 : // value or to the first key in the bundle.
879 1 : *curr = prefixBytesSizing{
880 1 : lastKeyOff: len(b.data),
881 1 : offsetCount: prev.offsetCount + 1,
882 1 : blockPrefixLen: min(prev.blockPrefixLen, bytesSharedWithPrev),
883 1 : currentBundleDistinctLen: prev.currentBundleDistinctLen + len(key),
884 1 : currentBundleDistinctKeys: prev.currentBundleDistinctKeys + 1,
885 1 : currentBundlePrefixLen: min(prev.currentBundlePrefixLen, bytesSharedWithPrev),
886 1 : currentBundlePrefixOffset: prev.currentBundlePrefixOffset,
887 1 : completedBundleLen: prev.completedBundleLen,
888 1 : }
889 1 : // Compute the correct compressedDataLen.
890 1 : curr.compressedDataLen = curr.completedBundleLen +
891 1 : curr.currentBundleDistinctLen -
892 1 : (curr.currentBundleDistinctKeys-1)*curr.currentBundlePrefixLen
893 1 : // Currently compressedDataLen is correct, except that it includes the block
894 1 : // prefix length for all bundle prefixes. Adjust the length to account for
895 1 : // the block prefix being stripped from every bundle except the first one.
896 1 : curr.compressedDataLen -= (b.bundleCount(b.nKeys) - 1) * curr.blockPrefixLen
897 1 : // The compressedDataLen is the largest offset we'll need to encode in the
898 1 : // offset table.
899 1 : curr.offsetEncoding = DetermineUintEncoding(0, uint64(curr.compressedDataLen), UintEncodingRowThreshold)
900 1 : b.data = append(b.data, key...)
901 1 : b.addOffset(uint32(len(b.data)))
902 : }
903 : }
904 :
905 : // UnsafeGet returns the zero-indexed i'th key added to the builder through Put.
906 : // UnsafeGet may only be used to retrieve the Rows()-1'th or Rows()-2'th keys.
907 : // If called with a different i value, UnsafeGet panics. The keys returned by
908 : // UnsafeGet are guaranteed to be stable until Finish or Reset is called. The
909 : // caller must not mutate the returned slice.
910 1 : func (b *PrefixBytesBuilder) UnsafeGet(i int) []byte {
911 1 : switch i {
912 1 : case b.nKeys - 1:
913 1 : lastKeyOff := b.sizings[i&1].lastKeyOff
914 1 : return b.data[lastKeyOff:]
915 1 : case b.nKeys - 2:
916 1 : lastKeyOff := b.sizings[(i+1)&1].lastKeyOff
917 1 : secondLastKeyOff := b.sizings[i&1].lastKeyOff
918 1 : if secondLastKeyOff == lastKeyOff {
919 1 : // The last key is a duplicate of the second-to-last key.
920 1 : return b.data[secondLastKeyOff:]
921 1 : }
922 1 : return b.data[secondLastKeyOff:lastKeyOff]
923 0 : default:
924 0 : panic(errors.AssertionFailedf("UnsafeGet(%d) called on PrefixBytes with %d keys", i, b.nKeys))
925 : }
926 : }
927 :
928 : // addOffset adds an offset to the offsets table. If necessary, addOffset will
929 : // grow the offset table to accommodate the new offset.
930 1 : func (b *PrefixBytesBuilder) addOffset(offset uint32) {
931 1 : if b.offsets.count == b.offsets.elemsSize {
932 1 : // Double the size of the allocated array, or initialize it to at least
933 1 : // 64 rows if this is the first allocation.
934 1 : n2 := max(b.offsets.elemsSize<<1, 64)
935 1 : newDataTyped := make([]uint32, n2)
936 1 : copy(newDataTyped, b.offsets.elems.Slice(b.offsets.elemsSize))
937 1 : b.offsets.elems = makeUnsafeRawSlice[uint32](unsafe.Pointer(&newDataTyped[0]))
938 1 : b.offsets.elemsSize = n2
939 1 : }
940 1 : b.offsets.elems.set(b.offsets.count, offset)
941 1 : b.offsets.count++
942 : }
943 :
944 : // writePrefixCompressed writes the provided builder's first [rows] rows with
945 : // prefix-compression applied. It writes offsets and string data in tandem,
946 : // writing offsets of width T into [offsetDeltas] and compressed string data
947 : // into [buf]. The builder's internal state is not modified by
948 : // writePrefixCompressed. writePrefixCompressed is generic in terms of the type
949 : // T of the offset deltas.
950 : //
951 : // The caller must have correctly constructed [offsetDeltas] such that writing
952 : // [sizing.offsetCount] offsets of size T does not overwrite the beginning of
953 : // [buf]:
954 : //
955 : // +-------------------------------------+ <- offsetDeltas.ptr
956 : // | offsetDeltas[0] |
957 : // +-------------------------------------+
958 : // | offsetDeltas[1] |
959 : // +-------------------------------------+
960 : // | ... |
961 : // +-------------------------------------+
962 : // | offsetDeltas[sizing.offsetCount-1] |
963 : // +-------------------------------------+ <- &buf[0]
964 : // | buf (string data) |
965 : // | ... |
966 : // +-------------------------------------+
967 : func writePrefixCompressed[T Uint](
968 : b *PrefixBytesBuilder,
969 : rows int,
970 : sz *prefixBytesSizing,
971 : offsetDeltas UnsafeRawSlice[T],
972 : buf []byte,
973 1 : ) {
974 1 : if rows == 0 {
975 0 : return
976 1 : } else if rows == 1 {
977 1 : // If there's just 1 row, no prefix compression is necessary and we can
978 1 : // just encode the first key as the entire block prefix and first bundle
979 1 : // prefix.
980 1 : e := b.offsets.elems.At(2)
981 1 : offsetDeltas.set(0, T(e))
982 1 : offsetDeltas.set(1, T(e))
983 1 : offsetDeltas.set(2, T(e))
984 1 : copy(buf, b.data[:e])
985 1 : return
986 1 : }
987 :
988 : // The offset at index 0 is the block prefix length.
989 1 : offsetDeltas.set(0, T(sz.blockPrefixLen))
990 1 : destOffset := T(copy(buf, b.data[:sz.blockPrefixLen]))
991 1 : var lastRowOffset uint32
992 1 : var shared int
993 1 :
994 1 : // Loop over the slices starting at the bundle prefix of the first bundle.
995 1 : // If the slice is a bundle prefix, carve off the suffix that excludes the
996 1 : // block prefix. Otherwise, carve off the suffix that excludes the block
997 1 : // prefix + bundle prefix.
998 1 : for i := 1; i < sz.offsetCount; i++ {
999 1 : off := b.offsets.elems.At(i)
1000 1 : var suffix []byte
1001 1 : if (i-1)%(b.bundleSize+1) == 0 {
1002 1 : // This is a bundle prefix.
1003 1 : if i == sz.currentBundlePrefixOffset {
1004 1 : suffix = b.data[lastRowOffset+uint32(sz.blockPrefixLen) : lastRowOffset+uint32(sz.currentBundlePrefixLen)]
1005 1 : } else {
1006 1 : suffix = b.data[lastRowOffset+uint32(sz.blockPrefixLen) : off]
1007 1 : }
1008 1 : shared = sz.blockPrefixLen + len(suffix)
1009 : // We don't update lastRowOffset here because the bundle prefix
1010 : // was never actually stored separately in the data array.
1011 1 : } else {
1012 1 : // If the offset of this key is the same as the offset of the
1013 1 : // previous key, then the key is a duplicate. All we need to do is
1014 1 : // set the same offset in the destination.
1015 1 : if off == lastRowOffset {
1016 1 : offsetDeltas.set(i, offsetDeltas.At(i-1))
1017 1 : continue
1018 : }
1019 1 : suffix = b.data[lastRowOffset+uint32(shared) : off]
1020 1 : // Update lastRowOffset for the next iteration of this loop.
1021 1 : lastRowOffset = off
1022 : }
1023 1 : if invariants.Enabled && len(buf[destOffset:]) < len(suffix) {
1024 0 : panic(errors.AssertionFailedf("buf is too small: %d < %d", len(buf[destOffset:]), len(suffix)))
1025 : }
1026 1 : destOffset += T(copy(buf[destOffset:], suffix))
1027 1 : offsetDeltas.set(i, destOffset)
1028 : }
1029 1 : if destOffset != T(sz.compressedDataLen) {
1030 0 : panic(errors.AssertionFailedf("wrote %d, expected %d", destOffset, sz.compressedDataLen))
1031 : }
1032 : }
1033 :
1034 : // Finish writes the serialized byte slices to buf starting at offset. The buf
1035 : // slice must be sufficiently large to store the serialized output. The caller
1036 : // should use [Size] to size buf appropriately before calling Finish.
1037 : //
1038 : // Finish only supports values of [rows] equal to the number of keys set on the
1039 : // builder, or one less.
1040 : func (b *PrefixBytesBuilder) Finish(
1041 : col int, rows int, offset uint32, buf []byte,
1042 1 : ) (endOffset uint32) {
1043 1 : if rows < b.nKeys-1 || rows > b.nKeys {
1044 0 : panic(errors.AssertionFailedf("PrefixBytes has accumulated %d keys, asked to Finish %d", b.nKeys, rows))
1045 : }
1046 1 : if rows == 0 {
1047 0 : return offset
1048 0 : }
1049 : // Encode the bundle shift.
1050 1 : buf[offset] = byte(b.bundleShift)
1051 1 : offset++
1052 1 :
1053 1 : sz := &b.sizings[(rows+1)%2]
1054 1 : stringDataOffset := uintColumnSize(uint32(sz.offsetCount), offset, sz.offsetEncoding)
1055 1 : if sz.offsetEncoding.IsDelta() {
1056 0 : panic(errors.AssertionFailedf("offsets never need delta encoding"))
1057 : }
1058 :
1059 1 : width := uint32(sz.offsetEncoding.Width())
1060 1 : buf[offset] = byte(sz.offsetEncoding)
1061 1 : offset++
1062 1 : offset = alignWithZeroes(buf, offset, width)
1063 1 : switch width {
1064 1 : case 1:
1065 1 : offsetDest := makeUnsafeRawSlice[uint8](unsafe.Pointer(&buf[offset]))
1066 1 : writePrefixCompressed[uint8](b, rows, sz, offsetDest, buf[stringDataOffset:])
1067 1 : case align16:
1068 1 : offsetDest := makeUnsafeRawSlice[uint16](unsafe.Pointer(&buf[offset]))
1069 1 : writePrefixCompressed[uint16](b, rows, sz, offsetDest, buf[stringDataOffset:])
1070 0 : case align32:
1071 0 : offsetDest := makeUnsafeRawSlice[uint32](unsafe.Pointer(&buf[offset]))
1072 0 : writePrefixCompressed[uint32](b, rows, sz, offsetDest, buf[stringDataOffset:])
1073 0 : default:
1074 0 : panic("unreachable")
1075 : }
1076 1 : return stringDataOffset + uint32(sz.compressedDataLen)
1077 : }
1078 :
1079 : // Size computes the size required to encode the byte slices beginning at the
1080 : // provided offset. The offset is required to ensure proper alignment. The
1081 : // returned uint32 is the offset of the first byte after the end of the encoded
1082 : // data. To compute the size in bytes, subtract the [offset] passed into Size
1083 : // from the returned offset.
1084 1 : func (b *PrefixBytesBuilder) Size(rows int, offset uint32) uint32 {
1085 1 : if rows == 0 {
1086 1 : return offset
1087 1 : } else if rows != b.nKeys && rows != b.nKeys-1 {
1088 0 : panic(errors.AssertionFailedf("PrefixBytes has accumulated %d keys, asked to Size %d", b.nKeys, rows))
1089 : }
1090 1 : sz := &b.sizings[(rows+1)%2]
1091 1 : // The 1-byte bundleSize.
1092 1 : offset++
1093 1 : // Compute the size of the offsets table.
1094 1 : offset = uintColumnSize(uint32(sz.offsetCount), offset, sz.offsetEncoding)
1095 1 : return offset + uint32(sz.compressedDataLen)
1096 : }
1097 :
1098 : // WriteDebug implements the Encoder interface.
1099 1 : func (b *PrefixBytesBuilder) WriteDebug(w io.Writer, rows int) {
1100 1 : fmt.Fprintf(w, "prefixbytes(%d): %d keys", b.bundleSize, b.nKeys)
1101 1 : }
1102 :
1103 : // bundleCalc provides facilities for computing indexes and offsets within a
1104 : // PrefixBytes structure.
1105 : type bundleCalc struct {
1106 : bundleShift uint32 // log2(bundleSize)
1107 : // bundleMask is a mask with 1s across the high bits that indicate the
1108 : // bundle and 0s for the bits that indicate the position within the bundle.
1109 : bundleMask uint32
1110 : }
1111 :
1112 1 : func makeBundleCalc(bundleShift uint32) bundleCalc {
1113 1 : return bundleCalc{
1114 1 : bundleShift: bundleShift,
1115 1 : bundleMask: ^((1 << bundleShift) - 1),
1116 1 : }
1117 1 : }
1118 :
1119 : // rowSuffixIndex computes the index of the offset encoding the start of a row's
1120 : // suffix. Example usage of retrieving the row's suffix:
1121 : //
1122 : // i := b.rowSuffixIndex(row)
1123 : // l := b.rawBytes.offsets.At(i)
1124 : // h := b.rawBytes.offsets.At(i + 1)
1125 : // suffix := b.rawBytes.slice(l, h)
1126 1 : func (b bundleCalc) rowSuffixIndex(row int) int {
1127 1 : return 1 + (row >> b.bundleShift) + row
1128 1 : }
1129 :
1130 : // bundleOffsetIndexForRow computes the index of the offset encoding the start
1131 : // of a bundle's prefix.
1132 1 : func (b bundleCalc) bundleOffsetIndexForRow(row int) int {
1133 1 : // AND-ing the row with the bundle mask removes the least significant bits
1134 1 : // of the row, which encode the row's index within the bundle.
1135 1 : return int((uint32(row) >> b.bundleShift) + (uint32(row) & b.bundleMask))
1136 1 : }
1137 :
1138 : // offsetIndexByBundleIndex computes the index of the offset encoding the start
1139 : // of a bundle's prefix when given the bundle's index (an index in
1140 : // [0,Rows/BundleSize)).
1141 1 : func (b bundleCalc) offsetIndexByBundleIndex(bi int) int {
1142 1 : return bi<<b.bundleShift + bi
1143 1 : }
1144 :
1145 1 : func (b bundleCalc) bundleCount(rows int) int {
1146 1 : return 1 + (rows-1)>>b.bundleShift
1147 1 : }
|