Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package colblk
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "math"
15 : "sync"
16 : "unsafe"
17 :
18 : "github.com/cockroachdb/crlib/crbytes"
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/binfmt"
22 : "github.com/cockroachdb/pebble/internal/invariants"
23 : "github.com/cockroachdb/pebble/internal/treeprinter"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : )
26 :
27 : // KeySchema defines the schema of a user key, as defined by the user's
28 : // application.
29 : //
30 : // TODO(jackson): Consider making this KVSchema. It feels like there's an
31 : // opportunity to generalize the ShortAttribute so that when a value is stored
32 : // out-of-band, the DataBlockWriter calls user-provided code to store the short
33 : // attributes inlined within the data block. For inlined-values, the
34 : // user-defined value columns would be implicitly null.
35 : type KeySchema struct {
36 : ColumnTypes []DataType
37 : NewKeyWriter func() KeyWriter
38 : NewKeySeeker func() KeySeeker
39 : }
40 :
41 : // A KeyWriter maintains ColumnWriters for a data block for writing user keys
42 : // into the database-specific key schema. Users may define their own key schema
43 : // and implement KeyWriter to encode keys into custom columns that are aware of
44 : // the structure of user keys.
45 : type KeyWriter interface {
46 : ColumnWriter
47 : // ComparePrev compares the provided user to the previously-written user
48 : // key. The returned KeyComparison's UserKeyComparison field is equivalent
49 : // to Compare(key, prevKey) where prevKey is the last key passed to
50 : // WriteKey.
51 : //
52 : // If no key has been written yet, ComparePrev returns a KeyComparison with
53 : // PrefixLen set and UserKeyComparison=1.
54 : ComparePrev(key []byte) KeyComparison
55 : // WriteKey writes a user key into the KeyWriter's columns. The
56 : // keyPrefixLenSharedWithPrev parameter takes the number of bytes prefixing
57 : // the key's logical prefix (as defined by (base.Comparer).Split) that the
58 : // previously-written key's prefix shares.
59 : //
60 : // WriteKey is guaranteed to be called sequentially with increasing row
61 : // indexes, beginning at zero.
62 : WriteKey(row int, key []byte, keyPrefixLen, keyPrefixLenSharedWithPrev int32)
63 : // MaterializeKey appends the zero-indexed row'th key written to dst,
64 : // returning the result.
65 : MaterializeKey(dst []byte, row int) []byte
66 : }
67 :
68 : // KeyComparison holds information about a key and its comparison to another a
69 : // key.
70 : type KeyComparison struct {
71 : // PrefixLen is the length of the prefix of the key. It's the outcome of
72 : // calling base.Split on the key.
73 : PrefixLen int32
74 : // CommonPrefixLen is the length of the physical (byte-wise) prefix of the
75 : // logical prefix that is shared with the other key. For example, for
76 : // "apple@1" and "applied@3" the value is 4 (the length of "appl"). For
77 : // "apple@1" and "apple@10" the value is 5 (the length of "apple"), because
78 : // the shared bytes within the suffix are not included.
79 : CommonPrefixLen int32
80 : // UserKeyComparison is the comparison of the user keys of the two keys.
81 : // Should be equivalent to
82 : //
83 : // Compare(key, otherKey)
84 : UserKeyComparison int32
85 : }
86 :
87 : // PrefixEqual returns true if the key comparison determined that the keys have
88 : // equal prefixes.
89 1 : func (kcmp KeyComparison) PrefixEqual() bool { return kcmp.PrefixLen == kcmp.CommonPrefixLen }
90 :
91 : // KeySeeker iterates over the keys in a columnar data block.
92 : //
93 : // Users of Pebble who define their own key schema must implement KeySeeker to
94 : // seek over their decomposed keys.
95 : //
96 : // KeySeeker implementations must be safe for concurrent use by multiple
97 : // goroutines. In practice, multiple DataBlockIterators may use the same
98 : // KeySeeker.
99 : type KeySeeker interface {
100 : // Init initializes the iterator to read from the provided DataBlockReader.
101 : Init(b *DataBlockReader) error
102 : // CompareFirstUserKey compares the provided key to the first user key
103 : // contained within the data block. It's equivalent to performing
104 : // Compare(firstUserKey, k)
105 : CompareFirstUserKey(k []byte) int
106 : // SeekGE returns the index of the first row with a key greater than or
107 : // equal to [key].
108 : //
109 : // If the caller externally knows a bound on where the key is located, it
110 : // may indicate it through [boundRow] and [searchDir]. A [searchDir] value
111 : // of -1 indicates that the sought row must be at an index ≤ [boundRow]. A
112 : // [searchDir] value of +1 indicates that the sought row must be at an index
113 : // ≥ [boundRow]. Implementations may use this information to constrain the
114 : // search. See (base.SeekGEFlags).TrySeekUsingNext for context on when this
115 : // may be set in practice.
116 : SeekGE(key []byte, boundRow int, searchDir int8) (row int)
117 : // MaterializeUserKey materializes the user key of the specified row,
118 : // returning a slice of the materialized user key.
119 : //
120 : // The provided keyIter must have a buffer large enough to hold the key.
121 : //
122 : // The prevRow parameter is the row MaterializeUserKey was last invoked with.
123 : // Implementations may take advantage of that knowledge to reduce work.
124 : MaterializeUserKey(keyIter *PrefixBytesIter, prevRow, row int) []byte
125 : // Release releases the KeySeeker. It's called when the seeker is no longer
126 : // in use. Implementations may pool KeySeeker objects.
127 : Release()
128 : }
129 :
130 : const (
131 : defaultKeySchemaColumnPrefix int = iota
132 : defaultKeySchemaColumnSuffix
133 : )
134 :
135 : var defaultSchemaColumnTypes = []DataType{
136 : defaultKeySchemaColumnPrefix: DataTypePrefixBytes,
137 : defaultKeySchemaColumnSuffix: DataTypeBytes,
138 : }
139 :
140 : var defaultKeySeekerPool = sync.Pool{
141 1 : New: func() interface{} {
142 1 : return &defaultKeySeeker{}
143 1 : },
144 : }
145 :
146 : // DefaultKeySchema returns the default key schema that decomposes a user key
147 : // into its prefix and suffix. Prefixes are sorted in lexicographical order.
148 1 : func DefaultKeySchema(comparer *base.Comparer, prefixBundleSize int) KeySchema {
149 1 : return KeySchema{
150 1 : ColumnTypes: defaultSchemaColumnTypes,
151 1 : NewKeyWriter: func() KeyWriter {
152 1 : kw := &defaultKeyWriter{comparer: comparer}
153 1 : kw.prefixes.Init(prefixBundleSize)
154 1 : kw.suffixes.Init()
155 1 : return kw
156 1 : },
157 1 : NewKeySeeker: func() KeySeeker {
158 1 : ks := defaultKeySeekerPool.Get().(*defaultKeySeeker)
159 1 : ks.comparer = comparer
160 1 : return ks
161 1 : },
162 : }
163 : }
164 :
165 : // Assert that *defaultKeyWriter implements the KeyWriter interface.
166 : var _ KeyWriter = (*defaultKeyWriter)(nil)
167 :
168 : type defaultKeyWriter struct {
169 : comparer *base.Comparer
170 : prefixes PrefixBytesBuilder
171 : suffixes RawBytesBuilder
172 : }
173 :
174 1 : func (w *defaultKeyWriter) ComparePrev(key []byte) KeyComparison {
175 1 : var cmpv KeyComparison
176 1 : cmpv.PrefixLen = int32(w.comparer.Split(key))
177 1 : if w.prefixes.nKeys == 0 {
178 1 : // The first key has no previous key to compare to.
179 1 : cmpv.UserKeyComparison = 1
180 1 : return cmpv
181 1 : }
182 1 : lp := w.prefixes.UnsafeGet(w.prefixes.nKeys - 1)
183 1 : cmpv.CommonPrefixLen = int32(crbytes.CommonPrefix(lp, key[:cmpv.PrefixLen]))
184 1 :
185 1 : if invariants.Enabled && bytes.Compare(lp, key[:cmpv.PrefixLen]) > 0 {
186 0 : panic(errors.AssertionFailedf("keys are not in order: %q > %q", lp, key[:cmpv.PrefixLen]))
187 : }
188 : // Keys are written in order and prefixes must be sorted lexicograpgically,
189 : // so CommonPrefixLen == PrefixLen implies that the keys share the same
190 : // logical prefix. (If the previous key had a prefix longer than
191 : // CommonPrefixLen, it would sort after [key].)
192 1 : if cmpv.CommonPrefixLen == cmpv.PrefixLen {
193 1 : // The keys share the same MVCC prefix. Compare the suffixes.
194 1 : cmpv.UserKeyComparison = int32(w.comparer.CompareSuffixes(key[cmpv.PrefixLen:],
195 1 : w.suffixes.UnsafeGet(w.suffixes.rows-1)))
196 1 : if invariants.Enabled {
197 1 : if !w.comparer.Equal(lp, key[:cmpv.PrefixLen]) {
198 0 : panic(errors.AssertionFailedf("keys have different logical prefixes: %q != %q", lp, key[:cmpv.PrefixLen]))
199 : }
200 : }
201 1 : return cmpv
202 : }
203 :
204 : // The keys have different MVCC prefixes. We haven't determined which is
205 : // greater, but we know the index at which they diverge. The base.Comparer
206 : // contract dictates that prefixes must be lexicographically ordered.
207 1 : if len(lp) == int(cmpv.CommonPrefixLen) {
208 1 : // cmpv.PrefixLen > cmpv.PrefixLenShared; key is greater.
209 1 : cmpv.UserKeyComparison = +1
210 1 : } else {
211 1 : // Both keys have at least 1 additional byte at which they diverge.
212 1 : // Compare the diverging byte.
213 1 : cmpv.UserKeyComparison = int32(cmp.Compare(key[cmpv.CommonPrefixLen], lp[cmpv.CommonPrefixLen]))
214 1 : }
215 1 : if invariants.Enabled {
216 1 : // In this case we've determined that the keys have different prefixes,
217 1 : // so the UserKeyComparison should be equal to the result of comparing
218 1 : // the prefixes and nonzero.
219 1 : if cmpv.UserKeyComparison == 0 {
220 0 : panic(errors.AssertionFailedf("user keys should not be equal: %q+%q, %q",
221 0 : lp, w.suffixes.UnsafeGet(w.suffixes.rows-1), key))
222 : }
223 1 : if v := w.comparer.Compare(key, lp); v != int(cmpv.UserKeyComparison) {
224 0 : panic(errors.AssertionFailedf("user key comparison mismatch: Compare(%q, %q) = %d ≠ %d",
225 0 : key, lp, v, cmpv.UserKeyComparison))
226 : }
227 : }
228 1 : return cmpv
229 : }
230 :
231 : func (w *defaultKeyWriter) WriteKey(
232 : row int, key []byte, keyPrefixLen, keyPrefixLenSharedWithPrev int32,
233 1 : ) {
234 1 : w.prefixes.Put(key[:keyPrefixLen], int(keyPrefixLenSharedWithPrev))
235 1 : w.suffixes.Put(key[keyPrefixLen:])
236 1 : }
237 :
238 1 : func (w *defaultKeyWriter) MaterializeKey(dst []byte, row int) []byte {
239 1 : dst = append(dst, w.prefixes.UnsafeGet(row)...)
240 1 : dst = append(dst, w.suffixes.UnsafeGet(row)...)
241 1 : return dst
242 1 : }
243 :
244 1 : func (w *defaultKeyWriter) NumColumns() int {
245 1 : return 2
246 1 : }
247 :
248 1 : func (w *defaultKeyWriter) DataType(col int) DataType {
249 1 : return defaultSchemaColumnTypes[col]
250 1 : }
251 :
252 1 : func (w *defaultKeyWriter) Reset() {
253 1 : w.prefixes.Reset()
254 1 : w.suffixes.Reset()
255 1 : }
256 :
257 1 : func (w *defaultKeyWriter) WriteDebug(dst io.Writer, rows int) {
258 1 : fmt.Fprint(dst, "0: prefixes: ")
259 1 : w.prefixes.WriteDebug(dst, rows)
260 1 : fmt.Fprintln(dst)
261 1 : fmt.Fprint(dst, "1: suffixes: ")
262 1 : w.suffixes.WriteDebug(dst, rows)
263 1 : fmt.Fprintln(dst)
264 1 : }
265 :
266 1 : func (w *defaultKeyWriter) Size(rows int, offset uint32) uint32 {
267 1 : offset = w.prefixes.Size(rows, offset)
268 1 : offset = w.suffixes.Size(rows, offset)
269 1 : return offset
270 1 : }
271 :
272 1 : func (w *defaultKeyWriter) Finish(col, rows int, offset uint32, buf []byte) (nextOffset uint32) {
273 1 : switch col {
274 1 : case defaultKeySchemaColumnPrefix:
275 1 : return w.prefixes.Finish(0, rows, offset, buf)
276 1 : case defaultKeySchemaColumnSuffix:
277 1 : return w.suffixes.Finish(0, rows, offset, buf)
278 0 : default:
279 0 : panic(fmt.Sprintf("unknown default key column: %d", col))
280 : }
281 : }
282 :
283 : // Assert that *defaultKeySeeker implements KeySeeker.
284 : var _ KeySeeker = (*defaultKeySeeker)(nil)
285 :
286 : type defaultKeySeeker struct {
287 : comparer *base.Comparer
288 : reader *DataBlockReader
289 : prefixes PrefixBytes
290 : suffixes RawBytes
291 : sharedPrefix []byte
292 : }
293 :
294 1 : func (ks *defaultKeySeeker) Init(r *DataBlockReader) error {
295 1 : ks.reader = r
296 1 : ks.prefixes = r.r.PrefixBytes(defaultKeySchemaColumnPrefix)
297 1 : ks.suffixes = r.r.RawBytes(defaultKeySchemaColumnSuffix)
298 1 : ks.sharedPrefix = ks.prefixes.SharedPrefix()
299 1 : return nil
300 1 : }
301 :
302 : // CompareFirstUserKey compares the provided key to the first user key
303 : // contained within the data block. It's equivalent to performing
304 : //
305 : // Compare(firstUserKey, k)
306 0 : func (ks *defaultKeySeeker) CompareFirstUserKey(k []byte) int {
307 0 : si := ks.comparer.Split(k)
308 0 : if v := ks.comparer.Compare(ks.prefixes.UnsafeFirstSlice(), k[:si]); v != 0 {
309 0 : return v
310 0 : }
311 0 : return ks.comparer.Compare(ks.suffixes.At(0), k[si:])
312 : }
313 :
314 1 : func (ks *defaultKeySeeker) SeekGE(key []byte, currRow int, dir int8) (row int) {
315 1 : si := ks.comparer.Split(key)
316 1 : row, eq := ks.prefixes.Search(key[:si])
317 1 : if eq {
318 1 : return ks.seekGEOnSuffix(row, key[si:])
319 1 : }
320 1 : return row
321 : }
322 :
323 : // seekGEOnSuffix is a helper function for SeekGE when a seek key's prefix
324 : // exactly matches a row. seekGEOnSuffix finds the first row at index or later
325 : // with the same prefix as index and a suffix greater than or equal to [suffix],
326 : // or if no such row exists, the next row with a different prefix.
327 1 : func (ks *defaultKeySeeker) seekGEOnSuffix(index int, suffix []byte) (row int) {
328 1 : // The search key's prefix exactly matches the prefix of the row at index.
329 1 : // If the row at index has a suffix >= [suffix], then return the row.
330 1 : if ks.comparer.CompareSuffixes(ks.suffixes.At(index), suffix) >= 0 {
331 1 : return index
332 1 : }
333 : // Otherwise, the row at [index] sorts before the search key and we need to
334 : // search forward. Binary search between [index+1, prefixChanged.SeekSetBitGE(index+1)].
335 : //
336 : // Define f(l-1) == false and f(u) == true.
337 : // Invariant: f(l-1) == false, f(u) == true.
338 1 : l := index + 1
339 1 : u := ks.reader.prefixChanged.SeekSetBitGE(index + 1)
340 1 : for l < u {
341 1 : h := int(uint(l+u) >> 1) // avoid overflow when computing h
342 1 : // l ≤ h < u
343 1 : if ks.comparer.CompareSuffixes(ks.suffixes.At(h), suffix) >= 0 {
344 1 : u = h // preserves f(u) == true
345 1 : } else {
346 1 : l = h + 1 // preserves f(l-1) == false
347 1 : }
348 : }
349 1 : return l
350 : }
351 :
352 : // MaterializeUserKey is part of the colblk.KeySeeker interface.
353 1 : func (ks *defaultKeySeeker) MaterializeUserKey(keyIter *PrefixBytesIter, prevRow, row int) []byte {
354 1 : if row == prevRow+1 && prevRow >= 0 {
355 1 : ks.prefixes.SetNext(keyIter)
356 1 : } else {
357 1 : ks.prefixes.SetAt(keyIter, row)
358 1 : }
359 1 : suffix := ks.suffixes.At(row)
360 1 : res := keyIter.buf[:len(keyIter.buf)+len(suffix)]
361 1 : memmove(
362 1 : unsafe.Pointer(uintptr(unsafe.Pointer(unsafe.SliceData(keyIter.buf)))+uintptr(len(keyIter.buf))),
363 1 : unsafe.Pointer(unsafe.SliceData(suffix)),
364 1 : uintptr(len(suffix)),
365 1 : )
366 1 : return res
367 : }
368 :
369 1 : func (ks *defaultKeySeeker) Release() {
370 1 : *ks = defaultKeySeeker{}
371 1 : defaultKeySeekerPool.Put(ks)
372 1 : }
373 :
374 : // DataBlockWriter writes columnar data blocks, encoding keys using a
375 : // user-defined schema.
376 : type DataBlockWriter struct {
377 : Schema KeySchema
378 : KeyWriter KeyWriter
379 : // trailers is the column writer for InternalKey uint64 trailers.
380 : trailers UintBuilder
381 : // prefixSame is the column writer for the prefix-changed bitmap that
382 : // indicates when a new key prefix begins. During block building, the bitmap
383 : // represents when the prefix stays the same, which is expected to be a
384 : // rarer case. Before Finish-ing the column, we invert the bitmap.
385 : prefixSame BitmapBuilder
386 : // values is the column writer for values. Iff the isValueExternal bitmap
387 : // indicates a value is external, the value is prefixed with a ValuePrefix
388 : // byte.
389 : values RawBytesBuilder
390 : // isValueExternal is the column writer for the is-value-external bitmap
391 : // that indicates when a value is stored out-of-band in a value block.
392 : isValueExternal BitmapBuilder
393 : // isObsolete is the column writer for the is-obsolete bitmap that indicates
394 : // when a key is known to be obsolete/non-live (i.e., shadowed by another
395 : // identical point key or range deletion with a higher sequence number).
396 : isObsolete BitmapBuilder
397 :
398 : enc blockEncoder
399 : rows int
400 : maximumKeyLength int
401 : valuePrefixTmp [1]byte
402 : lastUserKeyTmp []byte
403 : }
404 :
405 : const (
406 : dataBlockColumnTrailer = iota
407 : dataBlockColumnPrefixChanged
408 : dataBlockColumnValue
409 : dataBlockColumnIsValueExternal
410 : dataBlockColumnIsObsolete
411 : dataBlockColumnMax
412 : )
413 :
414 : // The data block header is a 4-byte uint32 encoding the maximum length of a key
415 : // contained within the block. This is used by iterators to avoid the need to
416 : // grow key buffers while iterating over the block, ensuring that the key buffer
417 : // is always sufficiently large.
418 : const dataBlockCustomHeaderSize = 4
419 :
420 : // Init initializes the data block writer.
421 1 : func (w *DataBlockWriter) Init(schema KeySchema) {
422 1 : w.Schema = schema
423 1 : w.KeyWriter = schema.NewKeyWriter()
424 1 : w.trailers.Init()
425 1 : w.prefixSame.Reset()
426 1 : w.values.Init()
427 1 : w.isValueExternal.Reset()
428 1 : w.isObsolete.Reset()
429 1 : w.rows = 0
430 1 : w.maximumKeyLength = 0
431 1 : w.lastUserKeyTmp = w.lastUserKeyTmp[:0]
432 1 : w.enc.reset()
433 1 : }
434 :
435 : // Reset resets the data block writer to its initial state, retaining buffers.
436 1 : func (w *DataBlockWriter) Reset() {
437 1 : w.KeyWriter.Reset()
438 1 : w.trailers.Reset()
439 1 : w.prefixSame.Reset()
440 1 : w.values.Reset()
441 1 : w.isValueExternal.Reset()
442 1 : w.isObsolete.Reset()
443 1 : w.rows = 0
444 1 : w.maximumKeyLength = 0
445 1 : w.lastUserKeyTmp = w.lastUserKeyTmp[:0]
446 1 : w.enc.reset()
447 1 : }
448 :
449 : // String outputs a human-readable summary of internal DataBlockWriter state.
450 1 : func (w *DataBlockWriter) String() string {
451 1 : var buf bytes.Buffer
452 1 : size := uint32(w.Size())
453 1 : fmt.Fprintf(&buf, "size=%d:\n", size)
454 1 : w.KeyWriter.WriteDebug(&buf, w.rows)
455 1 :
456 1 : fmt.Fprintf(&buf, "%d: trailers: ", len(w.Schema.ColumnTypes)+dataBlockColumnTrailer)
457 1 : w.trailers.WriteDebug(&buf, w.rows)
458 1 : fmt.Fprintln(&buf)
459 1 :
460 1 : fmt.Fprintf(&buf, "%d: prefix changed: ", len(w.Schema.ColumnTypes)+dataBlockColumnPrefixChanged)
461 1 : w.prefixSame.WriteDebug(&buf, w.rows)
462 1 : fmt.Fprintln(&buf)
463 1 :
464 1 : fmt.Fprintf(&buf, "%d: values: ", len(w.Schema.ColumnTypes)+dataBlockColumnValue)
465 1 : w.values.WriteDebug(&buf, w.rows)
466 1 : fmt.Fprintln(&buf)
467 1 :
468 1 : fmt.Fprintf(&buf, "%d: is-value-ext: ", len(w.Schema.ColumnTypes)+dataBlockColumnIsValueExternal)
469 1 : w.isValueExternal.WriteDebug(&buf, w.rows)
470 1 : fmt.Fprintln(&buf)
471 1 :
472 1 : fmt.Fprintf(&buf, "%d: is-obsolete: ", len(w.Schema.ColumnTypes)+dataBlockColumnIsObsolete)
473 1 : w.isObsolete.WriteDebug(&buf, w.rows)
474 1 : fmt.Fprintln(&buf)
475 1 :
476 1 : return buf.String()
477 1 : }
478 :
479 : // Add adds the provided key to the data block. Keys must be added in order. The
480 : // caller must supply a KeyComparison containing the comparison of the key to
481 : // the previously-added key, obtainable through
482 : //
483 : // KeyWriter.ComparePrev(ikey.UserKey)
484 : //
485 : // The caller is required to pass this in because in expected use cases, the
486 : // caller will also require the same information.
487 : func (w *DataBlockWriter) Add(
488 : ikey base.InternalKey,
489 : value []byte,
490 : valuePrefix block.ValuePrefix,
491 : kcmp KeyComparison,
492 : isObsolete bool,
493 1 : ) {
494 1 : w.KeyWriter.WriteKey(w.rows, ikey.UserKey, kcmp.PrefixLen, kcmp.CommonPrefixLen)
495 1 : if kcmp.PrefixEqual() {
496 1 : w.prefixSame.Set(w.rows)
497 1 : }
498 1 : if isObsolete {
499 1 : w.isObsolete.Set(w.rows)
500 1 : }
501 1 : w.trailers.Set(w.rows, uint64(ikey.Trailer))
502 1 : if valuePrefix.IsValueHandle() {
503 1 : w.isValueExternal.Set(w.rows)
504 1 : // Write the value with the value prefix byte preceding the value.
505 1 : w.valuePrefixTmp[0] = byte(valuePrefix)
506 1 : w.values.PutConcat(w.valuePrefixTmp[:], value)
507 1 : } else {
508 1 : // Elide the value prefix. Readers will examine the isValueExternal
509 1 : // bitmap and know there is no value prefix byte if !isValueExternal.
510 1 : w.values.Put(value)
511 1 : }
512 1 : if len(ikey.UserKey) > int(w.maximumKeyLength) {
513 1 : w.maximumKeyLength = len(ikey.UserKey)
514 1 : }
515 1 : w.rows++
516 : }
517 :
518 : // Rows returns the number of rows in the current pending data block.
519 1 : func (w *DataBlockWriter) Rows() int {
520 1 : return w.rows
521 1 : }
522 :
523 : // Size returns the size of the current pending data block.
524 1 : func (w *DataBlockWriter) Size() int {
525 1 : off := blockHeaderSize(len(w.Schema.ColumnTypes)+dataBlockColumnMax, dataBlockCustomHeaderSize)
526 1 : off = w.KeyWriter.Size(w.rows, off)
527 1 : off = w.trailers.Size(w.rows, off)
528 1 : off = w.prefixSame.InvertedSize(w.rows, off)
529 1 : off = w.values.Size(w.rows, off)
530 1 : off = w.isValueExternal.Size(w.rows, off)
531 1 : off = w.isObsolete.Size(w.rows, off)
532 1 : off++ // trailer padding byte
533 1 : return int(off)
534 1 : }
535 :
536 : // Finish serializes the pending data block, including the first [rows] rows.
537 : // The value of [rows] must be Rows() or Rows()-1. The provided size must be the
538 : // size of the data block with the provided row count (i.e., the return value of
539 : // [Size] when DataBlockWriter.Rows() = [rows]).
540 : //
541 : // Finish the returns the serialized, uncompressed data block and the
542 : // InternalKey of the last key contained within the data block. The memory of
543 : // the lastKey's UserKey is owned by the DataBlockWriter. The caller must
544 : // copy it if they require it to outlive a Reset of the writer.
545 1 : func (w *DataBlockWriter) Finish(rows, size int) (finished []byte, lastKey base.InternalKey) {
546 1 : if invariants.Enabled && rows != w.rows && rows != w.rows-1 {
547 0 : panic(errors.AssertionFailedf("data block has %d rows; asked to finish %d", w.rows, rows))
548 : }
549 :
550 1 : cols := len(w.Schema.ColumnTypes) + dataBlockColumnMax
551 1 : h := Header{
552 1 : Version: Version1,
553 1 : Columns: uint16(cols),
554 1 : Rows: uint32(rows),
555 1 : }
556 1 :
557 1 : // Invert the prefix-same bitmap before writing it out, because we want it
558 1 : // to represent when the prefix changes.
559 1 : w.prefixSame.Invert(rows)
560 1 :
561 1 : w.enc.init(size, h, dataBlockCustomHeaderSize)
562 1 :
563 1 : // Write the max key length in the custom header.
564 1 : binary.LittleEndian.PutUint32(w.enc.data()[:dataBlockCustomHeaderSize], uint32(w.maximumKeyLength))
565 1 :
566 1 : w.enc.encode(rows, w.KeyWriter)
567 1 : w.enc.encode(rows, &w.trailers)
568 1 : w.enc.encode(rows, &w.prefixSame)
569 1 : w.enc.encode(rows, &w.values)
570 1 : w.enc.encode(rows, &w.isValueExternal)
571 1 : w.enc.encode(rows, &w.isObsolete)
572 1 : finished = w.enc.finish()
573 1 :
574 1 : w.lastUserKeyTmp = w.lastUserKeyTmp[:0]
575 1 : w.lastUserKeyTmp = w.KeyWriter.MaterializeKey(w.lastUserKeyTmp[:0], rows-1)
576 1 : lastKey = base.InternalKey{
577 1 : UserKey: w.lastUserKeyTmp,
578 1 : Trailer: base.InternalKeyTrailer(w.trailers.Get(rows - 1)),
579 1 : }
580 1 : return finished, lastKey
581 : }
582 :
583 : // DataBlockReaderSize is the size of a DataBlockReader struct. If allocating
584 : // memory for a data block, the caller may want to additionally allocate memory
585 : // for the corresponding DataBlockReader.
586 : const DataBlockReaderSize = unsafe.Sizeof(DataBlockReader{})
587 :
588 : // A DataBlockReader holds state for reading a columnar data block. It may be
589 : // shared among multiple DataBlockIters.
590 : type DataBlockReader struct {
591 : r BlockReader
592 : // trailers holds an array of the InternalKey trailers, encoding the key
593 : // kind and sequence number of each key.
594 : trailers UnsafeUints
595 : // prefixChanged is a bitmap indicating when the prefix (as defined by
596 : // Split) of a key changes, relative to the preceding key. This is used to
597 : // bound seeks within a prefix, and to optimize NextPrefix.
598 : prefixChanged Bitmap
599 : // values is the column reader for values. If the isValueExternal bitmap
600 : // indicates a value is external, the value is prefixed with a ValuePrefix
601 : // byte.
602 : values RawBytes
603 : // isValueExternal is the column reader for the is-value-external bitmap
604 : // that indicates whether a value is stored out-of-band in a value block. If
605 : // true, the value contains a ValuePrefix byte followed by an encoded value
606 : // handle indicating the value's location within the value block(s).
607 : isValueExternal Bitmap
608 : // isObsolete is the column reader for the is-obsolete bitmap
609 : // that indicates whether a key is obsolete/non-live.
610 : isObsolete Bitmap
611 : // maximumKeyLength is the maximum length of a user key in the block.
612 : // Iterators may use it to allocate a sufficiently large buffer up front,
613 : // and elide size checks during iteration.
614 : maximumKeyLength uint32
615 : }
616 :
617 : // BlockReader returns a pointer to the underlying BlockReader.
618 1 : func (r *DataBlockReader) BlockReader() *BlockReader {
619 1 : return &r.r
620 1 : }
621 :
622 : // Init initializes the data block reader with the given serialized data block.
623 1 : func (r *DataBlockReader) Init(schema KeySchema, data []byte) {
624 1 : r.r.Init(data, dataBlockCustomHeaderSize)
625 1 : r.trailers = r.r.Uints(len(schema.ColumnTypes) + dataBlockColumnTrailer)
626 1 : r.prefixChanged = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnPrefixChanged)
627 1 : r.values = r.r.RawBytes(len(schema.ColumnTypes) + dataBlockColumnValue)
628 1 : r.isValueExternal = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsValueExternal)
629 1 : r.isObsolete = r.r.Bitmap(len(schema.ColumnTypes) + dataBlockColumnIsObsolete)
630 1 : r.maximumKeyLength = binary.LittleEndian.Uint32(data[:dataBlockCustomHeaderSize])
631 1 : }
632 :
633 : // Describe descirbes the binary format of the data block, assuming f.Offset()
634 : // is positioned at the beginning of the same data block described by r.
635 1 : func (r *DataBlockReader) Describe(f *binfmt.Formatter) {
636 1 : // Set the relative offset. When loaded into memory, the beginning of blocks
637 1 : // are aligned. Padding that ensures alignment is done relative to the
638 1 : // current offset. Setting the relative offset ensures that if we're
639 1 : // describing this block within a larger structure (eg, f.Offset()>0), we
640 1 : // compute padding appropriately assuming the current byte f.Offset() is
641 1 : // aligned.
642 1 : f.SetAnchorOffset()
643 1 :
644 1 : f.CommentLine("data block header")
645 1 : f.HexBytesln(4, "maximum key length: %d", r.maximumKeyLength)
646 1 : r.r.headerToBinFormatter(f)
647 1 : for i := 0; i < int(r.r.header.Columns); i++ {
648 1 : r.r.columnToBinFormatter(f, i, int(r.r.header.Rows))
649 1 : }
650 1 : f.HexBytesln(1, "block padding byte")
651 : }
652 :
653 : // MultiDataBlockIter wraps a DataBlockIter, implementing the block package's
654 : // DataBlockIterator. MultiDataBlockIter retains configuration (the KeySchema
655 : // and lazy value retriever) across initialization for additional blocks. It
656 : // also is responsible for maintaining handles to block buffer handles.
657 : //
658 : // In constrast, the DataBlockIter type it wraps operates solely within the
659 : // context of iterating over an individual block. While a DataBlockIter may be
660 : // reused, it does not retain any configuration beyond a reset.
661 : type MultiDataBlockIter struct {
662 : DataBlockIter
663 : h block.BufferHandle
664 : r DataBlockReader
665 :
666 : // KeySchema configures the DataBlockIterConfig to use the provided
667 : // KeySchema when initializing the DataBlockIter for iteration over a new
668 : // block.
669 : KeySchema KeySchema
670 : // GetLazyValuer configures the DataBlockIterConfig to initialize the
671 : // DataBlockIter to use the provided handler for retrieving lazy values.
672 : GetLazyValuer block.GetLazyValueForPrefixAndValueHandler
673 : }
674 :
675 : // Assert that *DataBlockIterConfig implements block.DataBlockIterator.
676 : var _ block.DataBlockIterator = (*MultiDataBlockIter)(nil)
677 :
678 : // InitHandle initializes the block from the provided buffer handle.
679 : func (c *MultiDataBlockIter) InitHandle(
680 : cmp base.Compare, split base.Split, h block.BufferHandle, t block.IterTransforms,
681 0 : ) error {
682 0 : c.h.Release()
683 0 : c.h = h
684 0 : c.r.Init(c.KeySchema, h.Get())
685 0 : return c.DataBlockIter.Init(&c.r, c.KeySchema.NewKeySeeker(), c.GetLazyValuer)
686 0 : }
687 :
688 : // Handle returns the handle to the block.
689 0 : func (c *MultiDataBlockIter) Handle() block.BufferHandle {
690 0 : return c.h
691 0 : }
692 :
693 : // Invalidate invalidates the block iterator, removing references to the block
694 : // it was initialized with.
695 0 : func (c *MultiDataBlockIter) Invalidate() {
696 0 : c.DataBlockIter = DataBlockIter{}
697 0 : }
698 :
699 : // IsDataInvalidated returns true when the iterator has been invalidated
700 : // using an Invalidate call.
701 0 : func (c *MultiDataBlockIter) IsDataInvalidated() bool {
702 0 : return c.DataBlockIter.r == nil
703 0 : }
704 :
705 : // ResetForReuse resets the iterator for reuse, retaining buffers and
706 : // configuration, to avoid future allocations.
707 0 : func (i *MultiDataBlockIter) ResetForReuse() MultiDataBlockIter {
708 0 : return MultiDataBlockIter{
709 0 : DataBlockIter: DataBlockIter{
710 0 : keyIter: PrefixBytesIter{buf: i.DataBlockIter.keyIter.buf},
711 0 : },
712 0 : KeySchema: i.KeySchema,
713 0 : GetLazyValuer: i.GetLazyValuer,
714 0 : }
715 0 : }
716 :
717 : // Close implements the base.InternalIterator interface.
718 0 : func (i *MultiDataBlockIter) Close() error {
719 0 : if i.keySeeker != nil {
720 0 : i.keySeeker.Release()
721 0 : i.keySeeker = nil
722 0 : }
723 0 : i.DataBlockIter = DataBlockIter{}
724 0 : i.h.Release()
725 0 : i.h = block.BufferHandle{}
726 0 : return nil
727 : }
728 :
729 : // DataBlockIter iterates over a columnar data block.
730 : type DataBlockIter struct {
731 : // configuration
732 : r *DataBlockReader
733 : maxRow int
734 : keySeeker KeySeeker
735 : getLazyValuer block.GetLazyValueForPrefixAndValueHandler
736 :
737 : // state
738 : keyIter PrefixBytesIter
739 : row int
740 : kv base.InternalKV
741 : kvRow int // the row currently held in kv
742 : }
743 :
744 : // Init initializes the data block iterator, configuring it to read from the
745 : // provided reader.
746 : func (i *DataBlockIter) Init(
747 : r *DataBlockReader,
748 : keyIterator KeySeeker,
749 : getLazyValuer block.GetLazyValueForPrefixAndValueHandler,
750 1 : ) error {
751 1 : *i = DataBlockIter{
752 1 : r: r,
753 1 : maxRow: int(r.r.header.Rows) - 1,
754 1 : keySeeker: keyIterator,
755 1 : getLazyValuer: getLazyValuer,
756 1 : row: -1,
757 1 : kvRow: math.MinInt,
758 1 : kv: base.InternalKV{},
759 1 : keyIter: PrefixBytesIter{},
760 1 : }
761 1 : // Allocate a keyIter buffer that's large enough to hold the largest user
762 1 : // key in the block with 1 byte to spare (so that pointer arithmetic is
763 1 : // never pointing beyond the allocation, which would violate Go rules).
764 1 : n := int(r.maximumKeyLength) + 1
765 1 : if cap(i.keyIter.buf) < n {
766 1 : ptr := mallocgc(uintptr(n), nil, false)
767 1 : i.keyIter.buf = unsafe.Slice((*byte)(ptr), n)[:0]
768 1 : } else {
769 0 : i.keyIter.buf = i.keyIter.buf[:0]
770 0 : }
771 1 : return i.keySeeker.Init(r)
772 : }
773 :
774 : // CompareFirstUserKey compares the provided key to the first user key
775 : // contained within the data block. It's equivalent to performing
776 : // Compare(firstUserKey, k).
777 0 : func (i *DataBlockIter) CompareFirstUserKey(k []byte) int {
778 0 : return i.keySeeker.CompareFirstUserKey(k)
779 0 : }
780 :
781 : // SeekGE implements the base.InternalIterator interface.
782 1 : func (i *DataBlockIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
783 1 : searchDir := int8(0)
784 1 : if flags.TrySeekUsingNext() {
785 0 : searchDir = +1
786 0 : }
787 1 : return i.decodeRow(i.keySeeker.SeekGE(key, i.row, searchDir))
788 : }
789 :
790 : // SeekPrefixGE implements the base.InternalIterator interface.
791 0 : func (i *DataBlockIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
792 0 : // This should never be called as prefix iteration is handled by
793 0 : // sstable.Iterator.
794 0 :
795 0 : // TODO(jackson): We can implement this and avoid propagating keys without
796 0 : // the prefix up to the merging iterator. It will avoid unnecessary key
797 0 : // comparisons fixing up the merging iterator heap. We can also short
798 0 : // circuit the search if the prefix isn't found within the prefix column.
799 0 : // There's some subtlety around ensuring we continue to benefit from the
800 0 : // TrySeekUsingNext optimization.
801 0 : panic("pebble: SeekPrefixGE unimplemented")
802 : }
803 :
804 : // SeekLT implements the base.InternalIterator interface.
805 1 : func (i *DataBlockIter) SeekLT(key []byte, _ base.SeekLTFlags) *base.InternalKV {
806 1 : row := i.keySeeker.SeekGE(key, i.row, 0 /* searchDir */)
807 1 : return i.decodeRow(row - 1)
808 1 : }
809 :
810 : // First implements the base.InternalIterator interface.
811 1 : func (i *DataBlockIter) First() *base.InternalKV {
812 1 : return i.decodeRow(0)
813 1 : }
814 :
815 : // Last implements the base.InternalIterator interface.
816 1 : func (i *DataBlockIter) Last() *base.InternalKV {
817 1 : return i.decodeRow(i.r.BlockReader().Rows() - 1)
818 1 : }
819 :
820 : // Next advances to the next KV pair in the block.
821 1 : func (i *DataBlockIter) Next() *base.InternalKV {
822 1 : // Inline decodeRow, but avoiding unnecessary checks against i.row.
823 1 : if i.row >= i.maxRow {
824 1 : i.row = i.maxRow + 1
825 1 : return nil
826 1 : }
827 1 : i.row++
828 1 : i.kv.K = base.InternalKey{
829 1 : UserKey: i.keySeeker.MaterializeUserKey(&i.keyIter, i.kvRow, i.row),
830 1 : Trailer: base.InternalKeyTrailer(i.r.trailers.At(i.row)),
831 1 : }
832 1 : // Inline i.r.values.At(row).
833 1 : startOffset, endOffset := i.r.values.offsets.At2(i.row)
834 1 : v := unsafe.Slice((*byte)(i.r.values.ptr(startOffset)), endOffset-startOffset)
835 1 : if i.r.isValueExternal.At(i.row) {
836 1 : i.kv.V = i.getLazyValuer.GetLazyValueForPrefixAndValueHandle(v)
837 1 : } else {
838 1 : i.kv.V = base.MakeInPlaceValue(v)
839 1 : }
840 1 : i.kvRow = i.row
841 1 : return &i.kv
842 : }
843 :
844 : // NextPrefix moves the iterator to the next row with a different prefix than
845 : // the key at the current iterator position.
846 : //
847 : // The columnar block implementation uses a newPrefix bitmap to identify the
848 : // next row with a differing prefix from the current row's key. If newPrefix[i]
849 : // is set then row's i key prefix is different that row i-1. The bitmap is
850 : // organized as a slice of 64-bit words. If a 64-bit word in the bitmap is zero
851 : // then all of the rows corresponding to the bits in that word have the same
852 : // prefix and we can skip ahead. If a row is non-zero a small bit of bit
853 : // shifting and masking combined with bits.TrailingZeros64 can identify the the
854 : // next bit that is set after the current row. The bitmap uses 1 bit/row (0.125
855 : // bytes/row). A 32KB block containing 1300 rows (25 bytes/row) would need a
856 : // bitmap of 21 64-bit words. Even in the worst case where every word is 0 this
857 : // bitmap can be scanned in ~20 ns (1 ns/word) leading to a total NextPrefix
858 : // time of ~30 ns if a row is found and decodeRow are called. In more normal
859 : // cases, NextPrefix takes ~15% longer that a single Next call.
860 : //
861 : // For comparision, the rowblk nextPrefixV3 optimizations work by setting a bit
862 : // in the value prefix byte that indicates that the current key has the same
863 : // prefix as the previous key. Additionally, a bit is stolen from the restart
864 : // table entries indicating whether a restart table entry has the same key
865 : // prefix as the previous entry. Checking the value prefix byte bit requires
866 : // locating that byte which requires decoding 3 varints per key/value pair.
867 1 : func (i *DataBlockIter) NextPrefix(_ []byte) *base.InternalKV {
868 1 : return i.decodeRow(i.r.prefixChanged.SeekSetBitGE(i.row + 1))
869 1 : }
870 :
871 : // Prev moves the iterator to the previous KV pair in the block.
872 1 : func (i *DataBlockIter) Prev() *base.InternalKV {
873 1 : return i.decodeRow(i.row - 1)
874 1 : }
875 :
876 : // Error implements the base.InternalIterator interface. A DataBlockIter is
877 : // infallible and always returns a nil error.
878 1 : func (i *DataBlockIter) Error() error {
879 1 : return nil // infallible
880 1 : }
881 :
882 : // SetBounds implements the base.InternalIterator interface.
883 0 : func (i *DataBlockIter) SetBounds(lower, upper []byte) {
884 0 : // This should never be called as bounds are handled by sstable.Iterator.
885 0 : panic("pebble: SetBounds unimplemented")
886 : }
887 :
888 : // SetContext implements the base.InternalIterator interface.
889 0 : func (i *DataBlockIter) SetContext(_ context.Context) {}
890 :
891 : var dataBlockTypeString string = fmt.Sprintf("%T", (*DataBlockIter)(nil))
892 :
893 : // String implements the base.InternalIterator interface.
894 0 : func (i *DataBlockIter) String() string {
895 0 : return dataBlockTypeString
896 0 : }
897 :
898 : // DebugTree is part of the InternalIterator interface.
899 0 : func (i *DataBlockIter) DebugTree(tp treeprinter.Node) {
900 0 : tp.Childf("%T(%p)", i, i)
901 0 : }
902 :
903 1 : func (i *DataBlockIter) decodeRow(row int) *base.InternalKV {
904 1 : switch {
905 1 : case row < 0:
906 1 : i.row = -1
907 1 : return nil
908 1 : case row >= i.r.BlockReader().Rows():
909 1 : i.row = i.r.BlockReader().Rows()
910 1 : return nil
911 1 : case i.kvRow == row:
912 1 : i.row = row
913 1 : // Already synthesized the kv at row.
914 1 : return &i.kv
915 1 : default:
916 1 : i.kv.K = base.InternalKey{
917 1 : UserKey: i.keySeeker.MaterializeUserKey(&i.keyIter, i.kvRow, row),
918 1 : Trailer: base.InternalKeyTrailer(i.r.trailers.At(row)),
919 1 : }
920 1 : // Inline i.r.values.At(row).
921 1 : startOffset := i.r.values.offsets.At(row)
922 1 : v := unsafe.Slice((*byte)(i.r.values.ptr(startOffset)), i.r.values.offsets.At(row+1)-startOffset)
923 1 : if i.r.isValueExternal.At(row) {
924 1 : i.kv.V = i.getLazyValuer.GetLazyValueForPrefixAndValueHandle(v)
925 1 : } else {
926 1 : i.kv.V = base.MakeInPlaceValue(v)
927 1 : }
928 1 : i.row = row
929 1 : i.kvRow = row
930 1 : return &i.kv
931 : }
932 : }
933 :
934 : // Valid returns true if the iterator is currently positioned at a valid KV.
935 0 : func (i *DataBlockIter) Valid() bool {
936 0 : return i.row < 0 || i.row > i.maxRow
937 0 : }
938 :
939 : // KV returns the key-value pair at the current iterator position. The
940 : // iterator must be positioned over a valid KV.
941 0 : func (i *DataBlockIter) KV() *base.InternalKV {
942 0 : return &i.kv
943 0 : }
944 :
945 : // Close implements the base.InternalIterator interface.
946 1 : func (i *DataBlockIter) Close() error {
947 1 : i.keySeeker.Release()
948 1 : i.keySeeker = nil
949 1 : return nil
950 1 : }
|