Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/sstable"
21 : )
22 :
23 : // TODO(peter): describe the MANIFEST file format, independently of the C++
24 : // project.
25 :
26 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
27 :
28 : type byteReader interface {
29 : io.ByteReader
30 : io.Reader
31 : }
32 :
33 : // Tags for the versionEdit disk format.
34 : // Tag 8 is no longer used.
35 : const (
36 : // LevelDB tags.
37 : tagComparator = 1
38 : tagLogNumber = 2
39 : tagNextFileNumber = 3
40 : tagLastSequence = 4
41 : tagCompactPointer = 5
42 : tagDeletedFile = 6
43 : tagNewFile = 7
44 : tagPrevLogNumber = 9
45 :
46 : // RocksDB tags.
47 : tagNewFile2 = 100
48 : tagNewFile3 = 102
49 : tagNewFile4 = 103
50 : tagColumnFamily = 200
51 : tagColumnFamilyAdd = 201
52 : tagColumnFamilyDrop = 202
53 : tagMaxColumnFamily = 203
54 :
55 : // Pebble tags.
56 : tagNewFile5 = 104 // Range keys.
57 : tagCreatedBackingTable = 105
58 : tagRemovedBackingTable = 106
59 :
60 : // The custom tags sub-format used by tagNewFile4 and above. All tags less
61 : // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
62 : // a single bytes field.
63 : customTagTerminate = 1
64 : customTagNeedsCompaction = 2
65 : customTagCreationTime = 6
66 : customTagPathID = 65
67 : customTagNonSafeIgnoreMask = 1 << 6
68 : customTagVirtual = 66
69 : customTagPrefixRewrite = 67
70 : customTagSuffixRewrite = 68
71 : )
72 :
73 : // DeletedFileEntry holds the state for a file deletion from a level. The file
74 : // itself might still be referenced by another level.
75 : type DeletedFileEntry struct {
76 : Level int
77 : FileNum base.FileNum
78 : }
79 :
80 : // NewFileEntry holds the state for a new file or one moved from a different
81 : // level.
82 : type NewFileEntry struct {
83 : Level int
84 : Meta *FileMetadata
85 : // BackingFileNum is only set during manifest replay, and only for virtual
86 : // sstables.
87 : BackingFileNum base.DiskFileNum
88 : }
89 :
90 : // VersionEdit holds the state for an edit to a Version along with other
91 : // on-disk state (log numbers, next file number, and the last sequence number).
92 : type VersionEdit struct {
93 : // ComparerName is the value of Options.Comparer.Name. This is only set in
94 : // the first VersionEdit in a manifest (either when the DB is created, or
95 : // when a new manifest is created) and is used to verify that the comparer
96 : // specified at Open matches the comparer that was previously used.
97 : ComparerName string
98 :
99 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
100 : // mutations that have not been flushed to an sstable.
101 : //
102 : // This is an optional field, and 0 represents it is not set.
103 : MinUnflushedLogNum base.DiskFileNum
104 :
105 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
106 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
107 : // 6/2011. We keep it around purely for informational purposes when
108 : // displaying MANIFEST contents.
109 : ObsoletePrevLogNum uint64
110 :
111 : // The next file number. A single counter is used to assign file numbers
112 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
113 : NextFileNum uint64
114 :
115 : // LastSeqNum is an upper bound on the sequence numbers that have been
116 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
117 : // recovery) may contain sequence numbers greater than this value.
118 : LastSeqNum uint64
119 :
120 : // A file num may be present in both deleted files and new files when it
121 : // is moved from a lower level to a higher level (when the compaction
122 : // found that there was no overlapping file at the higher level).
123 : DeletedFiles map[DeletedFileEntry]*FileMetadata
124 : NewFiles []NewFileEntry
125 : // CreatedBackingTables can be used to preserve the FileBacking associated
126 : // with a physical sstable. This is useful when virtual sstables in the
127 : // latest version are reconstructed during manifest replay, and we also need
128 : // to reconstruct the FileBacking which is required by these virtual
129 : // sstables.
130 : //
131 : // INVARIANT: The FileBacking associated with a physical sstable must only
132 : // be added as a backing file in the same version edit where the physical
133 : // sstable is first virtualized. This means that the physical sstable must
134 : // be present in DeletedFiles and that there must be at least one virtual
135 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
136 : // file must be present in CreatedBackingTables in exactly one version edit.
137 : // The physical sstable associated with the FileBacking must also not be
138 : // present in NewFiles.
139 : CreatedBackingTables []*FileBacking
140 : // RemovedBackingTables is used to remove the FileBacking associated with a
141 : // virtual sstable. Note that a backing sstable can be removed as soon as
142 : // there are no virtual sstables in the latest version which are using the
143 : // backing sstable, but the backing sstable doesn't necessarily have to be
144 : // removed atomically with the version edit which removes the last virtual
145 : // sstable associated with the backing sstable. The removal can happen in a
146 : // future version edit.
147 : //
148 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
149 : // added to CreateBackingTables in a prior version edit. The same version
150 : // edit also cannot have the same file present in both CreateBackingTables
151 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
152 : // in exactly one version edit.
153 : RemovedBackingTables []base.DiskFileNum
154 : }
155 :
156 : // Decode decodes an edit from the specified reader.
157 : //
158 : // Note that the Decode step will not set the FileBacking for virtual sstables
159 : // and the responsibility is left to the caller. However, the Decode step will
160 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
161 1 : func (v *VersionEdit) Decode(r io.Reader) error {
162 1 : br, ok := r.(byteReader)
163 1 : if !ok {
164 1 : br = bufio.NewReader(r)
165 1 : }
166 1 : d := versionEditDecoder{br}
167 1 : for {
168 1 : tag, err := binary.ReadUvarint(br)
169 1 : if err == io.EOF {
170 1 : break
171 : }
172 1 : if err != nil {
173 0 : return err
174 0 : }
175 1 : switch tag {
176 1 : case tagComparator:
177 1 : s, err := d.readBytes()
178 1 : if err != nil {
179 0 : return err
180 0 : }
181 1 : v.ComparerName = string(s)
182 :
183 1 : case tagLogNumber:
184 1 : n, err := d.readUvarint()
185 1 : if err != nil {
186 0 : return err
187 0 : }
188 1 : v.MinUnflushedLogNum = base.DiskFileNum(n)
189 :
190 1 : case tagNextFileNumber:
191 1 : n, err := d.readUvarint()
192 1 : if err != nil {
193 0 : return err
194 0 : }
195 1 : v.NextFileNum = n
196 :
197 1 : case tagLastSequence:
198 1 : n, err := d.readUvarint()
199 1 : if err != nil {
200 0 : return err
201 0 : }
202 1 : v.LastSeqNum = n
203 :
204 0 : case tagCompactPointer:
205 0 : if _, err := d.readLevel(); err != nil {
206 0 : return err
207 0 : }
208 0 : if _, err := d.readBytes(); err != nil {
209 0 : return err
210 0 : }
211 : // NB: RocksDB does not use compaction pointers anymore.
212 :
213 1 : case tagRemovedBackingTable:
214 1 : n, err := d.readUvarint()
215 1 : if err != nil {
216 0 : return err
217 0 : }
218 1 : v.RemovedBackingTables = append(
219 1 : v.RemovedBackingTables, base.DiskFileNum(n),
220 1 : )
221 1 : case tagCreatedBackingTable:
222 1 : dfn, err := d.readUvarint()
223 1 : if err != nil {
224 0 : return err
225 0 : }
226 1 : size, err := d.readUvarint()
227 1 : if err != nil {
228 0 : return err
229 0 : }
230 1 : fileBacking := &FileBacking{
231 1 : DiskFileNum: base.DiskFileNum(dfn),
232 1 : Size: size,
233 1 : }
234 1 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
235 1 : case tagDeletedFile:
236 1 : level, err := d.readLevel()
237 1 : if err != nil {
238 0 : return err
239 0 : }
240 1 : fileNum, err := d.readFileNum()
241 1 : if err != nil {
242 0 : return err
243 0 : }
244 1 : if v.DeletedFiles == nil {
245 1 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
246 1 : }
247 1 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
248 :
249 1 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
250 1 : level, err := d.readLevel()
251 1 : if err != nil {
252 0 : return err
253 0 : }
254 1 : fileNum, err := d.readFileNum()
255 1 : if err != nil {
256 0 : return err
257 0 : }
258 1 : if tag == tagNewFile3 {
259 0 : // The pathID field appears unused in RocksDB.
260 0 : _ /* pathID */, err := d.readUvarint()
261 0 : if err != nil {
262 0 : return err
263 0 : }
264 : }
265 1 : size, err := d.readUvarint()
266 1 : if err != nil {
267 0 : return err
268 0 : }
269 : // We read the smallest / largest key bounds differently depending on
270 : // whether we have point, range or both types of keys present in the
271 : // table.
272 1 : var (
273 1 : smallestPointKey, largestPointKey []byte
274 1 : smallestRangeKey, largestRangeKey []byte
275 1 : parsedPointBounds bool
276 1 : boundsMarker byte
277 1 : )
278 1 : if tag != tagNewFile5 {
279 1 : // Range keys not present in the table. Parse the point key bounds.
280 1 : smallestPointKey, err = d.readBytes()
281 1 : if err != nil {
282 0 : return err
283 0 : }
284 1 : largestPointKey, err = d.readBytes()
285 1 : if err != nil {
286 0 : return err
287 0 : }
288 1 : } else {
289 1 : // Range keys are present in the table. Determine whether we have point
290 1 : // keys to parse, in addition to the bounds.
291 1 : boundsMarker, err = d.ReadByte()
292 1 : if err != nil {
293 0 : return err
294 0 : }
295 : // Parse point key bounds, if present.
296 1 : if boundsMarker&maskContainsPointKeys > 0 {
297 1 : smallestPointKey, err = d.readBytes()
298 1 : if err != nil {
299 0 : return err
300 0 : }
301 1 : largestPointKey, err = d.readBytes()
302 1 : if err != nil {
303 0 : return err
304 0 : }
305 1 : parsedPointBounds = true
306 1 : } else {
307 1 : // The table does not have point keys.
308 1 : // Sanity check: the bounds must be range keys.
309 1 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
310 0 : return base.CorruptionErrorf(
311 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
312 0 : boundsMarker,
313 0 : )
314 0 : }
315 : }
316 : // Parse range key bounds.
317 1 : smallestRangeKey, err = d.readBytes()
318 1 : if err != nil {
319 0 : return err
320 0 : }
321 1 : largestRangeKey, err = d.readBytes()
322 1 : if err != nil {
323 0 : return err
324 0 : }
325 : }
326 1 : var smallestSeqNum uint64
327 1 : var largestSeqNum uint64
328 1 : if tag != tagNewFile {
329 1 : smallestSeqNum, err = d.readUvarint()
330 1 : if err != nil {
331 0 : return err
332 0 : }
333 1 : largestSeqNum, err = d.readUvarint()
334 1 : if err != nil {
335 0 : return err
336 0 : }
337 : }
338 1 : var markedForCompaction bool
339 1 : var creationTime uint64
340 1 : virtualState := struct {
341 1 : virtual bool
342 1 : backingFileNum uint64
343 1 : }{}
344 1 : var virtualPrefix *sstable.PrefixReplacement
345 1 : var syntheticSuffix []byte
346 1 : if tag == tagNewFile4 || tag == tagNewFile5 {
347 1 : for {
348 1 : customTag, err := d.readUvarint()
349 1 : if err != nil {
350 0 : return err
351 0 : }
352 1 : if customTag == customTagTerminate {
353 1 : break
354 : }
355 1 : switch customTag {
356 1 : case customTagNeedsCompaction:
357 1 : field, err := d.readBytes()
358 1 : if err != nil {
359 0 : return err
360 0 : }
361 1 : if len(field) != 1 {
362 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
363 0 : }
364 1 : markedForCompaction = (field[0] == 1)
365 :
366 1 : case customTagCreationTime:
367 1 : field, err := d.readBytes()
368 1 : if err != nil {
369 0 : return err
370 0 : }
371 1 : var n int
372 1 : creationTime, n = binary.Uvarint(field)
373 1 : if n != len(field) {
374 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
375 0 : }
376 :
377 0 : case customTagPathID:
378 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
379 :
380 1 : case customTagVirtual:
381 1 : virtualState.virtual = true
382 1 : if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
383 0 : return err
384 0 : }
385 :
386 0 : case customTagPrefixRewrite:
387 0 : content, err := d.readBytes()
388 0 : if err != nil {
389 0 : return err
390 0 : }
391 0 : synthetic, err := d.readBytes()
392 0 : if err != nil {
393 0 : return err
394 0 : }
395 0 : virtualPrefix = &sstable.PrefixReplacement{
396 0 : ContentPrefix: content,
397 0 : SyntheticPrefix: synthetic,
398 0 : }
399 :
400 1 : case customTagSuffixRewrite:
401 1 : if syntheticSuffix, err = d.readBytes(); err != nil {
402 0 : return err
403 0 : }
404 :
405 0 : default:
406 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
407 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
408 0 : }
409 0 : if _, err := d.readBytes(); err != nil {
410 0 : return err
411 0 : }
412 : }
413 : }
414 : }
415 1 : m := &FileMetadata{
416 1 : FileNum: fileNum,
417 1 : Size: size,
418 1 : CreationTime: int64(creationTime),
419 1 : SmallestSeqNum: smallestSeqNum,
420 1 : LargestSeqNum: largestSeqNum,
421 1 : MarkedForCompaction: markedForCompaction,
422 1 : Virtual: virtualState.virtual,
423 1 : PrefixReplacement: virtualPrefix,
424 1 : SyntheticSuffix: syntheticSuffix,
425 1 : }
426 1 : if tag != tagNewFile5 { // no range keys present
427 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
428 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
429 1 : m.HasPointKeys = true
430 1 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
431 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
432 1 : } else { // range keys present
433 1 : // Set point key bounds, if parsed.
434 1 : if parsedPointBounds {
435 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
436 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
437 1 : m.HasPointKeys = true
438 1 : }
439 : // Set range key bounds.
440 1 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
441 1 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
442 1 : m.HasRangeKeys = true
443 1 : // Set overall bounds (by default assume range keys).
444 1 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
445 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
446 1 : if boundsMarker&maskSmallest == maskSmallest {
447 1 : m.Smallest = m.SmallestPointKey
448 1 : m.boundTypeSmallest = boundTypePointKey
449 1 : }
450 1 : if boundsMarker&maskLargest == maskLargest {
451 1 : m.Largest = m.LargestPointKey
452 1 : m.boundTypeLargest = boundTypePointKey
453 1 : }
454 : }
455 1 : m.boundsSet = true
456 1 : if !virtualState.virtual {
457 1 : m.InitPhysicalBacking()
458 1 : }
459 :
460 1 : nfe := NewFileEntry{
461 1 : Level: level,
462 1 : Meta: m,
463 1 : }
464 1 : if virtualState.virtual {
465 1 : nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
466 1 : }
467 1 : v.NewFiles = append(v.NewFiles, nfe)
468 :
469 1 : case tagPrevLogNumber:
470 1 : n, err := d.readUvarint()
471 1 : if err != nil {
472 0 : return err
473 0 : }
474 1 : v.ObsoletePrevLogNum = n
475 :
476 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
477 0 : return base.CorruptionErrorf("column families are not supported")
478 :
479 0 : default:
480 0 : return errCorruptManifest
481 : }
482 : }
483 1 : return nil
484 : }
485 :
486 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
487 1 : var buf bytes.Buffer
488 1 : if v.ComparerName != "" {
489 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
490 1 : }
491 1 : if v.MinUnflushedLogNum != 0 {
492 1 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
493 1 : }
494 1 : if v.ObsoletePrevLogNum != 0 {
495 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
496 0 : }
497 1 : if v.NextFileNum != 0 {
498 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
499 1 : }
500 1 : if v.LastSeqNum != 0 {
501 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
502 1 : }
503 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
504 1 : for df := range v.DeletedFiles {
505 1 : entries = append(entries, df)
506 1 : }
507 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
508 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
509 1 : return v
510 1 : }
511 0 : return stdcmp.Compare(a.FileNum, b.FileNum)
512 : })
513 1 : for _, df := range entries {
514 1 : fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum)
515 1 : }
516 1 : for _, nf := range v.NewFiles {
517 1 : fmt.Fprintf(&buf, " added: L%d", nf.Level)
518 1 : if verbose {
519 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
520 1 : } else {
521 1 : fmt.Fprintf(&buf, " %s", nf.Meta.String())
522 1 : }
523 1 : if nf.Meta.CreationTime != 0 {
524 1 : fmt.Fprintf(&buf, " (%s)",
525 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
526 1 : }
527 1 : fmt.Fprintln(&buf)
528 : }
529 1 : return buf.String()
530 : }
531 :
532 : // DebugString is a more verbose version of String(). Use this in tests.
533 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
534 1 : return v.string(true /* verbose */, fmtKey)
535 1 : }
536 :
537 : // String implements fmt.Stringer for a VersionEdit.
538 1 : func (v *VersionEdit) String() string {
539 1 : return v.string(false /* verbose */, base.DefaultFormatter)
540 1 : }
541 :
542 : // Encode encodes an edit to the specified writer.
543 1 : func (v *VersionEdit) Encode(w io.Writer) error {
544 1 : e := versionEditEncoder{new(bytes.Buffer)}
545 1 :
546 1 : if v.ComparerName != "" {
547 1 : e.writeUvarint(tagComparator)
548 1 : e.writeString(v.ComparerName)
549 1 : }
550 1 : if v.MinUnflushedLogNum != 0 {
551 1 : e.writeUvarint(tagLogNumber)
552 1 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
553 1 : }
554 1 : if v.ObsoletePrevLogNum != 0 {
555 1 : e.writeUvarint(tagPrevLogNumber)
556 1 : e.writeUvarint(v.ObsoletePrevLogNum)
557 1 : }
558 1 : if v.NextFileNum != 0 {
559 1 : e.writeUvarint(tagNextFileNumber)
560 1 : e.writeUvarint(uint64(v.NextFileNum))
561 1 : }
562 1 : for _, dfn := range v.RemovedBackingTables {
563 1 : e.writeUvarint(tagRemovedBackingTable)
564 1 : e.writeUvarint(uint64(dfn))
565 1 : }
566 1 : for _, fileBacking := range v.CreatedBackingTables {
567 1 : e.writeUvarint(tagCreatedBackingTable)
568 1 : e.writeUvarint(uint64(fileBacking.DiskFileNum))
569 1 : e.writeUvarint(fileBacking.Size)
570 1 : }
571 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
572 : // even though its value is zero. We detect this by encoding LastSeqNum when
573 : // ComparerName is set.
574 1 : if v.LastSeqNum != 0 || v.ComparerName != "" {
575 1 : e.writeUvarint(tagLastSequence)
576 1 : e.writeUvarint(v.LastSeqNum)
577 1 : }
578 1 : for x := range v.DeletedFiles {
579 1 : e.writeUvarint(tagDeletedFile)
580 1 : e.writeUvarint(uint64(x.Level))
581 1 : e.writeUvarint(uint64(x.FileNum))
582 1 : }
583 1 : for _, x := range v.NewFiles {
584 1 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
585 1 : var tag uint64
586 1 : switch {
587 1 : case x.Meta.HasRangeKeys:
588 1 : tag = tagNewFile5
589 1 : case customFields:
590 1 : tag = tagNewFile4
591 0 : default:
592 0 : tag = tagNewFile2
593 : }
594 1 : e.writeUvarint(tag)
595 1 : e.writeUvarint(uint64(x.Level))
596 1 : e.writeUvarint(uint64(x.Meta.FileNum))
597 1 : e.writeUvarint(x.Meta.Size)
598 1 : if !x.Meta.HasRangeKeys {
599 1 : // If we have no range keys, preserve the original format and write the
600 1 : // smallest and largest point keys.
601 1 : e.writeKey(x.Meta.SmallestPointKey)
602 1 : e.writeKey(x.Meta.LargestPointKey)
603 1 : } else {
604 1 : // When range keys are present, we first write a marker byte that
605 1 : // indicates if the table also contains point keys, in addition to how the
606 1 : // overall bounds for the table should be reconstructed. This byte is
607 1 : // followed by the keys themselves.
608 1 : b, err := x.Meta.boundsMarker()
609 1 : if err != nil {
610 0 : return err
611 0 : }
612 1 : if err = e.WriteByte(b); err != nil {
613 0 : return err
614 0 : }
615 : // Write point key bounds (if present).
616 1 : if x.Meta.HasPointKeys {
617 1 : e.writeKey(x.Meta.SmallestPointKey)
618 1 : e.writeKey(x.Meta.LargestPointKey)
619 1 : }
620 : // Write range key bounds.
621 1 : e.writeKey(x.Meta.SmallestRangeKey)
622 1 : e.writeKey(x.Meta.LargestRangeKey)
623 : }
624 1 : e.writeUvarint(x.Meta.SmallestSeqNum)
625 1 : e.writeUvarint(x.Meta.LargestSeqNum)
626 1 : if customFields {
627 1 : if x.Meta.CreationTime != 0 {
628 1 : e.writeUvarint(customTagCreationTime)
629 1 : var buf [binary.MaxVarintLen64]byte
630 1 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
631 1 : e.writeBytes(buf[:n])
632 1 : }
633 1 : if x.Meta.MarkedForCompaction {
634 1 : e.writeUvarint(customTagNeedsCompaction)
635 1 : e.writeBytes([]byte{1})
636 1 : }
637 1 : if x.Meta.Virtual {
638 1 : e.writeUvarint(customTagVirtual)
639 1 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
640 1 : }
641 1 : if x.Meta.PrefixReplacement != nil {
642 1 : e.writeUvarint(customTagPrefixRewrite)
643 1 : e.writeBytes(x.Meta.PrefixReplacement.ContentPrefix)
644 1 : e.writeBytes(x.Meta.PrefixReplacement.SyntheticPrefix)
645 1 : }
646 1 : if x.Meta.SyntheticSuffix != nil {
647 1 : e.writeUvarint(customTagSuffixRewrite)
648 1 : e.writeBytes(x.Meta.SyntheticSuffix)
649 1 : }
650 1 : e.writeUvarint(customTagTerminate)
651 : }
652 : }
653 1 : _, err := w.Write(e.Bytes())
654 1 : return err
655 : }
656 :
657 : // versionEditDecoder should be used to decode version edits.
658 : type versionEditDecoder struct {
659 : byteReader
660 : }
661 :
662 1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
663 1 : n, err := d.readUvarint()
664 1 : if err != nil {
665 0 : return nil, err
666 0 : }
667 1 : s := make([]byte, n)
668 1 : _, err = io.ReadFull(d, s)
669 1 : if err != nil {
670 0 : if err == io.ErrUnexpectedEOF {
671 0 : return nil, errCorruptManifest
672 0 : }
673 0 : return nil, err
674 : }
675 1 : return s, nil
676 : }
677 :
678 1 : func (d versionEditDecoder) readLevel() (int, error) {
679 1 : u, err := d.readUvarint()
680 1 : if err != nil {
681 0 : return 0, err
682 0 : }
683 1 : if u >= NumLevels {
684 0 : return 0, errCorruptManifest
685 0 : }
686 1 : return int(u), nil
687 : }
688 :
689 1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
690 1 : u, err := d.readUvarint()
691 1 : if err != nil {
692 0 : return 0, err
693 0 : }
694 1 : return base.FileNum(u), nil
695 : }
696 :
697 1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
698 1 : u, err := binary.ReadUvarint(d)
699 1 : if err != nil {
700 0 : if err == io.EOF {
701 0 : return 0, errCorruptManifest
702 0 : }
703 0 : return 0, err
704 : }
705 1 : return u, nil
706 : }
707 :
708 : type versionEditEncoder struct {
709 : *bytes.Buffer
710 : }
711 :
712 1 : func (e versionEditEncoder) writeBytes(p []byte) {
713 1 : e.writeUvarint(uint64(len(p)))
714 1 : e.Write(p)
715 1 : }
716 :
717 1 : func (e versionEditEncoder) writeKey(k InternalKey) {
718 1 : e.writeUvarint(uint64(k.Size()))
719 1 : e.Write(k.UserKey)
720 1 : buf := k.EncodeTrailer()
721 1 : e.Write(buf[:])
722 1 : }
723 :
724 1 : func (e versionEditEncoder) writeString(s string) {
725 1 : e.writeUvarint(uint64(len(s)))
726 1 : e.WriteString(s)
727 1 : }
728 :
729 1 : func (e versionEditEncoder) writeUvarint(u uint64) {
730 1 : var buf [binary.MaxVarintLen64]byte
731 1 : n := binary.PutUvarint(buf[:], u)
732 1 : e.Write(buf[:n])
733 1 : }
734 :
735 : // BulkVersionEdit summarizes the files added and deleted from a set of version
736 : // edits.
737 : //
738 : // INVARIANTS:
739 : // No file can be added to a level more than once. This is true globally, and
740 : // also true for all of the calls to Accumulate for a single bulk version edit.
741 : //
742 : // No file can be removed from a level more than once. This is true globally,
743 : // and also true for all of the calls to Accumulate for a single bulk version
744 : // edit.
745 : //
746 : // A file must not be added and removed from a given level in the same version
747 : // edit.
748 : //
749 : // A file that is being removed from a level must have been added to that level
750 : // before (in a prior version edit). Note that a given file can be deleted from
751 : // a level and added to another level in a single version edit
752 : type BulkVersionEdit struct {
753 : Added [NumLevels]map[base.FileNum]*FileMetadata
754 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
755 :
756 : // AddedFileBacking is a map to support lookup so that we can populate the
757 : // FileBacking of virtual sstables during manifest replay.
758 : AddedFileBacking map[base.DiskFileNum]*FileBacking
759 : RemovedFileBacking []base.DiskFileNum
760 :
761 : // AddedByFileNum maps file number to file metadata for all added files
762 : // from accumulated version edits. AddedByFileNum is only populated if set
763 : // to non-nil by a caller. It must be set to non-nil when replaying
764 : // version edits read from a MANIFEST (as opposed to VersionEdits
765 : // constructed in-memory). While replaying a MANIFEST file,
766 : // VersionEdit.DeletedFiles map entries have nil values, because the
767 : // on-disk deletion record encodes only the file number. Accumulate
768 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
769 : // field with non-nil *FileMetadata.
770 : AddedByFileNum map[base.FileNum]*FileMetadata
771 :
772 : // MarkedForCompactionCountDiff holds the aggregated count of files
773 : // marked for compaction added or removed.
774 : MarkedForCompactionCountDiff int
775 : }
776 :
777 : // Accumulate adds the file addition and deletions in the specified version
778 : // edit to the bulk edit's internal state.
779 : //
780 : // INVARIANTS:
781 : // If a file is added to a given level in a call to Accumulate and then removed
782 : // from that level in a subsequent call, the file will not be present in the
783 : // resulting BulkVersionEdit.Deleted for that level.
784 : //
785 : // After accumulation of version edits, the bulk version edit may have
786 : // information about a file which has been deleted from a level, but it may
787 : // not have information about the same file added to the same level. The add
788 : // could've occurred as part of a previous bulk version edit. In this case,
789 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
790 : // of the accumulation, because we need to decrease the refcount of the
791 : // deleted file in Apply.
792 1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
793 1 : for df, m := range ve.DeletedFiles {
794 1 : dmap := b.Deleted[df.Level]
795 1 : if dmap == nil {
796 1 : dmap = make(map[base.FileNum]*FileMetadata)
797 1 : b.Deleted[df.Level] = dmap
798 1 : }
799 :
800 1 : if m == nil {
801 1 : // m is nil only when replaying a MANIFEST.
802 1 : if b.AddedByFileNum == nil {
803 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
804 0 : }
805 1 : m = b.AddedByFileNum[df.FileNum]
806 1 : if m == nil {
807 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
808 1 : }
809 : }
810 1 : if m.MarkedForCompaction {
811 1 : b.MarkedForCompactionCountDiff--
812 1 : }
813 1 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
814 1 : dmap[df.FileNum] = m
815 1 : } else {
816 1 : // Present in b.Added for the same level.
817 1 : delete(b.Added[df.Level], df.FileNum)
818 1 : }
819 : }
820 :
821 : // Generate state for Added backing files. Note that these must be generated
822 : // before we loop through the NewFiles, because we need to populate the
823 : // FileBackings which might be used by the NewFiles loop.
824 1 : if b.AddedFileBacking == nil {
825 1 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
826 1 : }
827 1 : for _, fb := range ve.CreatedBackingTables {
828 1 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
829 0 : // There is already a FileBacking associated with fb.DiskFileNum.
830 0 : // This should never happen. There must always be only one FileBacking
831 0 : // associated with a backing sstable.
832 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
833 : }
834 1 : b.AddedFileBacking[fb.DiskFileNum] = fb
835 : }
836 :
837 1 : for _, nf := range ve.NewFiles {
838 1 : // A new file should not have been deleted in this or a preceding
839 1 : // VersionEdit at the same level (though files can move across levels).
840 1 : if dmap := b.Deleted[nf.Level]; dmap != nil {
841 1 : if _, ok := dmap[nf.Meta.FileNum]; ok {
842 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
843 0 : }
844 : }
845 1 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
846 1 : // FileBacking for a virtual sstable must only be nil if we're performing
847 1 : // manifest replay.
848 1 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
849 1 : if nf.Meta.FileBacking == nil {
850 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
851 0 : }
852 1 : } else if nf.Meta.FileBacking == nil {
853 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
854 0 : }
855 :
856 1 : if b.Added[nf.Level] == nil {
857 1 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
858 1 : }
859 1 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
860 1 : if b.AddedByFileNum != nil {
861 1 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
862 1 : }
863 1 : if nf.Meta.MarkedForCompaction {
864 0 : b.MarkedForCompactionCountDiff++
865 0 : }
866 : }
867 :
868 : // Since a file can be removed from backing files in exactly one version
869 : // edit it is safe to just append without any de-duplication.
870 1 : b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
871 1 :
872 1 : return nil
873 : }
874 :
875 : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
876 : // is to be applied to the provided curr Version and if the caller needs to
877 : // update the versionSet.zombieTables map. This function exists separately from
878 : // BulkVersionEdit.Apply because it is easier to reason about properties
879 : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
880 : // know that exactly one version edit is being accumulated.
881 : //
882 : // Note that the version edit passed into this function may be incomplete
883 : // because compactions don't have the ref counting information necessary to
884 : // populate VersionEdit.RemovedBackingTables. This function will complete such a
885 : // version edit by populating RemovedBackingTables.
886 : //
887 : // Invariant: Any file being deleted through ve must belong to the curr Version.
888 : // We can't have a delete for some arbitrary file which does not exist in curr.
889 : func AccumulateIncompleteAndApplySingleVE(
890 : ve *VersionEdit,
891 : curr *Version,
892 : comparer *base.Comparer,
893 : flushSplitBytes int64,
894 : readCompactionRate int64,
895 : backings *FileBackings,
896 1 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
897 1 : if len(ve.RemovedBackingTables) != 0 {
898 0 : panic("pebble: invalid incomplete version edit")
899 : }
900 1 : var b BulkVersionEdit
901 1 : err := b.Accumulate(ve)
902 1 : if err != nil {
903 0 : return nil, nil, err
904 0 : }
905 1 : zombies = make(map[base.DiskFileNum]uint64)
906 1 : v, err := b.Apply(curr, comparer, flushSplitBytes, readCompactionRate, zombies)
907 1 : if err != nil {
908 0 : return nil, nil, err
909 0 : }
910 :
911 1 : for _, s := range b.AddedFileBacking {
912 1 : backings.Add(s)
913 1 : }
914 :
915 1 : for fileNum := range zombies {
916 1 : if _, ok := backings.Get(fileNum); ok {
917 1 : // This table was backing some virtual sstable in the latest version,
918 1 : // but is now a zombie. We add RemovedBackingTables entries for
919 1 : // these, before the version edit is written to disk.
920 1 : ve.RemovedBackingTables = append(
921 1 : ve.RemovedBackingTables, fileNum,
922 1 : )
923 1 : backings.Remove(fileNum)
924 1 : }
925 : }
926 1 : return v, zombies, nil
927 : }
928 :
929 : // Apply applies the delta b to the current version to produce a new
930 : // version. The new version is consistent with respect to the comparer cmp.
931 : //
932 : // curr may be nil, which is equivalent to a pointer to a zero version.
933 : //
934 : // On success, if a non-nil zombies map is provided to Apply, the map is updated
935 : // with file numbers and files sizes of deleted files. These files are
936 : // considered zombies because they are no longer referenced by the returned
937 : // Version, but cannot be deleted from disk as they are still in use by the
938 : // incoming Version.
939 : func (b *BulkVersionEdit) Apply(
940 : curr *Version,
941 : comparer *base.Comparer,
942 : flushSplitBytes int64,
943 : readCompactionRate int64,
944 : zombies map[base.DiskFileNum]uint64,
945 1 : ) (*Version, error) {
946 1 : addZombie := func(state *FileBacking) {
947 1 : if zombies != nil {
948 1 : zombies[state.DiskFileNum] = state.Size
949 1 : }
950 : }
951 1 : removeZombie := func(state *FileBacking) {
952 1 : if zombies != nil {
953 1 : delete(zombies, state.DiskFileNum)
954 1 : }
955 : }
956 :
957 1 : v := &Version{
958 1 : cmp: comparer,
959 1 : }
960 1 :
961 1 : // Adjust the count of files marked for compaction.
962 1 : if curr != nil {
963 1 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
964 1 : }
965 1 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
966 1 : if v.Stats.MarkedForCompaction < 0 {
967 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
968 0 : }
969 :
970 1 : for level := range v.Levels {
971 1 : if curr == nil || curr.Levels[level].tree.root == nil {
972 1 : v.Levels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
973 1 : } else {
974 1 : v.Levels[level] = curr.Levels[level].clone()
975 1 : }
976 1 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
977 1 : v.RangeKeyLevels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
978 1 : } else {
979 1 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
980 1 : }
981 :
982 1 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
983 1 : // There are no edits on this level.
984 1 : if level == 0 {
985 1 : // Initialize L0Sublevels.
986 1 : if curr == nil || curr.L0Sublevels == nil {
987 1 : if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
988 0 : return nil, errors.Wrap(err, "pebble: internal error")
989 0 : }
990 1 : } else {
991 1 : v.L0Sublevels = curr.L0Sublevels
992 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
993 1 : }
994 : }
995 1 : continue
996 : }
997 :
998 : // Some edits on this level.
999 1 : lm := &v.Levels[level]
1000 1 : lmRange := &v.RangeKeyLevels[level]
1001 1 :
1002 1 : addedFilesMap := b.Added[level]
1003 1 : deletedFilesMap := b.Deleted[level]
1004 1 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
1005 1 : return nil, base.CorruptionErrorf(
1006 1 : "pebble: internal error: No current or added files but have deleted files: %d",
1007 1 : errors.Safe(len(deletedFilesMap)))
1008 1 : }
1009 :
1010 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
1011 : // for a level, it won't be present in deletedFilesMap for the same
1012 : // level.
1013 :
1014 1 : for _, f := range deletedFilesMap {
1015 1 : if obsolete := v.Levels[level].remove(f); obsolete {
1016 0 : // Deleting a file from the B-Tree may decrement its
1017 0 : // reference count. However, because we cloned the
1018 0 : // previous level's B-Tree, this should never result in a
1019 0 : // file's reference count dropping to zero.
1020 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
1021 0 : return nil, err
1022 0 : }
1023 1 : if f.HasRangeKeys {
1024 1 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
1025 0 : // Deleting a file from the B-Tree may decrement its
1026 0 : // reference count. However, because we cloned the
1027 0 : // previous level's B-Tree, this should never result in a
1028 0 : // file's reference count dropping to zero.
1029 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
1030 0 : return nil, err
1031 0 : }
1032 : }
1033 :
1034 : // Note that a backing sst will only become a zombie if the
1035 : // references to it in the latest version is 0. We will remove the
1036 : // backing sst from the zombie list in the next loop if one of the
1037 : // addedFiles in any of the levels is referencing the backing sst.
1038 : // This is possible if a physical sstable is virtualized, or if it
1039 : // is moved.
1040 1 : latestRefCount := f.LatestRefs()
1041 1 : if latestRefCount <= 0 {
1042 0 : // If a file is present in deletedFilesMap for a level, then it
1043 0 : // must have already been added to the level previously, which
1044 0 : // means that its latest ref count cannot be 0.
1045 0 : err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
1046 0 : return nil, err
1047 1 : } else if f.LatestUnref() == 0 {
1048 1 : addZombie(f.FileBacking)
1049 1 : }
1050 : }
1051 :
1052 1 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1053 1 : for _, f := range addedFilesMap {
1054 1 : addedFiles = append(addedFiles, f)
1055 1 : }
1056 : // Sort addedFiles by file number. This isn't necessary, but tests which
1057 : // replay invalid manifests check the error output, and the error output
1058 : // depends on the order in which files are added to the btree.
1059 1 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1060 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
1061 1 : })
1062 :
1063 1 : var sm, la *FileMetadata
1064 1 : for _, f := range addedFiles {
1065 1 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1066 1 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1067 1 : var allowedSeeks int64
1068 1 : if readCompactionRate != 0 {
1069 1 : allowedSeeks = int64(f.Size) / readCompactionRate
1070 1 : }
1071 1 : if allowedSeeks < 100 {
1072 1 : allowedSeeks = 100
1073 1 : }
1074 1 : f.AllowedSeeks.Store(allowedSeeks)
1075 1 : f.InitAllowedSeeks = allowedSeeks
1076 1 :
1077 1 : err := lm.insert(f)
1078 1 : // We're adding this file to the new version, so increment the
1079 1 : // latest refs count.
1080 1 : f.LatestRef()
1081 1 : if err != nil {
1082 1 : return nil, errors.Wrap(err, "pebble")
1083 1 : }
1084 1 : if f.HasRangeKeys {
1085 1 : err = lmRange.insert(f)
1086 1 : if err != nil {
1087 0 : return nil, errors.Wrap(err, "pebble")
1088 0 : }
1089 : }
1090 1 : removeZombie(f.FileBacking)
1091 1 : // Track the keys with the smallest and largest keys, so that we can
1092 1 : // check consistency of the modified span.
1093 1 : if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
1094 1 : sm = f
1095 1 : }
1096 1 : if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
1097 1 : la = f
1098 1 : }
1099 : }
1100 :
1101 1 : if level == 0 {
1102 1 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1103 1 : // Flushes and ingestions that do not delete any L0 files do not require
1104 1 : // a regeneration of L0Sublevels from scratch. We can instead generate
1105 1 : // it incrementally.
1106 1 : var err error
1107 1 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1108 1 : SortBySeqNum(addedFiles)
1109 1 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1110 1 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1111 1 : err = v.InitL0Sublevels(flushSplitBytes)
1112 1 : } else if invariants.Enabled && err == nil {
1113 1 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
1114 1 : if err != nil {
1115 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1116 : }
1117 1 : s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
1118 1 : s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
1119 1 : if s1 != s2 {
1120 0 : // Add verbosity.
1121 0 : s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
1122 0 : s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
1123 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1124 : }
1125 : }
1126 1 : if err != nil {
1127 0 : return nil, errors.Wrap(err, "pebble: internal error")
1128 0 : }
1129 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
1130 1 : } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
1131 0 : return nil, errors.Wrap(err, "pebble: internal error")
1132 0 : }
1133 1 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
1134 0 : return nil, errors.Wrap(err, "pebble: internal error")
1135 0 : }
1136 1 : continue
1137 : }
1138 :
1139 : // Check consistency of the level in the vicinity of our edits.
1140 1 : if sm != nil && la != nil {
1141 1 : overlap := overlaps(v.Levels[level].Iter(), comparer.Compare, sm.Smallest.UserKey,
1142 1 : la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
1143 1 : // overlap contains all of the added files. We want to ensure that
1144 1 : // the added files are consistent with neighboring existing files
1145 1 : // too, so reslice the overlap to pull in a neighbor on each side.
1146 1 : check := overlap.Reslice(func(start, end *LevelIterator) {
1147 1 : if m := start.Prev(); m == nil {
1148 1 : start.Next()
1149 1 : }
1150 1 : if m := end.Next(); m == nil {
1151 1 : end.Prev()
1152 1 : }
1153 : })
1154 1 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
1155 1 : return nil, errors.Wrap(err, "pebble: internal error")
1156 1 : }
1157 : }
1158 : }
1159 1 : return v, nil
1160 : }
|