Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package colblk
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "io"
11 : "math/bits"
12 : "slices"
13 : "strings"
14 : "unsafe"
15 :
16 : "github.com/cockroachdb/crlib/crbytes"
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/binfmt"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : )
21 :
22 : // PrefixBytes holds an array of lexicographically ordered byte slices. It
23 : // provides prefix compression. Prefix compression applies strongly to two cases
24 : // in CockroachDB: removal of the "[/tenantID]/tableID/indexID" prefix that is
25 : // present on all table data keys, and multiple versions of a key that are
26 : // distinguished only by different timestamp suffixes. With columnar blocks
27 : // enabling the timestamp to be placed in a separate column, the multiple
28 : // version problem becomes one of efficiently handling exact duplicate keys.
29 : // PrefixBytes builds off of the RawBytes encoding, introducing additional
30 : // slices for encoding (n+bundleSize-1)/bundleSize bundle prefixes and 1
31 : // block-level shared prefix for the column.
32 : //
33 : // Unlike the original prefix compression performed by rowblk (inherited from
34 : // LevelDB and RocksDB), PrefixBytes does not perform all prefix compression
35 : // relative to the previous key. Rather it performs prefix compression relative
36 : // to the first key of a key's bundle. This can result in less compression, but
37 : // simplifies reverse iteration and allows iteration to be largely stateless.
38 : //
39 : // To understand the PrefixBytes layout, we'll work through an example using
40 : // these 15 keys:
41 : //
42 : // 0123456789
43 : // 0 aaabbbc
44 : // 1 aaabbbcc
45 : // 2 aaabbbcde
46 : // 3 aaabbbce
47 : // 4 aaabbbdee
48 : // 5 aaabbbdee
49 : // 6 aaabbbdee
50 : // 7 aaabbbeff
51 : // 8 aaabbe
52 : // 9 aaabbeef
53 : // 10 aaabbeef
54 : // 11 aaabc
55 : // 12 aabcceef
56 : // 13 aabcceef
57 : // 14 aabcceef
58 : //
59 : // The total length of these keys is 119 bytes. There are 3 keys which occur
60 : // multiple times (rows 4-6, 9-10, 12-14) which models multiple versions of the
61 : // same MVCC key in CockroachDB. There is a shared prefix to all of the keys
62 : // which models the "[/tenantID]/tableID/indexID" present on CockroachDB table
63 : // data keys. There are other shared prefixes which model identical values in
64 : // table key columns.
65 : //
66 : // The table below shows the components of the KeyBytes encoding for these 15
67 : // keys when using a bundle size of 4 which results in 4 bundles. The 15 keys
68 : // are encoded into 20 slices: 1 block prefix, 4 bundle prefixes, and 15
69 : // suffixes. The first slice in the table is the block prefix that is shared by
70 : // all keys in the block. The first slice in each bundle is the bundle prefix
71 : // which is shared by all keys in the bundle.
72 : //
73 : // idx | row | end offset | data
74 : // -------+-------+------------+----------
75 : // 0 | | 2 | aa
76 : // 1 | | 7 | ..abbbc
77 : // 2 | 0 | 7 | .......
78 : // 3 | 1 | 8 | .......c
79 : // 4 | 2 | 10 | .......de
80 : // 5 | 3 | 11 | .......e
81 : // 6 | | 15 | ..abbb
82 : // 7 | 4 | 18 | ......dee
83 : // 8 | 5 | 18 | .........
84 : // 9 | 6 | 18 | .........
85 : // 10 | 7 | 21 | ......eff
86 : // 11 | | 23 | ..ab
87 : // 12 | 8 | 25 | ....be
88 : // 13 | 9 | 29 | ....beef
89 : // 14 | 10 | 29 | ........
90 : // 15 | 11 | 30 | ....c
91 : // 16 | | 36 | ..bcceef
92 : // 17 | 12 | 36 | ........
93 : // 18 | 13 | 36 | ........
94 : // 19 | 14 | 36 | ........
95 : //
96 : // The 'end offset' column in the table encodes the exclusive offset within the
97 : // string data section where each of the slices end. Each slice starts at the
98 : // previous slice's end offset. The first slice (the block prefix)'s start
99 : // offset is implicitly zero. Note that this differs from the plain RawBytes
100 : // encoding which always stores a zero offset at the beginning of the offsets
101 : // array to avoid special-casing the first slice. The block prefix already
102 : // requires special-casing, so materializing the zero start offset is not
103 : // needed.
104 : //
105 : // The table above defines 20 slices: the 1 block key prefix, the 4 bundle key
106 : // prefixes and the 15 key suffixes. Offset[0] is the length of the first slice
107 : // which is always anchored at data[0]. The data columns display the portion of
108 : // the data array the slice covers. For row slices, an empty suffix column
109 : // indicates that the slice is identical to the slice at the previous index
110 : // which is indicated by the slice's offset being equal to the previous slice's
111 : // offset. Due to the lexicographic sorting, the key at row i can't be a prefix
112 : // of the key at row i-1 or it would have sorted before the key at row i-1. And
113 : // if the key differs then only the differing bytes will be part of the suffix
114 : // and not contained in the bundle prefix.
115 : //
116 : // The end result of this encoding is that we can store the 119 bytes of the 15
117 : // keys plus their start and end offsets (which would naively consume 15*4=60
118 : // bytes for at least the key lengths) in 61 bytes (36 bytes of data + 4 bytes
119 : // of offset constant + 20 bytes of offset delta data + 1 byte of bundle size).
120 : //
121 : // # Physical representation
122 : //
123 : // +==================================================================+
124 : // | Bundle size (1 byte) |
125 : // | |
126 : // | The bundle size indicates how many keys prefix compression may |
127 : // | apply across. Every bundleSize keys, prefix compression restarts.|
128 : // | The bundleSize is required to be a power of two, and this 1- |
129 : // | byte prefix stores log2(bundleSize). |
130 : // +==================================================================+
131 : // | RawBytes |
132 : // | |
133 : // | A modified RawBytes encoding is used to store the data slices. A |
134 : // | PrefixBytes column storing n keys will encode |
135 : // | |
136 : // | 1 block prefix |
137 : // | + |
138 : // | (n + bundleSize-1)/bundleSize bundle prefixes |
139 : // | + |
140 : // | n row suffixes |
141 : // | |
142 : // | slices. Unlike the RawBytes encoding, the first offset encoded |
143 : // | is not guaranteed to be zero. In the PrefixBytes encoding, the |
144 : // | first offset encodes the length of the column-wide prefix. The |
145 : // | column-wide prefix is stored in slice(0, offset(0)). |
146 : // | |
147 : // | +------------------------------------------------------------+ |
148 : // | | Offset table | |
149 : // | | | |
150 : // | | A Uint32 column encoding offsets into the string data, | |
151 : // | | possibly delta8 or delta16 encoded. When a delta encoding | |
152 : // | | is used, the base constant is always zero. | |
153 : // | +------------------------------------------------------------+ |
154 : // | | offsetDelta[0] | offsetDelta[1] | ... | offsetDelta[m] | |
155 : // | +------------------------------------------------------------+ |
156 : // | | prefix-compressed string data | |
157 : // | | ... | |
158 : // | +------------------------------------------------------------+ |
159 : // +==================================================================+
160 : //
161 : // TODO(jackson): Consider stealing the low bit of the offset for a flag
162 : // indicating that a key is a duplicate and then using the remaining bits to
163 : // encode the relative index of the duplicated key's end offset. This would
164 : // avoid the O(bundle size) scan in the case of duplicate keys, but at the cost
165 : // of complicating logic to look up a bundle prefix (which may need to follow a
166 : // duplicate key's relative index to uncover the start offset of the bundle
167 : // prefix).
168 : //
169 : // # Reads
170 : //
171 : // This encoding provides O(1) access to any row by calculating the bundle for
172 : // the row (see bundleOffsetIndexForRow), then the per-row's suffix (see
173 : // rowSuffixIndex). If the per-row suffix's end offset equals the previous
174 : // offset, then the row is a duplicate key and we need to step backward until we
175 : // find a non-empty slice or the start of the bundle (a variable number of
176 : // steps, but bounded by the bundle size).
177 : //
178 : // Forward iteration can easily reuse the previous row's key with a check on
179 : // whether the row's slice is empty. Reverse iteration within a run of equal
180 : // keys can reuse the next row's key. When reverse iteration steps backward from
181 : // a non-empty slice onto an empty slice, it must continue backward until a
182 : // non-empty slice is found (just as in absolute positioning) to discover the
183 : // row suffix that is duplicated.
184 : //
185 : // The Seek{GE,LT} routines first binary search on the first key of each bundle
186 : // which can be retrieved without data movement because the bundle prefix is
187 : // immediately adjacent to it in the data array. We can slightly optimize the
188 : // binary search by skipping over all of the keys in the bundle on prefix
189 : // mismatches.
190 : type PrefixBytes struct {
191 : bundleCalc
192 : rows int
193 : sharedPrefixLen int
194 : rawBytes RawBytes
195 : }
196 :
197 : // Assert that PrefixBytes implements Array[[]byte].
198 : var _ Array[[]byte] = PrefixBytes{}
199 :
200 : // DecodePrefixBytes decodes the structure of a PrefixBytes, constructing an
201 : // accessor for an array of lexicographically sorted byte slices constructed by
202 : // PrefixBytesBuilder. Count must be the number of logical slices within the
203 : // array.
204 : func DecodePrefixBytes(
205 : b []byte, offset uint32, count int,
206 1 : ) (prefixBytes PrefixBytes, endOffset uint32) {
207 1 : if count == 0 {
208 0 : panic(errors.AssertionFailedf("empty PrefixBytes"))
209 : }
210 : // The first byte of a PrefixBytes-encoded column is the bundle size
211 : // expressed as log2 of the bundle size (the bundle size must always be a
212 : // power of two)
213 1 : bundleShift := int(*((*uint8)(unsafe.Pointer(&b[offset]))))
214 1 : calc := makeBundleCalc(bundleShift)
215 1 : nBundles := calc.bundleCount(count)
216 1 :
217 1 : rb, endOffset := DecodeRawBytes(b, offset+1, count+nBundles)
218 1 : pb := PrefixBytes{
219 1 : bundleCalc: calc,
220 1 : rows: count,
221 1 : rawBytes: rb,
222 1 : }
223 1 : pb.sharedPrefixLen = int(pb.rawBytes.offsets.At(0))
224 1 : return pb, endOffset
225 : }
226 :
227 : // Assert that DecodePrefixBytes implements DecodeFunc.
228 : var _ DecodeFunc[PrefixBytes] = DecodePrefixBytes
229 :
230 : // At returns the i'th []byte slice in the PrefixBytes. At must allocate, so
231 : // callers should prefer accessing a slice's constituent components through
232 : // SharedPrefix, BundlePrefix and RowSuffix.
233 1 : func (b PrefixBytes) At(i int) []byte {
234 1 : return slices.Concat(b.SharedPrefix(), b.RowBundlePrefix(i), b.RowSuffix(i))
235 1 : }
236 :
237 : // UnsafeFirstSlice returns first slice in the PrefixBytes. The returned slice
238 : // points directly into the PrefixBytes buffer and must not be mutated.
239 0 : func (b *PrefixBytes) UnsafeFirstSlice() []byte {
240 0 : return b.rawBytes.slice(0, b.rawBytes.offsets.At(2))
241 0 : }
242 :
243 : // PrefixBytesIter is an iterator and associated buffers for PrefixBytes. It
244 : // provides a means for efficiently iterating over the []byte slices contained
245 : // within a PrefixBytes, avoiding unnecessary copying when portions of slices
246 : // are shared.
247 : type PrefixBytesIter struct {
248 : // buf is used for materializing a user key. It is preallocated to the maximum
249 : // key length in the data block.
250 : buf []byte
251 : sharedAndBundlePrefixLen uint32
252 : offsetIndex int
253 : nextBundleOffsetIndex int
254 : }
255 :
256 : // SetAt updates the provided PrefixBytesIter to hold the i'th []byte slice in
257 : // the PrefixBytes. The PrefixBytesIter's buffer must be sufficiently large to
258 : // hold the i'th []byte slice, and the caller is required to statically ensure
259 : // this.
260 1 : func (b *PrefixBytes) SetAt(it *PrefixBytesIter, i int) {
261 1 : // Determine the offset and length of the bundle prefix.
262 1 : bundleOffsetIndex := b.bundleOffsetIndexForRow(i)
263 1 : bundleOffsetStart, bundleOffsetEnd := b.rawBytes.offsets.At2(bundleOffsetIndex)
264 1 : bundlePrefixLen := bundleOffsetEnd - bundleOffsetStart
265 1 :
266 1 : // Determine the offset and length of the row's individual suffix.
267 1 : it.offsetIndex = b.rowSuffixIndex(i)
268 1 : // TODO(jackson): rowSuffixOffsets will recompute bundleOffsetIndexForRow in
269 1 : // the case that the row is a duplicate key. Is it worth optimizing to avoid
270 1 : // this recomputation? The expected case is non-duplicate keys, so it may
271 1 : // not be worthwhile.
272 1 : rowSuffixStart, rowSuffixEnd := b.rowSuffixOffsets(i, it.offsetIndex)
273 1 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
274 1 :
275 1 : it.sharedAndBundlePrefixLen = uint32(b.sharedPrefixLen) + bundlePrefixLen
276 1 : it.buf = it.buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
277 1 :
278 1 : ptr := unsafe.Pointer(unsafe.SliceData(it.buf))
279 1 : // Copy the shared key prefix.
280 1 : memmove(ptr, b.rawBytes.data, uintptr(b.sharedPrefixLen))
281 1 : // Copy the bundle prefix.
282 1 : memmove(
283 1 : unsafe.Pointer(uintptr(ptr)+uintptr(b.sharedPrefixLen)),
284 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundleOffsetStart)),
285 1 : uintptr(bundlePrefixLen))
286 1 :
287 1 : // Copy the per-row suffix.
288 1 : memmove(
289 1 : unsafe.Pointer(uintptr(ptr)+uintptr(it.sharedAndBundlePrefixLen)),
290 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
291 1 : uintptr(rowSuffixLen))
292 1 : // Set nextBundleOffsetIndex so that a call to SetNext can cheaply determine
293 1 : // whether the next row is in the same bundle.
294 1 : it.nextBundleOffsetIndex = bundleOffsetIndex + (1 << b.bundleShift) + 1
295 1 : }
296 :
297 : // SetNext updates the provided PrefixBytesIter to hold the next []byte slice in
298 : // the PrefixBytes. SetNext requires the provided iter to currently hold a slice
299 : // and for a subsequent slice to exist within the PrefixBytes. The
300 : // PrefixBytesIter's buffer must be sufficiently large to hold the next []byte
301 : // slice, and the caller is required to statically ensure this.
302 1 : func (b *PrefixBytes) SetNext(it *PrefixBytesIter) {
303 1 : it.offsetIndex++
304 1 : // If the next row is in the same bundle, we can take a fast path of only
305 1 : // updating the per-row suffix.
306 1 : if it.offsetIndex < it.nextBundleOffsetIndex {
307 1 : rowSuffixStart, rowSuffixEnd := b.rawBytes.offsets.At2(it.offsetIndex)
308 1 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
309 1 : if rowSuffixLen == 0 {
310 1 : // The start and end offsets are equal, indicating that the key is a
311 1 : // duplicate. Since it's identical to the previous key, there's
312 1 : // nothing left to do, we can leave buf as-is.
313 1 : return
314 1 : }
315 1 : it.buf = it.buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
316 1 : // Copy in the per-row suffix.
317 1 : ptr := unsafe.Pointer(unsafe.SliceData(it.buf))
318 1 : memmove(
319 1 : unsafe.Pointer(uintptr(ptr)+uintptr(it.sharedAndBundlePrefixLen)),
320 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
321 1 : uintptr(rowSuffixLen))
322 1 : return
323 : }
324 :
325 : // We've reached the end of the bundle. We need to update the bundle prefix.
326 : // The offsetIndex is currently pointing to the start of the new bundle
327 : // prefix. Increment it to point at the start of the new row suffix.
328 1 : it.offsetIndex++
329 1 : rowSuffixStart, rowSuffixEnd := b.rawBytes.offsets.At2(it.offsetIndex)
330 1 : rowSuffixLen := rowSuffixEnd - rowSuffixStart
331 1 :
332 1 : // Read the offsets of the new bundle prefix and update the index of the
333 1 : // next bundle.
334 1 : bundlePrefixStart := b.rawBytes.offsets.At(it.nextBundleOffsetIndex)
335 1 : bundlePrefixLen := rowSuffixStart - bundlePrefixStart
336 1 : it.nextBundleOffsetIndex = it.offsetIndex + (1 << b.bundleShift)
337 1 :
338 1 : it.sharedAndBundlePrefixLen = uint32(b.sharedPrefixLen) + bundlePrefixLen
339 1 : it.buf = it.buf[:it.sharedAndBundlePrefixLen+rowSuffixLen]
340 1 : // Copy in the new bundle suffix.
341 1 : ptr := unsafe.Pointer(unsafe.SliceData(it.buf))
342 1 : memmove(
343 1 : unsafe.Pointer(uintptr(ptr)+uintptr(b.sharedPrefixLen)),
344 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(bundlePrefixStart)),
345 1 : uintptr(bundlePrefixLen))
346 1 : // Copy in the per-row suffix.
347 1 : memmove(
348 1 : unsafe.Pointer(uintptr(ptr)+uintptr(it.sharedAndBundlePrefixLen)),
349 1 : unsafe.Pointer(uintptr(b.rawBytes.data)+uintptr(rowSuffixStart)),
350 1 : uintptr(rowSuffixLen))
351 : }
352 :
353 : // SharedPrefix return a []byte of the shared prefix that was extracted from
354 : // all of the values in the Bytes vector. The returned slice should not be
355 : // mutated.
356 1 : func (b *PrefixBytes) SharedPrefix() []byte {
357 1 : // The very first slice is the prefix for the entire column.
358 1 : return b.rawBytes.slice(0, b.rawBytes.offsets.At(0))
359 1 : }
360 :
361 : // RowBundlePrefix takes a row index and returns a []byte of the prefix shared
362 : // among all the keys in the row's bundle, but without the block-level shared
363 : // prefix for the column. The returned slice should not be mutated.
364 1 : func (b *PrefixBytes) RowBundlePrefix(row int) []byte {
365 1 : i := b.bundleOffsetIndexForRow(row)
366 1 : return b.rawBytes.slice(b.rawBytes.offsets.At(i), b.rawBytes.offsets.At(i+1))
367 1 : }
368 :
369 : // BundlePrefix returns the prefix of the i-th bundle in the column. The
370 : // provided i must be in the range [0, BundleCount()). The returned slice should
371 : // not be mutated.
372 1 : func (b *PrefixBytes) BundlePrefix(i int) []byte {
373 1 : j := b.offsetIndexByBundleIndex(i)
374 1 : return b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+1))
375 1 : }
376 :
377 : // RowSuffix returns a []byte of the suffix unique to the row. A row's full key
378 : // is the result of concatenating SharedPrefix(), BundlePrefix() and
379 : // RowSuffix().
380 : //
381 : // The returned slice should not be mutated.
382 1 : func (b *PrefixBytes) RowSuffix(row int) []byte {
383 1 : return b.rawBytes.slice(b.rowSuffixOffsets(row, b.rowSuffixIndex(row)))
384 1 : }
385 :
386 : // rowSuffixOffsets finds the start and end offsets of the row's suffix slice,
387 : // accounting for duplicate keys. It takes the index of the row, and the value
388 : // of rowSuffixIndex(row).
389 1 : func (b *PrefixBytes) rowSuffixOffsets(row, i int) (low uint32, high uint32) {
390 1 : // Retrieve the low and high offsets indicating the start and end of the
391 1 : // row's suffix slice.
392 1 : low, high = b.rawBytes.offsets.At2(i)
393 1 : // If there's a non-empty slice for the row, this row is different than its
394 1 : // predecessor.
395 1 : if low != high {
396 1 : return low, high
397 1 : }
398 : // Otherwise, an empty slice indicates a duplicate key. We need to find the
399 : // first non-empty predecessor within the bundle, or if all the rows are
400 : // empty, return arbitrary equal low and high.
401 : //
402 : // Compute the index of the first row in the bundle so we know when to stop.
403 1 : firstIndex := 1 + b.bundleOffsetIndexForRow(row)
404 1 : for i > firstIndex {
405 1 : // Step back a row, and check if the slice is non-empty.
406 1 : i--
407 1 : high = low
408 1 : low = b.rawBytes.offsets.At(i)
409 1 : if low != high {
410 1 : return low, high
411 1 : }
412 : }
413 : // All the rows in the bundle are empty.
414 1 : return low, high
415 : }
416 :
417 : // Rows returns the count of rows whose keys are encoded within the PrefixBytes.
418 1 : func (b *PrefixBytes) Rows() int {
419 1 : return b.rows
420 1 : }
421 :
422 : // BundleCount returns the count of bundles within the PrefixBytes.
423 1 : func (b *PrefixBytes) BundleCount() int {
424 1 : return b.bundleCount(b.rows)
425 1 : }
426 :
427 : // Search searchs for the first key in the PrefixBytes that is greater than or
428 : // equal to k, returning the index of the key and whether an equal key was
429 : // found. If multiple keys are equal, the index of the first such key is
430 : // returned. If all keys are < k, Search returns Rows() for the row index.
431 1 : func (b *PrefixBytes) Search(k []byte) (rowIndex int, isEqual bool) {
432 1 : // First compare to the block-level shared prefix.
433 1 : n := min(len(k), b.sharedPrefixLen)
434 1 : c := bytes.Compare(k[:n], unsafe.Slice((*byte)(b.rawBytes.data), b.sharedPrefixLen))
435 1 : switch {
436 1 : case c < 0 || (c == 0 && n < b.sharedPrefixLen):
437 1 : // Search key is less than any prefix in the block.
438 1 : return 0, false
439 1 : case c > 0:
440 1 : // Search key is greater than any key in the block.
441 1 : return b.rows, false
442 : }
443 : // Trim the block-level shared prefix from the search key.
444 1 : k = k[b.sharedPrefixLen:]
445 1 :
446 1 : // Binary search among the first keys of each bundle.
447 1 : //
448 1 : // Define f(-1) == false and f(upper) == true.
449 1 : // Invariant: f(bi-1) == false, f(upper) == true.
450 1 : nBundles := b.BundleCount()
451 1 : bi, upper := 0, nBundles
452 1 : upperEqual := false
453 1 : for bi < upper {
454 1 : h := int(uint(bi+upper) >> 1) // avoid overflow when computing h
455 1 : // bi ≤ h < upper
456 1 :
457 1 : // Retrieve the first key in the h-th (zero-indexed) bundle. We take
458 1 : // advantage of the fact that the first row is stored contiguously in
459 1 : // the data array (modulo the block prefix) to slice the entirety of the
460 1 : // first key:
461 1 : //
462 1 : // b u n d l e p r e f i x f i r s t k e y r e m a i n d e r
463 1 : // ^ ^ ^
464 1 : // offset(j) offset(j+1) offset(j+2)
465 1 : //
466 1 : j := b.offsetIndexByBundleIndex(h)
467 1 : bundleFirstKey := b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+2))
468 1 : c = bytes.Compare(k, bundleFirstKey)
469 1 : switch {
470 1 : case c > 0:
471 1 : bi = h + 1 // preserves f(bi-1) == false
472 1 : case c < 0:
473 1 : upper = h // preserves f(upper) == true
474 1 : upperEqual = false
475 1 : default:
476 1 : // c == 0
477 1 : upper = h // preserves f(upper) == true
478 1 : upperEqual = true
479 : }
480 : }
481 1 : if bi == 0 {
482 1 : // The very first key is ≥ k. Return it.
483 1 : return 0, upperEqual
484 1 : }
485 : // The first key of the bundle bi is ≥ k, but any of the keys in the
486 : // previous bundle besides the first could also be ≥ k. We can binary search
487 : // among them, but if the seek key doesn't share the previous bundle's
488 : // prefix there's no need.
489 1 : j := b.offsetIndexByBundleIndex(bi - 1)
490 1 : bundlePrefix := b.rawBytes.slice(b.rawBytes.offsets.At(j), b.rawBytes.offsets.At(j+1))
491 1 :
492 1 : // The row we are looking for might still be in the previous bundle even
493 1 : // though the seek key is greater than the first key. This is possible only
494 1 : // if the search key shares the first bundle's prefix (eg, the search key
495 1 : // equals a row in the previous bundle or falls between two rows within the
496 1 : // previous bundle).
497 1 : if len(bundlePrefix) > len(k) || !bytes.Equal(k[:len(bundlePrefix)], bundlePrefix) {
498 1 : // The search key doesn't share the previous bundle's prefix, so all of
499 1 : // the keys in the previous bundle must be less than k. We know the
500 1 : // first key of bi is ≥ k, so return it.
501 1 : if bi >= nBundles {
502 1 : return b.rows, false
503 1 : }
504 1 : return bi << b.bundleShift, upperEqual
505 : }
506 : // Binary search among bundle bi-1's key remainders after stripping bundle
507 : // bi-1's prefix.
508 : //
509 : // Define f(l-1) == false and f(u) == true.
510 : // Invariant: f(l-1) == false, f(u) == true.
511 1 : k = k[len(bundlePrefix):]
512 1 : l := 1
513 1 : u := min(1<<b.bundleShift, b.rows-(bi-1)<<b.bundleShift)
514 1 : for l < u {
515 1 : h := int(uint(l+u) >> 1) // avoid overflow when computing h
516 1 : // l ≤ h < u
517 1 :
518 1 : // j is currently the index of the offset of bundle bi-i's prefix.
519 1 : //
520 1 : // b u n d l e p r e f i x f i r s t k e y s e c o n d k e y
521 1 : // ^ ^ ^
522 1 : // offset(j) offset(j+1) offset(j+2)
523 1 : //
524 1 : // The beginning of the zero-indexed i-th key of the bundle is at
525 1 : // offset(j+i+1).
526 1 : //
527 1 : hStart := b.rawBytes.offsets.At(j + h + 1)
528 1 : hEnd := b.rawBytes.offsets.At(j + h + 2)
529 1 : // There's a complication with duplicate keys. When keys are repeated,
530 1 : // the PrefixBytes encoding avoids re-encoding the duplicate key,
531 1 : // instead encoding an empty slice. While binary searching, if we land
532 1 : // on an empty slice, we need to back up until we find a non-empty slice
533 1 : // which is the key at index h. We iterate with p. If we eventually find
534 1 : // the duplicated key at index p < h and determine f(p) == true, then we
535 1 : // can set u=p (rather than h). If we determine f(p)==false, then we
536 1 : // know f(h)==false too and set l=h+1.
537 1 : p := h
538 1 : if hStart == hEnd {
539 1 : // Back up looking for an empty slice.
540 1 : for hStart == hEnd && p >= l {
541 1 : p--
542 1 : hEnd = hStart
543 1 : hStart = b.rawBytes.offsets.At(j + p + 1)
544 1 : }
545 : // If we backed up to l-1, then all the rows in indexes [l, h] have
546 : // the same keys as index l-1. We know f(l-1) == false [see the
547 : // invariants above], so we can move l to h+1 and continue the loop
548 : // without performing any key comparisons.
549 1 : if p < l {
550 1 : l = h + 1
551 1 : continue
552 : }
553 : }
554 1 : rem := b.rawBytes.slice(hStart, hEnd)
555 1 : c = bytes.Compare(k, rem)
556 1 : switch {
557 1 : case c > 0:
558 1 : l = h + 1 // preserves f(l-1) == false
559 1 : case c < 0:
560 1 : u = p // preserves f(u) == true
561 1 : upperEqual = false
562 1 : default:
563 1 : // c == 0
564 1 : u = p // preserves f(u) == true
565 1 : upperEqual = true
566 : }
567 : }
568 1 : i := (bi-1)<<b.bundleShift + l
569 1 : if i < b.rows {
570 1 : return i, upperEqual
571 1 : }
572 1 : return b.rows, false
573 : }
574 :
575 1 : func prefixBytesToBinFormatter(f *binfmt.Formatter, count int, sliceFormatter func([]byte) string) {
576 1 : if sliceFormatter == nil {
577 1 : sliceFormatter = defaultSliceFormatter
578 1 : }
579 1 : pb, _ := DecodePrefixBytes(f.RelativeData(), uint32(f.RelativeOffset()), count)
580 1 : f.CommentLine("PrefixBytes")
581 1 : f.HexBytesln(1, "bundleSize: %d", 1<<pb.bundleShift)
582 1 : f.CommentLine("Offsets table")
583 1 : dataOffset := uint64(f.RelativeOffset()) + uint64(uintptr(pb.rawBytes.data)-uintptr(pb.rawBytes.start))
584 1 : uintsToBinFormatter(f, pb.rawBytes.slices+1,
585 1 : func(offsetDelta, offsetBase uint64) string {
586 1 : // NB: offsetBase will always be zero for PrefixBytes columns.
587 1 : return fmt.Sprintf("%d [%d overall]", offsetDelta+offsetBase, offsetDelta+offsetBase+dataOffset)
588 1 : })
589 1 : f.CommentLine("Data")
590 1 :
591 1 : // The first offset encodes the length of the block prefix.
592 1 : blockPrefixLen := pb.rawBytes.offsets.At(0)
593 1 : f.HexBytesln(int(blockPrefixLen), "data[00]: %s (block prefix)",
594 1 : sliceFormatter(pb.rawBytes.slice(0, blockPrefixLen)))
595 1 :
596 1 : k := 2 + (count-1)>>pb.bundleShift + count
597 1 : startOff := blockPrefixLen
598 1 : prevLen := blockPrefixLen
599 1 :
600 1 : // Use dots to indicate string data that's elided because it falls within
601 1 : // the block or bundle prefix.
602 1 : dots := strings.Repeat(".", int(blockPrefixLen))
603 1 : // Iterate through all the slices in the data section, annotating bundle
604 1 : // prefixes and using dots to indicate elided data.
605 1 : for i := 0; i < k-1; i++ {
606 1 : endOff := pb.rawBytes.offsets.At(i + 1)
607 1 : if i%(1+(1<<pb.bundleShift)) == 0 {
608 1 : // This is a bundle prefix.
609 1 : dots = strings.Repeat(".", int(blockPrefixLen))
610 1 : f.HexBytesln(int(endOff-startOff), "data[%02d]: %s%s (bundle prefix)", i+1, dots, sliceFormatter(pb.rawBytes.At(i)))
611 1 : dots = strings.Repeat(".", int(endOff-startOff+blockPrefixLen))
612 1 : prevLen = endOff - startOff + blockPrefixLen
613 1 : } else if startOff == endOff {
614 1 : // An empty slice that's not a block or bundle prefix indicates a
615 1 : // repeat key.
616 1 : f.HexBytesln(0, "data[%02d]: %s", i+1, strings.Repeat(".", int(prevLen)))
617 1 : } else {
618 1 : f.HexBytesln(int(endOff-startOff), "data[%02d]: %s%s", i+1, dots, sliceFormatter(pb.rawBytes.At(i)))
619 1 : prevLen = uint32(len(dots)) + endOff - startOff
620 1 : }
621 1 : startOff = endOff
622 : }
623 : }
624 :
625 : // PrefixBytesBuilder encodes a column of lexicographically-sorted byte slices,
626 : // applying prefix compression to reduce the encoded size.
627 : type PrefixBytesBuilder struct {
628 : bundleCalc
629 : // TODO(jackson): If prefix compression is very effective, the encoded size
630 : // may remain very small while the physical in-memory size of the
631 : // in-progress data slice may grow very large. This may pose memory usage
632 : // problems during block building.
633 : data []byte // The raw, concatenated keys w/o any prefix compression
634 : nKeys int // The number of keys added to the builder
635 : bundleSize int // The number of keys per bundle
636 : // sizings maintains metadata about the size of the accumulated data at both
637 : // nKeys and nKeys-1. Information for the state after the most recently
638 : // added key is stored at (b.nKeys+1)%2.
639 : sizings [2]prefixBytesSizing
640 : offsets struct {
641 : count int // The number of offsets in the builder
642 : // elemsSize is the size of the array (in count of uint32 elements; not
643 : // bytes)
644 : elemsSize int
645 : // elems provides access to elements without bounds checking. elems is
646 : // grown automatically in addOffset.
647 : elems UnsafeRawSlice[uint32]
648 : }
649 : maxShared uint16
650 : }
651 :
652 : // Assert that PrefixBytesBuilder implements ColumnWriter.
653 : var _ ColumnWriter = (*PrefixBytesBuilder)(nil)
654 :
655 : // Init initializes the PrefixBytesBuilder with the specified bundle size. The
656 : // builder will produce a prefix-compressed column of data type
657 : // DataTypePrefixBytes. The [bundleSize] indicates the number of keys that form
658 : // a "bundle," across which prefix-compression is applied. All keys in the
659 : // column will share a column-wide prefix if there is one.
660 1 : func (b *PrefixBytesBuilder) Init(bundleSize int) {
661 1 : if bundleSize > 0 && (bundleSize&(bundleSize-1)) != 0 {
662 0 : panic(errors.AssertionFailedf("prefixbytes bundle size %d is not a power of 2", bundleSize))
663 : }
664 1 : *b = PrefixBytesBuilder{
665 1 : bundleCalc: makeBundleCalc(bits.TrailingZeros32(uint32(bundleSize))),
666 1 : data: b.data[:0],
667 1 : bundleSize: bundleSize,
668 1 : offsets: b.offsets,
669 1 : maxShared: (1 << 16) - 1,
670 1 : }
671 1 : b.offsets.count = 0
672 : }
673 :
674 : // NumColumns implements ColumnWriter.
675 1 : func (b *PrefixBytesBuilder) NumColumns() int { return 1 }
676 :
677 : // DataType implements ColumnWriter.
678 1 : func (b *PrefixBytesBuilder) DataType(int) DataType { return DataTypePrefixBytes }
679 :
680 : // Reset resets the builder to an empty state, preserving the existing bundle
681 : // size.
682 1 : func (b *PrefixBytesBuilder) Reset() {
683 1 : const maxRetainedData = 512 << 10 // 512 KB
684 1 : *b = PrefixBytesBuilder{
685 1 : bundleCalc: b.bundleCalc,
686 1 : data: b.data[:0],
687 1 : bundleSize: b.bundleSize,
688 1 : offsets: b.offsets,
689 1 : maxShared: b.maxShared,
690 1 : sizings: [2]prefixBytesSizing{},
691 1 : }
692 1 : b.offsets.count = 0
693 1 : if len(b.data) > maxRetainedData {
694 0 : b.data = nil
695 0 : }
696 : }
697 :
698 : // Rows returns the number of keys added to the builder.
699 1 : func (b *PrefixBytesBuilder) Rows() int { return b.nKeys }
700 :
701 : // prefixBytesSizing maintains metadata about the size of the accumulated data
702 : // and its encoded size. Every key addition computes a new prefixBytesSizing
703 : // struct. The PrefixBytesBuilder maintains two prefixBytesSizing structs, one
704 : // for the state after the most recent key addition, and one for the state after
705 : // the second most recent key addition.
706 : type prefixBytesSizing struct {
707 : lastKeyLen int // the length of the last key added
708 : offsetCount int // the count of offsets required to encode the data
709 : blockPrefixLen int // the length of the block prefix
710 : currentBundleDistinctLen int // the length of the "current" bundle's distinct keys
711 : currentBundleDistinctKeys int // the number of distinct keys in the "current" bundle
712 : // currentBundlePrefixLen is the length of the "current" bundle's prefix.
713 : // The current bundle holds all keys that are not included within
714 : // PrefixBytesBuilder.completedBundleLen. If the addition of a key causes
715 : // the creation of a new bundle, the previous bundle's size is incorporated
716 : // into completedBundleLen and currentBundlePrefixLen is updated to the
717 : // length of the new bundle key. This ensures that there's always at least 1
718 : // key in the "current" bundle allowing Finish to accept rows = nKeys-1.
719 : //
720 : // Note that currentBundlePrefixLen is inclusive of the blockPrefixLen.
721 : //
722 : // INVARIANT: currentBundlePrefixLen >= blockPrefixLen
723 : currentBundlePrefixLen int // the length of the "current" bundle's prefix
724 : currentBundlePrefixOffset int // the index of the offset of the "current" bundle's prefix
725 : completedBundleLen int // the encoded size of completed bundles
726 : compressedDataLen int // the compressed, encoded size of data
727 : offsetEncoding UintEncoding // the encoding necessary to encode the offsets
728 : }
729 :
730 0 : func (sz *prefixBytesSizing) String() string {
731 0 : return fmt.Sprintf("lastKeyLen:%d offsetCount:%d blockPrefixLen:%d\n"+
732 0 : "currentBundleDistinct{Len,Keys}: (%d,%d)\n"+
733 0 : "currentBundlePrefix{Len,Offset}: (%d,%d)\n"+
734 0 : "completedBundleLen:%d compressedDataLen:%d offsetEncoding:%s",
735 0 : sz.lastKeyLen, sz.offsetCount, sz.blockPrefixLen, sz.currentBundleDistinctLen,
736 0 : sz.currentBundleDistinctKeys, sz.currentBundlePrefixLen, sz.currentBundlePrefixOffset,
737 0 : sz.completedBundleLen, sz.compressedDataLen, sz.offsetEncoding)
738 0 : }
739 :
740 : // Put adds the provided key to the column. The provided key must be
741 : // lexicographically greater than or equal to the previous key added to the
742 : // builder.
743 : //
744 : // The provided bytesSharedWithPrev must be the length of the byte prefix the
745 : // provided key shares with the previous key. The caller is required to provide
746 : // this because in the primary expected use, the caller will already need to
747 : // compute it for the purpose of determining whether successive keys share the
748 : // same prefix.
749 1 : func (b *PrefixBytesBuilder) Put(key []byte, bytesSharedWithPrev int) {
750 1 : currIdx := b.nKeys & 1 // %2
751 1 : curr := &b.sizings[currIdx]
752 1 : prev := &b.sizings[1-currIdx]
753 1 :
754 1 : if invariants.Enabled {
755 1 : if len(key) == 0 {
756 0 : panic(errors.AssertionFailedf("key must be non-empty"))
757 : }
758 1 : if b.maxShared == 0 {
759 0 : panic(errors.AssertionFailedf("maxShared must be positive"))
760 : }
761 1 : if b.nKeys > 0 {
762 1 : if bytes.Compare(key, b.data[len(b.data)-prev.lastKeyLen:]) < 0 {
763 0 : panic(errors.AssertionFailedf("keys must be added in order: %q < %q", key, b.data[len(b.data)-prev.lastKeyLen:]))
764 : }
765 1 : if bytesSharedWithPrev != crbytes.CommonPrefix(key, b.data[len(b.data)-prev.lastKeyLen:]) {
766 0 : panic(errors.AssertionFailedf("bytesSharedWithPrev %d != %d", bytesSharedWithPrev,
767 0 : crbytes.CommonPrefix(key, b.data[len(b.data)-prev.lastKeyLen:])))
768 : }
769 : }
770 : }
771 :
772 1 : switch {
773 1 : case b.nKeys == 0:
774 1 : // We're adding the first key to the block.
775 1 : // Set a placeholder offset for the block prefix length.
776 1 : b.addOffset(0)
777 1 : // Set a placeholder offset for the bundle prefix length.
778 1 : b.addOffset(0)
779 1 : b.nKeys++
780 1 : b.data = append(b.data, key...)
781 1 : b.addOffset(uint32(len(b.data)))
782 1 : *curr = prefixBytesSizing{
783 1 : lastKeyLen: len(key),
784 1 : offsetCount: b.offsets.count,
785 1 : blockPrefixLen: min(len(key), int(b.maxShared)),
786 1 : currentBundleDistinctLen: len(key),
787 1 : currentBundleDistinctKeys: 1,
788 1 : currentBundlePrefixLen: min(len(key), int(b.maxShared)),
789 1 : currentBundlePrefixOffset: 1,
790 1 : completedBundleLen: 0,
791 1 : compressedDataLen: len(key),
792 1 : offsetEncoding: DetermineUintEncoding(0, uint64(len(key))),
793 1 : }
794 1 : case b.nKeys&(b.bundleSize-1) == 0:
795 1 : // We're starting a new bundle.
796 1 :
797 1 : // Set the bundle prefix length of the previous bundle.
798 1 : b.offsets.elems.set(prev.currentBundlePrefixOffset,
799 1 : b.offsets.elems.At(prev.currentBundlePrefixOffset-1)+uint32(prev.currentBundlePrefixLen))
800 1 :
801 1 : // Finalize the encoded size of the previous bundle.
802 1 : bundleSizeJustCompleted := prev.currentBundleDistinctLen - (prev.currentBundleDistinctKeys-1)*prev.currentBundlePrefixLen
803 1 : completedBundleSize := prev.completedBundleLen + bundleSizeJustCompleted
804 1 :
805 1 : // Update the block prefix length if necessary. The caller tells us how
806 1 : // many bytes of prefix this key shares with the previous key. The block
807 1 : // prefix can only shrink if the bytes shared with the previous key are
808 1 : // less than the block prefix length, in which case the new block prefix
809 1 : // is the number of bytes shared with the previous key.
810 1 : blockPrefixLen := min(prev.blockPrefixLen, bytesSharedWithPrev)
811 1 : b.nKeys++
812 1 : *curr = prefixBytesSizing{
813 1 : lastKeyLen: len(key),
814 1 : offsetCount: b.offsets.count + 2,
815 1 : blockPrefixLen: blockPrefixLen,
816 1 : completedBundleLen: completedBundleSize,
817 1 : // We're adding the first key to the current bundle. Initialize
818 1 : // the current bundle prefix.
819 1 : currentBundlePrefixOffset: b.offsets.count,
820 1 : currentBundlePrefixLen: min(len(key), int(b.maxShared)),
821 1 : currentBundleDistinctLen: len(key),
822 1 : currentBundleDistinctKeys: 1,
823 1 : compressedDataLen: completedBundleSize + len(key) - (b.bundleCount(b.nKeys)-1)*blockPrefixLen,
824 1 : }
825 1 : curr.offsetEncoding = DetermineUintEncoding(0, uint64(curr.compressedDataLen))
826 1 : b.data = append(b.data, key...)
827 1 : b.addOffset(0) // Placeholder for bundle prefix.
828 1 : b.addOffset(uint32(len(b.data)))
829 1 : default:
830 1 : // Adding a new key to an existing bundle.
831 1 : b.nKeys++
832 1 :
833 1 : if bytesSharedWithPrev == len(key) {
834 1 : // Duplicate key; don't add it to the data slice and don't adjust
835 1 : // currentBundleDistinct{Len,Keys}.
836 1 : *curr = *prev
837 1 : curr.offsetCount++
838 1 : b.addOffset(b.offsets.elems.At(b.offsets.count - 1))
839 1 : return
840 1 : }
841 :
842 : // Update the bundle prefix length. Note that the shared prefix length
843 : // can only shrink as new values are added. During construction, the
844 : // bundle prefix value is stored contiguously in the data array so even
845 : // if the bundle prefix length changes no adjustment is needed to that
846 : // value or to the first key in the bundle.
847 1 : *curr = prefixBytesSizing{
848 1 : lastKeyLen: len(key),
849 1 : offsetCount: prev.offsetCount + 1,
850 1 : blockPrefixLen: min(prev.blockPrefixLen, bytesSharedWithPrev),
851 1 : currentBundleDistinctLen: prev.currentBundleDistinctLen + len(key),
852 1 : currentBundleDistinctKeys: prev.currentBundleDistinctKeys + 1,
853 1 : currentBundlePrefixLen: min(prev.currentBundlePrefixLen, bytesSharedWithPrev),
854 1 : currentBundlePrefixOffset: prev.currentBundlePrefixOffset,
855 1 : completedBundleLen: prev.completedBundleLen,
856 1 : }
857 1 : // Compute the correct compressedDataLen.
858 1 : curr.compressedDataLen = curr.completedBundleLen +
859 1 : curr.currentBundleDistinctLen -
860 1 : (curr.currentBundleDistinctKeys-1)*curr.currentBundlePrefixLen
861 1 : // Currently compressedDataLen is correct, except that it includes the block
862 1 : // prefix length for all bundle prefixes. Adjust the length to account for
863 1 : // the block prefix being stripped from every bundle except the first one.
864 1 : curr.compressedDataLen -= (b.bundleCount(b.nKeys) - 1) * curr.blockPrefixLen
865 1 : // The compressedDataLen is the largest offset we'll need to encode in the
866 1 : // offset table.
867 1 : curr.offsetEncoding = DetermineUintEncoding(0, uint64(curr.compressedDataLen))
868 1 : b.data = append(b.data, key...)
869 1 : b.addOffset(uint32(len(b.data)))
870 : }
871 : }
872 :
873 : // UnsafeGet returns the zero-indexed i'th key added to the builder through Put.
874 : // UnsafeGet may only be used to retrieve the Rows()-1'th or Rows()-2'th keys.
875 : // If called with a different i value, UnsafeGet panics. The keys returned by
876 : // UnsafeGet are guaranteed to be stable until Finish or Reset is called. The
877 : // caller must not mutate the returned slice.
878 1 : func (b *PrefixBytesBuilder) UnsafeGet(i int) []byte {
879 1 : switch i {
880 1 : case b.nKeys - 1:
881 1 : // The last key is the [lastKeyLen] bytes.
882 1 : return b.data[len(b.data)-b.sizings[i&1].lastKeyLen:]
883 1 : case b.nKeys - 2:
884 1 : // Check if the very last key is a duplicate of the second-to-last key.
885 1 : lastKeyLen := b.sizings[(i+1)&1].lastKeyLen
886 1 : if b.offsets.elems.At(b.rowSuffixIndex(i+1)) == b.offsets.elems.At(b.rowSuffixIndex(i+2)) {
887 1 : return b.data[len(b.data)-b.sizings[i&1].lastKeyLen:]
888 1 : }
889 1 : lastLastKeyLen := b.sizings[i&1].lastKeyLen
890 1 : return b.data[len(b.data)-lastKeyLen-lastLastKeyLen : len(b.data)-lastKeyLen]
891 0 : default:
892 0 : panic(errors.AssertionFailedf("UnsafeGet(%d) called on PrefixBytes with %d keys", i, b.nKeys))
893 : }
894 : }
895 :
896 : // addOffset adds an offset to the offsets table. If necessary, addOffset will
897 : // grow the offset table to accommodate the new offset.
898 1 : func (b *PrefixBytesBuilder) addOffset(offset uint32) {
899 1 : if b.offsets.count == b.offsets.elemsSize {
900 1 : // Double the size of the allocated array, or initialize it to at least
901 1 : // 64 rows if this is the first allocation.
902 1 : n2 := max(b.offsets.elemsSize<<1, 64)
903 1 : newDataTyped := make([]uint32, n2)
904 1 : copy(newDataTyped, b.offsets.elems.Slice(b.offsets.elemsSize))
905 1 : b.offsets.elems = makeUnsafeRawSlice[uint32](unsafe.Pointer(&newDataTyped[0]))
906 1 : b.offsets.elemsSize = n2
907 1 : }
908 1 : b.offsets.elems.set(b.offsets.count, offset)
909 1 : b.offsets.count++
910 : }
911 :
912 : // writePrefixCompressed writes the provided builder's first [rows] rows with
913 : // prefix-compression applied. It writes offsets and string data in tandem,
914 : // writing offsets of width T into [offsetDeltas] and compressed string data
915 : // into [buf]. The builder's internal state is not modified by
916 : // writePrefixCompressed. writePrefixCompressed is generic in terms of the type
917 : // T of the offset deltas.
918 : //
919 : // The caller must have correctly constructed [offsetDeltas] such that writing
920 : // [sizing.offsetCount] offsets of size T does not overwrite the beginning of
921 : // [buf]:
922 : //
923 : // +-------------------------------------+ <- offsetDeltas.ptr
924 : // | offsetDeltas[0] |
925 : // +-------------------------------------+
926 : // | offsetDeltas[1] |
927 : // +-------------------------------------+
928 : // | ... |
929 : // +-------------------------------------+
930 : // | offsetDeltas[sizing.offsetCount-1] |
931 : // +-------------------------------------+ <- &buf[0]
932 : // | buf (string data) |
933 : // | ... |
934 : // +-------------------------------------+
935 : func writePrefixCompressed[T Uint](
936 : b *PrefixBytesBuilder,
937 : rows int,
938 : sz *prefixBytesSizing,
939 : offsetDeltas UnsafeRawSlice[T],
940 : buf []byte,
941 1 : ) {
942 1 : if rows == 0 {
943 0 : return
944 1 : } else if rows == 1 {
945 1 : // If there's just 1 row, no prefix compression is necessary and we can
946 1 : // just encode the first key as the entire block prefix and first bundle
947 1 : // prefix.
948 1 : e := b.offsets.elems.At(2)
949 1 : offsetDeltas.set(0, T(e))
950 1 : offsetDeltas.set(1, T(e))
951 1 : offsetDeltas.set(2, T(e))
952 1 : copy(buf, b.data[:e])
953 1 : return
954 1 : }
955 :
956 : // The offset at index 0 is the block prefix length.
957 1 : offsetDeltas.set(0, T(sz.blockPrefixLen))
958 1 : destOffset := T(copy(buf, b.data[:sz.blockPrefixLen]))
959 1 : var lastRowOffset uint32
960 1 : var shared int
961 1 :
962 1 : // Loop over the slices starting at the bundle prefix of the first bundle.
963 1 : // If the slice is a bundle prefix, carve off the suffix that excludes the
964 1 : // block prefix. Otherwise, carve off the suffix that excludes the block
965 1 : // prefix + bundle prefix.
966 1 : for i := 1; i < sz.offsetCount; i++ {
967 1 : off := b.offsets.elems.At(i)
968 1 : var suffix []byte
969 1 : if (i-1)%(b.bundleSize+1) == 0 {
970 1 : // This is a bundle prefix.
971 1 : if i == sz.currentBundlePrefixOffset {
972 1 : suffix = b.data[lastRowOffset+uint32(sz.blockPrefixLen) : lastRowOffset+uint32(sz.currentBundlePrefixLen)]
973 1 : } else {
974 1 : suffix = b.data[lastRowOffset+uint32(sz.blockPrefixLen) : off]
975 1 : }
976 1 : shared = sz.blockPrefixLen + len(suffix)
977 : // We don't update lastRowOffset here because the bundle prefix
978 : // was never actually stored separately in the data array.
979 1 : } else {
980 1 : // If the offset of this key is the same as the offset of the
981 1 : // previous key, then the key is a duplicate. All we need to do is
982 1 : // set the same offset in the destination.
983 1 : if off == lastRowOffset {
984 1 : offsetDeltas.set(i, offsetDeltas.At(i-1))
985 1 : continue
986 : }
987 1 : suffix = b.data[lastRowOffset+uint32(shared) : off]
988 1 : // Update lastRowOffset for the next iteration of this loop.
989 1 : lastRowOffset = off
990 : }
991 1 : if invariants.Enabled && len(buf[destOffset:]) < len(suffix) {
992 0 : panic(errors.AssertionFailedf("buf is too small: %d < %d", len(buf[destOffset:]), len(suffix)))
993 : }
994 1 : destOffset += T(copy(buf[destOffset:], suffix))
995 1 : offsetDeltas.set(i, destOffset)
996 : }
997 1 : if destOffset != T(sz.compressedDataLen) {
998 0 : panic(errors.AssertionFailedf("wrote %d, expected %d", destOffset, sz.compressedDataLen))
999 : }
1000 : }
1001 :
1002 : // Finish writes the serialized byte slices to buf starting at offset. The buf
1003 : // slice must be sufficiently large to store the serialized output. The caller
1004 : // should use [Size] to size buf appropriately before calling Finish.
1005 : //
1006 : // Finish only supports values of [rows] equal to the number of keys set on the
1007 : // builder, or one less.
1008 : func (b *PrefixBytesBuilder) Finish(
1009 : col int, rows int, offset uint32, buf []byte,
1010 1 : ) (endOffset uint32) {
1011 1 : if rows < b.nKeys-1 || rows > b.nKeys {
1012 0 : panic(errors.AssertionFailedf("PrefixBytes has accumulated %d keys, asked to Finish %d", b.nKeys, rows))
1013 : }
1014 1 : if rows == 0 {
1015 0 : return offset
1016 0 : }
1017 : // Encode the bundle shift.
1018 1 : buf[offset] = byte(b.bundleShift)
1019 1 : offset++
1020 1 :
1021 1 : sz := &b.sizings[(rows+1)%2]
1022 1 : stringDataOffset := uintColumnSize(uint32(sz.offsetCount), offset, sz.offsetEncoding)
1023 1 : if sz.offsetEncoding.IsDelta() {
1024 0 : panic(errors.AssertionFailedf("offsets never need delta encoding"))
1025 : }
1026 :
1027 1 : width := uint32(sz.offsetEncoding.Width())
1028 1 : buf[offset] = byte(sz.offsetEncoding)
1029 1 : offset++
1030 1 : offset = alignWithZeroes(buf, offset, width)
1031 1 : switch width {
1032 1 : case 1:
1033 1 : offsetDest := makeUnsafeRawSlice[uint8](unsafe.Pointer(&buf[offset]))
1034 1 : writePrefixCompressed[uint8](b, rows, sz, offsetDest, buf[stringDataOffset:])
1035 1 : case align16:
1036 1 : offsetDest := makeUnsafeRawSlice[uint16](unsafe.Pointer(&buf[offset]))
1037 1 : writePrefixCompressed[uint16](b, rows, sz, offsetDest, buf[stringDataOffset:])
1038 1 : case align32:
1039 1 : offsetDest := makeUnsafeRawSlice[uint32](unsafe.Pointer(&buf[offset]))
1040 1 : writePrefixCompressed[uint32](b, rows, sz, offsetDest, buf[stringDataOffset:])
1041 0 : default:
1042 0 : panic("unreachable")
1043 : }
1044 1 : return stringDataOffset + uint32(sz.compressedDataLen)
1045 : }
1046 :
1047 : // Size computes the size required to encode the byte slices beginning at the
1048 : // provided offset. The offset is required to ensure proper alignment. The
1049 : // returned uint32 is the offset of the first byte after the end of the encoded
1050 : // data. To compute the size in bytes, subtract the [offset] passed into Size
1051 : // from the returned offset.
1052 1 : func (b *PrefixBytesBuilder) Size(rows int, offset uint32) uint32 {
1053 1 : if rows == 0 {
1054 1 : return offset
1055 1 : } else if rows != b.nKeys && rows != b.nKeys-1 {
1056 0 : panic(errors.AssertionFailedf("PrefixBytes has accumulated %d keys, asked to Size %d", b.nKeys, rows))
1057 : }
1058 1 : sz := &b.sizings[(rows+1)%2]
1059 1 : // The 1-byte bundleSize.
1060 1 : offset++
1061 1 : // Compute the size of the offsets table.
1062 1 : offset = uintColumnSize(uint32(sz.offsetCount), offset, sz.offsetEncoding)
1063 1 : return offset + uint32(sz.compressedDataLen)
1064 : }
1065 :
1066 : // WriteDebug implements the Encoder interface.
1067 1 : func (b *PrefixBytesBuilder) WriteDebug(w io.Writer, rows int) {
1068 1 : fmt.Fprintf(w, "prefixbytes(%d): %d keys", b.bundleSize, b.nKeys)
1069 1 : }
1070 :
1071 : // bundleCalc provides facilities for computing indexes and offsets within a
1072 : // PrefixBytes structure.
1073 : type bundleCalc struct {
1074 : bundleShift int // log2(bundleSize)
1075 : // bundleMask is a mask with 1s across the high bits that indicate the
1076 : // bundle and 0s for the bits that indicate the position within the bundle.
1077 : bundleMask int
1078 : }
1079 :
1080 1 : func makeBundleCalc(bundleShift int) bundleCalc {
1081 1 : return bundleCalc{
1082 1 : bundleShift: bundleShift,
1083 1 : bundleMask: ^((1 << bundleShift) - 1),
1084 1 : }
1085 1 : }
1086 :
1087 : // rowSuffixIndex computes the index of the offset encoding the start of a row's
1088 : // suffix. Example usage of retrieving the row's suffix:
1089 : //
1090 : // i := b.rowSuffixIndex(row)
1091 : // l := b.rawBytes.offsets.At(i)
1092 : // h := b.rawBytes.offsets.At(i + 1)
1093 : // suffix := b.rawBytes.slice(l, h)
1094 1 : func (b bundleCalc) rowSuffixIndex(row int) int {
1095 1 : return 1 + (row >> b.bundleShift) + row
1096 1 : }
1097 :
1098 : // bundleOffsetIndexForRow computes the index of the offset encoding the start
1099 : // of a bundle's prefix.
1100 1 : func (b bundleCalc) bundleOffsetIndexForRow(row int) int {
1101 1 : // AND-ing the row with the bundle mask removes the least significant bits
1102 1 : // of the row, which encode the row's index within the bundle.
1103 1 : return (row >> b.bundleShift) + (row & b.bundleMask)
1104 1 : }
1105 :
1106 : // offsetIndexByBundleIndex computes the index of the offset encoding the start
1107 : // of a bundle's prefix when given the bundle's index (an index in
1108 : // [0,Rows/BundleSize)).
1109 1 : func (b bundleCalc) offsetIndexByBundleIndex(bi int) int {
1110 1 : return bi<<b.bundleShift + bi
1111 1 : }
1112 :
1113 1 : func (b bundleCalc) bundleCount(rows int) int {
1114 1 : return 1 + (rows-1)>>b.bundleShift
1115 1 : }
|