Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "strings"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble/internal/base"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 : // TODO(peter): describe the MANIFEST file format, independently of the C++
25 : // project.
26 :
27 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
28 :
29 : type byteReader interface {
30 : io.ByteReader
31 : io.Reader
32 : }
33 :
34 : // Tags for the versionEdit disk format.
35 : // Tag 8 is no longer used.
36 : const (
37 : // LevelDB tags.
38 : tagComparator = 1
39 : tagLogNumber = 2
40 : tagNextFileNumber = 3
41 : tagLastSequence = 4
42 : tagCompactPointer = 5
43 : tagDeletedFile = 6
44 : tagNewFile = 7
45 : tagPrevLogNumber = 9
46 :
47 : // RocksDB tags.
48 : tagNewFile2 = 100
49 : tagNewFile3 = 102
50 : tagNewFile4 = 103
51 : tagColumnFamily = 200
52 : tagColumnFamilyAdd = 201
53 : tagColumnFamilyDrop = 202
54 : tagMaxColumnFamily = 203
55 :
56 : // Pebble tags.
57 : tagNewFile5 = 104 // Range keys.
58 : tagCreatedBackingTable = 105
59 : tagRemovedBackingTable = 106
60 :
61 : // The custom tags sub-format used by tagNewFile4 and above. All tags less
62 : // than customTagNonSafeIgnoreMask are safe to ignore and their format must be
63 : // a single bytes field.
64 : customTagTerminate = 1
65 : customTagNeedsCompaction = 2
66 : customTagCreationTime = 6
67 : customTagPathID = 65
68 : customTagNonSafeIgnoreMask = 1 << 6
69 : customTagVirtual = 66
70 : customTagSyntheticPrefix = 67
71 : customTagSyntheticSuffix = 68
72 : )
73 :
74 : // DeletedFileEntry holds the state for a file deletion from a level. The file
75 : // itself might still be referenced by another level.
76 : type DeletedFileEntry struct {
77 : Level int
78 : FileNum base.FileNum
79 : }
80 :
81 : // NewFileEntry holds the state for a new file or one moved from a different
82 : // level.
83 : type NewFileEntry struct {
84 : Level int
85 : Meta *FileMetadata
86 : // BackingFileNum is only set during manifest replay, and only for virtual
87 : // sstables.
88 : BackingFileNum base.DiskFileNum
89 : }
90 :
91 : // VersionEdit holds the state for an edit to a Version along with other
92 : // on-disk state (log numbers, next file number, and the last sequence number).
93 : type VersionEdit struct {
94 : // ComparerName is the value of Options.Comparer.Name. This is only set in
95 : // the first VersionEdit in a manifest (either when the DB is created, or
96 : // when a new manifest is created) and is used to verify that the comparer
97 : // specified at Open matches the comparer that was previously used.
98 : ComparerName string
99 :
100 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
101 : // mutations that have not been flushed to an sstable.
102 : //
103 : // This is an optional field, and 0 represents it is not set.
104 : MinUnflushedLogNum base.DiskFileNum
105 :
106 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
107 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
108 : // 6/2011. We keep it around purely for informational purposes when
109 : // displaying MANIFEST contents.
110 : ObsoletePrevLogNum uint64
111 :
112 : // The next file number. A single counter is used to assign file numbers
113 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
114 : NextFileNum uint64
115 :
116 : // LastSeqNum is an upper bound on the sequence numbers that have been
117 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
118 : // recovery) may contain sequence numbers greater than this value.
119 : LastSeqNum base.SeqNum
120 :
121 : // A file num may be present in both deleted files and new files when it
122 : // is moved from a lower level to a higher level (when the compaction
123 : // found that there was no overlapping file at the higher level).
124 : DeletedFiles map[DeletedFileEntry]*FileMetadata
125 : NewFiles []NewFileEntry
126 : // CreatedBackingTables can be used to preserve the FileBacking associated
127 : // with a physical sstable. This is useful when virtual sstables in the
128 : // latest version are reconstructed during manifest replay, and we also need
129 : // to reconstruct the FileBacking which is required by these virtual
130 : // sstables.
131 : //
132 : // INVARIANT: The FileBacking associated with a physical sstable must only
133 : // be added as a backing file in the same version edit where the physical
134 : // sstable is first virtualized. This means that the physical sstable must
135 : // be present in DeletedFiles and that there must be at least one virtual
136 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
137 : // file must be present in CreatedBackingTables in exactly one version edit.
138 : // The physical sstable associated with the FileBacking must also not be
139 : // present in NewFiles.
140 : CreatedBackingTables []*FileBacking
141 : // RemovedBackingTables is used to remove the FileBacking associated with a
142 : // virtual sstable. Note that a backing sstable can be removed as soon as
143 : // there are no virtual sstables in the latest version which are using the
144 : // backing sstable, but the backing sstable doesn't necessarily have to be
145 : // removed atomically with the version edit which removes the last virtual
146 : // sstable associated with the backing sstable. The removal can happen in a
147 : // future version edit.
148 : //
149 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
150 : // added to CreateBackingTables in a prior version edit. The same version
151 : // edit also cannot have the same file present in both CreateBackingTables
152 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
153 : // in exactly one version edit.
154 : RemovedBackingTables []base.DiskFileNum
155 : }
156 :
157 : // Decode decodes an edit from the specified reader.
158 : //
159 : // Note that the Decode step will not set the FileBacking for virtual sstables
160 : // and the responsibility is left to the caller. However, the Decode step will
161 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
162 1 : func (v *VersionEdit) Decode(r io.Reader) error {
163 1 : br, ok := r.(byteReader)
164 1 : if !ok {
165 1 : br = bufio.NewReader(r)
166 1 : }
167 1 : d := versionEditDecoder{br}
168 1 : for {
169 1 : tag, err := binary.ReadUvarint(br)
170 1 : if err == io.EOF {
171 1 : break
172 : }
173 1 : if err != nil {
174 0 : return err
175 0 : }
176 1 : switch tag {
177 1 : case tagComparator:
178 1 : s, err := d.readBytes()
179 1 : if err != nil {
180 0 : return err
181 0 : }
182 1 : v.ComparerName = string(s)
183 :
184 1 : case tagLogNumber:
185 1 : n, err := d.readUvarint()
186 1 : if err != nil {
187 0 : return err
188 0 : }
189 1 : v.MinUnflushedLogNum = base.DiskFileNum(n)
190 :
191 1 : case tagNextFileNumber:
192 1 : n, err := d.readUvarint()
193 1 : if err != nil {
194 0 : return err
195 0 : }
196 1 : v.NextFileNum = n
197 :
198 1 : case tagLastSequence:
199 1 : n, err := d.readUvarint()
200 1 : if err != nil {
201 0 : return err
202 0 : }
203 1 : v.LastSeqNum = base.SeqNum(n)
204 :
205 0 : case tagCompactPointer:
206 0 : if _, err := d.readLevel(); err != nil {
207 0 : return err
208 0 : }
209 0 : if _, err := d.readBytes(); err != nil {
210 0 : return err
211 0 : }
212 : // NB: RocksDB does not use compaction pointers anymore.
213 :
214 1 : case tagRemovedBackingTable:
215 1 : n, err := d.readUvarint()
216 1 : if err != nil {
217 0 : return err
218 0 : }
219 1 : v.RemovedBackingTables = append(
220 1 : v.RemovedBackingTables, base.DiskFileNum(n),
221 1 : )
222 1 : case tagCreatedBackingTable:
223 1 : dfn, err := d.readUvarint()
224 1 : if err != nil {
225 0 : return err
226 0 : }
227 1 : size, err := d.readUvarint()
228 1 : if err != nil {
229 0 : return err
230 0 : }
231 1 : fileBacking := &FileBacking{
232 1 : DiskFileNum: base.DiskFileNum(dfn),
233 1 : Size: size,
234 1 : }
235 1 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
236 1 : case tagDeletedFile:
237 1 : level, err := d.readLevel()
238 1 : if err != nil {
239 0 : return err
240 0 : }
241 1 : fileNum, err := d.readFileNum()
242 1 : if err != nil {
243 0 : return err
244 0 : }
245 1 : if v.DeletedFiles == nil {
246 1 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
247 1 : }
248 1 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
249 :
250 1 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
251 1 : level, err := d.readLevel()
252 1 : if err != nil {
253 0 : return err
254 0 : }
255 1 : fileNum, err := d.readFileNum()
256 1 : if err != nil {
257 0 : return err
258 0 : }
259 1 : if tag == tagNewFile3 {
260 0 : // The pathID field appears unused in RocksDB.
261 0 : _ /* pathID */, err := d.readUvarint()
262 0 : if err != nil {
263 0 : return err
264 0 : }
265 : }
266 1 : size, err := d.readUvarint()
267 1 : if err != nil {
268 0 : return err
269 0 : }
270 : // We read the smallest / largest key bounds differently depending on
271 : // whether we have point, range or both types of keys present in the
272 : // table.
273 1 : var (
274 1 : smallestPointKey, largestPointKey []byte
275 1 : smallestRangeKey, largestRangeKey []byte
276 1 : parsedPointBounds bool
277 1 : boundsMarker byte
278 1 : )
279 1 : if tag != tagNewFile5 {
280 1 : // Range keys not present in the table. Parse the point key bounds.
281 1 : smallestPointKey, err = d.readBytes()
282 1 : if err != nil {
283 0 : return err
284 0 : }
285 1 : largestPointKey, err = d.readBytes()
286 1 : if err != nil {
287 0 : return err
288 0 : }
289 1 : } else {
290 1 : // Range keys are present in the table. Determine whether we have point
291 1 : // keys to parse, in addition to the bounds.
292 1 : boundsMarker, err = d.ReadByte()
293 1 : if err != nil {
294 0 : return err
295 0 : }
296 : // Parse point key bounds, if present.
297 1 : if boundsMarker&maskContainsPointKeys > 0 {
298 1 : smallestPointKey, err = d.readBytes()
299 1 : if err != nil {
300 0 : return err
301 0 : }
302 1 : largestPointKey, err = d.readBytes()
303 1 : if err != nil {
304 0 : return err
305 0 : }
306 1 : parsedPointBounds = true
307 1 : } else {
308 1 : // The table does not have point keys.
309 1 : // Sanity check: the bounds must be range keys.
310 1 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
311 0 : return base.CorruptionErrorf(
312 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
313 0 : boundsMarker,
314 0 : )
315 0 : }
316 : }
317 : // Parse range key bounds.
318 1 : smallestRangeKey, err = d.readBytes()
319 1 : if err != nil {
320 0 : return err
321 0 : }
322 1 : largestRangeKey, err = d.readBytes()
323 1 : if err != nil {
324 0 : return err
325 0 : }
326 : }
327 1 : var smallestSeqNum base.SeqNum
328 1 : var largestSeqNum base.SeqNum
329 1 : if tag != tagNewFile {
330 1 : n, err := d.readUvarint()
331 1 : if err != nil {
332 0 : return err
333 0 : }
334 1 : smallestSeqNum = base.SeqNum(n)
335 1 : n, err = d.readUvarint()
336 1 : if err != nil {
337 0 : return err
338 0 : }
339 1 : largestSeqNum = base.SeqNum(n)
340 : }
341 1 : var markedForCompaction bool
342 1 : var creationTime uint64
343 1 : virtualState := struct {
344 1 : virtual bool
345 1 : backingFileNum uint64
346 1 : }{}
347 1 : var syntheticPrefix sstable.SyntheticPrefix
348 1 : var syntheticSuffix sstable.SyntheticSuffix
349 1 : if tag == tagNewFile4 || tag == tagNewFile5 {
350 1 : for {
351 1 : customTag, err := d.readUvarint()
352 1 : if err != nil {
353 0 : return err
354 0 : }
355 1 : if customTag == customTagTerminate {
356 1 : break
357 : }
358 1 : switch customTag {
359 1 : case customTagNeedsCompaction:
360 1 : field, err := d.readBytes()
361 1 : if err != nil {
362 0 : return err
363 0 : }
364 1 : if len(field) != 1 {
365 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
366 0 : }
367 1 : markedForCompaction = (field[0] == 1)
368 :
369 1 : case customTagCreationTime:
370 1 : field, err := d.readBytes()
371 1 : if err != nil {
372 0 : return err
373 0 : }
374 1 : var n int
375 1 : creationTime, n = binary.Uvarint(field)
376 1 : if n != len(field) {
377 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
378 0 : }
379 :
380 0 : case customTagPathID:
381 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
382 :
383 1 : case customTagVirtual:
384 1 : virtualState.virtual = true
385 1 : if virtualState.backingFileNum, err = d.readUvarint(); err != nil {
386 0 : return err
387 0 : }
388 :
389 1 : case customTagSyntheticPrefix:
390 1 : synthetic, err := d.readBytes()
391 1 : if err != nil {
392 0 : return err
393 0 : }
394 1 : syntheticPrefix = synthetic
395 :
396 1 : case customTagSyntheticSuffix:
397 1 : if syntheticSuffix, err = d.readBytes(); err != nil {
398 0 : return err
399 0 : }
400 :
401 0 : default:
402 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
403 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
404 0 : }
405 0 : if _, err := d.readBytes(); err != nil {
406 0 : return err
407 0 : }
408 : }
409 : }
410 : }
411 1 : m := &FileMetadata{
412 1 : FileNum: fileNum,
413 1 : Size: size,
414 1 : CreationTime: int64(creationTime),
415 1 : SmallestSeqNum: smallestSeqNum,
416 1 : LargestSeqNum: largestSeqNum,
417 1 : LargestSeqNumAbsolute: largestSeqNum,
418 1 : MarkedForCompaction: markedForCompaction,
419 1 : Virtual: virtualState.virtual,
420 1 : SyntheticPrefixAndSuffix: sstable.MakeSyntheticPrefixAndSuffix(syntheticPrefix, syntheticSuffix),
421 1 : }
422 1 : if tag != tagNewFile5 { // no range keys present
423 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
424 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
425 1 : m.HasPointKeys = true
426 1 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
427 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
428 1 : } else { // range keys present
429 1 : // Set point key bounds, if parsed.
430 1 : if parsedPointBounds {
431 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
432 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
433 1 : m.HasPointKeys = true
434 1 : }
435 : // Set range key bounds.
436 1 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
437 1 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
438 1 : m.HasRangeKeys = true
439 1 : // Set overall bounds (by default assume range keys).
440 1 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
441 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
442 1 : if boundsMarker&maskSmallest == maskSmallest {
443 1 : m.Smallest = m.SmallestPointKey
444 1 : m.boundTypeSmallest = boundTypePointKey
445 1 : }
446 1 : if boundsMarker&maskLargest == maskLargest {
447 1 : m.Largest = m.LargestPointKey
448 1 : m.boundTypeLargest = boundTypePointKey
449 1 : }
450 : }
451 1 : m.boundsSet = true
452 1 : if !virtualState.virtual {
453 1 : m.InitPhysicalBacking()
454 1 : }
455 :
456 1 : nfe := NewFileEntry{
457 1 : Level: level,
458 1 : Meta: m,
459 1 : }
460 1 : if virtualState.virtual {
461 1 : nfe.BackingFileNum = base.DiskFileNum(virtualState.backingFileNum)
462 1 : }
463 1 : v.NewFiles = append(v.NewFiles, nfe)
464 :
465 1 : case tagPrevLogNumber:
466 1 : n, err := d.readUvarint()
467 1 : if err != nil {
468 0 : return err
469 0 : }
470 1 : v.ObsoletePrevLogNum = n
471 :
472 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
473 0 : return base.CorruptionErrorf("column families are not supported")
474 :
475 0 : default:
476 0 : return errCorruptManifest
477 : }
478 : }
479 1 : return nil
480 : }
481 :
482 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
483 1 : var buf bytes.Buffer
484 1 : if v.ComparerName != "" {
485 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
486 1 : }
487 1 : if v.MinUnflushedLogNum != 0 {
488 0 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
489 0 : }
490 1 : if v.ObsoletePrevLogNum != 0 {
491 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
492 0 : }
493 1 : if v.NextFileNum != 0 {
494 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
495 1 : }
496 1 : if v.LastSeqNum != 0 {
497 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
498 1 : }
499 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
500 1 : for df := range v.DeletedFiles {
501 1 : entries = append(entries, df)
502 1 : }
503 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
504 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
505 1 : return v
506 1 : }
507 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
508 : })
509 1 : for _, df := range entries {
510 1 : fmt.Fprintf(&buf, " del-table: L%d %s\n", df.Level, df.FileNum)
511 1 : }
512 1 : for _, nf := range v.NewFiles {
513 1 : fmt.Fprintf(&buf, " add-table: L%d", nf.Level)
514 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, verbose))
515 1 : if nf.Meta.CreationTime != 0 {
516 1 : fmt.Fprintf(&buf, " (%s)",
517 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
518 1 : }
519 1 : fmt.Fprintln(&buf)
520 : }
521 :
522 1 : for _, b := range v.CreatedBackingTables {
523 1 : fmt.Fprintf(&buf, " add-backing: %s\n", b.DiskFileNum)
524 1 : }
525 1 : for _, n := range v.RemovedBackingTables {
526 1 : fmt.Fprintf(&buf, " del-backing: %s\n", n)
527 1 : }
528 1 : return buf.String()
529 : }
530 :
531 : // DebugString is a more verbose version of String(). Use this in tests.
532 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
533 1 : return v.string(true /* verbose */, fmtKey)
534 1 : }
535 :
536 : // String implements fmt.Stringer for a VersionEdit.
537 1 : func (v *VersionEdit) String() string {
538 1 : return v.string(false /* verbose */, base.DefaultFormatter)
539 1 : }
540 :
541 : // ParseVersionEditDebug parses a VersionEdit from its DebugString
542 : // implementation.
543 : //
544 : // It doesn't recognize all fields; this implementation can be filled in as
545 : // needed.
546 1 : func ParseVersionEditDebug(s string) (_ *VersionEdit, err error) {
547 1 : defer func() {
548 1 : if r := recover(); r != nil {
549 0 : err = errors.CombineErrors(err, errFromPanic(r))
550 0 : }
551 : }()
552 :
553 1 : var ve VersionEdit
554 1 : for _, l := range strings.Split(s, "\n") {
555 1 : l = strings.TrimSpace(l)
556 1 : if l == "" {
557 1 : continue
558 : }
559 1 : field, value, ok := strings.Cut(l, ":")
560 1 : if !ok {
561 0 : return nil, errors.Errorf("malformed line %q", l)
562 0 : }
563 1 : field = strings.TrimSpace(field)
564 1 : p := makeDebugParser(value)
565 1 : switch field {
566 1 : case "add-table":
567 1 : level := p.Level()
568 1 : meta, err := ParseFileMetadataDebug(p.Remaining())
569 1 : if err != nil {
570 0 : return nil, err
571 0 : }
572 1 : ve.NewFiles = append(ve.NewFiles, NewFileEntry{
573 1 : Level: level,
574 1 : Meta: meta,
575 1 : })
576 :
577 1 : case "del-table":
578 1 : level := p.Level()
579 1 : num := p.FileNum()
580 1 : if ve.DeletedFiles == nil {
581 1 : ve.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
582 1 : }
583 1 : ve.DeletedFiles[DeletedFileEntry{
584 1 : Level: level,
585 1 : FileNum: num,
586 1 : }] = nil
587 :
588 1 : case "add-backing":
589 1 : n := p.DiskFileNum()
590 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, &FileBacking{
591 1 : DiskFileNum: n,
592 1 : Size: 100,
593 1 : })
594 :
595 1 : case "del-backing":
596 1 : n := p.DiskFileNum()
597 1 : ve.RemovedBackingTables = append(ve.RemovedBackingTables, n)
598 :
599 0 : default:
600 0 : return nil, errors.Errorf("field %q not implemented", field)
601 : }
602 : }
603 1 : return &ve, nil
604 : }
605 :
606 : // Encode encodes an edit to the specified writer.
607 1 : func (v *VersionEdit) Encode(w io.Writer) error {
608 1 : e := versionEditEncoder{new(bytes.Buffer)}
609 1 :
610 1 : if v.ComparerName != "" {
611 1 : e.writeUvarint(tagComparator)
612 1 : e.writeString(v.ComparerName)
613 1 : }
614 1 : if v.MinUnflushedLogNum != 0 {
615 1 : e.writeUvarint(tagLogNumber)
616 1 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
617 1 : }
618 1 : if v.ObsoletePrevLogNum != 0 {
619 1 : e.writeUvarint(tagPrevLogNumber)
620 1 : e.writeUvarint(v.ObsoletePrevLogNum)
621 1 : }
622 1 : if v.NextFileNum != 0 {
623 1 : e.writeUvarint(tagNextFileNumber)
624 1 : e.writeUvarint(uint64(v.NextFileNum))
625 1 : }
626 1 : for _, dfn := range v.RemovedBackingTables {
627 1 : e.writeUvarint(tagRemovedBackingTable)
628 1 : e.writeUvarint(uint64(dfn))
629 1 : }
630 1 : for _, fileBacking := range v.CreatedBackingTables {
631 1 : e.writeUvarint(tagCreatedBackingTable)
632 1 : e.writeUvarint(uint64(fileBacking.DiskFileNum))
633 1 : e.writeUvarint(fileBacking.Size)
634 1 : }
635 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
636 : // even though its value is zero. We detect this by encoding LastSeqNum when
637 : // ComparerName is set.
638 1 : if v.LastSeqNum != 0 || v.ComparerName != "" {
639 1 : e.writeUvarint(tagLastSequence)
640 1 : e.writeUvarint(uint64(v.LastSeqNum))
641 1 : }
642 1 : for x := range v.DeletedFiles {
643 1 : e.writeUvarint(tagDeletedFile)
644 1 : e.writeUvarint(uint64(x.Level))
645 1 : e.writeUvarint(uint64(x.FileNum))
646 1 : }
647 1 : for _, x := range v.NewFiles {
648 1 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
649 1 : var tag uint64
650 1 : switch {
651 1 : case x.Meta.HasRangeKeys:
652 1 : tag = tagNewFile5
653 1 : case customFields:
654 1 : tag = tagNewFile4
655 1 : default:
656 1 : tag = tagNewFile2
657 : }
658 1 : e.writeUvarint(tag)
659 1 : e.writeUvarint(uint64(x.Level))
660 1 : e.writeUvarint(uint64(x.Meta.FileNum))
661 1 : e.writeUvarint(x.Meta.Size)
662 1 : if !x.Meta.HasRangeKeys {
663 1 : // If we have no range keys, preserve the original format and write the
664 1 : // smallest and largest point keys.
665 1 : e.writeKey(x.Meta.SmallestPointKey)
666 1 : e.writeKey(x.Meta.LargestPointKey)
667 1 : } else {
668 1 : // When range keys are present, we first write a marker byte that
669 1 : // indicates if the table also contains point keys, in addition to how the
670 1 : // overall bounds for the table should be reconstructed. This byte is
671 1 : // followed by the keys themselves.
672 1 : b, err := x.Meta.boundsMarker()
673 1 : if err != nil {
674 0 : return err
675 0 : }
676 1 : if err = e.WriteByte(b); err != nil {
677 0 : return err
678 0 : }
679 : // Write point key bounds (if present).
680 1 : if x.Meta.HasPointKeys {
681 1 : e.writeKey(x.Meta.SmallestPointKey)
682 1 : e.writeKey(x.Meta.LargestPointKey)
683 1 : }
684 : // Write range key bounds.
685 1 : e.writeKey(x.Meta.SmallestRangeKey)
686 1 : e.writeKey(x.Meta.LargestRangeKey)
687 : }
688 1 : e.writeUvarint(uint64(x.Meta.SmallestSeqNum))
689 1 : e.writeUvarint(uint64(x.Meta.LargestSeqNum))
690 1 : if customFields {
691 1 : if x.Meta.CreationTime != 0 {
692 1 : e.writeUvarint(customTagCreationTime)
693 1 : var buf [binary.MaxVarintLen64]byte
694 1 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
695 1 : e.writeBytes(buf[:n])
696 1 : }
697 1 : if x.Meta.MarkedForCompaction {
698 1 : e.writeUvarint(customTagNeedsCompaction)
699 1 : e.writeBytes([]byte{1})
700 1 : }
701 1 : if x.Meta.Virtual {
702 1 : e.writeUvarint(customTagVirtual)
703 1 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum))
704 1 : }
705 1 : if x.Meta.SyntheticPrefixAndSuffix.HasPrefix() {
706 1 : e.writeUvarint(customTagSyntheticPrefix)
707 1 : e.writeBytes(x.Meta.SyntheticPrefixAndSuffix.Prefix())
708 1 : }
709 1 : if x.Meta.SyntheticPrefixAndSuffix.HasSuffix() {
710 1 : e.writeUvarint(customTagSyntheticSuffix)
711 1 : e.writeBytes(x.Meta.SyntheticPrefixAndSuffix.Suffix())
712 1 : }
713 1 : e.writeUvarint(customTagTerminate)
714 : }
715 : }
716 1 : _, err := w.Write(e.Bytes())
717 1 : return err
718 : }
719 :
720 : // versionEditDecoder should be used to decode version edits.
721 : type versionEditDecoder struct {
722 : byteReader
723 : }
724 :
725 1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
726 1 : n, err := d.readUvarint()
727 1 : if err != nil {
728 0 : return nil, err
729 0 : }
730 1 : s := make([]byte, n)
731 1 : _, err = io.ReadFull(d, s)
732 1 : if err != nil {
733 0 : if err == io.ErrUnexpectedEOF {
734 0 : return nil, errCorruptManifest
735 0 : }
736 0 : return nil, err
737 : }
738 1 : return s, nil
739 : }
740 :
741 1 : func (d versionEditDecoder) readLevel() (int, error) {
742 1 : u, err := d.readUvarint()
743 1 : if err != nil {
744 0 : return 0, err
745 0 : }
746 1 : if u >= NumLevels {
747 0 : return 0, errCorruptManifest
748 0 : }
749 1 : return int(u), nil
750 : }
751 :
752 1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
753 1 : u, err := d.readUvarint()
754 1 : if err != nil {
755 0 : return 0, err
756 0 : }
757 1 : return base.FileNum(u), nil
758 : }
759 :
760 1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
761 1 : u, err := binary.ReadUvarint(d)
762 1 : if err != nil {
763 0 : if err == io.EOF {
764 0 : return 0, errCorruptManifest
765 0 : }
766 0 : return 0, err
767 : }
768 1 : return u, nil
769 : }
770 :
771 : type versionEditEncoder struct {
772 : *bytes.Buffer
773 : }
774 :
775 1 : func (e versionEditEncoder) writeBytes(p []byte) {
776 1 : e.writeUvarint(uint64(len(p)))
777 1 : e.Write(p)
778 1 : }
779 :
780 1 : func (e versionEditEncoder) writeKey(k InternalKey) {
781 1 : e.writeUvarint(uint64(k.Size()))
782 1 : e.Write(k.UserKey)
783 1 : buf := k.EncodeTrailer()
784 1 : e.Write(buf[:])
785 1 : }
786 :
787 1 : func (e versionEditEncoder) writeString(s string) {
788 1 : e.writeUvarint(uint64(len(s)))
789 1 : e.WriteString(s)
790 1 : }
791 :
792 1 : func (e versionEditEncoder) writeUvarint(u uint64) {
793 1 : var buf [binary.MaxVarintLen64]byte
794 1 : n := binary.PutUvarint(buf[:], u)
795 1 : e.Write(buf[:n])
796 1 : }
797 :
798 : // BulkVersionEdit summarizes the files added and deleted from a set of version
799 : // edits.
800 : //
801 : // INVARIANTS:
802 : // No file can be added to a level more than once. This is true globally, and
803 : // also true for all of the calls to Accumulate for a single bulk version edit.
804 : //
805 : // No file can be removed from a level more than once. This is true globally,
806 : // and also true for all of the calls to Accumulate for a single bulk version
807 : // edit.
808 : //
809 : // A file must not be added and removed from a given level in the same version
810 : // edit.
811 : //
812 : // A file that is being removed from a level must have been added to that level
813 : // before (in a prior version edit). Note that a given file can be deleted from
814 : // a level and added to another level in a single version edit
815 : type BulkVersionEdit struct {
816 : Added [NumLevels]map[base.FileNum]*FileMetadata
817 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
818 :
819 : // AddedFileBacking is a map to support lookup so that we can populate the
820 : // FileBacking of virtual sstables during manifest replay.
821 : AddedFileBacking map[base.DiskFileNum]*FileBacking
822 : RemovedFileBacking []base.DiskFileNum
823 :
824 : // AddedByFileNum maps file number to file metadata for all added files
825 : // from accumulated version edits. AddedByFileNum is only populated if set
826 : // to non-nil by a caller. It must be set to non-nil when replaying
827 : // version edits read from a MANIFEST (as opposed to VersionEdits
828 : // constructed in-memory). While replaying a MANIFEST file,
829 : // VersionEdit.DeletedFiles map entries have nil values, because the
830 : // on-disk deletion record encodes only the file number. Accumulate
831 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
832 : // field with non-nil *FileMetadata.
833 : AddedByFileNum map[base.FileNum]*FileMetadata
834 :
835 : // MarkedForCompactionCountDiff holds the aggregated count of files
836 : // marked for compaction added or removed.
837 : MarkedForCompactionCountDiff int
838 : }
839 :
840 : // Accumulate adds the file addition and deletions in the specified version
841 : // edit to the bulk edit's internal state.
842 : //
843 : // INVARIANTS:
844 : // If a file is added to a given level in a call to Accumulate and then removed
845 : // from that level in a subsequent call, the file will not be present in the
846 : // resulting BulkVersionEdit.Deleted for that level.
847 : //
848 : // After accumulation of version edits, the bulk version edit may have
849 : // information about a file which has been deleted from a level, but it may
850 : // not have information about the same file added to the same level. The add
851 : // could've occurred as part of a previous bulk version edit. In this case,
852 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
853 : // of the accumulation, because we need to decrease the refcount of the
854 : // deleted file in Apply.
855 1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
856 1 : for df, m := range ve.DeletedFiles {
857 1 : dmap := b.Deleted[df.Level]
858 1 : if dmap == nil {
859 1 : dmap = make(map[base.FileNum]*FileMetadata)
860 1 : b.Deleted[df.Level] = dmap
861 1 : }
862 :
863 1 : if m == nil {
864 1 : // m is nil only when replaying a MANIFEST.
865 1 : if b.AddedByFileNum == nil {
866 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
867 0 : }
868 1 : m = b.AddedByFileNum[df.FileNum]
869 1 : if m == nil {
870 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
871 1 : }
872 : }
873 1 : if m.MarkedForCompaction {
874 1 : b.MarkedForCompactionCountDiff--
875 1 : }
876 1 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
877 1 : dmap[df.FileNum] = m
878 1 : } else {
879 1 : // Present in b.Added for the same level.
880 1 : delete(b.Added[df.Level], df.FileNum)
881 1 : }
882 : }
883 :
884 : // Generate state for Added backing files. Note that these must be generated
885 : // before we loop through the NewFiles, because we need to populate the
886 : // FileBackings which might be used by the NewFiles loop.
887 1 : if b.AddedFileBacking == nil {
888 1 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
889 1 : }
890 1 : for _, fb := range ve.CreatedBackingTables {
891 1 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
892 0 : // There is already a FileBacking associated with fb.DiskFileNum.
893 0 : // This should never happen. There must always be only one FileBacking
894 0 : // associated with a backing sstable.
895 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
896 : }
897 1 : b.AddedFileBacking[fb.DiskFileNum] = fb
898 : }
899 :
900 1 : for _, nf := range ve.NewFiles {
901 1 : // A new file should not have been deleted in this or a preceding
902 1 : // VersionEdit at the same level (though files can move across levels).
903 1 : if dmap := b.Deleted[nf.Level]; dmap != nil {
904 1 : if _, ok := dmap[nf.Meta.FileNum]; ok {
905 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
906 0 : }
907 : }
908 1 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
909 1 : // FileBacking for a virtual sstable must only be nil if we're performing
910 1 : // manifest replay.
911 1 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
912 1 : if nf.Meta.FileBacking == nil {
913 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
914 0 : }
915 1 : } else if nf.Meta.FileBacking == nil {
916 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
917 0 : }
918 :
919 1 : if b.Added[nf.Level] == nil {
920 1 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
921 1 : }
922 1 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
923 1 : if b.AddedByFileNum != nil {
924 1 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
925 1 : }
926 1 : if nf.Meta.MarkedForCompaction {
927 0 : b.MarkedForCompactionCountDiff++
928 0 : }
929 : }
930 :
931 1 : for _, n := range ve.RemovedBackingTables {
932 1 : if _, ok := b.AddedFileBacking[n]; ok {
933 1 : delete(b.AddedFileBacking, n)
934 1 : } else {
935 1 : // Since a file can be removed from backing files in exactly one version
936 1 : // edit it is safe to just append without any de-duplication.
937 1 : b.RemovedFileBacking = append(b.RemovedFileBacking, n)
938 1 : }
939 : }
940 :
941 1 : return nil
942 : }
943 :
944 : // Apply applies the delta b to the current version to produce a new version.
945 : // The new version is consistent with respect to the comparer.
946 : //
947 : // Apply updates the backing refcounts (Ref/Unref) as files are installed into
948 : // the levels.
949 : //
950 : // curr may be nil, which is equivalent to a pointer to a zero version.
951 : func (b *BulkVersionEdit) Apply(
952 : curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
953 1 : ) (*Version, error) {
954 1 : v := &Version{
955 1 : cmp: comparer,
956 1 : }
957 1 :
958 1 : // Adjust the count of files marked for compaction.
959 1 : if curr != nil {
960 1 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
961 1 : }
962 1 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
963 1 : if v.Stats.MarkedForCompaction < 0 {
964 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
965 0 : }
966 :
967 1 : for level := range v.Levels {
968 1 : if curr == nil || curr.Levels[level].tree.root == nil {
969 1 : v.Levels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
970 1 : } else {
971 1 : v.Levels[level] = curr.Levels[level].clone()
972 1 : }
973 1 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
974 1 : v.RangeKeyLevels[level] = MakeLevelMetadata(comparer.Compare, level, nil /* files */)
975 1 : } else {
976 1 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
977 1 : }
978 :
979 1 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
980 1 : // There are no edits on this level.
981 1 : if level == 0 {
982 1 : // Initialize L0Sublevels.
983 1 : if curr == nil || curr.L0Sublevels == nil {
984 1 : if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
985 0 : return nil, errors.Wrap(err, "pebble: internal error")
986 0 : }
987 1 : } else {
988 1 : v.L0Sublevels = curr.L0Sublevels
989 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
990 1 : }
991 : }
992 1 : continue
993 : }
994 :
995 : // Some edits on this level.
996 1 : lm := &v.Levels[level]
997 1 : lmRange := &v.RangeKeyLevels[level]
998 1 :
999 1 : addedFilesMap := b.Added[level]
1000 1 : deletedFilesMap := b.Deleted[level]
1001 1 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
1002 1 : return nil, base.CorruptionErrorf(
1003 1 : "pebble: internal error: No current or added files but have deleted files: %d",
1004 1 : errors.Safe(len(deletedFilesMap)))
1005 1 : }
1006 :
1007 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
1008 : // for a level, it won't be present in deletedFilesMap for the same
1009 : // level.
1010 :
1011 1 : for _, f := range deletedFilesMap {
1012 1 : if obsolete := v.Levels[level].remove(f); obsolete {
1013 0 : // Deleting a file from the B-Tree may decrement its
1014 0 : // reference count. However, because we cloned the
1015 0 : // previous level's B-Tree, this should never result in a
1016 0 : // file's reference count dropping to zero.
1017 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
1018 0 : return nil, err
1019 0 : }
1020 1 : if f.HasRangeKeys {
1021 1 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
1022 0 : // Deleting a file from the B-Tree may decrement its
1023 0 : // reference count. However, because we cloned the
1024 0 : // previous level's B-Tree, this should never result in a
1025 0 : // file's reference count dropping to zero.
1026 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
1027 0 : return nil, err
1028 0 : }
1029 : }
1030 : }
1031 :
1032 1 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1033 1 : for _, f := range addedFilesMap {
1034 1 : addedFiles = append(addedFiles, f)
1035 1 : }
1036 : // Sort addedFiles by file number. This isn't necessary, but tests which
1037 : // replay invalid manifests check the error output, and the error output
1038 : // depends on the order in which files are added to the btree.
1039 1 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1040 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
1041 1 : })
1042 :
1043 1 : var sm, la *FileMetadata
1044 1 : for _, f := range addedFiles {
1045 1 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1046 1 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1047 1 : var allowedSeeks int64
1048 1 : if readCompactionRate != 0 {
1049 1 : allowedSeeks = int64(f.Size) / readCompactionRate
1050 1 : }
1051 1 : if allowedSeeks < 100 {
1052 1 : allowedSeeks = 100
1053 1 : }
1054 1 : f.AllowedSeeks.Store(allowedSeeks)
1055 1 : f.InitAllowedSeeks = allowedSeeks
1056 1 :
1057 1 : err := lm.insert(f)
1058 1 : if err != nil {
1059 1 : return nil, errors.Wrap(err, "pebble")
1060 1 : }
1061 1 : if f.HasRangeKeys {
1062 1 : err = lmRange.insert(f)
1063 1 : if err != nil {
1064 0 : return nil, errors.Wrap(err, "pebble")
1065 0 : }
1066 : }
1067 : // Track the keys with the smallest and largest keys, so that we can
1068 : // check consistency of the modified span.
1069 1 : if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
1070 1 : sm = f
1071 1 : }
1072 1 : if la == nil || base.InternalCompare(comparer.Compare, la.Largest, f.Largest) < 0 {
1073 1 : la = f
1074 1 : }
1075 : }
1076 :
1077 1 : if level == 0 {
1078 1 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1079 1 : // Flushes and ingestions that do not delete any L0 files do not require
1080 1 : // a regeneration of L0Sublevels from scratch. We can instead generate
1081 1 : // it incrementally.
1082 1 : var err error
1083 1 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1084 1 : SortBySeqNum(addedFiles)
1085 1 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1086 1 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1087 1 : err = v.InitL0Sublevels(flushSplitBytes)
1088 1 : } else if invariants.Enabled && err == nil {
1089 1 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], comparer.Compare, comparer.FormatKey, flushSplitBytes)
1090 1 : if err != nil {
1091 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1092 : }
1093 1 : s1 := describeSublevels(comparer.FormatKey, false /* verbose */, copyOfSublevels.Levels)
1094 1 : s2 := describeSublevels(comparer.FormatKey, false /* verbose */, v.L0Sublevels.Levels)
1095 1 : if s1 != s2 {
1096 0 : // Add verbosity.
1097 0 : s1 := describeSublevels(comparer.FormatKey, true /* verbose */, copyOfSublevels.Levels)
1098 0 : s2 := describeSublevels(comparer.FormatKey, true /* verbose */, v.L0Sublevels.Levels)
1099 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1100 : }
1101 : }
1102 1 : if err != nil {
1103 0 : return nil, errors.Wrap(err, "pebble: internal error")
1104 0 : }
1105 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
1106 1 : } else if err := v.InitL0Sublevels(flushSplitBytes); err != nil {
1107 0 : return nil, errors.Wrap(err, "pebble: internal error")
1108 0 : }
1109 1 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(0), v.Levels[level].Iter()); err != nil {
1110 0 : return nil, errors.Wrap(err, "pebble: internal error")
1111 0 : }
1112 1 : continue
1113 : }
1114 :
1115 : // Check consistency of the level in the vicinity of our edits.
1116 1 : if sm != nil && la != nil {
1117 1 : overlap := v.Levels[level].Slice().Overlaps(comparer.Compare, sm.UserKeyBounds())
1118 1 : // overlap contains all of the added files. We want to ensure that
1119 1 : // the added files are consistent with neighboring existing files
1120 1 : // too, so reslice the overlap to pull in a neighbor on each side.
1121 1 : check := overlap.Reslice(func(start, end *LevelIterator) {
1122 1 : if m := start.Prev(); m == nil {
1123 1 : start.Next()
1124 1 : }
1125 1 : if m := end.Next(); m == nil {
1126 1 : end.Prev()
1127 1 : }
1128 : })
1129 1 : if err := CheckOrdering(comparer.Compare, comparer.FormatKey, Level(level), check.Iter()); err != nil {
1130 1 : return nil, errors.Wrap(err, "pebble: internal error")
1131 1 : }
1132 : }
1133 : }
1134 1 : return v, nil
1135 : }
|