Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : )
21 :
22 : // TODO(peter): describe the MANIFEST file format, independently of the C++
23 : // project.
24 :
25 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
26 :
27 : type byteReader interface {
28 : io.ByteReader
29 : io.Reader
30 : }
31 :
32 : // Tags for the versionEdit disk format.
33 : // Tag 8 is no longer used.
34 : const (
35 : // LevelDB tags.
36 : tagComparator = 1
37 : tagLogNumber = 2
38 : tagNextFileNumber = 3
39 : tagLastSequence = 4
40 : tagCompactPointer = 5
41 : tagDeletedFile = 6
42 : tagNewFile = 7
43 : tagPrevLogNumber = 9
44 :
45 : // RocksDB tags.
46 : tagNewFile2 = 100
47 : tagNewFile3 = 102
48 : tagNewFile4 = 103
49 : tagColumnFamily = 200
50 : tagColumnFamilyAdd = 201
51 : tagColumnFamilyDrop = 202
52 : tagMaxColumnFamily = 203
53 :
54 : // Pebble tags.
55 : tagNewFile5 = 104 // Range keys.
56 : tagCreatedBackingTable = 105
57 : tagRemovedBackingTable = 106
58 :
59 : // The custom tags sub-format used by tagNewFile4 and above. All tags less
60 : // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
61 : // a single bytes field.
62 : customTagTerminate = 1
63 : customTagNeedsCompaction = 2
64 : customTagCreationTime = 6
65 : customTagPathID = 65
66 : customTagNonSafeIgnoreMask = 1 << 6
67 : customTagVirtual = 66
68 : customTagPrefixRewrite = 67
69 : customTagSuffixRewrite = 68
70 : )
71 :
72 : // DeletedFileEntry holds the state for a file deletion from a level. The file
73 : // itself might still be referenced by another level.
74 : type DeletedFileEntry struct {
75 : Level int
76 : FileNum base.FileNum
77 : }
78 :
79 : // NewFileEntry holds the state for a new file or one moved from a different
80 : // level.
81 : type NewFileEntry struct {
82 : Level int
83 : Meta *FileMetadata
84 : // BackingFileNum is only set during manifest replay, and only for virtual
85 : // sstables.
86 : BackingFileNum base.DiskFileNum
87 : }
88 :
89 : // VersionEdit holds the state for an edit to a Version along with other
90 : // on-disk state (log numbers, next file number, and the last sequence number).
91 : type VersionEdit struct {
92 : // ComparerName is the value of Options.Comparer.Name. This is only set in
93 : // the first VersionEdit in a manifest (either when the DB is created, or
94 : // when a new manifest is created) and is used to verify that the comparer
95 : // specified at Open matches the comparer that was previously used.
96 : ComparerName string
97 :
98 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
99 : // mutations that have not been flushed to an sstable.
100 : //
101 : // This is an optional field, and 0 represents it is not set.
102 : MinUnflushedLogNum base.DiskFileNum
103 :
104 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
105 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
106 : // 6/2011. We keep it around purely for informational purposes when
107 : // displaying MANIFEST contents.
108 : ObsoletePrevLogNum uint64
109 :
110 : // The next file number. A single counter is used to assign file numbers
111 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
112 : NextFileNum uint64
113 :
114 : // LastSeqNum is an upper bound on the sequence numbers that have been
115 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
116 : // recovery) may contain sequence numbers greater than this value.
117 : LastSeqNum uint64
118 :
119 : // A file num may be present in both deleted files and new files when it
120 : // is moved from a lower level to a higher level (when the compaction
121 : // found that there was no overlapping file at the higher level).
122 : DeletedFiles map[DeletedFileEntry]*FileMetadata
123 : NewFiles []NewFileEntry
124 : // CreatedBackingTables can be used to preserve the FileBacking associated
125 : // with a physical sstable. This is useful when virtual sstables in the
126 : // latest version are reconstructed during manifest replay, and we also need
127 : // to reconstruct the FileBacking which is required by these virtual
128 : // sstables.
129 : //
130 : // INVARIANT: The FileBacking associated with a physical sstable must only
131 : // be added as a backing file in the same version edit where the physical
132 : // sstable is first virtualized. This means that the physical sstable must
133 : // be present in DeletedFiles and that there must be at least one virtual
134 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
135 : // file must be present in CreatedBackingTables in exactly one version edit.
136 : // The physical sstable associated with the FileBacking must also not be
137 : // present in NewFiles.
138 : CreatedBackingTables []*FileBacking
139 : // RemovedBackingTables is used to remove the FileBacking associated with a
140 : // virtual sstable. Note that a backing sstable can be removed as soon as
141 : // there are no virtual sstables in the latest version which are using the
142 : // backing sstable, but the backing sstable doesn't necessarily have to be
143 : // removed atomically with the version edit which removes the last virtual
144 : // sstable associated with the backing sstable. The removal can happen in a
145 : // future version edit.
146 : //
147 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
148 : // added to CreateBackingTables in a prior version edit. The same version
149 : // edit also cannot have the same file present in both CreateBackingTables
150 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
151 : // in exactly one version edit.
152 : RemovedBackingTables []base.DiskFileNum
153 : }
154 :
155 : // Decode decodes an edit from the specified reader.
156 : //
157 : // Note that the Decode step will not set the FileBacking for virtual sstables
158 : // and the responsibility is left to the caller. However, the Decode step will
159 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
160 1 : func (v *VersionEdit) Decode(r io.Reader) error {
161 1 : br, ok := r.(byteReader)
162 1 : if !ok {
163 1 : br = bufio.NewReader(r)
164 1 : }
165 1 : d := versionEditDecoder{br}
166 1 : for {
167 1 : tag, err := binary.ReadUvarint(br)
168 1 : if err == io.EOF {
169 1 : break
170 : }
171 1 : if err != nil {
172 0 : return err
173 0 : }
174 1 : switch tag {
175 1 : case tagComparator:
176 1 : s, err := d.readBytes()
177 1 : if err != nil {
178 0 : return err
179 0 : }
180 1 : v.ComparerName = string(s)
181 :
182 1 : case tagLogNumber:
183 1 : n, err := d.readUvarint()
184 1 : if err != nil {
185 0 : return err
186 0 : }
187 1 : v.MinUnflushedLogNum = base.DiskFileNum(n)
188 :
189 1 : case tagNextFileNumber:
190 1 : n, err := d.readUvarint()
191 1 : if err != nil {
192 0 : return err
193 0 : }
194 1 : v.NextFileNum = n
195 :
196 1 : case tagLastSequence:
197 1 : n, err := d.readUvarint()
198 1 : if err != nil {
199 0 : return err
200 0 : }
201 1 : v.LastSeqNum = n
202 :
203 0 : case tagCompactPointer:
204 0 : if _, err := d.readLevel(); err != nil {
205 0 : return err
206 0 : }
207 0 : if _, err := d.readBytes(); err != nil {
208 0 : return err
209 0 : }
210 : // NB: RocksDB does not use compaction pointers anymore.
211 :
212 1 : case tagRemovedBackingTable:
213 1 : n, err := d.readUvarint()
214 1 : if err != nil {
215 0 : return err
216 0 : }
217 1 : v.RemovedBackingTables = append(
218 1 : v.RemovedBackingTables, base.DiskFileNum(n),
219 1 : )
220 1 : case tagCreatedBackingTable:
221 1 : dfn, err := d.readUvarint()
222 1 : if err != nil {
223 0 : return err
224 0 : }
225 1 : size, err := d.readUvarint()
226 1 : if err != nil {
227 0 : return err
228 0 : }
229 1 : fileBacking := &FileBacking{
230 1 : DiskFileNum: base.DiskFileNum(dfn),
231 1 : Size: size,
232 1 : }
233 1 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
234 1 : case tagDeletedFile:
235 1 : level, err := d.readLevel()
236 1 : if err != nil {
237 0 : return err
238 0 : }
239 1 : fileNum, err := d.readFileNum()
240 1 : if err != nil {
241 0 : return err
242 0 : }
243 1 : if v.DeletedFiles == nil {
244 1 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
245 1 : }
246 1 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
247 :
248 1 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
249 1 : level, err := d.readLevel()
250 1 : if err != nil {
251 0 : return err
252 0 : }
253 1 : fileNum, err := d.readFileNum()
254 1 : if err != nil {
255 0 : return err
256 0 : }
257 1 : if tag == tagNewFile3 {
258 0 : // The pathID field appears unused in RocksDB.
259 0 : _ /* pathID */, err := d.readUvarint()
260 0 : if err != nil {
261 0 : return err
262 0 : }
263 : }
264 1 : size, err := d.readUvarint()
265 1 : if err != nil {
266 0 : return err
267 0 : }
268 : // We read the smallest / largest key bounds differently depending on
269 : // whether we have point, range or both types of keys present in the
270 : // table.
271 1 : var (
272 1 : smallestPointKey, largestPointKey []byte
273 1 : smallestRangeKey, largestRangeKey []byte
274 1 : parsedPointBounds bool
275 1 : boundsMarker byte
276 1 : )
277 1 : if tag != tagNewFile5 {
278 1 : // Range keys not present in the table. Parse the point key bounds.
279 1 : smallestPointKey, err = d.readBytes()
280 1 : if err != nil {
281 0 : return err
282 0 : }
283 1 : largestPointKey, err = d.readBytes()
284 1 : if err != nil {
285 0 : return err
286 0 : }
287 1 : } else {
288 1 : // Range keys are present in the table. Determine whether we have point
289 1 : // keys to parse, in addition to the bounds.
290 1 : boundsMarker, err = d.ReadByte()
291 1 : if err != nil {
292 0 : return err
293 0 : }
294 : // Parse point key bounds, if present.
295 1 : if boundsMarker&maskContainsPointKeys > 0 {
296 1 : smallestPointKey, err = d.readBytes()
297 1 : if err != nil {
298 0 : return err
299 0 : }
300 1 : largestPointKey, err = d.readBytes()
301 1 : if err != nil {
302 0 : return err
303 0 : }
304 1 : parsedPointBounds = true
305 1 : } else {
306 1 : // The table does not have point keys.
307 1 : // Sanity check: the bounds must be range keys.
308 1 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
309 0 : return base.CorruptionErrorf(
310 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
311 0 : boundsMarker,
312 0 : )
313 0 : }
314 : }
315 : // Parse range key bounds.
316 1 : smallestRangeKey, err = d.readBytes()
317 1 : if err != nil {
318 0 : return err
319 0 : }
320 1 : largestRangeKey, err = d.readBytes()
321 1 : if err != nil {
322 0 : return err
323 0 : }
324 : }
325 1 : var smallestSeqNum uint64
326 1 : var largestSeqNum uint64
327 1 : if tag != tagNewFile {
328 1 : smallestSeqNum, err = d.readUvarint()
329 1 : if err != nil {
330 0 : return err
331 0 : }
332 1 : largestSeqNum, err = d.readUvarint()
333 1 : if err != nil {
334 0 : return err
335 0 : }
336 : }
337 1 : var markedForCompaction bool
338 1 : var creationTime uint64
339 1 : virtualState := struct {
340 1 : virtual bool
341 1 : backingFileNum uint64
342 1 : }{}
343 1 : var virtualPrefix *PrefixReplacement
344 1 : var syntheticSuffix []byte
345 1 : if tag == tagNewFile4 || tag == tagNewFile5 {
346 1 : for {
347 1 : customTag, err := d.readUvarint()
348 1 : if err != nil {
349 0 : return err
350 0 : }
351 1 : if customTag == customTagTerminate {
352 1 : break
353 : }
354 1 : switch customTag {
355 1 : case customTagNeedsCompaction:
356 1 : field, err := d.readBytes()
357 1 : if err != nil {
358 0 : return err
359 0 : }
360 1 : if len(field) != 1 {
361 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
362 0 : }
363 1 : markedForCompaction = (field[0] == 1)
364 :
365 1 : case customTagCreationTime:
366 1 : field, err := d.readBytes()
367 1 : if err != nil {
368 0 : return err
369 0 : }
370 1 : var n int
371 1 : creationTime, n = binary.Uvarint(field)
372 1 : if n != len(field) {
373 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
374 0 : }
375 :
376 0 : case customTagPathID:
377 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
378 :
379 1 : case customTagVirtual:
380 1 : virtualState.virtual = true
381 1 : if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
382 0 : return err
383 0 : }
384 :
385 0 : case customTagPrefixRewrite:
386 0 : content, err := d.readBytes()
387 0 : if err != nil {
388 0 : return err
389 0 : }
390 0 : synthetic, err := d.readBytes()
391 0 : if err != nil {
392 0 : return err
393 0 : }
394 0 : virtualPrefix = &PrefixReplacement{
395 0 : ContentPrefix: content,
396 0 : SyntheticPrefix: synthetic,
397 0 : }
398 :
399 1 : case customTagSuffixRewrite:
400 1 : if syntheticSuffix, err = d.readBytes(); err != nil {
401 0 : return err
402 0 : }
403 :
404 0 : default:
405 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
406 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
407 0 : }
408 0 : if _, err := d.readBytes(); err != nil {
409 0 : return err
410 0 : }
411 : }
412 : }
413 : }
414 1 : m := &FileMetadata{
415 1 : FileNum: fileNum,
416 1 : Size: size,
417 1 : CreationTime: int64(creationTime),
418 1 : SmallestSeqNum: smallestSeqNum,
419 1 : LargestSeqNum: largestSeqNum,
420 1 : MarkedForCompaction: markedForCompaction,
421 1 : Virtual: virtualState.virtual,
422 1 : PrefixReplacement: virtualPrefix,
423 1 : SyntheticSuffix: syntheticSuffix,
424 1 : }
425 1 : if tag != tagNewFile5 { // no range keys present
426 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
427 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
428 1 : m.HasPointKeys = true
429 1 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
430 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
431 1 : } else { // range keys present
432 1 : // Set point key bounds, if parsed.
433 1 : if parsedPointBounds {
434 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
435 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
436 1 : m.HasPointKeys = true
437 1 : }
438 : // Set range key bounds.
439 1 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
440 1 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
441 1 : m.HasRangeKeys = true
442 1 : // Set overall bounds (by default assume range keys).
443 1 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
444 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
445 1 : if boundsMarker&maskSmallest == maskSmallest {
446 1 : m.Smallest = m.SmallestPointKey
447 1 : m.boundTypeSmallest = boundTypePointKey
448 1 : }
449 1 : if boundsMarker&maskLargest == maskLargest {
450 1 : m.Largest = m.LargestPointKey
451 1 : m.boundTypeLargest = boundTypePointKey
452 1 : }
453 : }
454 1 : m.boundsSet = true
455 1 : if !virtualState.virtual {
456 1 : m.InitPhysicalBacking()
457 1 : }
458 :
459 1 : nfe := NewFileEntry{
460 1 : Level: level,
461 1 : Meta: m,
462 1 : }
463 1 : if virtualState.virtual {
464 1 : nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
465 1 : }
466 1 : v.NewFiles = append(v.NewFiles, nfe)
467 :
468 1 : case tagPrevLogNumber:
469 1 : n, err := d.readUvarint()
470 1 : if err != nil {
471 0 : return err
472 0 : }
473 1 : v.ObsoletePrevLogNum = n
474 :
475 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
476 0 : return base.CorruptionErrorf("column families are not supported")
477 :
478 0 : default:
479 0 : return errCorruptManifest
480 : }
481 : }
482 1 : return nil
483 : }
484 :
485 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
486 1 : var buf bytes.Buffer
487 1 : if v.ComparerName != "" {
488 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
489 1 : }
490 1 : if v.MinUnflushedLogNum != 0 {
491 1 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
492 1 : }
493 1 : if v.ObsoletePrevLogNum != 0 {
494 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
495 0 : }
496 1 : if v.NextFileNum != 0 {
497 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
498 1 : }
499 1 : if v.LastSeqNum != 0 {
500 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
501 1 : }
502 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
503 1 : for df := range v.DeletedFiles {
504 1 : entries = append(entries, df)
505 1 : }
506 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
507 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
508 1 : return v
509 1 : }
510 0 : return stdcmp.Compare(a.FileNum, b.FileNum)
511 : })
512 1 : for _, df := range entries {
513 1 : fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum)
514 1 : }
515 1 : for _, nf := range v.NewFiles {
516 1 : fmt.Fprintf(&buf, " added: L%d", nf.Level)
517 1 : if verbose {
518 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
519 1 : } else {
520 1 : fmt.Fprintf(&buf, " %s", nf.Meta.String())
521 1 : }
522 1 : if nf.Meta.CreationTime != 0 {
523 1 : fmt.Fprintf(&buf, " (%s)",
524 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
525 1 : }
526 1 : fmt.Fprintln(&buf)
527 : }
528 1 : return buf.String()
529 : }
530 :
531 : // DebugString is a more verbose version of String(). Use this in tests.
532 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
533 1 : return v.string(true /* verbose */, fmtKey)
534 1 : }
535 :
536 : // String implements fmt.Stringer for a VersionEdit.
537 1 : func (v *VersionEdit) String() string {
538 1 : return v.string(false /* verbose */, base.DefaultFormatter)
539 1 : }
540 :
541 : // Encode encodes an edit to the specified writer.
542 1 : func (v *VersionEdit) Encode(w io.Writer) error {
543 1 : e := versionEditEncoder{new(bytes.Buffer)}
544 1 :
545 1 : if v.ComparerName != "" {
546 1 : e.writeUvarint(tagComparator)
547 1 : e.writeString(v.ComparerName)
548 1 : }
549 1 : if v.MinUnflushedLogNum != 0 {
550 1 : e.writeUvarint(tagLogNumber)
551 1 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
552 1 : }
553 1 : if v.ObsoletePrevLogNum != 0 {
554 1 : e.writeUvarint(tagPrevLogNumber)
555 1 : e.writeUvarint(v.ObsoletePrevLogNum)
556 1 : }
557 1 : if v.NextFileNum != 0 {
558 1 : e.writeUvarint(tagNextFileNumber)
559 1 : e.writeUvarint(uint64(v.NextFileNum))
560 1 : }
561 1 : for _, dfn := range v.RemovedBackingTables {
562 1 : e.writeUvarint(tagRemovedBackingTable)
563 1 : e.writeUvarint(uint64(dfn))
564 1 : }
565 1 : for _, fileBacking := range v.CreatedBackingTables {
566 1 : e.writeUvarint(tagCreatedBackingTable)
567 1 : e.writeUvarint(uint64(fileBacking.DiskFileNum))
568 1 : e.writeUvarint(fileBacking.Size)
569 1 : }
570 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
571 : // even though its value is zero. We detect this by encoding LastSeqNum when
572 : // ComparerName is set.
573 1 : if v.LastSeqNum != 0 || v.ComparerName != "" {
574 1 : e.writeUvarint(tagLastSequence)
575 1 : e.writeUvarint(v.LastSeqNum)
576 1 : }
577 1 : for x := range v.DeletedFiles {
578 1 : e.writeUvarint(tagDeletedFile)
579 1 : e.writeUvarint(uint64(x.Level))
580 1 : e.writeUvarint(uint64(x.FileNum))
581 1 : }
582 1 : for _, x := range v.NewFiles {
583 1 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
584 1 : var tag uint64
585 1 : switch {
586 1 : case x.Meta.HasRangeKeys:
587 1 : tag = tagNewFile5
588 1 : case customFields:
589 1 : tag = tagNewFile4
590 0 : default:
591 0 : tag = tagNewFile2
592 : }
593 1 : e.writeUvarint(tag)
594 1 : e.writeUvarint(uint64(x.Level))
595 1 : e.writeUvarint(uint64(x.Meta.FileNum))
596 1 : e.writeUvarint(x.Meta.Size)
597 1 : if !x.Meta.HasRangeKeys {
598 1 : // If we have no range keys, preserve the original format and write the
599 1 : // smallest and largest point keys.
600 1 : e.writeKey(x.Meta.SmallestPointKey)
601 1 : e.writeKey(x.Meta.LargestPointKey)
602 1 : } else {
603 1 : // When range keys are present, we first write a marker byte that
604 1 : // indicates if the table also contains point keys, in addition to how the
605 1 : // overall bounds for the table should be reconstructed. This byte is
606 1 : // followed by the keys themselves.
607 1 : b, err := x.Meta.boundsMarker()
608 1 : if err != nil {
609 0 : return err
610 0 : }
611 1 : if err = e.WriteByte(b); err != nil {
612 0 : return err
613 0 : }
614 : // Write point key bounds (if present).
615 1 : if x.Meta.HasPointKeys {
616 1 : e.writeKey(x.Meta.SmallestPointKey)
617 1 : e.writeKey(x.Meta.LargestPointKey)
618 1 : }
619 : // Write range key bounds.
620 1 : e.writeKey(x.Meta.SmallestRangeKey)
621 1 : e.writeKey(x.Meta.LargestRangeKey)
622 : }
623 1 : e.writeUvarint(x.Meta.SmallestSeqNum)
624 1 : e.writeUvarint(x.Meta.LargestSeqNum)
625 1 : if customFields {
626 1 : if x.Meta.CreationTime != 0 {
627 1 : e.writeUvarint(customTagCreationTime)
628 1 : var buf [binary.MaxVarintLen64]byte
629 1 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
630 1 : e.writeBytes(buf[:n])
631 1 : }
632 1 : if x.Meta.MarkedForCompaction {
633 1 : e.writeUvarint(customTagNeedsCompaction)
634 1 : e.writeBytes([]byte{1})
635 1 : }
636 1 : if x.Meta.Virtual {
637 1 : e.writeUvarint(customTagVirtual)
638 1 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
639 1 : }
640 1 : if x.Meta.PrefixReplacement != nil {
641 1 : e.writeUvarint(customTagPrefixRewrite)
642 1 : e.writeBytes(x.Meta.PrefixReplacement.ContentPrefix)
643 1 : e.writeBytes(x.Meta.PrefixReplacement.SyntheticPrefix)
644 1 : }
645 1 : if x.Meta.SyntheticSuffix != nil {
646 1 : e.writeUvarint(customTagSuffixRewrite)
647 1 : e.writeBytes(x.Meta.SyntheticSuffix)
648 1 : }
649 1 : e.writeUvarint(customTagTerminate)
650 : }
651 : }
652 1 : _, err := w.Write(e.Bytes())
653 1 : return err
654 : }
655 :
656 : // versionEditDecoder should be used to decode version edits.
657 : type versionEditDecoder struct {
658 : byteReader
659 : }
660 :
661 1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
662 1 : n, err := d.readUvarint()
663 1 : if err != nil {
664 0 : return nil, err
665 0 : }
666 1 : s := make([]byte, n)
667 1 : _, err = io.ReadFull(d, s)
668 1 : if err != nil {
669 0 : if err == io.ErrUnexpectedEOF {
670 0 : return nil, errCorruptManifest
671 0 : }
672 0 : return nil, err
673 : }
674 1 : return s, nil
675 : }
676 :
677 1 : func (d versionEditDecoder) readLevel() (int, error) {
678 1 : u, err := d.readUvarint()
679 1 : if err != nil {
680 0 : return 0, err
681 0 : }
682 1 : if u >= NumLevels {
683 0 : return 0, errCorruptManifest
684 0 : }
685 1 : return int(u), nil
686 : }
687 :
688 1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
689 1 : u, err := d.readUvarint()
690 1 : if err != nil {
691 0 : return 0, err
692 0 : }
693 1 : return base.FileNum(u), nil
694 : }
695 :
696 1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
697 1 : u, err := binary.ReadUvarint(d)
698 1 : if err != nil {
699 0 : if err == io.EOF {
700 0 : return 0, errCorruptManifest
701 0 : }
702 0 : return 0, err
703 : }
704 1 : return u, nil
705 : }
706 :
707 : type versionEditEncoder struct {
708 : *bytes.Buffer
709 : }
710 :
711 1 : func (e versionEditEncoder) writeBytes(p []byte) {
712 1 : e.writeUvarint(uint64(len(p)))
713 1 : e.Write(p)
714 1 : }
715 :
716 1 : func (e versionEditEncoder) writeKey(k InternalKey) {
717 1 : e.writeUvarint(uint64(k.Size()))
718 1 : e.Write(k.UserKey)
719 1 : buf := k.EncodeTrailer()
720 1 : e.Write(buf[:])
721 1 : }
722 :
723 1 : func (e versionEditEncoder) writeString(s string) {
724 1 : e.writeUvarint(uint64(len(s)))
725 1 : e.WriteString(s)
726 1 : }
727 :
728 1 : func (e versionEditEncoder) writeUvarint(u uint64) {
729 1 : var buf [binary.MaxVarintLen64]byte
730 1 : n := binary.PutUvarint(buf[:], u)
731 1 : e.Write(buf[:n])
732 1 : }
733 :
734 : // BulkVersionEdit summarizes the files added and deleted from a set of version
735 : // edits.
736 : //
737 : // INVARIANTS:
738 : // No file can be added to a level more than once. This is true globally, and
739 : // also true for all of the calls to Accumulate for a single bulk version edit.
740 : //
741 : // No file can be removed from a level more than once. This is true globally,
742 : // and also true for all of the calls to Accumulate for a single bulk version
743 : // edit.
744 : //
745 : // A file must not be added and removed from a given level in the same version
746 : // edit.
747 : //
748 : // A file that is being removed from a level must have been added to that level
749 : // before (in a prior version edit). Note that a given file can be deleted from
750 : // a level and added to another level in a single version edit
751 : type BulkVersionEdit struct {
752 : Added [NumLevels]map[base.FileNum]*FileMetadata
753 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
754 :
755 : // AddedFileBacking is a map to support lookup so that we can populate the
756 : // FileBacking of virtual sstables during manifest replay.
757 : AddedFileBacking map[base.DiskFileNum]*FileBacking
758 : RemovedFileBacking []base.DiskFileNum
759 :
760 : // AddedByFileNum maps file number to file metadata for all added files
761 : // from accumulated version edits. AddedByFileNum is only populated if set
762 : // to non-nil by a caller. It must be set to non-nil when replaying
763 : // version edits read from a MANIFEST (as opposed to VersionEdits
764 : // constructed in-memory). While replaying a MANIFEST file,
765 : // VersionEdit.DeletedFiles map entries have nil values, because the
766 : // on-disk deletion record encodes only the file number. Accumulate
767 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
768 : // field with non-nil *FileMetadata.
769 : AddedByFileNum map[base.FileNum]*FileMetadata
770 :
771 : // MarkedForCompactionCountDiff holds the aggregated count of files
772 : // marked for compaction added or removed.
773 : MarkedForCompactionCountDiff int
774 : }
775 :
776 : // Accumulate adds the file addition and deletions in the specified version
777 : // edit to the bulk edit's internal state.
778 : //
779 : // INVARIANTS:
780 : // If a file is added to a given level in a call to Accumulate and then removed
781 : // from that level in a subsequent call, the file will not be present in the
782 : // resulting BulkVersionEdit.Deleted for that level.
783 : //
784 : // After accumulation of version edits, the bulk version edit may have
785 : // information about a file which has been deleted from a level, but it may
786 : // not have information about the same file added to the same level. The add
787 : // could've occurred as part of a previous bulk version edit. In this case,
788 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
789 : // of the accumulation, because we need to decrease the refcount of the
790 : // deleted file in Apply.
791 1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
792 1 : for df, m := range ve.DeletedFiles {
793 1 : dmap := b.Deleted[df.Level]
794 1 : if dmap == nil {
795 1 : dmap = make(map[base.FileNum]*FileMetadata)
796 1 : b.Deleted[df.Level] = dmap
797 1 : }
798 :
799 1 : if m == nil {
800 1 : // m is nil only when replaying a MANIFEST.
801 1 : if b.AddedByFileNum == nil {
802 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
803 0 : }
804 1 : m = b.AddedByFileNum[df.FileNum]
805 1 : if m == nil {
806 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
807 1 : }
808 : }
809 1 : if m.MarkedForCompaction {
810 1 : b.MarkedForCompactionCountDiff--
811 1 : }
812 1 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
813 1 : dmap[df.FileNum] = m
814 1 : } else {
815 1 : // Present in b.Added for the same level.
816 1 : delete(b.Added[df.Level], df.FileNum)
817 1 : }
818 : }
819 :
820 : // Generate state for Added backing files. Note that these must be generated
821 : // before we loop through the NewFiles, because we need to populate the
822 : // FileBackings which might be used by the NewFiles loop.
823 1 : if b.AddedFileBacking == nil {
824 1 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
825 1 : }
826 1 : for _, fb := range ve.CreatedBackingTables {
827 1 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
828 0 : // There is already a FileBacking associated with fb.DiskFileNum.
829 0 : // This should never happen. There must always be only one FileBacking
830 0 : // associated with a backing sstable.
831 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
832 : }
833 1 : b.AddedFileBacking[fb.DiskFileNum] = fb
834 : }
835 :
836 1 : for _, nf := range ve.NewFiles {
837 1 : // A new file should not have been deleted in this or a preceding
838 1 : // VersionEdit at the same level (though files can move across levels).
839 1 : if dmap := b.Deleted[nf.Level]; dmap != nil {
840 1 : if _, ok := dmap[nf.Meta.FileNum]; ok {
841 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
842 0 : }
843 : }
844 1 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
845 1 : // FileBacking for a virtual sstable must only be nil if we're performing
846 1 : // manifest replay.
847 1 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
848 1 : if nf.Meta.FileBacking == nil {
849 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
850 0 : }
851 1 : } else if nf.Meta.FileBacking == nil {
852 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
853 0 : }
854 :
855 1 : if b.Added[nf.Level] == nil {
856 1 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
857 1 : }
858 1 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
859 1 : if b.AddedByFileNum != nil {
860 1 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
861 1 : }
862 1 : if nf.Meta.MarkedForCompaction {
863 0 : b.MarkedForCompactionCountDiff++
864 0 : }
865 : }
866 :
867 : // Since a file can be removed from backing files in exactly one version
868 : // edit it is safe to just append without any de-duplication.
869 1 : b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
870 1 :
871 1 : return nil
872 : }
873 :
874 : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
875 : // is to be applied to the provided curr Version and if the caller needs to
876 : // update the versionSet.zombieTables map. This function exists separately from
877 : // BulkVersionEdit.Apply because it is easier to reason about properties
878 : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
879 : // know that exactly one version edit is being accumulated.
880 : //
881 : // Note that the version edit passed into this function may be incomplete
882 : // because compactions don't have the ref counting information necessary to
883 : // populate VersionEdit.RemovedBackingTables. This function will complete such a
884 : // version edit by populating RemovedBackingTables.
885 : //
886 : // Invariant: Any file being deleted through ve must belong to the curr Version.
887 : // We can't have a delete for some arbitrary file which does not exist in curr.
888 : func AccumulateIncompleteAndApplySingleVE(
889 : ve *VersionEdit,
890 : curr *Version,
891 : cmp Compare,
892 : formatKey base.FormatKey,
893 : flushSplitBytes int64,
894 : readCompactionRate int64,
895 : backingStateMap map[base.DiskFileNum]*FileBacking,
896 : addBackingFunc func(*FileBacking),
897 : removeBackingFunc func(base.DiskFileNum),
898 1 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
899 1 : if len(ve.RemovedBackingTables) != 0 {
900 0 : panic("pebble: invalid incomplete version edit")
901 : }
902 1 : var b BulkVersionEdit
903 1 : err := b.Accumulate(ve)
904 1 : if err != nil {
905 0 : return nil, nil, err
906 0 : }
907 1 : zombies = make(map[base.DiskFileNum]uint64)
908 1 : v, err := b.Apply(curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies)
909 1 : if err != nil {
910 0 : return nil, nil, err
911 0 : }
912 :
913 1 : for _, s := range b.AddedFileBacking {
914 1 : addBackingFunc(s)
915 1 : }
916 :
917 1 : for fileNum := range zombies {
918 1 : if _, ok := backingStateMap[fileNum]; ok {
919 1 : // This table was backing some virtual sstable in the latest version,
920 1 : // but is now a zombie. We add RemovedBackingTables entries for
921 1 : // these, before the version edit is written to disk.
922 1 : ve.RemovedBackingTables = append(
923 1 : ve.RemovedBackingTables, fileNum,
924 1 : )
925 1 : removeBackingFunc(fileNum)
926 1 : }
927 : }
928 1 : return v, zombies, nil
929 : }
930 :
931 : // Apply applies the delta b to the current version to produce a new
932 : // version. The new version is consistent with respect to the comparer cmp.
933 : //
934 : // curr may be nil, which is equivalent to a pointer to a zero version.
935 : //
936 : // On success, if a non-nil zombies map is provided to Apply, the map is updated
937 : // with file numbers and files sizes of deleted files. These files are
938 : // considered zombies because they are no longer referenced by the returned
939 : // Version, but cannot be deleted from disk as they are still in use by the
940 : // incoming Version.
941 : func (b *BulkVersionEdit) Apply(
942 : curr *Version,
943 : cmp Compare,
944 : formatKey base.FormatKey,
945 : flushSplitBytes int64,
946 : readCompactionRate int64,
947 : zombies map[base.DiskFileNum]uint64,
948 1 : ) (*Version, error) {
949 1 : addZombie := func(state *FileBacking) {
950 1 : if zombies != nil {
951 1 : zombies[state.DiskFileNum] = state.Size
952 1 : }
953 : }
954 1 : removeZombie := func(state *FileBacking) {
955 1 : if zombies != nil {
956 1 : delete(zombies, state.DiskFileNum)
957 1 : }
958 : }
959 :
960 1 : v := new(Version)
961 1 :
962 1 : // Adjust the count of files marked for compaction.
963 1 : if curr != nil {
964 1 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
965 1 : }
966 1 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
967 1 : if v.Stats.MarkedForCompaction < 0 {
968 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
969 0 : }
970 :
971 1 : for level := range v.Levels {
972 1 : if curr == nil || curr.Levels[level].tree.root == nil {
973 1 : v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */)
974 1 : } else {
975 1 : v.Levels[level] = curr.Levels[level].clone()
976 1 : }
977 1 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
978 1 : v.RangeKeyLevels[level] = makeLevelMetadata(cmp, level, nil /* files */)
979 1 : } else {
980 1 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
981 1 : }
982 :
983 1 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
984 1 : // There are no edits on this level.
985 1 : if level == 0 {
986 1 : // Initialize L0Sublevels.
987 1 : if curr == nil || curr.L0Sublevels == nil {
988 1 : if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
989 0 : return nil, errors.Wrap(err, "pebble: internal error")
990 0 : }
991 1 : } else {
992 1 : v.L0Sublevels = curr.L0Sublevels
993 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
994 1 : }
995 : }
996 1 : continue
997 : }
998 :
999 : // Some edits on this level.
1000 1 : lm := &v.Levels[level]
1001 1 : lmRange := &v.RangeKeyLevels[level]
1002 1 :
1003 1 : addedFilesMap := b.Added[level]
1004 1 : deletedFilesMap := b.Deleted[level]
1005 1 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
1006 1 : return nil, base.CorruptionErrorf(
1007 1 : "pebble: internal error: No current or added files but have deleted files: %d",
1008 1 : errors.Safe(len(deletedFilesMap)))
1009 1 : }
1010 :
1011 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
1012 : // for a level, it won't be present in deletedFilesMap for the same
1013 : // level.
1014 :
1015 1 : for _, f := range deletedFilesMap {
1016 1 : if obsolete := v.Levels[level].remove(f); obsolete {
1017 0 : // Deleting a file from the B-Tree may decrement its
1018 0 : // reference count. However, because we cloned the
1019 0 : // previous level's B-Tree, this should never result in a
1020 0 : // file's reference count dropping to zero.
1021 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
1022 0 : return nil, err
1023 0 : }
1024 1 : if f.HasRangeKeys {
1025 1 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
1026 0 : // Deleting a file from the B-Tree may decrement its
1027 0 : // reference count. However, because we cloned the
1028 0 : // previous level's B-Tree, this should never result in a
1029 0 : // file's reference count dropping to zero.
1030 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
1031 0 : return nil, err
1032 0 : }
1033 : }
1034 :
1035 : // Note that a backing sst will only become a zombie if the
1036 : // references to it in the latest version is 0. We will remove the
1037 : // backing sst from the zombie list in the next loop if one of the
1038 : // addedFiles in any of the levels is referencing the backing sst.
1039 : // This is possible if a physical sstable is virtualized, or if it
1040 : // is moved.
1041 1 : latestRefCount := f.LatestRefs()
1042 1 : if latestRefCount <= 0 {
1043 0 : // If a file is present in deletedFilesMap for a level, then it
1044 0 : // must have already been added to the level previously, which
1045 0 : // means that its latest ref count cannot be 0.
1046 0 : err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
1047 0 : return nil, err
1048 1 : } else if f.LatestUnref() == 0 {
1049 1 : addZombie(f.FileBacking)
1050 1 : }
1051 : }
1052 :
1053 1 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1054 1 : for _, f := range addedFilesMap {
1055 1 : addedFiles = append(addedFiles, f)
1056 1 : }
1057 : // Sort addedFiles by file number. This isn't necessary, but tests which
1058 : // replay invalid manifests check the error output, and the error output
1059 : // depends on the order in which files are added to the btree.
1060 1 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1061 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
1062 1 : })
1063 :
1064 1 : var sm, la *FileMetadata
1065 1 : for _, f := range addedFiles {
1066 1 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1067 1 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1068 1 : var allowedSeeks int64
1069 1 : if readCompactionRate != 0 {
1070 1 : allowedSeeks = int64(f.Size) / readCompactionRate
1071 1 : }
1072 1 : if allowedSeeks < 100 {
1073 1 : allowedSeeks = 100
1074 1 : }
1075 1 : f.AllowedSeeks.Store(allowedSeeks)
1076 1 : f.InitAllowedSeeks = allowedSeeks
1077 1 :
1078 1 : err := lm.insert(f)
1079 1 : // We're adding this file to the new version, so increment the
1080 1 : // latest refs count.
1081 1 : f.LatestRef()
1082 1 : if err != nil {
1083 1 : return nil, errors.Wrap(err, "pebble")
1084 1 : }
1085 1 : if f.HasRangeKeys {
1086 1 : err = lmRange.insert(f)
1087 1 : if err != nil {
1088 0 : return nil, errors.Wrap(err, "pebble")
1089 0 : }
1090 : }
1091 1 : removeZombie(f.FileBacking)
1092 1 : // Track the keys with the smallest and largest keys, so that we can
1093 1 : // check consistency of the modified span.
1094 1 : if sm == nil || base.InternalCompare(cmp, sm.Smallest, f.Smallest) > 0 {
1095 1 : sm = f
1096 1 : }
1097 1 : if la == nil || base.InternalCompare(cmp, la.Largest, f.Largest) < 0 {
1098 1 : la = f
1099 1 : }
1100 : }
1101 :
1102 1 : if level == 0 {
1103 1 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1104 1 : // Flushes and ingestions that do not delete any L0 files do not require
1105 1 : // a regeneration of L0Sublevels from scratch. We can instead generate
1106 1 : // it incrementally.
1107 1 : var err error
1108 1 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1109 1 : SortBySeqNum(addedFiles)
1110 1 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1111 1 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1112 1 : err = v.InitL0Sublevels(cmp, formatKey, flushSplitBytes)
1113 1 : } else if invariants.Enabled && err == nil {
1114 1 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], cmp, formatKey, flushSplitBytes)
1115 1 : if err != nil {
1116 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1117 : }
1118 1 : s1 := describeSublevels(base.DefaultFormatter, false /* verbose */, copyOfSublevels.Levels)
1119 1 : s2 := describeSublevels(base.DefaultFormatter, false /* verbose */, v.L0Sublevels.Levels)
1120 1 : if s1 != s2 {
1121 0 : // Add verbosity.
1122 0 : s1 := describeSublevels(base.DefaultFormatter, true /* verbose */, copyOfSublevels.Levels)
1123 0 : s2 := describeSublevels(base.DefaultFormatter, true /* verbose */, v.L0Sublevels.Levels)
1124 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1125 : }
1126 : }
1127 1 : if err != nil {
1128 0 : return nil, errors.Wrap(err, "pebble: internal error")
1129 0 : }
1130 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
1131 1 : } else if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
1132 0 : return nil, errors.Wrap(err, "pebble: internal error")
1133 0 : }
1134 1 : if err := CheckOrdering(cmp, formatKey, Level(0), v.Levels[level].Iter()); err != nil {
1135 0 : return nil, errors.Wrap(err, "pebble: internal error")
1136 0 : }
1137 1 : continue
1138 : }
1139 :
1140 : // Check consistency of the level in the vicinity of our edits.
1141 1 : if sm != nil && la != nil {
1142 1 : overlap := overlaps(v.Levels[level].Iter(), cmp, sm.Smallest.UserKey,
1143 1 : la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
1144 1 : // overlap contains all of the added files. We want to ensure that
1145 1 : // the added files are consistent with neighboring existing files
1146 1 : // too, so reslice the overlap to pull in a neighbor on each side.
1147 1 : check := overlap.Reslice(func(start, end *LevelIterator) {
1148 1 : if m := start.Prev(); m == nil {
1149 1 : start.Next()
1150 1 : }
1151 1 : if m := end.Next(); m == nil {
1152 1 : end.Prev()
1153 1 : }
1154 : })
1155 1 : if err := CheckOrdering(cmp, formatKey, Level(level), check.Iter()); err != nil {
1156 1 : return nil, errors.Wrap(err, "pebble: internal error")
1157 1 : }
1158 : }
1159 : }
1160 1 : return v, nil
1161 : }
|