Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "strings"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 : // TODO(peter): describe the MANIFEST file format, independently of the C++
25 : // project.
26 :
27 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
28 :
29 : type byteReader interface {
30 : io.ByteReader
31 : io.Reader
32 : }
33 :
34 : // Tags for the versionEdit disk format.
35 : // Tag 8 is no longer used.
36 : const (
37 : // LevelDB tags.
38 : tagComparator = 1
39 : tagLogNumber = 2
40 : tagNextFileNumber = 3
41 : tagLastSequence = 4
42 : tagCompactPointer = 5
43 : tagDeletedFile = 6
44 : tagNewFile = 7
45 : tagPrevLogNumber = 9
46 :
47 : // RocksDB tags.
48 : tagNewFile2 = 100
49 : tagNewFile3 = 102
50 : tagNewFile4 = 103
51 : tagColumnFamily = 200
52 : tagColumnFamilyAdd = 201
53 : tagColumnFamilyDrop = 202
54 : tagMaxColumnFamily = 203
55 :
56 : // Pebble tags.
57 : tagNewFile5 = 104 // Range keys.
58 : tagCreatedBackingTable = 105
59 : tagRemovedBackingTable = 106
60 :
61 : // The custom tags sub-format used by tagNewFile4 and above. All tags less
62 : // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
63 : // a single bytes field.
64 : customTagTerminate = 1
65 : customTagNeedsCompaction = 2
66 : customTagCreationTime = 6
67 : customTagPathID = 65
68 : customTagNonSafeIgnoreMask = 1 << 6
69 : customTagVirtual = 66
70 : customTagSyntheticPrefix = 67
71 : customTagSyntheticSuffix = 68
72 : )
73 :
74 : // DeletedFileEntry holds the state for a file deletion from a level. The file
75 : // itself might still be referenced by another level.
76 : type DeletedFileEntry struct {
77 : Level int
78 : FileNum base.FileNum
79 : }
80 :
81 : // NewFileEntry holds the state for a new file or one moved from a different
82 : // level.
83 : type NewFileEntry struct {
84 : Level int
85 : Meta *FileMetadata
86 : // BackingFileNum is only set during manifest replay, and only for virtual
87 : // sstables.
88 : BackingFileNum base.DiskFileNum
89 : }
90 :
91 : // VersionEdit holds the state for an edit to a Version along with other
92 : // on-disk state (log numbers, next file number, and the last sequence number).
93 : type VersionEdit struct {
94 : // ComparerName is the value of Options.Comparer.Name. This is only set in
95 : // the first VersionEdit in a manifest (either when the DB is created, or
96 : // when a new manifest is created) and is used to verify that the comparer
97 : // specified at Open matches the comparer that was previously used.
98 : ComparerName string
99 :
100 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
101 : // mutations that have not been flushed to an sstable.
102 : //
103 : // This is an optional field, and 0 represents it is not set.
104 : MinUnflushedLogNum base.DiskFileNum
105 :
106 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
107 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
108 : // 6/2011. We keep it around purely for informational purposes when
109 : // displaying MANIFEST contents.
110 : ObsoletePrevLogNum uint64
111 :
112 : // The next file number. A single counter is used to assign file numbers
113 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
114 : NextFileNum uint64
115 :
116 : // LastSeqNum is an upper bound on the sequence numbers that have been
117 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
118 : // recovery) may contain sequence numbers greater than this value.
119 : LastSeqNum uint64
120 :
121 : // A file num may be present in both deleted files and new files when it
122 : // is moved from a lower level to a higher level (when the compaction
123 : // found that there was no overlapping file at the higher level).
124 : DeletedFiles map[DeletedFileEntry]*FileMetadata
125 : NewFiles []NewFileEntry
126 : // CreatedBackingTables can be used to preserve the FileBacking associated
127 : // with a physical sstable. This is useful when virtual sstables in the
128 : // latest version are reconstructed during manifest replay, and we also need
129 : // to reconstruct the FileBacking which is required by these virtual
130 : // sstables.
131 : //
132 : // INVARIANT: The FileBacking associated with a physical sstable must only
133 : // be added as a backing file in the same version edit where the physical
134 : // sstable is first virtualized. This means that the physical sstable must
135 : // be present in DeletedFiles and that there must be at least one virtual
136 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
137 : // file must be present in CreatedBackingTables in exactly one version edit.
138 : // The physical sstable associated with the FileBacking must also not be
139 : // present in NewFiles.
140 : CreatedBackingTables []*FileBacking
141 : // RemovedBackingTables is used to remove the FileBacking associated with a
142 : // virtual sstable. Note that a backing sstable can be removed as soon as
143 : // there are no virtual sstables in the latest version which are using the
144 : // backing sstable, but the backing sstable doesn't necessarily have to be
145 : // removed atomically with the version edit which removes the last virtual
146 : // sstable associated with the backing sstable. The removal can happen in a
147 : // future version edit.
148 : //
149 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
150 : // added to CreateBackingTables in a prior version edit. The same version
151 : // edit also cannot have the same file present in both CreateBackingTables
152 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
153 : // in exactly one version edit.
154 : RemovedBackingTables []base.DiskFileNum
155 : }
156 :
157 : // Decode decodes an edit from the specified reader.
158 : //
159 : // Note that the Decode step will not set the FileBacking for virtual sstables
160 : // and the responsibility is left to the caller. However, the Decode step will
161 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
162 2 : func (v *VersionEdit) Decode(r io.Reader) error {
163 2 : br, ok := r.(byteReader)
164 2 : if !ok {
165 2 : br = bufio.NewReader(r)
166 2 : }
167 2 : d := versionEditDecoder{br}
168 2 : for {
169 2 : tag, err := binary.ReadUvarint(br)
170 2 : if err == io.EOF {
171 2 : break
172 : }
173 2 : if err != nil {
174 0 : return err
175 0 : }
176 2 : switch tag {
177 2 : case tagComparator:
178 2 : s, err := d.readBytes()
179 2 : if err != nil {
180 0 : return err
181 0 : }
182 2 : v.ComparerName = string(s)
183 :
184 2 : case tagLogNumber:
185 2 : n, err := d.readUvarint()
186 2 : if err != nil {
187 0 : return err
188 0 : }
189 2 : v.MinUnflushedLogNum = base.DiskFileNum(n)
190 :
191 2 : case tagNextFileNumber:
192 2 : n, err := d.readUvarint()
193 2 : if err != nil {
194 0 : return err
195 0 : }
196 2 : v.NextFileNum = n
197 :
198 2 : case tagLastSequence:
199 2 : n, err := d.readUvarint()
200 2 : if err != nil {
201 0 : return err
202 0 : }
203 2 : v.LastSeqNum = n
204 :
205 0 : case tagCompactPointer:
206 0 : if _, err := d.readLevel(); err != nil {
207 0 : return err
208 0 : }
209 0 : if _, err := d.readBytes(); err != nil {
210 0 : return err
211 0 : }
212 : // NB: RocksDB does not use compaction pointers anymore.
213 :
214 2 : case tagRemovedBackingTable:
215 2 : n, err := d.readUvarint()
216 2 : if err != nil {
217 0 : return err
218 0 : }
219 2 : v.RemovedBackingTables = append(
220 2 : v.RemovedBackingTables, base.DiskFileNum(n),
221 2 : )
222 2 : case tagCreatedBackingTable:
223 2 : dfn, err := d.readUvarint()
224 2 : if err != nil {
225 0 : return err
226 0 : }
227 2 : size, err := d.readUvarint()
228 2 : if err != nil {
229 0 : return err
230 0 : }
231 2 : fileBacking := &FileBacking{
232 2 : DiskFileNum: base.DiskFileNum(dfn),
233 2 : Size: size,
234 2 : }
235 2 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
236 2 : case tagDeletedFile:
237 2 : level, err := d.readLevel()
238 2 : if err != nil {
239 0 : return err
240 0 : }
241 2 : fileNum, err := d.readFileNum()
242 2 : if err != nil {
243 0 : return err
244 0 : }
245 2 : if v.DeletedFiles == nil {
246 2 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
247 2 : }
248 2 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
249 :
250 2 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
251 2 : level, err := d.readLevel()
252 2 : if err != nil {
253 0 : return err
254 0 : }
255 2 : fileNum, err := d.readFileNum()
256 2 : if err != nil {
257 0 : return err
258 0 : }
259 2 : if tag == tagNewFile3 {
260 0 : // The pathID field appears unused in RocksDB.
261 0 : _ /* pathID */, err := d.readUvarint()
262 0 : if err != nil {
263 0 : return err
264 0 : }
265 : }
266 2 : size, err := d.readUvarint()
267 2 : if err != nil {
268 0 : return err
269 0 : }
270 : // We read the smallest / largest key bounds differently depending on
271 : // whether we have point, range or both types of keys present in the
272 : // table.
273 2 : var (
274 2 : smallestPointKey, largestPointKey []byte
275 2 : smallestRangeKey, largestRangeKey []byte
276 2 : parsedPointBounds bool
277 2 : boundsMarker byte
278 2 : )
279 2 : if tag != tagNewFile5 {
280 2 : // Range keys not present in the table. Parse the point key bounds.
281 2 : smallestPointKey, err = d.readBytes()
282 2 : if err != nil {
283 0 : return err
284 0 : }
285 2 : largestPointKey, err = d.readBytes()
286 2 : if err != nil {
287 0 : return err
288 0 : }
289 2 : } else {
290 2 : // Range keys are present in the table. Determine whether we have point
291 2 : // keys to parse, in addition to the bounds.
292 2 : boundsMarker, err = d.ReadByte()
293 2 : if err != nil {
294 0 : return err
295 0 : }
296 : // Parse point key bounds, if present.
297 2 : if boundsMarker&maskContainsPointKeys > 0 {
298 2 : smallestPointKey, err = d.readBytes()
299 2 : if err != nil {
300 0 : return err
301 0 : }
302 2 : largestPointKey, err = d.readBytes()
303 2 : if err != nil {
304 0 : return err
305 0 : }
306 2 : parsedPointBounds = true
307 2 : } else {
308 2 : // The table does not have point keys.
309 2 : // Sanity check: the bounds must be range keys.
310 2 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
311 0 : return base.CorruptionErrorf(
312 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
313 0 : boundsMarker,
314 0 : )
315 0 : }
316 : }
317 : // Parse range key bounds.
318 2 : smallestRangeKey, err = d.readBytes()
319 2 : if err != nil {
320 0 : return err
321 0 : }
322 2 : largestRangeKey, err = d.readBytes()
323 2 : if err != nil {
324 0 : return err
325 0 : }
326 : }
327 2 : var smallestSeqNum uint64
328 2 : var largestSeqNum uint64
329 2 : if tag != tagNewFile {
330 2 : smallestSeqNum, err = d.readUvarint()
331 2 : if err != nil {
332 0 : return err
333 0 : }
334 2 : largestSeqNum, err = d.readUvarint()
335 2 : if err != nil {
336 0 : return err
337 0 : }
338 : }
339 2 : var markedForCompaction bool
340 2 : var creationTime uint64
341 2 : virtualState := struct {
342 2 : virtual bool
343 2 : backingFileNum uint64
344 2 : }{}
345 2 : var syntheticPrefix sstable.SyntheticPrefix
346 2 : var syntheticSuffix sstable.SyntheticSuffix
347 2 : if tag == tagNewFile4 || tag == tagNewFile5 {
348 2 : for {
349 2 : customTag, err := d.readUvarint()
350 2 : if err != nil {
351 0 : return err
352 0 : }
353 2 : if customTag == customTagTerminate {
354 2 : break
355 : }
356 2 : switch customTag {
357 1 : case customTagNeedsCompaction:
358 1 : field, err := d.readBytes()
359 1 : if err != nil {
360 0 : return err
361 0 : }
362 1 : if len(field) != 1 {
363 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
364 0 : }
365 1 : markedForCompaction = (field[0] == 1)
366 :
367 2 : case customTagCreationTime:
368 2 : field, err := d.readBytes()
369 2 : if err != nil {
370 0 : return err
371 0 : }
372 2 : var n int
373 2 : creationTime, n = binary.Uvarint(field)
374 2 : if n != len(field) {
375 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
376 0 : }
377 :
378 0 : case customTagPathID:
379 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
380 :
381 2 : case customTagVirtual:
382 2 : virtualState.virtual = true
383 2 : if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
384 0 : return err
385 0 : }
386 :
387 2 : case customTagSyntheticPrefix:
388 2 : synthetic, err := d.readBytes()
389 2 : if err != nil {
390 0 : return err
391 0 : }
392 2 : syntheticPrefix = synthetic
393 :
394 2 : case customTagSyntheticSuffix:
395 2 : if syntheticSuffix, err = d.readBytes(); err != nil {
396 0 : return err
397 0 : }
398 :
399 0 : default:
400 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
401 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
402 0 : }
403 0 : if _, err := d.readBytes(); err != nil {
404 0 : return err
405 0 : }
406 : }
407 : }
408 : }
409 2 : m := &FileMetadata{
410 2 : FileNum: fileNum,
411 2 : Size: size,
412 2 : CreationTime: int64(creationTime),
413 2 : SmallestSeqNum: smallestSeqNum,
414 2 : LargestSeqNum: largestSeqNum,
415 2 : MarkedForCompaction: markedForCompaction,
416 2 : Virtual: virtualState.virtual,
417 2 : SyntheticPrefix: syntheticPrefix,
418 2 : SyntheticSuffix: syntheticSuffix,
419 2 : }
420 2 : if tag != tagNewFile5 { // no range keys present
421 2 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
422 2 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
423 2 : m.HasPointKeys = true
424 2 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
425 2 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
426 2 : } else { // range keys present
427 2 : // Set point key bounds, if parsed.
428 2 : if parsedPointBounds {
429 2 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
430 2 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
431 2 : m.HasPointKeys = true
432 2 : }
433 : // Set range key bounds.
434 2 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
435 2 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
436 2 : m.HasRangeKeys = true
437 2 : // Set overall bounds (by default assume range keys).
438 2 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
439 2 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
440 2 : if boundsMarker&maskSmallest == maskSmallest {
441 2 : m.Smallest = m.SmallestPointKey
442 2 : m.boundTypeSmallest = boundTypePointKey
443 2 : }
444 2 : if boundsMarker&maskLargest == maskLargest {
445 2 : m.Largest = m.LargestPointKey
446 2 : m.boundTypeLargest = boundTypePointKey
447 2 : }
448 : }
449 2 : m.boundsSet = true
450 2 : if !virtualState.virtual {
451 2 : m.InitPhysicalBacking()
452 2 : }
453 :
454 2 : nfe := NewFileEntry{
455 2 : Level: level,
456 2 : Meta: m,
457 2 : }
458 2 : if virtualState.virtual {
459 2 : nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
460 2 : }
461 2 : v.NewFiles = append(v.NewFiles, nfe)
462 :
463 1 : case tagPrevLogNumber:
464 1 : n, err := d.readUvarint()
465 1 : if err != nil {
466 0 : return err
467 0 : }
468 1 : v.ObsoletePrevLogNum = n
469 :
470 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
471 0 : return base.CorruptionErrorf("column families are not supported")
472 :
473 0 : default:
474 0 : return errCorruptManifest
475 : }
476 : }
477 2 : return nil
478 : }
479 :
480 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
481 1 : var buf bytes.Buffer
482 1 : if v.ComparerName != "" {
483 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
484 1 : }
485 1 : if v.MinUnflushedLogNum != 0 {
486 1 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
487 1 : }
488 1 : if v.ObsoletePrevLogNum != 0 {
489 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
490 0 : }
491 1 : if v.NextFileNum != 0 {
492 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
493 1 : }
494 1 : if v.LastSeqNum != 0 {
495 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
496 1 : }
497 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
498 1 : for df := range v.DeletedFiles {
499 1 : entries = append(entries, df)
500 1 : }
501 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
502 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
503 1 : return v
504 1 : }
505 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
506 : })
507 1 : for _, df := range entries {
508 1 : fmt.Fprintf(&buf, " del-table: L%d %s\n", df.Level, df.FileNum)
509 1 : }
510 1 : for _, nf := range v.NewFiles {
511 1 : fmt.Fprintf(&buf, " add-table: L%d", nf.Level)
512 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, verbose))
513 1 : if nf.Meta.CreationTime != 0 {
514 1 : fmt.Fprintf(&buf, " (%s)",
515 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
516 1 : }
517 1 : fmt.Fprintln(&buf)
518 : }
519 :
520 1 : for _, b := range v.CreatedBackingTables {
521 1 : fmt.Fprintf(&buf, " add-backing: %s\n", b.DiskFileNum)
522 1 : }
523 1 : for _, n := range v.RemovedBackingTables {
524 1 : fmt.Fprintf(&buf, " del-backing: %s\n", n)
525 1 : }
526 1 : return buf.String()
527 : }
528 :
529 : // DebugString is a more verbose version of String(). Use this in tests.
530 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
531 1 : return v.string(true /* verbose */, fmtKey)
532 1 : }
533 :
534 : // String implements fmt.Stringer for a VersionEdit.
535 1 : func (v *VersionEdit) String() string {
536 1 : return v.string(false /* verbose */, base.DefaultFormatter)
537 1 : }
538 :
539 : // ParseVersionEditDebug parses a VersionEdit from its DebugString
540 : // implementation.
541 : //
542 : // It doesn't recognize all fields; this implementation can be filled in as
543 : // needed.
544 1 : func ParseVersionEditDebug(s string) (_ *VersionEdit, err error) {
545 1 : defer func() {
546 1 : err = errors.CombineErrors(err, maybeRecover())
547 1 : }()
548 :
549 1 : var ve VersionEdit
550 1 : for _, l := range strings.Split(s, "\n") {
551 1 : l = strings.TrimSpace(l)
552 1 : if l == "" {
553 1 : continue
554 : }
555 1 : field, value, ok := strings.Cut(l, ":")
556 1 : if !ok {
557 0 : return nil, errors.Errorf("malformed line %q", l)
558 0 : }
559 1 : field = strings.TrimSpace(field)
560 1 : p := makeDebugParser(value)
561 1 : switch field {
562 1 : case "add-table":
563 1 : level := p.Level()
564 1 : meta, err := ParseFileMetadataDebug(p.Remaining())
565 1 : if err != nil {
566 0 : return nil, err
567 0 : }
568 1 : ve.NewFiles = append(ve.NewFiles, NewFileEntry{
569 1 : Level: level,
570 1 : Meta: meta,
571 1 : })
572 :
573 1 : case "del-table":
574 1 : level := p.Level()
575 1 : num := p.FileNum()
576 1 : if ve.DeletedFiles == nil {
577 1 : ve.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
578 1 : }
579 1 : ve.DeletedFiles[DeletedFileEntry{
580 1 : Level: level,
581 1 : FileNum: num,
582 1 : }] = nil
583 :
584 1 : case "add-backing":
585 1 : n := p.DiskFileNum()
586 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, &FileBacking{
587 1 : DiskFileNum: n,
588 1 : Size: 100,
589 1 : })
590 :
591 1 : case "del-backing":
592 1 : n := p.DiskFileNum()
593 1 : ve.RemovedBackingTables = append(ve.RemovedBackingTables, n)
594 :
595 0 : default:
596 0 : return nil, errors.Errorf("field %q not implemented", field)
597 : }
598 : }
599 1 : return &ve, nil
600 : }
601 :
602 : // Encode encodes an edit to the specified writer.
603 2 : func (v *VersionEdit) Encode(w io.Writer) error {
604 2 : e := versionEditEncoder{new(bytes.Buffer)}
605 2 :
606 2 : if v.ComparerName != "" {
607 2 : e.writeUvarint(tagComparator)
608 2 : e.writeString(v.ComparerName)
609 2 : }
610 2 : if v.MinUnflushedLogNum != 0 {
611 2 : e.writeUvarint(tagLogNumber)
612 2 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
613 2 : }
614 2 : if v.ObsoletePrevLogNum != 0 {
615 1 : e.writeUvarint(tagPrevLogNumber)
616 1 : e.writeUvarint(v.ObsoletePrevLogNum)
617 1 : }
618 2 : if v.NextFileNum != 0 {
619 2 : e.writeUvarint(tagNextFileNumber)
620 2 : e.writeUvarint(uint64(v.NextFileNum))
621 2 : }
622 2 : for _, dfn := range v.RemovedBackingTables {
623 2 : e.writeUvarint(tagRemovedBackingTable)
624 2 : e.writeUvarint(uint64(dfn))
625 2 : }
626 2 : for _, fileBacking := range v.CreatedBackingTables {
627 2 : e.writeUvarint(tagCreatedBackingTable)
628 2 : e.writeUvarint(uint64(fileBacking.DiskFileNum))
629 2 : e.writeUvarint(fileBacking.Size)
630 2 : }
631 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
632 : // even though its value is zero. We detect this by encoding LastSeqNum when
633 : // ComparerName is set.
634 2 : if v.LastSeqNum != 0 || v.ComparerName != "" {
635 2 : e.writeUvarint(tagLastSequence)
636 2 : e.writeUvarint(v.LastSeqNum)
637 2 : }
638 2 : for x := range v.DeletedFiles {
639 2 : e.writeUvarint(tagDeletedFile)
640 2 : e.writeUvarint(uint64(x.Level))
641 2 : e.writeUvarint(uint64(x.FileNum))
642 2 : }
643 2 : for _, x := range v.NewFiles {
644 2 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
645 2 : var tag uint64
646 2 : switch {
647 2 : case x.Meta.HasRangeKeys:
648 2 : tag = tagNewFile5
649 2 : case customFields:
650 2 : tag = tagNewFile4
651 1 : default:
652 1 : tag = tagNewFile2
653 : }
654 2 : e.writeUvarint(tag)
655 2 : e.writeUvarint(uint64(x.Level))
656 2 : e.writeUvarint(uint64(x.Meta.FileNum))
657 2 : e.writeUvarint(x.Meta.Size)
658 2 : if !x.Meta.HasRangeKeys {
659 2 : // If we have no range keys, preserve the original format and write the
660 2 : // smallest and largest point keys.
661 2 : e.writeKey(x.Meta.SmallestPointKey)
662 2 : e.writeKey(x.Meta.LargestPointKey)
663 2 : } else {
664 2 : // When range keys are present, we first write a marker byte that
665 2 : // indicates if the table also contains point keys, in addition to how the
666 2 : // overall bounds for the table should be reconstructed. This byte is
667 2 : // followed by the keys themselves.
668 2 : b, err := x.Meta.boundsMarker()
669 2 : if err != nil {
670 0 : return err
671 0 : }
672 2 : if err = e.WriteByte(b); err != nil {
673 0 : return err
674 0 : }
675 : // Write point key bounds (if present).
676 2 : if x.Meta.HasPointKeys {
677 2 : e.writeKey(x.Meta.SmallestPointKey)
678 2 : e.writeKey(x.Meta.LargestPointKey)
679 2 : }
680 : // Write range key bounds.
681 2 : e.writeKey(x.Meta.SmallestRangeKey)
682 2 : e.writeKey(x.Meta.LargestRangeKey)
683 : }
684 2 : e.writeUvarint(x.Meta.SmallestSeqNum)
685 2 : e.writeUvarint(x.Meta.LargestSeqNum)
686 2 : if customFields {
687 2 : if x.Meta.CreationTime != 0 {
688 2 : e.writeUvarint(customTagCreationTime)
689 2 : var buf [binary.MaxVarintLen64]byte
690 2 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
691 2 : e.writeBytes(buf[:n])
692 2 : }
693 2 : if x.Meta.MarkedForCompaction {
694 1 : e.writeUvarint(customTagNeedsCompaction)
695 1 : e.writeBytes([]byte{1})
696 1 : }
697 2 : if x.Meta.Virtual {
698 2 : e.writeUvarint(customTagVirtual)
699 2 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
700 2 : }
701 2 : if x.Meta.SyntheticPrefix != nil {
702 2 : e.writeUvarint(customTagSyntheticPrefix)
703 2 : e.writeBytes(x.Meta.SyntheticPrefix)
704 2 : }
705 2 : if x.Meta.SyntheticSuffix != nil {
706 2 : e.writeUvarint(customTagSyntheticSuffix)
707 2 : e.writeBytes(x.Meta.SyntheticSuffix)
708 2 : }
709 2 : e.writeUvarint(customTagTerminate)
710 : }
711 : }
712 2 : _, err := w.Write(e.Bytes())
713 2 : return err
714 : }
715 :
716 : // versionEditDecoder should be used to decode version edits.
717 : type versionEditDecoder struct {
718 : byteReader
719 : }
720 :
721 2 : func (d versionEditDecoder) readBytes() ([]byte, error) {
722 2 : n, err := d.readUvarint()
723 2 : if err != nil {
724 0 : return nil, err
725 0 : }
726 2 : s := make([]byte, n)
727 2 : _, err = io.ReadFull(d, s)
728 2 : if err != nil {
729 0 : if err == io.ErrUnexpectedEOF {
730 0 : return nil, errCorruptManifest
731 0 : }
732 0 : return nil, err
733 : }
734 2 : return s, nil
735 : }
736 :
737 2 : func (d versionEditDecoder) readLevel() (int, error) {
738 2 : u, err := d.readUvarint()
739 2 : if err != nil {
740 0 : return 0, err
741 0 : }
742 2 : if u >= NumLevels {
743 0 : return 0, errCorruptManifest
744 0 : }
745 2 : return int(u), nil
746 : }
747 :
748 2 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
749 2 : u, err := d.readUvarint()
750 2 : if err != nil {
751 0 : return 0, err
752 0 : }
753 2 : return base.FileNum(u), nil
754 : }
755 :
756 2 : func (d versionEditDecoder) readUvarint() (uint64, error) {
757 2 : u, err := binary.ReadUvarint(d)
758 2 : if err != nil {
759 0 : if err == io.EOF {
760 0 : return 0, errCorruptManifest
761 0 : }
762 0 : return 0, err
763 : }
764 2 : return u, nil
765 : }
766 :
767 : type versionEditEncoder struct {
768 : *bytes.Buffer
769 : }
770 :
771 2 : func (e versionEditEncoder) writeBytes(p []byte) {
772 2 : e.writeUvarint(uint64(len(p)))
773 2 : e.Write(p)
774 2 : }
775 :
776 2 : func (e versionEditEncoder) writeKey(k InternalKey) {
777 2 : e.writeUvarint(uint64(k.Size()))
778 2 : e.Write(k.UserKey)
779 2 : buf := k.EncodeTrailer()
780 2 : e.Write(buf[:])
781 2 : }
782 :
783 2 : func (e versionEditEncoder) writeString(s string) {
784 2 : e.writeUvarint(uint64(len(s)))
785 2 : e.WriteString(s)
786 2 : }
787 :
788 2 : func (e versionEditEncoder) writeUvarint(u uint64) {
789 2 : var buf [binary.MaxVarintLen64]byte
790 2 : n := binary.PutUvarint(buf[:], u)
791 2 : e.Write(buf[:n])
792 2 : }
793 :
794 : // BulkVersionEdit summarizes the files added and deleted from a set of version
795 : // edits.
796 : //
797 : // INVARIANTS:
798 : // No file can be added to a level more than once. This is true globally, and
799 : // also true for all of the calls to Accumulate for a single bulk version edit.
800 : //
801 : // No file can be removed from a level more than once. This is true globally,
802 : // and also true for all of the calls to Accumulate for a single bulk version
803 : // edit.
804 : //
805 : // A file must not be added and removed from a given level in the same version
806 : // edit.
807 : //
808 : // A file that is being removed from a level must have been added to that level
809 : // before (in a prior version edit). Note that a given file can be deleted from
810 : // a level and added to another level in a single version edit
811 : type BulkVersionEdit struct {
812 : Added [NumLevels]map[base.FileNum]*FileMetadata
813 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
814 :
815 : // AddedFileBacking is a map to support lookup so that we can populate the
816 : // FileBacking of virtual sstables during manifest replay.
817 : AddedFileBacking map[base.DiskFileNum]*FileBacking
818 : RemovedFileBacking []base.DiskFileNum
819 :
820 : // AddedByFileNum maps file number to file metadata for all added files
821 : // from accumulated version edits. AddedByFileNum is only populated if set
822 : // to non-nil by a caller. It must be set to non-nil when replaying
823 : // version edits read from a MANIFEST (as opposed to VersionEdits
824 : // constructed in-memory). While replaying a MANIFEST file,
825 : // VersionEdit.DeletedFiles map entries have nil values, because the
826 : // on-disk deletion record encodes only the file number. Accumulate
827 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
828 : // field with non-nil *FileMetadata.
829 : AddedByFileNum map[base.FileNum]*FileMetadata
830 :
831 : // MarkedForCompactionCountDiff holds the aggregated count of files
832 : // marked for compaction added or removed.
833 : MarkedForCompactionCountDiff int
834 : }
835 :
836 : // Accumulate adds the file addition and deletions in the specified version
837 : // edit to the bulk edit's internal state.
838 : //
839 : // INVARIANTS:
840 : // If a file is added to a given level in a call to Accumulate and then removed
841 : // from that level in a subsequent call, the file will not be present in the
842 : // resulting BulkVersionEdit.Deleted for that level.
843 : //
844 : // After accumulation of version edits, the bulk version edit may have
845 : // information about a file which has been deleted from a level, but it may
846 : // not have information about the same file added to the same level. The add
847 : // could've occurred as part of a previous bulk version edit. In this case,
848 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
849 : // of the accumulation, because we need to decrease the refcount of the
850 : // deleted file in Apply.
851 2 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
852 2 : for df, m := range ve.DeletedFiles {
853 2 : dmap := b.Deleted[df.Level]
854 2 : if dmap == nil {
855 2 : dmap = make(map[base.FileNum]*FileMetadata)
856 2 : b.Deleted[df.Level] = dmap
857 2 : }
858 :
859 2 : if m == nil {
860 2 : // m is nil only when replaying a MANIFEST.
861 2 : if b.AddedByFileNum == nil {
862 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
863 0 : }
864 2 : m = b.AddedByFileNum[df.FileNum]
865 2 : if m == nil {
866 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
867 1 : }
868 : }
869 2 : if m.MarkedForCompaction {
870 1 : b.MarkedForCompactionCountDiff--
871 1 : }
872 2 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
873 2 : dmap[df.FileNum] = m
874 2 : } else {
875 2 : // Present in b.Added for the same level.
876 2 : delete(b.Added[df.Level], df.FileNum)
877 2 : }
878 : }
879 :
880 : // Generate state for Added backing files. Note that these must be generated
881 : // before we loop through the NewFiles, because we need to populate the
882 : // FileBackings which might be used by the NewFiles loop.
883 2 : if b.AddedFileBacking == nil {
884 2 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
885 2 : }
886 2 : for _, fb := range ve.CreatedBackingTables {
887 2 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
888 0 : // There is already a FileBacking associated with fb.DiskFileNum.
889 0 : // This should never happen. There must always be only one FileBacking
890 0 : // associated with a backing sstable.
891 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
892 : }
893 2 : b.AddedFileBacking[fb.DiskFileNum] = fb
894 : }
895 :
896 2 : for _, nf := range ve.NewFiles {
897 2 : // A new file should not have been deleted in this or a preceding
898 2 : // VersionEdit at the same level (though files can move across levels).
899 2 : if dmap := b.Deleted[nf.Level]; dmap != nil {
900 2 : if _, ok := dmap[nf.Meta.FileNum]; ok {
901 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
902 0 : }
903 : }
904 2 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
905 2 : // FileBacking for a virtual sstable must only be nil if we're performing
906 2 : // manifest replay.
907 2 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
908 2 : if nf.Meta.FileBacking == nil {
909 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
910 0 : }
911 2 : } else if nf.Meta.FileBacking == nil {
912 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
913 0 : }
914 :
915 2 : if b.Added[nf.Level] == nil {
916 2 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
917 2 : }
918 2 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
919 2 : if b.AddedByFileNum != nil {
920 2 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
921 2 : }
922 2 : if nf.Meta.MarkedForCompaction {
923 0 : b.MarkedForCompactionCountDiff++
924 0 : }
925 : }
926 :
927 2 : for _, n := range ve.RemovedBackingTables {
928 2 : if _, ok := b.AddedFileBacking[n]; ok {
929 2 : delete(b.AddedFileBacking, n)
930 2 : } else {
931 2 : // Since a file can be removed from backing files in exactly one version
932 2 : // edit it is safe to just append without any de-duplication.
933 2 : b.RemovedFileBacking = append(b.RemovedFileBacking, n)
934 2 : }
935 : }
936 :
937 2 : return nil
938 : }
939 :
940 : // Apply applies the delta b to the current version to produce a new version.
941 : // The new version is consistent with respect to the comparer.
942 : //
943 : // Apply updates the backing refcounts (Ref/Unref) as files are installed into
944 : // the levels.
945 : //
946 : // curr may be nil, which is equivalent to a pointer to a zero version.
947 : func (b *BulkVersionEdit) Apply(
948 : curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
949 2 : ) (*Version, error) {
950 2 : v := &Version{
951 2 : cmp: comparer,
952 2 : }
953 2 :
954 2 : // Adjust the count of files marked for compaction.
955 2 : if curr != nil {
956 2 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
957 2 : }
958 2 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
959 2 : if v.Stats.MarkedForCompaction < 0 {
960 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
961 0 : }
962 :
963 2 : for level := range v.Levels {
964 2 : if curr == nil || curr.Levels[level].tree.root == nil {
965 2 : v.Levels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
966 2 : } else {
967 2 : v.Levels[level] = curr.Levels[level].clone()
968 2 : }
969 2 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
970 2 : v.RangeKeyLevels[level] = makeLevelMetadata(comparer.Compare, level, nil /* files */)
971 2 : } else {
972 2 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
973 2 : }
974 :
975 2 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
976 2 : // There are no edits on this level.
977 2 : if level == 0 {
978 2 : // Initialize L0Sublevels.
979 2 : if curr == nil || curr.L0Sublevels == nil {
980 2 : if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
981 0 : return nil, errors.Wrap(err, "pebble: internal error")
982 0 : }
983 2 : } else {
984 2 : v.L0Sublevels = curr.L0Sublevels
985 2 : v.L0SublevelFiles = v.L0Sublevels.Levels
986 2 : }
987 : }
988 2 : continue
989 : }
990 :
991 : // Some edits on this level.
992 2 : lm := &v.Levels[level]
993 2 : lmRange := &v.RangeKeyLevels[level]
994 2 :
995 2 : addedFilesMap := b.Added[level]
996 2 : deletedFilesMap := b.Deleted[level]
997 2 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
998 1 : return nil, base.CorruptionErrorf(
999 1 : "pebble: internal error: No current or added files but have deleted files: %d",
1000 1 : errors.Safe(len(deletedFilesMap)))
1001 1 : }
1002 :
1003 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
1004 : // for a level, it won't be present in deletedFilesMap for the same
1005 : // level.
1006 :
1007 2 : for _, f := range deletedFilesMap {
1008 2 : if obsolete := v.Levels[level].remove(f); obsolete {
1009 0 : // Deleting a file from the B-Tree may decrement its
1010 0 : // reference count. However, because we cloned the
1011 0 : // previous level's B-Tree, this should never result in a
1012 0 : // file's reference count dropping to zero.
1013 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
1014 0 : return nil, err
1015 0 : }
1016 2 : if f.HasRangeKeys {
1017 2 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
1018 0 : // Deleting a file from the B-Tree may decrement its
1019 0 : // reference count. However, because we cloned the
1020 0 : // previous level's B-Tree, this should never result in a
1021 0 : // file's reference count dropping to zero.
1022 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
1023 0 : return nil, err
1024 0 : }
1025 : }
1026 : }
1027 :
1028 2 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1029 2 : for _, f := range addedFilesMap {
1030 2 : addedFiles = append(addedFiles, f)
1031 2 : }
1032 : // Sort addedFiles by file number. This isn't necessary, but tests which
1033 : // replay invalid manifests check the error output, and the error output
1034 : // depends on the order in which files are added to the btree.
1035 2 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1036 2 : return stdcmp.Compare(a.FileNum, b.FileNum)
1037 2 : })
1038 :
1039 2 : var sm, la *FileMetadata
1040 2 : for _, f := range addedFiles {
1041 2 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1042 2 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1043 2 : var allowedSeeks int64
1044 2 : if readCompactionRate != 0 {
1045 2 : allowedSeeks = int64(f.Size) / readCompactionRate
1046 2 : }
1047 2 : if allowedSeeks < 100 {
1048 2 : allowedSeeks = 100
1049 2 : }
1050 2 : f.AllowedSeeks.Store(allowedSeeks)
1051 2 : f.InitAllowedSeeks = allowedSeeks
1052 2 :
1053 2 : err := lm.insert(f)
1054 2 : if err != nil {
1055 1 : return nil, errors.Wrap(err, "pebble")
1056 1 : }
1057 2 : if f.HasRangeKeys {
1058 2 : err = lmRange.insert(f)
1059 2 : if err != nil {
1060 0 : return nil, errors.Wrap(err, "pebble")
1061 0 : }
1062 : }
1063 : // Track the keys with the smallest and largest keys, so that we can
1064 : // check consistency of the modified span.
1065 2 : if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
1066 2 : sm = f
1067 2 : }
1068 2 : if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
1069 2 : la = f
1070 2 : }
1071 : }
1072 :
1073 2 : if level == 0 {
1074 2 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1075 2 : // Flushes and ingestions that do not delete any L0 files do not require
1076 2 : // a regeneration of L0Sublevels from scratch. We can instead generate
1077 2 : // it incrementally.
1078 2 : var err error
1079 2 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1080 2 : SortBySeqNum(addedFiles)
1081 2 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1082 2 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1083 2 : err = v.InitL0Sublevels(flushSplitBytes)
1084 2 : } else if invariants.Enabled && err == nil {
1085 2 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
1086 2 : if err != nil {
1087 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1088 : }
1089 2 : s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
1090 2 : s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
1091 2 : if s1 != s2 {
1092 0 : // Add verbosity.
1093 0 : s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
1094 0 : s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
1095 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1096 : }
1097 : }
1098 2 : if err != nil {
1099 0 : return nil, errors.Wrap(err, "pebble: internal error")
1100 0 : }
1101 2 : v.L0SublevelFiles = v.L0Sublevels.Levels
1102 2 : } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
1103 0 : return nil, errors.Wrap(err, "pebble: internal error")
1104 0 : }
1105 2 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
1106 0 : return nil, errors.Wrap(err, "pebble: internal error")
1107 0 : }
1108 2 : continue
1109 : }
1110 :
1111 : // Check consistency of the level in the vicinity of our edits.
1112 2 : if sm != nil && la != nil {
1113 2 : overlap := overlaps(v.Levels[level].Iter(), comparer.Compare, sm.Smallest.UserKey,
1114 2 : la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
1115 2 : // overlap contains all of the added files. We want to ensure that
1116 2 : // the added files are consistent with neighboring existing files
1117 2 : // too, so reslice the overlap to pull in a neighbor on each side.
1118 2 : check := overlap.Reslice(func(start, end *LevelIterator) {
1119 2 : if m := start.Prev(); m == nil {
1120 2 : start.Next()
1121 2 : }
1122 2 : if m := end.Next(); m == nil {
1123 2 : end.Prev()
1124 2 : }
1125 : })
1126 2 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
1127 1 : return nil, errors.Wrap(err, "pebble: internal error")
1128 1 : }
1129 : }
1130 : }
1131 2 : return v, nil
1132 : }
|