Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "strings"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 : // TODO(peter): describe the MANIFEST file format, independently of the C++
25 : // project.
26 :
27 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
28 :
29 : type byteReader interface {
30 : io.ByteReader
31 : io.Reader
32 : }
33 :
34 : // Tags for the versionEdit disk format.
35 : // Tag 8 is no longer used.
36 : const (
37 : // LevelDB tags.
38 : tagComparator = 1
39 : tagLogNumber = 2
40 : tagNextFileNumber = 3
41 : tagLastSequence = 4
42 : tagCompactPointer = 5
43 : tagDeletedFile = 6
44 : tagNewFile = 7
45 : tagPrevLogNumber = 9
46 :
47 : // RocksDB tags.
48 : tagNewFile2 = 100
49 : tagNewFile3 = 102
50 : tagNewFile4 = 103
51 : tagColumnFamily = 200
52 : tagColumnFamilyAdd = 201
53 : tagColumnFamilyDrop = 202
54 : tagMaxColumnFamily = 203
55 :
56 : // Pebble tags.
57 : tagNewFile5 = 104 // Range keys.
58 : tagCreatedBackingTable = 105
59 : tagRemovedBackingTable = 106
60 :
61 : // The custom tags sub-format used by tagNewFile4 and above. All tags less
62 : // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
63 : // a single bytes field.
64 : customTagTerminate = 1
65 : customTagNeedsCompaction = 2
66 : customTagCreationTime = 6
67 : customTagPathID = 65
68 : customTagNonSafeIgnoreMask = 1 << 6
69 : customTagVirtual = 66
70 : customTagSyntheticPrefix = 67
71 : customTagSyntheticSuffix = 68
72 : )
73 :
74 : // DeletedFileEntry holds the state for a file deletion from a level. The file
75 : // itself might still be referenced by another level.
76 : type DeletedFileEntry struct {
77 : Level int
78 : FileNum base.FileNum
79 : }
80 :
81 : // NewFileEntry holds the state for a new file or one moved from a different
82 : // level.
83 : type NewFileEntry struct {
84 : Level int
85 : Meta *FileMetadata
86 : // BackingFileNum is only set during manifest replay, and only for virtual
87 : // sstables.
88 : BackingFileNum base.DiskFileNum
89 : }
90 :
91 : // VersionEdit holds the state for an edit to a Version along with other
92 : // on-disk state (log numbers, next file number, and the last sequence number).
93 : type VersionEdit struct {
94 : // ComparerName is the value of Options.Comparer.Name. This is only set in
95 : // the first VersionEdit in a manifest (either when the DB is created, or
96 : // when a new manifest is created) and is used to verify that the comparer
97 : // specified at Open matches the comparer that was previously used.
98 : ComparerName string
99 :
100 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
101 : // mutations that have not been flushed to an sstable.
102 : //
103 : // This is an optional field, and 0 represents it is not set.
104 : MinUnflushedLogNum base.DiskFileNum
105 :
106 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
107 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
108 : // 6/2011. We keep it around purely for informational purposes when
109 : // displaying MANIFEST contents.
110 : ObsoletePrevLogNum uint64
111 :
112 : // The next file number. A single counter is used to assign file numbers
113 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
114 : NextFileNum uint64
115 :
116 : // LastSeqNum is an upper bound on the sequence numbers that have been
117 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
118 : // recovery) may contain sequence numbers greater than this value.
119 : LastSeqNum base.SeqNum
120 :
121 : // A file num may be present in both deleted files and new files when it
122 : // is moved from a lower level to a higher level (when the compaction
123 : // found that there was no overlapping file at the higher level).
124 : DeletedFiles map[DeletedFileEntry]*FileMetadata
125 : NewFiles []NewFileEntry
126 : // CreatedBackingTables can be used to preserve the FileBacking associated
127 : // with a physical sstable. This is useful when virtual sstables in the
128 : // latest version are reconstructed during manifest replay, and we also need
129 : // to reconstruct the FileBacking which is required by these virtual
130 : // sstables.
131 : //
132 : // INVARIANT: The FileBacking associated with a physical sstable must only
133 : // be added as a backing file in the same version edit where the physical
134 : // sstable is first virtualized. This means that the physical sstable must
135 : // be present in DeletedFiles and that there must be at least one virtual
136 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
137 : // file must be present in CreatedBackingTables in exactly one version edit.
138 : // The physical sstable associated with the FileBacking must also not be
139 : // present in NewFiles.
140 : CreatedBackingTables []*FileBacking
141 : // RemovedBackingTables is used to remove the FileBacking associated with a
142 : // virtual sstable. Note that a backing sstable can be removed as soon as
143 : // there are no virtual sstables in the latest version which are using the
144 : // backing sstable, but the backing sstable doesn't necessarily have to be
145 : // removed atomically with the version edit which removes the last virtual
146 : // sstable associated with the backing sstable. The removal can happen in a
147 : // future version edit.
148 : //
149 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
150 : // added to CreateBackingTables in a prior version edit. The same version
151 : // edit also cannot have the same file present in both CreateBackingTables
152 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
153 : // in exactly one version edit.
154 : RemovedBackingTables []base.DiskFileNum
155 : }
156 :
157 : // Decode decodes an edit from the specified reader.
158 : //
159 : // Note that the Decode step will not set the FileBacking for virtual sstables
160 : // and the responsibility is left to the caller. However, the Decode step will
161 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
162 1 : func (v *VersionEdit) Decode(r io.Reader) error {
163 1 : br, ok := r.(byteReader)
164 1 : if !ok {
165 1 : br = bufio.NewReader(r)
166 1 : }
167 1 : d := versionEditDecoder{br}
168 1 : for {
169 1 : tag, err := binary.ReadUvarint(br)
170 1 : if err == io.EOF {
171 1 : break
172 : }
173 1 : if err != nil {
174 0 : return err
175 0 : }
176 1 : switch tag {
177 1 : case tagComparator:
178 1 : s, err := d.readBytes()
179 1 : if err != nil {
180 0 : return err
181 0 : }
182 1 : v.ComparerName = string(s)
183 :
184 1 : case tagLogNumber:
185 1 : n, err := d.readUvarint()
186 1 : if err != nil {
187 0 : return err
188 0 : }
189 1 : v.MinUnflushedLogNum = base.DiskFileNum(n)
190 :
191 1 : case tagNextFileNumber:
192 1 : n, err := d.readUvarint()
193 1 : if err != nil {
194 0 : return err
195 0 : }
196 1 : v.NextFileNum = n
197 :
198 1 : case tagLastSequence:
199 1 : n, err := d.readUvarint()
200 1 : if err != nil {
201 0 : return err
202 0 : }
203 1 : v.LastSeqNum = base.SeqNum(n)
204 :
205 0 : case tagCompactPointer:
206 0 : if _, err := d.readLevel(); err != nil {
207 0 : return err
208 0 : }
209 0 : if _, err := d.readBytes(); err != nil {
210 0 : return err
211 0 : }
212 : // NB: RocksDB does not use compaction pointers anymore.
213 :
214 1 : case tagRemovedBackingTable:
215 1 : n, err := d.readUvarint()
216 1 : if err != nil {
217 0 : return err
218 0 : }
219 1 : v.RemovedBackingTables = append(
220 1 : v.RemovedBackingTables, base.DiskFileNum(n),
221 1 : )
222 1 : case tagCreatedBackingTable:
223 1 : dfn, err := d.readUvarint()
224 1 : if err != nil {
225 0 : return err
226 0 : }
227 1 : size, err := d.readUvarint()
228 1 : if err != nil {
229 0 : return err
230 0 : }
231 1 : fileBacking := &FileBacking{
232 1 : DiskFileNum: base.DiskFileNum(dfn),
233 1 : Size: size,
234 1 : }
235 1 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
236 1 : case tagDeletedFile:
237 1 : level, err := d.readLevel()
238 1 : if err != nil {
239 0 : return err
240 0 : }
241 1 : fileNum, err := d.readFileNum()
242 1 : if err != nil {
243 0 : return err
244 0 : }
245 1 : if v.DeletedFiles == nil {
246 1 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
247 1 : }
248 1 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
249 :
250 1 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
251 1 : level, err := d.readLevel()
252 1 : if err != nil {
253 0 : return err
254 0 : }
255 1 : fileNum, err := d.readFileNum()
256 1 : if err != nil {
257 0 : return err
258 0 : }
259 1 : if tag == tagNewFile3 {
260 0 : // The pathID field appears unused in RocksDB.
261 0 : _ /* pathID */, err := d.readUvarint()
262 0 : if err != nil {
263 0 : return err
264 0 : }
265 : }
266 1 : size, err := d.readUvarint()
267 1 : if err != nil {
268 0 : return err
269 0 : }
270 : // We read the smallest / largest key bounds differently depending on
271 : // whether we have point, range or both types of keys present in the
272 : // table.
273 1 : var (
274 1 : smallestPointKey, largestPointKey []byte
275 1 : smallestRangeKey, largestRangeKey []byte
276 1 : parsedPointBounds bool
277 1 : boundsMarker byte
278 1 : )
279 1 : if tag != tagNewFile5 {
280 1 : // Range keys not present in the table. Parse the point key bounds.
281 1 : smallestPointKey, err = d.readBytes()
282 1 : if err != nil {
283 0 : return err
284 0 : }
285 1 : largestPointKey, err = d.readBytes()
286 1 : if err != nil {
287 0 : return err
288 0 : }
289 1 : } else {
290 1 : // Range keys are present in the table. Determine whether we have point
291 1 : // keys to parse, in addition to the bounds.
292 1 : boundsMarker, err = d.ReadByte()
293 1 : if err != nil {
294 0 : return err
295 0 : }
296 : // Parse point key bounds, if present.
297 1 : if boundsMarker&maskContainsPointKeys > 0 {
298 1 : smallestPointKey, err = d.readBytes()
299 1 : if err != nil {
300 0 : return err
301 0 : }
302 1 : largestPointKey, err = d.readBytes()
303 1 : if err != nil {
304 0 : return err
305 0 : }
306 1 : parsedPointBounds = true
307 1 : } else {
308 1 : // The table does not have point keys.
309 1 : // Sanity check: the bounds must be range keys.
310 1 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
311 0 : return base.CorruptionErrorf(
312 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
313 0 : boundsMarker,
314 0 : )
315 0 : }
316 : }
317 : // Parse range key bounds.
318 1 : smallestRangeKey, err = d.readBytes()
319 1 : if err != nil {
320 0 : return err
321 0 : }
322 1 : largestRangeKey, err = d.readBytes()
323 1 : if err != nil {
324 0 : return err
325 0 : }
326 : }
327 1 : var smallestSeqNum base.SeqNum
328 1 : var largestSeqNum base.SeqNum
329 1 : if tag != tagNewFile {
330 1 : n, err := d.readUvarint()
331 1 : if err != nil {
332 0 : return err
333 0 : }
334 1 : smallestSeqNum = base.SeqNum(n)
335 1 : n, err = d.readUvarint()
336 1 : if err != nil {
337 0 : return err
338 0 : }
339 1 : largestSeqNum = base.SeqNum(n)
340 : }
341 1 : var markedForCompaction bool
342 1 : var creationTime uint64
343 1 : virtualState := struct {
344 1 : virtual bool
345 1 : backingFileNum uint64
346 1 : }{}
347 1 : var syntheticPrefix sstable.SyntheticPrefix
348 1 : var syntheticSuffix sstable.SyntheticSuffix
349 1 : if tag == tagNewFile4 || tag == tagNewFile5 {
350 1 : for {
351 1 : customTag, err := d.readUvarint()
352 1 : if err != nil {
353 0 : return err
354 0 : }
355 1 : if customTag == customTagTerminate {
356 1 : break
357 : }
358 1 : switch customTag {
359 1 : case customTagNeedsCompaction:
360 1 : field, err := d.readBytes()
361 1 : if err != nil {
362 0 : return err
363 0 : }
364 1 : if len(field) != 1 {
365 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
366 0 : }
367 1 : markedForCompaction = (field[0] == 1)
368 :
369 1 : case customTagCreationTime:
370 1 : field, err := d.readBytes()
371 1 : if err != nil {
372 0 : return err
373 0 : }
374 1 : var n int
375 1 : creationTime, n = binary.Uvarint(field)
376 1 : if n != len(field) {
377 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
378 0 : }
379 :
380 0 : case customTagPathID:
381 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
382 :
383 1 : case customTagVirtual:
384 1 : virtualState.virtual = true
385 1 : if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
386 0 : return err
387 0 : }
388 :
389 1 : case customTagSyntheticPrefix:
390 1 : synthetic, err := d.readBytes()
391 1 : if err != nil {
392 0 : return err
393 0 : }
394 1 : syntheticPrefix = synthetic
395 :
396 1 : case customTagSyntheticSuffix:
397 1 : if syntheticSuffix, err = d.readBytes(); err != nil {
398 0 : return err
399 0 : }
400 :
401 0 : default:
402 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
403 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
404 0 : }
405 0 : if _, err := d.readBytes(); err != nil {
406 0 : return err
407 0 : }
408 : }
409 : }
410 : }
411 1 : m := &FileMetadata{
412 1 : FileNum: fileNum,
413 1 : Size: size,
414 1 : CreationTime: int64(creationTime),
415 1 : SmallestSeqNum: smallestSeqNum,
416 1 : LargestSeqNum: largestSeqNum,
417 1 : LargestSeqNumAbsolute: largestSeqNum,
418 1 : MarkedForCompaction: markedForCompaction,
419 1 : Virtual: virtualState.virtual,
420 1 : SyntheticPrefix: syntheticPrefix,
421 1 : SyntheticSuffix: syntheticSuffix,
422 1 : }
423 1 : if tag != tagNewFile5 { // no range keys present
424 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
425 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
426 1 : m.HasPointKeys = true
427 1 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
428 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
429 1 : } else { // range keys present
430 1 : // Set point key bounds, if parsed.
431 1 : if parsedPointBounds {
432 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
433 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
434 1 : m.HasPointKeys = true
435 1 : }
436 : // Set range key bounds.
437 1 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
438 1 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
439 1 : m.HasRangeKeys = true
440 1 : // Set overall bounds (by default assume range keys).
441 1 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
442 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
443 1 : if boundsMarker&maskSmallest == maskSmallest {
444 1 : m.Smallest = m.SmallestPointKey
445 1 : m.boundTypeSmallest = boundTypePointKey
446 1 : }
447 1 : if boundsMarker&maskLargest == maskLargest {
448 1 : m.Largest = m.LargestPointKey
449 1 : m.boundTypeLargest = boundTypePointKey
450 1 : }
451 : }
452 1 : m.boundsSet = true
453 1 : if !virtualState.virtual {
454 1 : m.InitPhysicalBacking()
455 1 : }
456 :
457 1 : nfe := NewFileEntry{
458 1 : Level: level,
459 1 : Meta: m,
460 1 : }
461 1 : if virtualState.virtual {
462 1 : nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
463 1 : }
464 1 : v.NewFiles = append(v.NewFiles, nfe)
465 :
466 1 : case tagPrevLogNumber:
467 1 : n, err := d.readUvarint()
468 1 : if err != nil {
469 0 : return err
470 0 : }
471 1 : v.ObsoletePrevLogNum = n
472 :
473 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
474 0 : return base.CorruptionErrorf("column families are not supported")
475 :
476 0 : default:
477 0 : return errCorruptManifest
478 : }
479 : }
480 1 : return nil
481 : }
482 :
483 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
484 1 : var buf bytes.Buffer
485 1 : if v.ComparerName != "" {
486 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
487 1 : }
488 1 : if v.MinUnflushedLogNum != 0 {
489 1 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
490 1 : }
491 1 : if v.ObsoletePrevLogNum != 0 {
492 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
493 0 : }
494 1 : if v.NextFileNum != 0 {
495 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
496 1 : }
497 1 : if v.LastSeqNum != 0 {
498 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
499 1 : }
500 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
501 1 : for df := range v.DeletedFiles {
502 1 : entries = append(entries, df)
503 1 : }
504 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
505 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
506 1 : return v
507 1 : }
508 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
509 : })
510 1 : for _, df := range entries {
511 1 : fmt.Fprintf(&buf, " del-table: L%d %s\n", df.Level, df.FileNum)
512 1 : }
513 1 : for _, nf := range v.NewFiles {
514 1 : fmt.Fprintf(&buf, " add-table: L%d", nf.Level)
515 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, verbose))
516 1 : if nf.Meta.CreationTime != 0 {
517 1 : fmt.Fprintf(&buf, " (%s)",
518 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
519 1 : }
520 1 : fmt.Fprintln(&buf)
521 : }
522 :
523 1 : for _, b := range v.CreatedBackingTables {
524 1 : fmt.Fprintf(&buf, " add-backing: %s\n", b.DiskFileNum)
525 1 : }
526 1 : for _, n := range v.RemovedBackingTables {
527 1 : fmt.Fprintf(&buf, " del-backing: %s\n", n)
528 1 : }
529 1 : return buf.String()
530 : }
531 :
532 : // DebugString is a more verbose version of String(). Use this in tests.
533 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
534 1 : return v.string(true /* verbose */, fmtKey)
535 1 : }
536 :
537 : // String implements fmt.Stringer for a VersionEdit.
538 1 : func (v *VersionEdit) String() string {
539 1 : return v.string(false /* verbose */, base.DefaultFormatter)
540 1 : }
541 :
542 : // ParseVersionEditDebug parses a VersionEdit from its DebugString
543 : // implementation.
544 : //
545 : // It doesn't recognize all fields; this implementation can be filled in as
546 : // needed.
547 1 : func ParseVersionEditDebug(s string) (_ *VersionEdit, err error) {
548 1 : defer func() {
549 1 : err = errors.CombineErrors(err, maybeRecover())
550 1 : }()
551 :
552 1 : var ve VersionEdit
553 1 : for _, l := range strings.Split(s, "\n") {
554 1 : l = strings.TrimSpace(l)
555 1 : if l == "" {
556 1 : continue
557 : }
558 1 : field, value, ok := strings.Cut(l, ":")
559 1 : if !ok {
560 0 : return nil, errors.Errorf("malformed line %q", l)
561 0 : }
562 1 : field = strings.TrimSpace(field)
563 1 : p := makeDebugParser(value)
564 1 : switch field {
565 1 : case "add-table":
566 1 : level := p.Level()
567 1 : meta, err := ParseFileMetadataDebug(p.Remaining())
568 1 : if err != nil {
569 0 : return nil, err
570 0 : }
571 1 : ve.NewFiles = append(ve.NewFiles, NewFileEntry{
572 1 : Level: level,
573 1 : Meta: meta,
574 1 : })
575 :
576 1 : case "del-table":
577 1 : level := p.Level()
578 1 : num := p.FileNum()
579 1 : if ve.DeletedFiles == nil {
580 1 : ve.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
581 1 : }
582 1 : ve.DeletedFiles[DeletedFileEntry{
583 1 : Level: level,
584 1 : FileNum: num,
585 1 : }] = nil
586 :
587 1 : case "add-backing":
588 1 : n := p.DiskFileNum()
589 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, &FileBacking{
590 1 : DiskFileNum: n,
591 1 : Size: 100,
592 1 : })
593 :
594 1 : case "del-backing":
595 1 : n := p.DiskFileNum()
596 1 : ve.RemovedBackingTables = append(ve.RemovedBackingTables, n)
597 :
598 0 : default:
599 0 : return nil, errors.Errorf("field %q not implemented", field)
600 : }
601 : }
602 1 : return &ve, nil
603 : }
604 :
605 : // Encode encodes an edit to the specified writer.
606 1 : func (v *VersionEdit) Encode(w io.Writer) error {
607 1 : e := versionEditEncoder{new(bytes.Buffer)}
608 1 :
609 1 : if v.ComparerName != "" {
610 1 : e.writeUvarint(tagComparator)
611 1 : e.writeString(v.ComparerName)
612 1 : }
613 1 : if v.MinUnflushedLogNum != 0 {
614 1 : e.writeUvarint(tagLogNumber)
615 1 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
616 1 : }
617 1 : if v.ObsoletePrevLogNum != 0 {
618 1 : e.writeUvarint(tagPrevLogNumber)
619 1 : e.writeUvarint(v.ObsoletePrevLogNum)
620 1 : }
621 1 : if v.NextFileNum != 0 {
622 1 : e.writeUvarint(tagNextFileNumber)
623 1 : e.writeUvarint(uint64(v.NextFileNum))
624 1 : }
625 1 : for _, dfn := range v.RemovedBackingTables {
626 1 : e.writeUvarint(tagRemovedBackingTable)
627 1 : e.writeUvarint(uint64(dfn))
628 1 : }
629 1 : for _, fileBacking := range v.CreatedBackingTables {
630 1 : e.writeUvarint(tagCreatedBackingTable)
631 1 : e.writeUvarint(uint64(fileBacking.DiskFileNum))
632 1 : e.writeUvarint(fileBacking.Size)
633 1 : }
634 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
635 : // even though its value is zero. We detect this by encoding LastSeqNum when
636 : // ComparerName is set.
637 1 : if v.LastSeqNum != 0 || v.ComparerName != "" {
638 1 : e.writeUvarint(tagLastSequence)
639 1 : e.writeUvarint(uint64(v.LastSeqNum))
640 1 : }
641 1 : for x := range v.DeletedFiles {
642 1 : e.writeUvarint(tagDeletedFile)
643 1 : e.writeUvarint(uint64(x.Level))
644 1 : e.writeUvarint(uint64(x.FileNum))
645 1 : }
646 1 : for _, x := range v.NewFiles {
647 1 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
648 1 : var tag uint64
649 1 : switch {
650 1 : case x.Meta.HasRangeKeys:
651 1 : tag = tagNewFile5
652 1 : case customFields:
653 1 : tag = tagNewFile4
654 1 : default:
655 1 : tag = tagNewFile2
656 : }
657 1 : e.writeUvarint(tag)
658 1 : e.writeUvarint(uint64(x.Level))
659 1 : e.writeUvarint(uint64(x.Meta.FileNum))
660 1 : e.writeUvarint(x.Meta.Size)
661 1 : if !x.Meta.HasRangeKeys {
662 1 : // If we have no range keys, preserve the original format and write the
663 1 : // smallest and largest point keys.
664 1 : e.writeKey(x.Meta.SmallestPointKey)
665 1 : e.writeKey(x.Meta.LargestPointKey)
666 1 : } else {
667 1 : // When range keys are present, we first write a marker byte that
668 1 : // indicates if the table also contains point keys, in addition to how the
669 1 : // overall bounds for the table should be reconstructed. This byte is
670 1 : // followed by the keys themselves.
671 1 : b, err := x.Meta.boundsMarker()
672 1 : if err != nil {
673 0 : return err
674 0 : }
675 1 : if err = e.WriteByte(b); err != nil {
676 0 : return err
677 0 : }
678 : // Write point key bounds (if present).
679 1 : if x.Meta.HasPointKeys {
680 1 : e.writeKey(x.Meta.SmallestPointKey)
681 1 : e.writeKey(x.Meta.LargestPointKey)
682 1 : }
683 : // Write range key bounds.
684 1 : e.writeKey(x.Meta.SmallestRangeKey)
685 1 : e.writeKey(x.Meta.LargestRangeKey)
686 : }
687 1 : e.writeUvarint(uint64(x.Meta.SmallestSeqNum))
688 1 : e.writeUvarint(uint64(x.Meta.LargestSeqNum))
689 1 : if customFields {
690 1 : if x.Meta.CreationTime != 0 {
691 1 : e.writeUvarint(customTagCreationTime)
692 1 : var buf [binary.MaxVarintLen64]byte
693 1 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
694 1 : e.writeBytes(buf[:n])
695 1 : }
696 1 : if x.Meta.MarkedForCompaction {
697 1 : e.writeUvarint(customTagNeedsCompaction)
698 1 : e.writeBytes([]byte{1})
699 1 : }
700 1 : if x.Meta.Virtual {
701 1 : e.writeUvarint(customTagVirtual)
702 1 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
703 1 : }
704 1 : if x.Meta.SyntheticPrefix != nil {
705 1 : e.writeUvarint(customTagSyntheticPrefix)
706 1 : e.writeBytes(x.Meta.SyntheticPrefix)
707 1 : }
708 1 : if x.Meta.SyntheticSuffix != nil {
709 1 : e.writeUvarint(customTagSyntheticSuffix)
710 1 : e.writeBytes(x.Meta.SyntheticSuffix)
711 1 : }
712 1 : e.writeUvarint(customTagTerminate)
713 : }
714 : }
715 1 : _, err := w.Write(e.Bytes())
716 1 : return err
717 : }
718 :
719 : // versionEditDecoder should be used to decode version edits.
720 : type versionEditDecoder struct {
721 : byteReader
722 : }
723 :
724 1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
725 1 : n, err := d.readUvarint()
726 1 : if err != nil {
727 0 : return nil, err
728 0 : }
729 1 : s := make([]byte, n)
730 1 : _, err = io.ReadFull(d, s)
731 1 : if err != nil {
732 0 : if err == io.ErrUnexpectedEOF {
733 0 : return nil, errCorruptManifest
734 0 : }
735 0 : return nil, err
736 : }
737 1 : return s, nil
738 : }
739 :
740 1 : func (d versionEditDecoder) readLevel() (int, error) {
741 1 : u, err := d.readUvarint()
742 1 : if err != nil {
743 0 : return 0, err
744 0 : }
745 1 : if u >= NumLevels {
746 0 : return 0, errCorruptManifest
747 0 : }
748 1 : return int(u), nil
749 : }
750 :
751 1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
752 1 : u, err := d.readUvarint()
753 1 : if err != nil {
754 0 : return 0, err
755 0 : }
756 1 : return base.FileNum(u), nil
757 : }
758 :
759 1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
760 1 : u, err := binary.ReadUvarint(d)
761 1 : if err != nil {
762 0 : if err == io.EOF {
763 0 : return 0, errCorruptManifest
764 0 : }
765 0 : return 0, err
766 : }
767 1 : return u, nil
768 : }
769 :
770 : type versionEditEncoder struct {
771 : *bytes.Buffer
772 : }
773 :
774 1 : func (e versionEditEncoder) writeBytes(p []byte) {
775 1 : e.writeUvarint(uint64(len(p)))
776 1 : e.Write(p)
777 1 : }
778 :
779 1 : func (e versionEditEncoder) writeKey(k InternalKey) {
780 1 : e.writeUvarint(uint64(k.Size()))
781 1 : e.Write(k.UserKey)
782 1 : buf := k.EncodeTrailer()
783 1 : e.Write(buf[:])
784 1 : }
785 :
786 1 : func (e versionEditEncoder) writeString(s string) {
787 1 : e.writeUvarint(uint64(len(s)))
788 1 : e.WriteString(s)
789 1 : }
790 :
791 1 : func (e versionEditEncoder) writeUvarint(u uint64) {
792 1 : var buf [binary.MaxVarintLen64]byte
793 1 : n := binary.PutUvarint(buf[:], u)
794 1 : e.Write(buf[:n])
795 1 : }
796 :
797 : // BulkVersionEdit summarizes the files added and deleted from a set of version
798 : // edits.
799 : //
800 : // INVARIANTS:
801 : // No file can be added to a level more than once. This is true globally, and
802 : // also true for all of the calls to Accumulate for a single bulk version edit.
803 : //
804 : // No file can be removed from a level more than once. This is true globally,
805 : // and also true for all of the calls to Accumulate for a single bulk version
806 : // edit.
807 : //
808 : // A file must not be added and removed from a given level in the same version
809 : // edit.
810 : //
811 : // A file that is being removed from a level must have been added to that level
812 : // before (in a prior version edit). Note that a given file can be deleted from
813 : // a level and added to another level in a single version edit
814 : type BulkVersionEdit struct {
815 : Added [NumLevels]map[base.FileNum]*FileMetadata
816 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
817 :
818 : // AddedFileBacking is a map to support lookup so that we can populate the
819 : // FileBacking of virtual sstables during manifest replay.
820 : AddedFileBacking map[base.DiskFileNum]*FileBacking
821 : RemovedFileBacking []base.DiskFileNum
822 :
823 : // AddedByFileNum maps file number to file metadata for all added files
824 : // from accumulated version edits. AddedByFileNum is only populated if set
825 : // to non-nil by a caller. It must be set to non-nil when replaying
826 : // version edits read from a MANIFEST (as opposed to VersionEdits
827 : // constructed in-memory). While replaying a MANIFEST file,
828 : // VersionEdit.DeletedFiles map entries have nil values, because the
829 : // on-disk deletion record encodes only the file number. Accumulate
830 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
831 : // field with non-nil *FileMetadata.
832 : AddedByFileNum map[base.FileNum]*FileMetadata
833 :
834 : // MarkedForCompactionCountDiff holds the aggregated count of files
835 : // marked for compaction added or removed.
836 : MarkedForCompactionCountDiff int
837 : }
838 :
839 : // Accumulate adds the file addition and deletions in the specified version
840 : // edit to the bulk edit's internal state.
841 : //
842 : // INVARIANTS:
843 : // If a file is added to a given level in a call to Accumulate and then removed
844 : // from that level in a subsequent call, the file will not be present in the
845 : // resulting BulkVersionEdit.Deleted for that level.
846 : //
847 : // After accumulation of version edits, the bulk version edit may have
848 : // information about a file which has been deleted from a level, but it may
849 : // not have information about the same file added to the same level. The add
850 : // could've occurred as part of a previous bulk version edit. In this case,
851 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
852 : // of the accumulation, because we need to decrease the refcount of the
853 : // deleted file in Apply.
854 1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
855 1 : for df, m := range ve.DeletedFiles {
856 1 : dmap := b.Deleted[df.Level]
857 1 : if dmap == nil {
858 1 : dmap = make(map[base.FileNum]*FileMetadata)
859 1 : b.Deleted[df.Level] = dmap
860 1 : }
861 :
862 1 : if m == nil {
863 1 : // m is nil only when replaying a MANIFEST.
864 1 : if b.AddedByFileNum == nil {
865 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
866 0 : }
867 1 : m = b.AddedByFileNum[df.FileNum]
868 1 : if m == nil {
869 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
870 1 : }
871 : }
872 1 : if m.MarkedForCompaction {
873 1 : b.MarkedForCompactionCountDiff--
874 1 : }
875 1 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
876 1 : dmap[df.FileNum] = m
877 1 : } else {
878 1 : // Present in b.Added for the same level.
879 1 : delete(b.Added[df.Level], df.FileNum)
880 1 : }
881 : }
882 :
883 : // Generate state for Added backing files. Note that these must be generated
884 : // before we loop through the NewFiles, because we need to populate the
885 : // FileBackings which might be used by the NewFiles loop.
886 1 : if b.AddedFileBacking == nil {
887 1 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
888 1 : }
889 1 : for _, fb := range ve.CreatedBackingTables {
890 1 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
891 0 : // There is already a FileBacking associated with fb.DiskFileNum.
892 0 : // This should never happen. There must always be only one FileBacking
893 0 : // associated with a backing sstable.
894 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
895 : }
896 1 : b.AddedFileBacking[fb.DiskFileNum] = fb
897 : }
898 :
899 1 : for _, nf := range ve.NewFiles {
900 1 : // A new file should not have been deleted in this or a preceding
901 1 : // VersionEdit at the same level (though files can move across levels).
902 1 : if dmap := b.Deleted[nf.Level]; dmap != nil {
903 1 : if _, ok := dmap[nf.Meta.FileNum]; ok {
904 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
905 0 : }
906 : }
907 1 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
908 1 : // FileBacking for a virtual sstable must only be nil if we're performing
909 1 : // manifest replay.
910 1 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
911 1 : if nf.Meta.FileBacking == nil {
912 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
913 0 : }
914 1 : } else if nf.Meta.FileBacking == nil {
915 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
916 0 : }
917 :
918 1 : if b.Added[nf.Level] == nil {
919 1 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
920 1 : }
921 1 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
922 1 : if b.AddedByFileNum != nil {
923 1 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
924 1 : }
925 1 : if nf.Meta.MarkedForCompaction {
926 0 : b.MarkedForCompactionCountDiff++
927 0 : }
928 : }
929 :
930 1 : for _, n := range ve.RemovedBackingTables {
931 1 : if _, ok := b.AddedFileBacking[n]; ok {
932 1 : delete(b.AddedFileBacking, n)
933 1 : } else {
934 1 : // Since a file can be removed from backing files in exactly one version
935 1 : // edit it is safe to just append without any de-duplication.
936 1 : b.RemovedFileBacking = append(b.RemovedFileBacking, n)
937 1 : }
938 : }
939 :
940 1 : return nil
941 : }
942 :
943 : // Apply applies the delta b to the current version to produce a new version.
944 : // The new version is consistent with respect to the comparer.
945 : //
946 : // Apply updates the backing refcounts (Ref/Unref) as files are installed into
947 : // the levels.
948 : //
949 : // curr may be nil, which is equivalent to a pointer to a zero version.
950 : func (b *BulkVersionEdit) Apply(
951 : curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
952 1 : ) (*Version, error) {
953 1 : v := &Version{
954 1 : cmp: comparer,
955 1 : }
956 1 :
957 1 : // Adjust the count of files marked for compaction.
958 1 : if curr != nil {
959 1 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
960 1 : }
961 1 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
962 1 : if v.Stats.MarkedForCompaction < 0 {
963 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
964 0 : }
965 :
966 1 : for level := range v.Levels {
967 1 : if curr == nil || curr.Levels[level].tree.root == nil {
968 1 : v.Levels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
969 1 : } else {
970 1 : v.Levels[level] = curr.Levels[level].clone()
971 1 : }
972 1 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
973 1 : v.RangeKeyLevels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
974 1 : } else {
975 1 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
976 1 : }
977 :
978 1 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
979 1 : // There are no edits on this level.
980 1 : if level == 0 {
981 1 : // Initialize L0Sublevels.
982 1 : if curr == nil || curr.L0Sublevels == nil {
983 1 : if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
984 0 : return nil, errors.Wrap(err, "pebble: internal error")
985 0 : }
986 1 : } else {
987 1 : v.L0Sublevels = curr.L0Sublevels
988 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
989 1 : }
990 : }
991 1 : continue
992 : }
993 :
994 : // Some edits on this level.
995 1 : lm := &v.Levels[level]
996 1 : lmRange := &v.RangeKeyLevels[level]
997 1 :
998 1 : addedFilesMap := b.Added[level]
999 1 : deletedFilesMap := b.Deleted[level]
1000 1 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
1001 1 : return nil, base.CorruptionErrorf(
1002 1 : "pebble: internal error: No current or added files but have deleted files: %d",
1003 1 : errors.Safe(len(deletedFilesMap)))
1004 1 : }
1005 :
1006 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
1007 : // for a level, it won't be present in deletedFilesMap for the same
1008 : // level.
1009 :
1010 1 : for _, f := range deletedFilesMap {
1011 1 : if obsolete := v.Levels[level].remove(f); obsolete {
1012 0 : // Deleting a file from the B-Tree may decrement its
1013 0 : // reference count. However, because we cloned the
1014 0 : // previous level's B-Tree, this should never result in a
1015 0 : // file's reference count dropping to zero.
1016 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
1017 0 : return nil, err
1018 0 : }
1019 1 : if f.HasRangeKeys {
1020 1 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
1021 0 : // Deleting a file from the B-Tree may decrement its
1022 0 : // reference count. However, because we cloned the
1023 0 : // previous level's B-Tree, this should never result in a
1024 0 : // file's reference count dropping to zero.
1025 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
1026 0 : return nil, err
1027 0 : }
1028 : }
1029 : }
1030 :
1031 1 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1032 1 : for _, f := range addedFilesMap {
1033 1 : addedFiles = append(addedFiles, f)
1034 1 : }
1035 : // Sort addedFiles by file number. This isn't necessary, but tests which
1036 : // replay invalid manifests check the error output, and the error output
1037 : // depends on the order in which files are added to the btree.
1038 1 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1039 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
1040 1 : })
1041 :
1042 1 : var sm, la *FileMetadata
1043 1 : for _, f := range addedFiles {
1044 1 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1045 1 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1046 1 : var allowedSeeks int64
1047 1 : if readCompactionRate != 0 {
1048 1 : allowedSeeks = int64(f.Size) / readCompactionRate
1049 1 : }
1050 1 : if allowedSeeks < 100 {
1051 1 : allowedSeeks = 100
1052 1 : }
1053 1 : f.AllowedSeeks.Store(allowedSeeks)
1054 1 : f.InitAllowedSeeks = allowedSeeks
1055 1 :
1056 1 : err := lm.insert(f)
1057 1 : if err != nil {
1058 1 : return nil, errors.Wrap(err, "pebble")
1059 1 : }
1060 1 : if f.HasRangeKeys {
1061 1 : err = lmRange.insert(f)
1062 1 : if err != nil {
1063 0 : return nil, errors.Wrap(err, "pebble")
1064 0 : }
1065 : }
1066 : // Track the keys with the smallest and largest keys, so that we can
1067 : // check consistency of the modified span.
1068 1 : if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
1069 1 : sm = f
1070 1 : }
1071 1 : if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
1072 1 : la = f
1073 1 : }
1074 : }
1075 :
1076 1 : if level == 0 {
1077 1 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1078 1 : // Flushes and ingestions that do not delete any L0 files do not require
1079 1 : // a regeneration of L0Sublevels from scratch. We can instead generate
1080 1 : // it incrementally.
1081 1 : var err error
1082 1 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1083 1 : SortBySeqNum(addedFiles)
1084 1 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1085 1 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1086 1 : err = v.InitL0Sublevels(flushSplitBytes)
1087 1 : } else if invariants.Enabled && err == nil {
1088 1 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
1089 1 : if err != nil {
1090 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1091 : }
1092 1 : s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
1093 1 : s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
1094 1 : if s1 != s2 {
1095 0 : // Add verbosity.
1096 0 : s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
1097 0 : s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
1098 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1099 : }
1100 : }
1101 1 : if err != nil {
1102 0 : return nil, errors.Wrap(err, "pebble: internal error")
1103 0 : }
1104 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
1105 1 : } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
1106 0 : return nil, errors.Wrap(err, "pebble: internal error")
1107 0 : }
1108 1 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
1109 0 : return nil, errors.Wrap(err, "pebble: internal error")
1110 0 : }
1111 1 : continue
1112 : }
1113 :
1114 : // Check consistency of the level in the vicinity of our edits.
1115 1 : if sm != nil && la != nil {
1116 1 : overlap := v.Levels[level].Slice().Overlaps(comparer.Compare, sm.UserKeyBounds())
1117 1 : // overlap contains all of the added files. We want to ensure that
1118 1 : // the added files are consistent with neighboring existing files
1119 1 : // too, so reslice the overlap to pull in a neighbor on each side.
1120 1 : check := overlap.Reslice(func(start, end *LevelIterator) {
1121 1 : if m := start.Prev(); m == nil {
1122 1 : start.Next()
1123 1 : }
1124 1 : if m := end.Next(); m == nil {
1125 1 : end.Prev()
1126 1 : }
1127 : })
1128 1 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
1129 1 : return nil, errors.Wrap(err, "pebble: internal error")
1130 1 : }
1131 : }
1132 : }
1133 1 : return v, nil
1134 : }
|