Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "strings"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 : // TODO(peter): describe the MANIFEST file format, independently of the C++
25 : // project.
26 :
27 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
28 :
29 : type byteReader interface {
30 : io.ByteReader
31 : io.Reader
32 : }
33 :
34 : // Tags for the versionEdit disk format.
35 : // Tag 8 is no longer used.
36 : const (
37 : // LevelDB tags.
38 : tagComparator = 1
39 : tagLogNumber = 2
40 : tagNextFileNumber = 3
41 : tagLastSequence = 4
42 : tagCompactPointer = 5
43 : tagDeletedFile = 6
44 : tagNewFile = 7
45 : tagPrevLogNumber = 9
46 :
47 : // RocksDB tags.
48 : tagNewFile2 = 100
49 : tagNewFile3 = 102
50 : tagNewFile4 = 103
51 : tagColumnFamily = 200
52 : tagColumnFamilyAdd = 201
53 : tagColumnFamilyDrop = 202
54 : tagMaxColumnFamily = 203
55 :
56 : // Pebble tags.
57 : tagNewFile5 = 104 // Range keys.
58 : tagCreatedBackingTable = 105
59 : tagRemovedBackingTable = 106
60 :
61 : // The custom tags sub-format used by tagNewFile4 and above. All tags less
62 : // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
63 : // a single bytes field.
64 : customTagTerminate = 1
65 : customTagNeedsCompaction = 2
66 : customTagCreationTime = 6
67 : customTagPathID = 65
68 : customTagNonSafeIgnoreMask = 1 << 6
69 : customTagVirtual = 66
70 : customTagSyntheticPrefix = 67
71 : customTagSyntheticSuffix = 68
72 : )
73 :
74 : // DeletedFileEntry holds the state for a file deletion from a level. The file
75 : // itself might still be referenced by another level.
76 : type DeletedFileEntry struct {
77 : Level int
78 : FileNum base.FileNum
79 : }
80 :
81 : // NewFileEntry holds the state for a new file or one moved from a different
82 : // level.
83 : type NewFileEntry struct {
84 : Level int
85 : Meta *FileMetadata
86 : // BackingFileNum is only set during manifest replay, and only for virtual
87 : // sstables.
88 : BackingFileNum base.DiskFileNum
89 : }
90 :
91 : // VersionEdit holds the state for an edit to a Version along with other
92 : // on-disk state (log numbers, next file number, and the last sequence number).
93 : type VersionEdit struct {
94 : // ComparerName is the value of Options.Comparer.Name. This is only set in
95 : // the first VersionEdit in a manifest (either when the DB is created, or
96 : // when a new manifest is created) and is used to verify that the comparer
97 : // specified at Open matches the comparer that was previously used.
98 : ComparerName string
99 :
100 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
101 : // mutations that have not been flushed to an sstable.
102 : //
103 : // This is an optional field, and 0 represents it is not set.
104 : MinUnflushedLogNum base.DiskFileNum
105 :
106 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
107 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
108 : // 6/2011. We keep it around purely for informational purposes when
109 : // displaying MANIFEST contents.
110 : ObsoletePrevLogNum uint64
111 :
112 : // The next file number. A single counter is used to assign file numbers
113 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
114 : NextFileNum uint64
115 :
116 : // LastSeqNum is an upper bound on the sequence numbers that have been
117 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
118 : // recovery) may contain sequence numbers greater than this value.
119 : LastSeqNum base.SeqNum
120 :
121 : // A file num may be present in both deleted files and new files when it
122 : // is moved from a lower level to a higher level (when the compaction
123 : // found that there was no overlapping file at the higher level).
124 : DeletedFiles map[DeletedFileEntry]*FileMetadata
125 : NewFiles []NewFileEntry
126 : // CreatedBackingTables can be used to preserve the FileBacking associated
127 : // with a physical sstable. This is useful when virtual sstables in the
128 : // latest version are reconstructed during manifest replay, and we also need
129 : // to reconstruct the FileBacking which is required by these virtual
130 : // sstables.
131 : //
132 : // INVARIANT: The FileBacking associated with a physical sstable must only
133 : // be added as a backing file in the same version edit where the physical
134 : // sstable is first virtualized. This means that the physical sstable must
135 : // be present in DeletedFiles and that there must be at least one virtual
136 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
137 : // file must be present in CreatedBackingTables in exactly one version edit.
138 : // The physical sstable associated with the FileBacking must also not be
139 : // present in NewFiles.
140 : CreatedBackingTables []*FileBacking
141 : // RemovedBackingTables is used to remove the FileBacking associated with a
142 : // virtual sstable. Note that a backing sstable can be removed as soon as
143 : // there are no virtual sstables in the latest version which are using the
144 : // backing sstable, but the backing sstable doesn't necessarily have to be
145 : // removed atomically with the version edit which removes the last virtual
146 : // sstable associated with the backing sstable. The removal can happen in a
147 : // future version edit.
148 : //
149 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
150 : // added to CreateBackingTables in a prior version edit. The same version
151 : // edit also cannot have the same file present in both CreateBackingTables
152 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
153 : // in exactly one version edit.
154 : RemovedBackingTables []base.DiskFileNum
155 : }
156 :
157 : // Decode decodes an edit from the specified reader.
158 : //
159 : // Note that the Decode step will not set the FileBacking for virtual sstables
160 : // and the responsibility is left to the caller. However, the Decode step will
161 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
162 2 : func (v *VersionEdit) Decode(r io.Reader) error {
163 2 : br, ok := r.(byteReader)
164 2 : if !ok {
165 2 : br = bufio.NewReader(r)
166 2 : }
167 2 : d := versionEditDecoder{br}
168 2 : for {
169 2 : tag, err := binary.ReadUvarint(br)
170 2 : if err == io.EOF {
171 2 : break
172 : }
173 2 : if err != nil {
174 0 : return err
175 0 : }
176 2 : switch tag {
177 2 : case tagComparator:
178 2 : s, err := d.readBytes()
179 2 : if err != nil {
180 0 : return err
181 0 : }
182 2 : v.ComparerName = string(s)
183 :
184 2 : case tagLogNumber:
185 2 : n, err := d.readUvarint()
186 2 : if err != nil {
187 0 : return err
188 0 : }
189 2 : v.MinUnflushedLogNum = base.DiskFileNum(n)
190 :
191 2 : case tagNextFileNumber:
192 2 : n, err := d.readUvarint()
193 2 : if err != nil {
194 0 : return err
195 0 : }
196 2 : v.NextFileNum = n
197 :
198 2 : case tagLastSequence:
199 2 : n, err := d.readUvarint()
200 2 : if err != nil {
201 0 : return err
202 0 : }
203 2 : v.LastSeqNum = base.SeqNum(n)
204 :
205 0 : case tagCompactPointer:
206 0 : if _, err := d.readLevel(); err != nil {
207 0 : return err
208 0 : }
209 0 : if _, err := d.readBytes(); err != nil {
210 0 : return err
211 0 : }
212 : // NB: RocksDB does not use compaction pointers anymore.
213 :
214 2 : case tagRemovedBackingTable:
215 2 : n, err := d.readUvarint()
216 2 : if err != nil {
217 0 : return err
218 0 : }
219 2 : v.RemovedBackingTables = append(
220 2 : v.RemovedBackingTables, base.DiskFileNum(n),
221 2 : )
222 2 : case tagCreatedBackingTable:
223 2 : dfn, err := d.readUvarint()
224 2 : if err != nil {
225 0 : return err
226 0 : }
227 2 : size, err := d.readUvarint()
228 2 : if err != nil {
229 0 : return err
230 0 : }
231 2 : fileBacking := &FileBacking{
232 2 : DiskFileNum: base.DiskFileNum(dfn),
233 2 : Size: size,
234 2 : }
235 2 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
236 2 : case tagDeletedFile:
237 2 : level, err := d.readLevel()
238 2 : if err != nil {
239 0 : return err
240 0 : }
241 2 : fileNum, err := d.readFileNum()
242 2 : if err != nil {
243 0 : return err
244 0 : }
245 2 : if v.DeletedFiles == nil {
246 2 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
247 2 : }
248 2 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
249 :
250 2 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
251 2 : level, err := d.readLevel()
252 2 : if err != nil {
253 0 : return err
254 0 : }
255 2 : fileNum, err := d.readFileNum()
256 2 : if err != nil {
257 0 : return err
258 0 : }
259 2 : if tag == tagNewFile3 {
260 0 : // The pathID field appears unused in RocksDB.
261 0 : _ /* pathID */, err := d.readUvarint()
262 0 : if err != nil {
263 0 : return err
264 0 : }
265 : }
266 2 : size, err := d.readUvarint()
267 2 : if err != nil {
268 0 : return err
269 0 : }
270 : // We read the smallest / largest key bounds differently depending on
271 : // whether we have point, range or both types of keys present in the
272 : // table.
273 2 : var (
274 2 : smallestPointKey, largestPointKey []byte
275 2 : smallestRangeKey, largestRangeKey []byte
276 2 : parsedPointBounds bool
277 2 : boundsMarker byte
278 2 : )
279 2 : if tag != tagNewFile5 {
280 2 : // Range keys not present in the table. Parse the point key bounds.
281 2 : smallestPointKey, err = d.readBytes()
282 2 : if err != nil {
283 0 : return err
284 0 : }
285 2 : largestPointKey, err = d.readBytes()
286 2 : if err != nil {
287 0 : return err
288 0 : }
289 2 : } else {
290 2 : // Range keys are present in the table. Determine whether we have point
291 2 : // keys to parse, in addition to the bounds.
292 2 : boundsMarker, err = d.ReadByte()
293 2 : if err != nil {
294 0 : return err
295 0 : }
296 : // Parse point key bounds, if present.
297 2 : if boundsMarker&maskContainsPointKeys > 0 {
298 2 : smallestPointKey, err = d.readBytes()
299 2 : if err != nil {
300 0 : return err
301 0 : }
302 2 : largestPointKey, err = d.readBytes()
303 2 : if err != nil {
304 0 : return err
305 0 : }
306 2 : parsedPointBounds = true
307 2 : } else {
308 2 : // The table does not have point keys.
309 2 : // Sanity check: the bounds must be range keys.
310 2 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
311 0 : return base.CorruptionErrorf(
312 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
313 0 : boundsMarker,
314 0 : )
315 0 : }
316 : }
317 : // Parse range key bounds.
318 2 : smallestRangeKey, err = d.readBytes()
319 2 : if err != nil {
320 0 : return err
321 0 : }
322 2 : largestRangeKey, err = d.readBytes()
323 2 : if err != nil {
324 0 : return err
325 0 : }
326 : }
327 2 : var smallestSeqNum base.SeqNum
328 2 : var largestSeqNum base.SeqNum
329 2 : if tag != tagNewFile {
330 2 : n, err := d.readUvarint()
331 2 : if err != nil {
332 0 : return err
333 0 : }
334 2 : smallestSeqNum = base.SeqNum(n)
335 2 : n, err = d.readUvarint()
336 2 : if err != nil {
337 0 : return err
338 0 : }
339 2 : largestSeqNum = base.SeqNum(n)
340 : }
341 2 : var markedForCompaction bool
342 2 : var creationTime uint64
343 2 : virtualState := struct {
344 2 : virtual bool
345 2 : backingFileNum uint64
346 2 : }{}
347 2 : var syntheticPrefix sstable.SyntheticPrefix
348 2 : var syntheticSuffix sstable.SyntheticSuffix
349 2 : if tag == tagNewFile4 || tag == tagNewFile5 {
350 2 : for {
351 2 : customTag, err := d.readUvarint()
352 2 : if err != nil {
353 0 : return err
354 0 : }
355 2 : if customTag == customTagTerminate {
356 2 : break
357 : }
358 2 : switch customTag {
359 1 : case customTagNeedsCompaction:
360 1 : field, err := d.readBytes()
361 1 : if err != nil {
362 0 : return err
363 0 : }
364 1 : if len(field) != 1 {
365 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
366 0 : }
367 1 : markedForCompaction = (field[0] == 1)
368 :
369 2 : case customTagCreationTime:
370 2 : field, err := d.readBytes()
371 2 : if err != nil {
372 0 : return err
373 0 : }
374 2 : var n int
375 2 : creationTime, n = binary.Uvarint(field)
376 2 : if n != len(field) {
377 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
378 0 : }
379 :
380 0 : case customTagPathID:
381 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
382 :
383 2 : case customTagVirtual:
384 2 : virtualState.virtual = true
385 2 : if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
386 0 : return err
387 0 : }
388 :
389 2 : case customTagSyntheticPrefix:
390 2 : synthetic, err := d.readBytes()
391 2 : if err != nil {
392 0 : return err
393 0 : }
394 2 : syntheticPrefix = synthetic
395 :
396 2 : case customTagSyntheticSuffix:
397 2 : if syntheticSuffix, err = d.readBytes(); err != nil {
398 0 : return err
399 0 : }
400 :
401 0 : default:
402 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
403 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
404 0 : }
405 0 : if _, err := d.readBytes(); err != nil {
406 0 : return err
407 0 : }
408 : }
409 : }
410 : }
411 2 : m := &FileMetadata{
412 2 : FileNum: fileNum,
413 2 : Size: size,
414 2 : CreationTime: int64(creationTime),
415 2 : SmallestSeqNum: smallestSeqNum,
416 2 : LargestSeqNum: largestSeqNum,
417 2 : LargestSeqNumAbsolute: largestSeqNum,
418 2 : MarkedForCompaction: markedForCompaction,
419 2 : Virtual: virtualState.virtual,
420 2 : SyntheticPrefix: syntheticPrefix,
421 2 : SyntheticSuffix: syntheticSuffix,
422 2 : }
423 2 : if tag != tagNewFile5 { // no range keys present
424 2 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
425 2 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
426 2 : m.HasPointKeys = true
427 2 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
428 2 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
429 2 : } else { // range keys present
430 2 : // Set point key bounds, if parsed.
431 2 : if parsedPointBounds {
432 2 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
433 2 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
434 2 : m.HasPointKeys = true
435 2 : }
436 : // Set range key bounds.
437 2 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
438 2 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
439 2 : m.HasRangeKeys = true
440 2 : // Set overall bounds (by default assume range keys).
441 2 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
442 2 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
443 2 : if boundsMarker&maskSmallest == maskSmallest {
444 2 : m.Smallest = m.SmallestPointKey
445 2 : m.boundTypeSmallest = boundTypePointKey
446 2 : }
447 2 : if boundsMarker&maskLargest == maskLargest {
448 2 : m.Largest = m.LargestPointKey
449 2 : m.boundTypeLargest = boundTypePointKey
450 2 : }
451 : }
452 2 : m.boundsSet = true
453 2 : if !virtualState.virtual {
454 2 : m.InitPhysicalBacking()
455 2 : }
456 :
457 2 : nfe := NewFileEntry{
458 2 : Level: level,
459 2 : Meta: m,
460 2 : }
461 2 : if virtualState.virtual {
462 2 : nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
463 2 : }
464 2 : v.NewFiles = append(v.NewFiles, nfe)
465 :
466 1 : case tagPrevLogNumber:
467 1 : n, err := d.readUvarint()
468 1 : if err != nil {
469 0 : return err
470 0 : }
471 1 : v.ObsoletePrevLogNum = n
472 :
473 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
474 0 : return base.CorruptionErrorf("column families are not supported")
475 :
476 0 : default:
477 0 : return errCorruptManifest
478 : }
479 : }
480 2 : return nil
481 : }
482 :
483 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
484 1 : var buf bytes.Buffer
485 1 : if v.ComparerName != "" {
486 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
487 1 : }
488 1 : if v.MinUnflushedLogNum != 0 {
489 1 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
490 1 : }
491 1 : if v.ObsoletePrevLogNum != 0 {
492 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
493 0 : }
494 1 : if v.NextFileNum != 0 {
495 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
496 1 : }
497 1 : if v.LastSeqNum != 0 {
498 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
499 1 : }
500 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
501 1 : for df := range v.DeletedFiles {
502 1 : entries = append(entries, df)
503 1 : }
504 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
505 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
506 1 : return v
507 1 : }
508 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
509 : })
510 1 : for _, df := range entries {
511 1 : fmt.Fprintf(&buf, " del-table: L%d %s\n", df.Level, df.FileNum)
512 1 : }
513 1 : for _, nf := range v.NewFiles {
514 1 : fmt.Fprintf(&buf, " add-table: L%d", nf.Level)
515 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, verbose))
516 1 : if nf.Meta.CreationTime != 0 {
517 1 : fmt.Fprintf(&buf, " (%s)",
518 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
519 1 : }
520 1 : fmt.Fprintln(&buf)
521 : }
522 :
523 1 : for _, b := range v.CreatedBackingTables {
524 1 : fmt.Fprintf(&buf, " add-backing: %s\n", b.DiskFileNum)
525 1 : }
526 1 : for _, n := range v.RemovedBackingTables {
527 1 : fmt.Fprintf(&buf, " del-backing: %s\n", n)
528 1 : }
529 1 : return buf.String()
530 : }
531 :
532 : // DebugString is a more verbose version of String(). Use this in tests.
533 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
534 1 : return v.string(true /* verbose */, fmtKey)
535 1 : }
536 :
537 : // String implements fmt.Stringer for a VersionEdit.
538 1 : func (v *VersionEdit) String() string {
539 1 : return v.string(false /* verbose */, base.DefaultFormatter)
540 1 : }
541 :
542 : // ParseVersionEditDebug parses a VersionEdit from its DebugString
543 : // implementation.
544 : //
545 : // It doesn't recognize all fields; this implementation can be filled in as
546 : // needed.
547 1 : func ParseVersionEditDebug(s string) (_ *VersionEdit, err error) {
548 1 : defer func() {
549 1 : err = errors.CombineErrors(err, maybeRecover())
550 1 : }()
551 :
552 1 : var ve VersionEdit
553 1 : for _, l := range strings.Split(s, "\n") {
554 1 : l = strings.TrimSpace(l)
555 1 : if l == "" {
556 1 : continue
557 : }
558 1 : field, value, ok := strings.Cut(l, ":")
559 1 : if !ok {
560 0 : return nil, errors.Errorf("malformed line %q", l)
561 0 : }
562 1 : field = strings.TrimSpace(field)
563 1 : p := makeDebugParser(value)
564 1 : switch field {
565 1 : case "add-table":
566 1 : level := p.Level()
567 1 : meta, err := ParseFileMetadataDebug(p.Remaining())
568 1 : if err != nil {
569 0 : return nil, err
570 0 : }
571 1 : ve.NewFiles = append(ve.NewFiles, NewFileEntry{
572 1 : Level: level,
573 1 : Meta: meta,
574 1 : })
575 :
576 1 : case "del-table":
577 1 : level := p.Level()
578 1 : num := p.FileNum()
579 1 : if ve.DeletedFiles == nil {
580 1 : ve.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
581 1 : }
582 1 : ve.DeletedFiles[DeletedFileEntry{
583 1 : Level: level,
584 1 : FileNum: num,
585 1 : }] = nil
586 :
587 1 : case "add-backing":
588 1 : n := p.DiskFileNum()
589 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, &FileBacking{
590 1 : DiskFileNum: n,
591 1 : Size: 100,
592 1 : })
593 :
594 1 : case "del-backing":
595 1 : n := p.DiskFileNum()
596 1 : ve.RemovedBackingTables = append(ve.RemovedBackingTables, n)
597 :
598 0 : default:
599 0 : return nil, errors.Errorf("field %q not implemented", field)
600 : }
601 : }
602 1 : return &ve, nil
603 : }
604 :
605 : // Encode encodes an edit to the specified writer.
606 2 : func (v *VersionEdit) Encode(w io.Writer) error {
607 2 : e := versionEditEncoder{new(bytes.Buffer)}
608 2 :
609 2 : if v.ComparerName != "" {
610 2 : e.writeUvarint(tagComparator)
611 2 : e.writeString(v.ComparerName)
612 2 : }
613 2 : if v.MinUnflushedLogNum != 0 {
614 2 : e.writeUvarint(tagLogNumber)
615 2 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
616 2 : }
617 2 : if v.ObsoletePrevLogNum != 0 {
618 1 : e.writeUvarint(tagPrevLogNumber)
619 1 : e.writeUvarint(v.ObsoletePrevLogNum)
620 1 : }
621 2 : if v.NextFileNum != 0 {
622 2 : e.writeUvarint(tagNextFileNumber)
623 2 : e.writeUvarint(uint64(v.NextFileNum))
624 2 : }
625 2 : for _, dfn := range v.RemovedBackingTables {
626 2 : e.writeUvarint(tagRemovedBackingTable)
627 2 : e.writeUvarint(uint64(dfn))
628 2 : }
629 2 : for _, fileBacking := range v.CreatedBackingTables {
630 2 : e.writeUvarint(tagCreatedBackingTable)
631 2 : e.writeUvarint(uint64(fileBacking.DiskFileNum))
632 2 : e.writeUvarint(fileBacking.Size)
633 2 : }
634 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
635 : // even though its value is zero. We detect this by encoding LastSeqNum when
636 : // ComparerName is set.
637 2 : if v.LastSeqNum != 0 || v.ComparerName != "" {
638 2 : e.writeUvarint(tagLastSequence)
639 2 : e.writeUvarint(uint64(v.LastSeqNum))
640 2 : }
641 2 : for x := range v.DeletedFiles {
642 2 : e.writeUvarint(tagDeletedFile)
643 2 : e.writeUvarint(uint64(x.Level))
644 2 : e.writeUvarint(uint64(x.FileNum))
645 2 : }
646 2 : for _, x := range v.NewFiles {
647 2 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
648 2 : var tag uint64
649 2 : switch {
650 2 : case x.Meta.HasRangeKeys:
651 2 : tag = tagNewFile5
652 2 : case customFields:
653 2 : tag = tagNewFile4
654 1 : default:
655 1 : tag = tagNewFile2
656 : }
657 2 : e.writeUvarint(tag)
658 2 : e.writeUvarint(uint64(x.Level))
659 2 : e.writeUvarint(uint64(x.Meta.FileNum))
660 2 : e.writeUvarint(x.Meta.Size)
661 2 : if !x.Meta.HasRangeKeys {
662 2 : // If we have no range keys, preserve the original format and write the
663 2 : // smallest and largest point keys.
664 2 : e.writeKey(x.Meta.SmallestPointKey)
665 2 : e.writeKey(x.Meta.LargestPointKey)
666 2 : } else {
667 2 : // When range keys are present, we first write a marker byte that
668 2 : // indicates if the table also contains point keys, in addition to how the
669 2 : // overall bounds for the table should be reconstructed. This byte is
670 2 : // followed by the keys themselves.
671 2 : b, err := x.Meta.boundsMarker()
672 2 : if err != nil {
673 0 : return err
674 0 : }
675 2 : if err = e.WriteByte(b); err != nil {
676 0 : return err
677 0 : }
678 : // Write point key bounds (if present).
679 2 : if x.Meta.HasPointKeys {
680 2 : e.writeKey(x.Meta.SmallestPointKey)
681 2 : e.writeKey(x.Meta.LargestPointKey)
682 2 : }
683 : // Write range key bounds.
684 2 : e.writeKey(x.Meta.SmallestRangeKey)
685 2 : e.writeKey(x.Meta.LargestRangeKey)
686 : }
687 2 : e.writeUvarint(uint64(x.Meta.SmallestSeqNum))
688 2 : e.writeUvarint(uint64(x.Meta.LargestSeqNum))
689 2 : if customFields {
690 2 : if x.Meta.CreationTime != 0 {
691 2 : e.writeUvarint(customTagCreationTime)
692 2 : var buf [binary.MaxVarintLen64]byte
693 2 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
694 2 : e.writeBytes(buf[:n])
695 2 : }
696 2 : if x.Meta.MarkedForCompaction {
697 1 : e.writeUvarint(customTagNeedsCompaction)
698 1 : e.writeBytes([]byte{1})
699 1 : }
700 2 : if x.Meta.Virtual {
701 2 : e.writeUvarint(customTagVirtual)
702 2 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
703 2 : }
704 2 : if x.Meta.SyntheticPrefix != nil {
705 2 : e.writeUvarint(customTagSyntheticPrefix)
706 2 : e.writeBytes(x.Meta.SyntheticPrefix)
707 2 : }
708 2 : if x.Meta.SyntheticSuffix != nil {
709 2 : e.writeUvarint(customTagSyntheticSuffix)
710 2 : e.writeBytes(x.Meta.SyntheticSuffix)
711 2 : }
712 2 : e.writeUvarint(customTagTerminate)
713 : }
714 : }
715 2 : _, err := w.Write(e.Bytes())
716 2 : return err
717 : }
718 :
719 : // versionEditDecoder should be used to decode version edits.
720 : type versionEditDecoder struct {
721 : byteReader
722 : }
723 :
724 2 : func (d versionEditDecoder) readBytes() ([]byte, error) {
725 2 : n, err := d.readUvarint()
726 2 : if err != nil {
727 0 : return nil, err
728 0 : }
729 2 : s := make([]byte, n)
730 2 : _, err = io.ReadFull(d, s)
731 2 : if err != nil {
732 0 : if err == io.ErrUnexpectedEOF {
733 0 : return nil, errCorruptManifest
734 0 : }
735 0 : return nil, err
736 : }
737 2 : return s, nil
738 : }
739 :
740 2 : func (d versionEditDecoder) readLevel() (int, error) {
741 2 : u, err := d.readUvarint()
742 2 : if err != nil {
743 0 : return 0, err
744 0 : }
745 2 : if u >= NumLevels {
746 0 : return 0, errCorruptManifest
747 0 : }
748 2 : return int(u), nil
749 : }
750 :
751 2 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
752 2 : u, err := d.readUvarint()
753 2 : if err != nil {
754 0 : return 0, err
755 0 : }
756 2 : return base.FileNum(u), nil
757 : }
758 :
759 2 : func (d versionEditDecoder) readUvarint() (uint64, error) {
760 2 : u, err := binary.ReadUvarint(d)
761 2 : if err != nil {
762 0 : if err == io.EOF {
763 0 : return 0, errCorruptManifest
764 0 : }
765 0 : return 0, err
766 : }
767 2 : return u, nil
768 : }
769 :
770 : type versionEditEncoder struct {
771 : *bytes.Buffer
772 : }
773 :
774 2 : func (e versionEditEncoder) writeBytes(p []byte) {
775 2 : e.writeUvarint(uint64(len(p)))
776 2 : e.Write(p)
777 2 : }
778 :
779 2 : func (e versionEditEncoder) writeKey(k InternalKey) {
780 2 : e.writeUvarint(uint64(k.Size()))
781 2 : e.Write(k.UserKey)
782 2 : buf := k.EncodeTrailer()
783 2 : e.Write(buf[:])
784 2 : }
785 :
786 2 : func (e versionEditEncoder) writeString(s string) {
787 2 : e.writeUvarint(uint64(len(s)))
788 2 : e.WriteString(s)
789 2 : }
790 :
791 2 : func (e versionEditEncoder) writeUvarint(u uint64) {
792 2 : var buf [binary.MaxVarintLen64]byte
793 2 : n := binary.PutUvarint(buf[:], u)
794 2 : e.Write(buf[:n])
795 2 : }
796 :
797 : // BulkVersionEdit summarizes the files added and deleted from a set of version
798 : // edits.
799 : //
800 : // INVARIANTS:
801 : // No file can be added to a level more than once. This is true globally, and
802 : // also true for all of the calls to Accumulate for a single bulk version edit.
803 : //
804 : // No file can be removed from a level more than once. This is true globally,
805 : // and also true for all of the calls to Accumulate for a single bulk version
806 : // edit.
807 : //
808 : // A file must not be added and removed from a given level in the same version
809 : // edit.
810 : //
811 : // A file that is being removed from a level must have been added to that level
812 : // before (in a prior version edit). Note that a given file can be deleted from
813 : // a level and added to another level in a single version edit
814 : type BulkVersionEdit struct {
815 : Added [NumLevels]map[base.FileNum]*FileMetadata
816 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
817 :
818 : // AddedFileBacking is a map to support lookup so that we can populate the
819 : // FileBacking of virtual sstables during manifest replay.
820 : AddedFileBacking map[base.DiskFileNum]*FileBacking
821 : RemovedFileBacking []base.DiskFileNum
822 :
823 : // AddedByFileNum maps file number to file metadata for all added files
824 : // from accumulated version edits. AddedByFileNum is only populated if set
825 : // to non-nil by a caller. It must be set to non-nil when replaying
826 : // version edits read from a MANIFEST (as opposed to VersionEdits
827 : // constructed in-memory). While replaying a MANIFEST file,
828 : // VersionEdit.DeletedFiles map entries have nil values, because the
829 : // on-disk deletion record encodes only the file number. Accumulate
830 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
831 : // field with non-nil *FileMetadata.
832 : AddedByFileNum map[base.FileNum]*FileMetadata
833 :
834 : // MarkedForCompactionCountDiff holds the aggregated count of files
835 : // marked for compaction added or removed.
836 : MarkedForCompactionCountDiff int
837 : }
838 :
839 : // Accumulate adds the file addition and deletions in the specified version
840 : // edit to the bulk edit's internal state.
841 : //
842 : // INVARIANTS:
843 : // If a file is added to a given level in a call to Accumulate and then removed
844 : // from that level in a subsequent call, the file will not be present in the
845 : // resulting BulkVersionEdit.Deleted for that level.
846 : //
847 : // After accumulation of version edits, the bulk version edit may have
848 : // information about a file which has been deleted from a level, but it may
849 : // not have information about the same file added to the same level. The add
850 : // could've occurred as part of a previous bulk version edit. In this case,
851 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
852 : // of the accumulation, because we need to decrease the refcount of the
853 : // deleted file in Apply.
854 2 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
855 2 : for df, m := range ve.DeletedFiles {
856 2 : dmap := b.Deleted[df.Level]
857 2 : if dmap == nil {
858 2 : dmap = make(map[base.FileNum]*FileMetadata)
859 2 : b.Deleted[df.Level] = dmap
860 2 : }
861 :
862 2 : if m == nil {
863 2 : // m is nil only when replaying a MANIFEST.
864 2 : if b.AddedByFileNum == nil {
865 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
866 0 : }
867 2 : m = b.AddedByFileNum[df.FileNum]
868 2 : if m == nil {
869 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
870 1 : }
871 : }
872 2 : if m.MarkedForCompaction {
873 1 : b.MarkedForCompactionCountDiff--
874 1 : }
875 2 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
876 2 : dmap[df.FileNum] = m
877 2 : } else {
878 2 : // Present in b.Added for the same level.
879 2 : delete(b.Added[df.Level], df.FileNum)
880 2 : }
881 : }
882 :
883 : // Generate state for Added backing files. Note that these must be generated
884 : // before we loop through the NewFiles, because we need to populate the
885 : // FileBackings which might be used by the NewFiles loop.
886 2 : if b.AddedFileBacking == nil {
887 2 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
888 2 : }
889 2 : for _, fb := range ve.CreatedBackingTables {
890 2 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
891 0 : // There is already a FileBacking associated with fb.DiskFileNum.
892 0 : // This should never happen. There must always be only one FileBacking
893 0 : // associated with a backing sstable.
894 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
895 : }
896 2 : b.AddedFileBacking[fb.DiskFileNum] = fb
897 : }
898 :
899 2 : for _, nf := range ve.NewFiles {
900 2 : // A new file should not have been deleted in this or a preceding
901 2 : // VersionEdit at the same level (though files can move across levels).
902 2 : if dmap := b.Deleted[nf.Level]; dmap != nil {
903 2 : if _, ok := dmap[nf.Meta.FileNum]; ok {
904 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
905 0 : }
906 : }
907 2 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
908 2 : // FileBacking for a virtual sstable must only be nil if we're performing
909 2 : // manifest replay.
910 2 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
911 2 : if nf.Meta.FileBacking == nil {
912 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
913 0 : }
914 2 : } else if nf.Meta.FileBacking == nil {
915 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
916 0 : }
917 :
918 2 : if b.Added[nf.Level] == nil {
919 2 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
920 2 : }
921 2 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
922 2 : if b.AddedByFileNum != nil {
923 2 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
924 2 : }
925 2 : if nf.Meta.MarkedForCompaction {
926 0 : b.MarkedForCompactionCountDiff++
927 0 : }
928 : }
929 :
930 2 : for _, n := range ve.RemovedBackingTables {
931 2 : if _, ok := b.AddedFileBacking[n]; ok {
932 2 : delete(b.AddedFileBacking, n)
933 2 : } else {
934 2 : // Since a file can be removed from backing files in exactly one version
935 2 : // edit it is safe to just append without any de-duplication.
936 2 : b.RemovedFileBacking = append(b.RemovedFileBacking, n)
937 2 : }
938 : }
939 :
940 2 : return nil
941 : }
942 :
943 : // Apply applies the delta b to the current version to produce a new version.
944 : // The new version is consistent with respect to the comparer.
945 : //
946 : // Apply updates the backing refcounts (Ref/Unref) as files are installed into
947 : // the levels.
948 : //
949 : // curr may be nil, which is equivalent to a pointer to a zero version.
950 : func (b *BulkVersionEdit) Apply(
951 : curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
952 2 : ) (*Version, error) {
953 2 : v := &Version{
954 2 : cmp: comparer,
955 2 : }
956 2 :
957 2 : // Adjust the count of files marked for compaction.
958 2 : if curr != nil {
959 2 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
960 2 : }
961 2 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
962 2 : if v.Stats.MarkedForCompaction < 0 {
963 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
964 0 : }
965 :
966 2 : for level := range v.Levels {
967 2 : if curr == nil || curr.Levels[level].tree.root == nil {
968 2 : v.Levels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
969 2 : } else {
970 2 : v.Levels[level] = curr.Levels[level].clone()
971 2 : }
972 2 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
973 2 : v.RangeKeyLevels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
974 2 : } else {
975 2 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
976 2 : }
977 :
978 2 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
979 2 : // There are no edits on this level.
980 2 : if level == 0 {
981 2 : // Initialize L0Sublevels.
982 2 : if curr == nil || curr.L0Sublevels == nil {
983 2 : if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
984 0 : return nil, errors.Wrap(err, "pebble: internal error")
985 0 : }
986 2 : } else {
987 2 : v.L0Sublevels = curr.L0Sublevels
988 2 : v.L0SublevelFiles = v.L0Sublevels.Levels
989 2 : }
990 : }
991 2 : continue
992 : }
993 :
994 : // Some edits on this level.
995 2 : lm := &v.Levels[level]
996 2 : lmRange := &v.RangeKeyLevels[level]
997 2 :
998 2 : addedFilesMap := b.Added[level]
999 2 : deletedFilesMap := b.Deleted[level]
1000 2 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
1001 1 : return nil, base.CorruptionErrorf(
1002 1 : "pebble: internal error: No current or added files but have deleted files: %d",
1003 1 : errors.Safe(len(deletedFilesMap)))
1004 1 : }
1005 :
1006 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
1007 : // for a level, it won't be present in deletedFilesMap for the same
1008 : // level.
1009 :
1010 2 : for _, f := range deletedFilesMap {
1011 2 : if obsolete := v.Levels[level].remove(f); obsolete {
1012 0 : // Deleting a file from the B-Tree may decrement its
1013 0 : // reference count. However, because we cloned the
1014 0 : // previous level's B-Tree, this should never result in a
1015 0 : // file's reference count dropping to zero.
1016 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
1017 0 : return nil, err
1018 0 : }
1019 2 : if f.HasRangeKeys {
1020 2 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
1021 0 : // Deleting a file from the B-Tree may decrement its
1022 0 : // reference count. However, because we cloned the
1023 0 : // previous level's B-Tree, this should never result in a
1024 0 : // file's reference count dropping to zero.
1025 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
1026 0 : return nil, err
1027 0 : }
1028 : }
1029 : }
1030 :
1031 2 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1032 2 : for _, f := range addedFilesMap {
1033 2 : addedFiles = append(addedFiles, f)
1034 2 : }
1035 : // Sort addedFiles by file number. This isn't necessary, but tests which
1036 : // replay invalid manifests check the error output, and the error output
1037 : // depends on the order in which files are added to the btree.
1038 2 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1039 2 : return stdcmp.Compare(a.FileNum, b.FileNum)
1040 2 : })
1041 :
1042 2 : var sm, la *FileMetadata
1043 2 : for _, f := range addedFiles {
1044 2 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1045 2 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1046 2 : var allowedSeeks int64
1047 2 : if readCompactionRate != 0 {
1048 2 : allowedSeeks = int64(f.Size) / readCompactionRate
1049 2 : }
1050 2 : if allowedSeeks < 100 {
1051 2 : allowedSeeks = 100
1052 2 : }
1053 2 : f.AllowedSeeks.Store(allowedSeeks)
1054 2 : f.InitAllowedSeeks = allowedSeeks
1055 2 :
1056 2 : err := lm.insert(f)
1057 2 : if err != nil {
1058 1 : return nil, errors.Wrap(err, "pebble")
1059 1 : }
1060 2 : if f.HasRangeKeys {
1061 2 : err = lmRange.insert(f)
1062 2 : if err != nil {
1063 0 : return nil, errors.Wrap(err, "pebble")
1064 0 : }
1065 : }
1066 : // Track the keys with the smallest and largest keys, so that we can
1067 : // check consistency of the modified span.
1068 2 : if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
1069 2 : sm = f
1070 2 : }
1071 2 : if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
1072 2 : la = f
1073 2 : }
1074 : }
1075 :
1076 2 : if level == 0 {
1077 2 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1078 2 : // Flushes and ingestions that do not delete any L0 files do not require
1079 2 : // a regeneration of L0Sublevels from scratch. We can instead generate
1080 2 : // it incrementally.
1081 2 : var err error
1082 2 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1083 2 : SortBySeqNum(addedFiles)
1084 2 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1085 2 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1086 2 : err = v.InitL0Sublevels(flushSplitBytes)
1087 2 : } else if invariants.Enabled && err == nil {
1088 2 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
1089 2 : if err != nil {
1090 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1091 : }
1092 2 : s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
1093 2 : s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
1094 2 : if s1 != s2 {
1095 0 : // Add verbosity.
1096 0 : s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
1097 0 : s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
1098 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1099 : }
1100 : }
1101 2 : if err != nil {
1102 0 : return nil, errors.Wrap(err, "pebble: internal error")
1103 0 : }
1104 2 : v.L0SublevelFiles = v.L0Sublevels.Levels
1105 2 : } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
1106 0 : return nil, errors.Wrap(err, "pebble: internal error")
1107 0 : }
1108 2 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
1109 0 : return nil, errors.Wrap(err, "pebble: internal error")
1110 0 : }
1111 2 : continue
1112 : }
1113 :
1114 : // Check consistency of the level in the vicinity of our edits.
1115 2 : if sm != nil && la != nil {
1116 2 : overlap := v.Levels[level].Slice().Overlaps(comparer.Compare, sm.UserKeyBounds())
1117 2 : // overlap contains all of the added files. We want to ensure that
1118 2 : // the added files are consistent with neighboring existing files
1119 2 : // too, so reslice the overlap to pull in a neighbor on each side.
1120 2 : check := overlap.Reslice(func(start, end *LevelIterator) {
1121 2 : if m := start.Prev(); m == nil {
1122 2 : start.Next()
1123 2 : }
1124 2 : if m := end.Next(); m == nil {
1125 2 : end.Prev()
1126 2 : }
1127 : })
1128 2 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
1129 1 : return nil, errors.Wrap(err, "pebble: internal error")
1130 1 : }
1131 : }
1132 : }
1133 2 : return v, nil
1134 : }
|