Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : stdcmp "cmp"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "slices"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : )
21 :
22 : // TODO(peter): describe the MANIFEST file format, independently of the C++
23 : // project.
24 :
25 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
26 :
27 : type byteReader interface {
28 : io.ByteReader
29 : io.Reader
30 : }
31 :
32 : // Tags for the versionEdit disk format.
33 : // Tag 8 is no longer used.
34 : const (
35 : // LevelDB tags.
36 : tagComparator = 1
37 : tagLogNumber = 2
38 : tagNextFileNumber = 3
39 : tagLastSequence = 4
40 : tagCompactPointer = 5
41 : tagDeletedFile = 6
42 : tagNewFile = 7
43 : tagPrevLogNumber = 9
44 :
45 : // RocksDB tags.
46 : tagNewFile2 = 100
47 : tagNewFile3 = 102
48 : tagNewFile4 = 103
49 : tagColumnFamily = 200
50 : tagColumnFamilyAdd = 201
51 : tagColumnFamilyDrop = 202
52 : tagMaxColumnFamily = 203
53 :
54 : // Pebble tags.
55 : tagNewFile5 = 104 // Range keys.
56 : tagCreatedBackingTable = 105
57 : tagRemovedBackingTable = 106
58 :
59 : // The custom tags sub-format used by tagNewFile4 and above.
60 : customTagTerminate = 1
61 : customTagNeedsCompaction = 2
62 : customTagCreationTime = 6
63 : customTagPathID = 65
64 : customTagNonSafeIgnoreMask = 1 << 6
65 : customTagVirtual = 66
66 : )
67 :
68 : // DeletedFileEntry holds the state for a file deletion from a level. The file
69 : // itself might still be referenced by another level.
70 : type DeletedFileEntry struct {
71 : Level int
72 : FileNum base.FileNum
73 : }
74 :
75 : // NewFileEntry holds the state for a new file or one moved from a different
76 : // level.
77 : type NewFileEntry struct {
78 : Level int
79 : Meta *FileMetadata
80 : // BackingFileNum is only set during manifest replay, and only for virtual
81 : // sstables.
82 : BackingFileNum base.DiskFileNum
83 : }
84 :
85 : // VersionEdit holds the state for an edit to a Version along with other
86 : // on-disk state (log numbers, next file number, and the last sequence number).
87 : type VersionEdit struct {
88 : // ComparerName is the value of Options.Comparer.Name. This is only set in
89 : // the first VersionEdit in a manifest (either when the DB is created, or
90 : // when a new manifest is created) and is used to verify that the comparer
91 : // specified at Open matches the comparer that was previously used.
92 : ComparerName string
93 :
94 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
95 : // mutations that have not been flushed to an sstable.
96 : //
97 : // This is an optional field, and 0 represents it is not set.
98 : MinUnflushedLogNum base.DiskFileNum
99 :
100 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
101 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
102 : // 6/2011. We keep it around purely for informational purposes when
103 : // displaying MANIFEST contents.
104 : ObsoletePrevLogNum uint64
105 :
106 : // The next file number. A single counter is used to assign file numbers
107 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
108 : NextFileNum uint64
109 :
110 : // LastSeqNum is an upper bound on the sequence numbers that have been
111 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
112 : // recovery) may contain sequence numbers greater than this value.
113 : LastSeqNum uint64
114 :
115 : // A file num may be present in both deleted files and new files when it
116 : // is moved from a lower level to a higher level (when the compaction
117 : // found that there was no overlapping file at the higher level).
118 : DeletedFiles map[DeletedFileEntry]*FileMetadata
119 : NewFiles []NewFileEntry
120 : // CreatedBackingTables can be used to preserve the FileBacking associated
121 : // with a physical sstable. This is useful when virtual sstables in the
122 : // latest version are reconstructed during manifest replay, and we also need
123 : // to reconstruct the FileBacking which is required by these virtual
124 : // sstables.
125 : //
126 : // INVARIANT: The FileBacking associated with a physical sstable must only
127 : // be added as a backing file in the same version edit where the physical
128 : // sstable is first virtualized. This means that the physical sstable must
129 : // be present in DeletedFiles and that there must be at least one virtual
130 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
131 : // file must be present in CreatedBackingTables in exactly one version edit.
132 : // The physical sstable associated with the FileBacking must also not be
133 : // present in NewFiles.
134 : CreatedBackingTables []*FileBacking
135 : // RemovedBackingTables is used to remove the FileBacking associated with a
136 : // virtual sstable. Note that a backing sstable can be removed as soon as
137 : // there are no virtual sstables in the latest version which are using the
138 : // backing sstable, but the backing sstable doesn't necessarily have to be
139 : // removed atomically with the version edit which removes the last virtual
140 : // sstable associated with the backing sstable. The removal can happen in a
141 : // future version edit.
142 : //
143 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
144 : // added to CreateBackingTables in a prior version edit. The same version
145 : // edit also cannot have the same file present in both CreateBackingTables
146 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
147 : // in exactly one version edit.
148 : RemovedBackingTables []base.DiskFileNum
149 : }
150 :
151 : // Decode decodes an edit from the specified reader.
152 : //
153 : // Note that the Decode step will not set the FileBacking for virtual sstables
154 : // and the responsibility is left to the caller. However, the Decode step will
155 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
156 1 : func (v *VersionEdit) Decode(r io.Reader) error {
157 1 : br, ok := r.(byteReader)
158 1 : if !ok {
159 1 : br = bufio.NewReader(r)
160 1 : }
161 1 : d := versionEditDecoder{br}
162 1 : for {
163 1 : tag, err := binary.ReadUvarint(br)
164 1 : if err == io.EOF {
165 1 : break
166 : }
167 1 : if err != nil {
168 0 : return err
169 0 : }
170 1 : switch tag {
171 1 : case tagComparator:
172 1 : s, err := d.readBytes()
173 1 : if err != nil {
174 0 : return err
175 0 : }
176 1 : v.ComparerName = string(s)
177 :
178 1 : case tagLogNumber:
179 1 : n, err := d.readUvarint()
180 1 : if err != nil {
181 0 : return err
182 0 : }
183 1 : v.MinUnflushedLogNum = base.DiskFileNum(n)
184 :
185 1 : case tagNextFileNumber:
186 1 : n, err := d.readUvarint()
187 1 : if err != nil {
188 0 : return err
189 0 : }
190 1 : v.NextFileNum = n
191 :
192 1 : case tagLastSequence:
193 1 : n, err := d.readUvarint()
194 1 : if err != nil {
195 0 : return err
196 0 : }
197 1 : v.LastSeqNum = n
198 :
199 0 : case tagCompactPointer:
200 0 : if _, err := d.readLevel(); err != nil {
201 0 : return err
202 0 : }
203 0 : if _, err := d.readBytes(); err != nil {
204 0 : return err
205 0 : }
206 : // NB: RocksDB does not use compaction pointers anymore.
207 :
208 1 : case tagRemovedBackingTable:
209 1 : n, err := d.readUvarint()
210 1 : if err != nil {
211 0 : return err
212 0 : }
213 1 : v.RemovedBackingTables = append(
214 1 : v.RemovedBackingTables, base.FileNum(n).DiskFileNum(),
215 1 : )
216 1 : case tagCreatedBackingTable:
217 1 : dfn, err := d.readUvarint()
218 1 : if err != nil {
219 0 : return err
220 0 : }
221 1 : size, err := d.readUvarint()
222 1 : if err != nil {
223 0 : return err
224 0 : }
225 1 : fileBacking := &FileBacking{
226 1 : DiskFileNum: base.FileNum(dfn).DiskFileNum(),
227 1 : Size: size,
228 1 : }
229 1 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
230 1 : case tagDeletedFile:
231 1 : level, err := d.readLevel()
232 1 : if err != nil {
233 0 : return err
234 0 : }
235 1 : fileNum, err := d.readFileNum()
236 1 : if err != nil {
237 0 : return err
238 0 : }
239 1 : if v.DeletedFiles == nil {
240 1 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
241 1 : }
242 1 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
243 :
244 1 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
245 1 : level, err := d.readLevel()
246 1 : if err != nil {
247 0 : return err
248 0 : }
249 1 : fileNum, err := d.readFileNum()
250 1 : if err != nil {
251 0 : return err
252 0 : }
253 1 : if tag == tagNewFile3 {
254 0 : // The pathID field appears unused in RocksDB.
255 0 : _ /* pathID */, err := d.readUvarint()
256 0 : if err != nil {
257 0 : return err
258 0 : }
259 : }
260 1 : size, err := d.readUvarint()
261 1 : if err != nil {
262 0 : return err
263 0 : }
264 : // We read the smallest / largest key bounds differently depending on
265 : // whether we have point, range or both types of keys present in the
266 : // table.
267 1 : var (
268 1 : smallestPointKey, largestPointKey []byte
269 1 : smallestRangeKey, largestRangeKey []byte
270 1 : parsedPointBounds bool
271 1 : boundsMarker byte
272 1 : )
273 1 : if tag != tagNewFile5 {
274 1 : // Range keys not present in the table. Parse the point key bounds.
275 1 : smallestPointKey, err = d.readBytes()
276 1 : if err != nil {
277 0 : return err
278 0 : }
279 1 : largestPointKey, err = d.readBytes()
280 1 : if err != nil {
281 0 : return err
282 0 : }
283 1 : } else {
284 1 : // Range keys are present in the table. Determine whether we have point
285 1 : // keys to parse, in addition to the bounds.
286 1 : boundsMarker, err = d.ReadByte()
287 1 : if err != nil {
288 0 : return err
289 0 : }
290 : // Parse point key bounds, if present.
291 1 : if boundsMarker&maskContainsPointKeys > 0 {
292 1 : smallestPointKey, err = d.readBytes()
293 1 : if err != nil {
294 0 : return err
295 0 : }
296 1 : largestPointKey, err = d.readBytes()
297 1 : if err != nil {
298 0 : return err
299 0 : }
300 1 : parsedPointBounds = true
301 1 : } else {
302 1 : // The table does not have point keys.
303 1 : // Sanity check: the bounds must be range keys.
304 1 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
305 0 : return base.CorruptionErrorf(
306 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
307 0 : boundsMarker,
308 0 : )
309 0 : }
310 : }
311 : // Parse range key bounds.
312 1 : smallestRangeKey, err = d.readBytes()
313 1 : if err != nil {
314 0 : return err
315 0 : }
316 1 : largestRangeKey, err = d.readBytes()
317 1 : if err != nil {
318 0 : return err
319 0 : }
320 : }
321 1 : var smallestSeqNum uint64
322 1 : var largestSeqNum uint64
323 1 : if tag != tagNewFile {
324 1 : smallestSeqNum, err = d.readUvarint()
325 1 : if err != nil {
326 0 : return err
327 0 : }
328 1 : largestSeqNum, err = d.readUvarint()
329 1 : if err != nil {
330 0 : return err
331 0 : }
332 : }
333 1 : var markedForCompaction bool
334 1 : var creationTime uint64
335 1 : virtualState := struct {
336 1 : virtual bool
337 1 : backingFileNum uint64
338 1 : }{}
339 1 : if tag == tagNewFile4 || tag == tagNewFile5 {
340 1 : for {
341 1 : customTag, err := d.readUvarint()
342 1 : if err != nil {
343 0 : return err
344 0 : }
345 1 : if customTag == customTagTerminate {
346 1 : break
347 1 : } else if customTag == customTagVirtual {
348 1 : virtualState.virtual = true
349 1 : n, err := d.readUvarint()
350 1 : if err != nil {
351 0 : return err
352 0 : }
353 1 : virtualState.backingFileNum = n
354 1 : continue
355 : }
356 :
357 1 : field, err := d.readBytes()
358 1 : if err != nil {
359 0 : return err
360 0 : }
361 1 : switch customTag {
362 1 : case customTagNeedsCompaction:
363 1 : if len(field) != 1 {
364 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
365 0 : }
366 1 : markedForCompaction = (field[0] == 1)
367 :
368 1 : case customTagCreationTime:
369 1 : var n int
370 1 : creationTime, n = binary.Uvarint(field)
371 1 : if n != len(field) {
372 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
373 0 : }
374 :
375 0 : case customTagPathID:
376 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
377 :
378 0 : default:
379 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
380 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
381 0 : }
382 : }
383 : }
384 : }
385 1 : m := &FileMetadata{
386 1 : FileNum: fileNum,
387 1 : Size: size,
388 1 : CreationTime: int64(creationTime),
389 1 : SmallestSeqNum: smallestSeqNum,
390 1 : LargestSeqNum: largestSeqNum,
391 1 : MarkedForCompaction: markedForCompaction,
392 1 : Virtual: virtualState.virtual,
393 1 : }
394 1 : if tag != tagNewFile5 { // no range keys present
395 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
396 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
397 1 : m.HasPointKeys = true
398 1 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
399 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
400 1 : } else { // range keys present
401 1 : // Set point key bounds, if parsed.
402 1 : if parsedPointBounds {
403 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
404 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
405 1 : m.HasPointKeys = true
406 1 : }
407 : // Set range key bounds.
408 1 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
409 1 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
410 1 : m.HasRangeKeys = true
411 1 : // Set overall bounds (by default assume range keys).
412 1 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
413 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
414 1 : if boundsMarker&maskSmallest == maskSmallest {
415 1 : m.Smallest = m.SmallestPointKey
416 1 : m.boundTypeSmallest = boundTypePointKey
417 1 : }
418 1 : if boundsMarker&maskLargest == maskLargest {
419 1 : m.Largest = m.LargestPointKey
420 1 : m.boundTypeLargest = boundTypePointKey
421 1 : }
422 : }
423 1 : m.boundsSet = true
424 1 : if !virtualState.virtual {
425 1 : m.InitPhysicalBacking()
426 1 : }
427 :
428 1 : nfe := NewFileEntry{
429 1 : Level: level,
430 1 : Meta: m,
431 1 : }
432 1 : if virtualState.virtual {
433 1 : nfe.BackingFileNum = base.FileNum(virtualState.backingFileNum).DiskFileNum()
434 1 : }
435 1 : v.NewFiles = append(v.NewFiles, nfe)
436 :
437 1 : case tagPrevLogNumber:
438 1 : n, err := d.readUvarint()
439 1 : if err != nil {
440 0 : return err
441 0 : }
442 1 : v.ObsoletePrevLogNum = n
443 :
444 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
445 0 : return base.CorruptionErrorf("column families are not supported")
446 :
447 0 : default:
448 0 : return errCorruptManifest
449 : }
450 : }
451 1 : return nil
452 : }
453 :
454 1 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
455 1 : var buf bytes.Buffer
456 1 : if v.ComparerName != "" {
457 1 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
458 1 : }
459 1 : if v.MinUnflushedLogNum != 0 {
460 1 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
461 1 : }
462 1 : if v.ObsoletePrevLogNum != 0 {
463 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
464 0 : }
465 1 : if v.NextFileNum != 0 {
466 1 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
467 1 : }
468 1 : if v.LastSeqNum != 0 {
469 1 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
470 1 : }
471 1 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
472 1 : for df := range v.DeletedFiles {
473 1 : entries = append(entries, df)
474 1 : }
475 1 : slices.SortFunc(entries, func(a, b DeletedFileEntry) int {
476 1 : if v := stdcmp.Compare(a.Level, b.Level); v != 0 {
477 1 : return v
478 1 : }
479 0 : return stdcmp.Compare(a.FileNum, b.FileNum)
480 : })
481 1 : for _, df := range entries {
482 1 : fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum)
483 1 : }
484 1 : for _, nf := range v.NewFiles {
485 1 : fmt.Fprintf(&buf, " added: L%d", nf.Level)
486 1 : if verbose {
487 1 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
488 1 : } else {
489 1 : fmt.Fprintf(&buf, " %s", nf.Meta.String())
490 1 : }
491 1 : if nf.Meta.CreationTime != 0 {
492 1 : fmt.Fprintf(&buf, " (%s)",
493 1 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
494 1 : }
495 1 : fmt.Fprintln(&buf)
496 : }
497 1 : return buf.String()
498 : }
499 :
500 : // DebugString is a more verbose version of String(). Use this in tests.
501 1 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
502 1 : return v.string(true /* verbose */, fmtKey)
503 1 : }
504 :
505 : // String implements fmt.Stringer for a VersionEdit.
506 1 : func (v *VersionEdit) String() string {
507 1 : return v.string(false /* verbose */, base.DefaultFormatter)
508 1 : }
509 :
510 : // Encode encodes an edit to the specified writer.
511 1 : func (v *VersionEdit) Encode(w io.Writer) error {
512 1 : e := versionEditEncoder{new(bytes.Buffer)}
513 1 :
514 1 : if v.ComparerName != "" {
515 1 : e.writeUvarint(tagComparator)
516 1 : e.writeString(v.ComparerName)
517 1 : }
518 1 : if v.MinUnflushedLogNum != 0 {
519 1 : e.writeUvarint(tagLogNumber)
520 1 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
521 1 : }
522 1 : if v.ObsoletePrevLogNum != 0 {
523 1 : e.writeUvarint(tagPrevLogNumber)
524 1 : e.writeUvarint(v.ObsoletePrevLogNum)
525 1 : }
526 1 : if v.NextFileNum != 0 {
527 1 : e.writeUvarint(tagNextFileNumber)
528 1 : e.writeUvarint(uint64(v.NextFileNum))
529 1 : }
530 1 : for _, dfn := range v.RemovedBackingTables {
531 1 : e.writeUvarint(tagRemovedBackingTable)
532 1 : e.writeUvarint(uint64(dfn.FileNum()))
533 1 : }
534 1 : for _, fileBacking := range v.CreatedBackingTables {
535 1 : e.writeUvarint(tagCreatedBackingTable)
536 1 : e.writeUvarint(uint64(fileBacking.DiskFileNum.FileNum()))
537 1 : e.writeUvarint(fileBacking.Size)
538 1 : }
539 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
540 : // even though its value is zero. We detect this by encoding LastSeqNum when
541 : // ComparerName is set.
542 1 : if v.LastSeqNum != 0 || v.ComparerName != "" {
543 1 : e.writeUvarint(tagLastSequence)
544 1 : e.writeUvarint(v.LastSeqNum)
545 1 : }
546 1 : for x := range v.DeletedFiles {
547 1 : e.writeUvarint(tagDeletedFile)
548 1 : e.writeUvarint(uint64(x.Level))
549 1 : e.writeUvarint(uint64(x.FileNum))
550 1 : }
551 1 : for _, x := range v.NewFiles {
552 1 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
553 1 : var tag uint64
554 1 : switch {
555 1 : case x.Meta.HasRangeKeys:
556 1 : tag = tagNewFile5
557 1 : case customFields:
558 1 : tag = tagNewFile4
559 1 : default:
560 1 : tag = tagNewFile2
561 : }
562 1 : e.writeUvarint(tag)
563 1 : e.writeUvarint(uint64(x.Level))
564 1 : e.writeUvarint(uint64(x.Meta.FileNum))
565 1 : e.writeUvarint(x.Meta.Size)
566 1 : if !x.Meta.HasRangeKeys {
567 1 : // If we have no range keys, preserve the original format and write the
568 1 : // smallest and largest point keys.
569 1 : e.writeKey(x.Meta.SmallestPointKey)
570 1 : e.writeKey(x.Meta.LargestPointKey)
571 1 : } else {
572 1 : // When range keys are present, we first write a marker byte that
573 1 : // indicates if the table also contains point keys, in addition to how the
574 1 : // overall bounds for the table should be reconstructed. This byte is
575 1 : // followed by the keys themselves.
576 1 : b, err := x.Meta.boundsMarker()
577 1 : if err != nil {
578 0 : return err
579 0 : }
580 1 : if err = e.WriteByte(b); err != nil {
581 0 : return err
582 0 : }
583 : // Write point key bounds (if present).
584 1 : if x.Meta.HasPointKeys {
585 1 : e.writeKey(x.Meta.SmallestPointKey)
586 1 : e.writeKey(x.Meta.LargestPointKey)
587 1 : }
588 : // Write range key bounds.
589 1 : e.writeKey(x.Meta.SmallestRangeKey)
590 1 : e.writeKey(x.Meta.LargestRangeKey)
591 : }
592 1 : e.writeUvarint(x.Meta.SmallestSeqNum)
593 1 : e.writeUvarint(x.Meta.LargestSeqNum)
594 1 : if customFields {
595 1 : if x.Meta.CreationTime != 0 {
596 1 : e.writeUvarint(customTagCreationTime)
597 1 : var buf [binary.MaxVarintLen64]byte
598 1 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
599 1 : e.writeBytes(buf[:n])
600 1 : }
601 1 : if x.Meta.MarkedForCompaction {
602 1 : e.writeUvarint(customTagNeedsCompaction)
603 1 : e.writeBytes([]byte{1})
604 1 : }
605 1 : if x.Meta.Virtual {
606 1 : e.writeUvarint(customTagVirtual)
607 1 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum.FileNum()))
608 1 : }
609 1 : e.writeUvarint(customTagTerminate)
610 : }
611 : }
612 1 : _, err := w.Write(e.Bytes())
613 1 : return err
614 : }
615 :
616 : // versionEditDecoder should be used to decode version edits.
617 : type versionEditDecoder struct {
618 : byteReader
619 : }
620 :
621 1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
622 1 : n, err := d.readUvarint()
623 1 : if err != nil {
624 0 : return nil, err
625 0 : }
626 1 : s := make([]byte, n)
627 1 : _, err = io.ReadFull(d, s)
628 1 : if err != nil {
629 0 : if err == io.ErrUnexpectedEOF {
630 0 : return nil, errCorruptManifest
631 0 : }
632 0 : return nil, err
633 : }
634 1 : return s, nil
635 : }
636 :
637 1 : func (d versionEditDecoder) readLevel() (int, error) {
638 1 : u, err := d.readUvarint()
639 1 : if err != nil {
640 0 : return 0, err
641 0 : }
642 1 : if u >= NumLevels {
643 0 : return 0, errCorruptManifest
644 0 : }
645 1 : return int(u), nil
646 : }
647 :
648 1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
649 1 : u, err := d.readUvarint()
650 1 : if err != nil {
651 0 : return 0, err
652 0 : }
653 1 : return base.FileNum(u), nil
654 : }
655 :
656 1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
657 1 : u, err := binary.ReadUvarint(d)
658 1 : if err != nil {
659 0 : if err == io.EOF {
660 0 : return 0, errCorruptManifest
661 0 : }
662 0 : return 0, err
663 : }
664 1 : return u, nil
665 : }
666 :
667 : type versionEditEncoder struct {
668 : *bytes.Buffer
669 : }
670 :
671 1 : func (e versionEditEncoder) writeBytes(p []byte) {
672 1 : e.writeUvarint(uint64(len(p)))
673 1 : e.Write(p)
674 1 : }
675 :
676 1 : func (e versionEditEncoder) writeKey(k InternalKey) {
677 1 : e.writeUvarint(uint64(k.Size()))
678 1 : e.Write(k.UserKey)
679 1 : buf := k.EncodeTrailer()
680 1 : e.Write(buf[:])
681 1 : }
682 :
683 1 : func (e versionEditEncoder) writeString(s string) {
684 1 : e.writeUvarint(uint64(len(s)))
685 1 : e.WriteString(s)
686 1 : }
687 :
688 1 : func (e versionEditEncoder) writeUvarint(u uint64) {
689 1 : var buf [binary.MaxVarintLen64]byte
690 1 : n := binary.PutUvarint(buf[:], u)
691 1 : e.Write(buf[:n])
692 1 : }
693 :
694 : // BulkVersionEdit summarizes the files added and deleted from a set of version
695 : // edits.
696 : //
697 : // INVARIANTS:
698 : // No file can be added to a level more than once. This is true globally, and
699 : // also true for all of the calls to Accumulate for a single bulk version edit.
700 : //
701 : // No file can be removed from a level more than once. This is true globally,
702 : // and also true for all of the calls to Accumulate for a single bulk version
703 : // edit.
704 : //
705 : // A file must not be added and removed from a given level in the same version
706 : // edit.
707 : //
708 : // A file that is being removed from a level must have been added to that level
709 : // before (in a prior version edit). Note that a given file can be deleted from
710 : // a level and added to another level in a single version edit
711 : type BulkVersionEdit struct {
712 : Added [NumLevels]map[base.FileNum]*FileMetadata
713 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
714 :
715 : // AddedFileBacking is a map to support lookup so that we can populate the
716 : // FileBacking of virtual sstables during manifest replay.
717 : AddedFileBacking map[base.DiskFileNum]*FileBacking
718 : RemovedFileBacking []base.DiskFileNum
719 :
720 : // AddedByFileNum maps file number to file metadata for all added files
721 : // from accumulated version edits. AddedByFileNum is only populated if set
722 : // to non-nil by a caller. It must be set to non-nil when replaying
723 : // version edits read from a MANIFEST (as opposed to VersionEdits
724 : // constructed in-memory). While replaying a MANIFEST file,
725 : // VersionEdit.DeletedFiles map entries have nil values, because the
726 : // on-disk deletion record encodes only the file number. Accumulate
727 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
728 : // field with non-nil *FileMetadata.
729 : AddedByFileNum map[base.FileNum]*FileMetadata
730 :
731 : // MarkedForCompactionCountDiff holds the aggregated count of files
732 : // marked for compaction added or removed.
733 : MarkedForCompactionCountDiff int
734 : }
735 :
736 : // Accumulate adds the file addition and deletions in the specified version
737 : // edit to the bulk edit's internal state.
738 : //
739 : // INVARIANTS:
740 : // If a file is added to a given level in a call to Accumulate and then removed
741 : // from that level in a subsequent call, the file will not be present in the
742 : // resulting BulkVersionEdit.Deleted for that level.
743 : //
744 : // After accumulation of version edits, the bulk version edit may have
745 : // information about a file which has been deleted from a level, but it may
746 : // not have information about the same file added to the same level. The add
747 : // could've occurred as part of a previous bulk version edit. In this case,
748 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
749 : // of the accumulation, because we need to decrease the refcount of the
750 : // deleted file in Apply.
751 1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
752 1 : for df, m := range ve.DeletedFiles {
753 1 : dmap := b.Deleted[df.Level]
754 1 : if dmap == nil {
755 1 : dmap = make(map[base.FileNum]*FileMetadata)
756 1 : b.Deleted[df.Level] = dmap
757 1 : }
758 :
759 1 : if m == nil {
760 1 : // m is nil only when replaying a MANIFEST.
761 1 : if b.AddedByFileNum == nil {
762 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
763 0 : }
764 1 : m = b.AddedByFileNum[df.FileNum]
765 1 : if m == nil {
766 1 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
767 1 : }
768 : }
769 1 : if m.MarkedForCompaction {
770 1 : b.MarkedForCompactionCountDiff--
771 1 : }
772 1 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
773 1 : dmap[df.FileNum] = m
774 1 : } else {
775 1 : // Present in b.Added for the same level.
776 1 : delete(b.Added[df.Level], df.FileNum)
777 1 : }
778 : }
779 :
780 : // Generate state for Added backing files. Note that these must be generated
781 : // before we loop through the NewFiles, because we need to populate the
782 : // FileBackings which might be used by the NewFiles loop.
783 1 : if b.AddedFileBacking == nil {
784 1 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
785 1 : }
786 1 : for _, fb := range ve.CreatedBackingTables {
787 1 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
788 0 : // There is already a FileBacking associated with fb.DiskFileNum.
789 0 : // This should never happen. There must always be only one FileBacking
790 0 : // associated with a backing sstable.
791 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
792 : }
793 1 : b.AddedFileBacking[fb.DiskFileNum] = fb
794 : }
795 :
796 1 : for _, nf := range ve.NewFiles {
797 1 : // A new file should not have been deleted in this or a preceding
798 1 : // VersionEdit at the same level (though files can move across levels).
799 1 : if dmap := b.Deleted[nf.Level]; dmap != nil {
800 1 : if _, ok := dmap[nf.Meta.FileNum]; ok {
801 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
802 0 : }
803 : }
804 1 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
805 1 : // FileBacking for a virtual sstable must only be nil if we're performing
806 1 : // manifest replay.
807 1 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
808 1 : if nf.Meta.FileBacking == nil {
809 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
810 0 : }
811 1 : } else if nf.Meta.FileBacking == nil {
812 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
813 0 : }
814 :
815 1 : if b.Added[nf.Level] == nil {
816 1 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
817 1 : }
818 1 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
819 1 : if b.AddedByFileNum != nil {
820 1 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
821 1 : }
822 1 : if nf.Meta.MarkedForCompaction {
823 1 : b.MarkedForCompactionCountDiff++
824 1 : }
825 : }
826 :
827 : // Since a file can be removed from backing files in exactly one version
828 : // edit it is safe to just append without any de-duplication.
829 1 : b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
830 1 :
831 1 : return nil
832 : }
833 :
834 : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
835 : // is to be applied to the provided curr Version and if the caller needs to
836 : // update the versionSet.zombieTables map. This function exists separately from
837 : // BulkVersionEdit.Apply because it is easier to reason about properties
838 : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
839 : // know that exactly one version edit is being accumulated.
840 : //
841 : // Note that the version edit passed into this function may be incomplete
842 : // because compactions don't have the ref counting information necessary to
843 : // populate VersionEdit.RemovedBackingTables. This function will complete such a
844 : // version edit by populating RemovedBackingTables.
845 : //
846 : // Invariant: Any file being deleted through ve must belong to the curr Version.
847 : // We can't have a delete for some arbitrary file which does not exist in curr.
848 : func AccumulateIncompleteAndApplySingleVE(
849 : ve *VersionEdit,
850 : curr *Version,
851 : cmp Compare,
852 : formatKey base.FormatKey,
853 : flushSplitBytes int64,
854 : readCompactionRate int64,
855 : backingStateMap map[base.DiskFileNum]*FileBacking,
856 : addBackingFunc func(*FileBacking),
857 : removeBackingFunc func(base.DiskFileNum),
858 1 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
859 1 : if len(ve.RemovedBackingTables) != 0 {
860 0 : panic("pebble: invalid incomplete version edit")
861 : }
862 1 : var b BulkVersionEdit
863 1 : err := b.Accumulate(ve)
864 1 : if err != nil {
865 0 : return nil, nil, err
866 0 : }
867 1 : zombies = make(map[base.DiskFileNum]uint64)
868 1 : v, err := b.Apply(
869 1 : curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies,
870 1 : )
871 1 : if err != nil {
872 0 : return nil, nil, err
873 0 : }
874 :
875 1 : for _, s := range b.AddedFileBacking {
876 1 : addBackingFunc(s)
877 1 : }
878 :
879 1 : for fileNum := range zombies {
880 1 : if _, ok := backingStateMap[fileNum]; ok {
881 1 : // This table was backing some virtual sstable in the latest version,
882 1 : // but is now a zombie. We add RemovedBackingTables entries for
883 1 : // these, before the version edit is written to disk.
884 1 : ve.RemovedBackingTables = append(
885 1 : ve.RemovedBackingTables, fileNum,
886 1 : )
887 1 : removeBackingFunc(fileNum)
888 1 : }
889 : }
890 1 : return v, zombies, nil
891 : }
892 :
893 : // Apply applies the delta b to the current version to produce a new
894 : // version. The new version is consistent with respect to the comparer cmp.
895 : //
896 : // curr may be nil, which is equivalent to a pointer to a zero version.
897 : //
898 : // On success, if a non-nil zombies map is provided to Apply, the map is updated
899 : // with file numbers and files sizes of deleted files. These files are
900 : // considered zombies because they are no longer referenced by the returned
901 : // Version, but cannot be deleted from disk as they are still in use by the
902 : // incoming Version.
903 : func (b *BulkVersionEdit) Apply(
904 : curr *Version,
905 : cmp Compare,
906 : formatKey base.FormatKey,
907 : flushSplitBytes int64,
908 : readCompactionRate int64,
909 : zombies map[base.DiskFileNum]uint64,
910 1 : ) (*Version, error) {
911 1 : addZombie := func(state *FileBacking) {
912 1 : if zombies != nil {
913 1 : zombies[state.DiskFileNum] = state.Size
914 1 : }
915 : }
916 1 : removeZombie := func(state *FileBacking) {
917 1 : if zombies != nil {
918 1 : delete(zombies, state.DiskFileNum)
919 1 : }
920 : }
921 :
922 1 : v := new(Version)
923 1 :
924 1 : // Adjust the count of files marked for compaction.
925 1 : if curr != nil {
926 1 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
927 1 : }
928 1 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
929 1 : if v.Stats.MarkedForCompaction < 0 {
930 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
931 0 : }
932 :
933 1 : for level := range v.Levels {
934 1 : if curr == nil || curr.Levels[level].tree.root == nil {
935 1 : v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */)
936 1 : } else {
937 1 : v.Levels[level] = curr.Levels[level].clone()
938 1 : }
939 1 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
940 1 : v.RangeKeyLevels[level] = makeLevelMetadata(cmp, level, nil /* files */)
941 1 : } else {
942 1 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
943 1 : }
944 :
945 1 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
946 1 : // There are no edits on this level.
947 1 : if level == 0 {
948 1 : // Initialize L0Sublevels.
949 1 : if curr == nil || curr.L0Sublevels == nil {
950 1 : if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
951 0 : return nil, errors.Wrap(err, "pebble: internal error")
952 0 : }
953 1 : } else {
954 1 : v.L0Sublevels = curr.L0Sublevels
955 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
956 1 : }
957 : }
958 1 : continue
959 : }
960 :
961 : // Some edits on this level.
962 1 : lm := &v.Levels[level]
963 1 : lmRange := &v.RangeKeyLevels[level]
964 1 :
965 1 : addedFilesMap := b.Added[level]
966 1 : deletedFilesMap := b.Deleted[level]
967 1 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
968 1 : return nil, base.CorruptionErrorf(
969 1 : "pebble: internal error: No current or added files but have deleted files: %d",
970 1 : errors.Safe(len(deletedFilesMap)))
971 1 : }
972 :
973 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
974 : // for a level, it won't be present in deletedFilesMap for the same
975 : // level.
976 :
977 1 : for _, f := range deletedFilesMap {
978 1 : if obsolete := v.Levels[level].remove(f); obsolete {
979 0 : // Deleting a file from the B-Tree may decrement its
980 0 : // reference count. However, because we cloned the
981 0 : // previous level's B-Tree, this should never result in a
982 0 : // file's reference count dropping to zero.
983 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
984 0 : return nil, err
985 0 : }
986 1 : if f.HasRangeKeys {
987 1 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
988 0 : // Deleting a file from the B-Tree may decrement its
989 0 : // reference count. However, because we cloned the
990 0 : // previous level's B-Tree, this should never result in a
991 0 : // file's reference count dropping to zero.
992 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
993 0 : return nil, err
994 0 : }
995 : }
996 :
997 : // Note that a backing sst will only become a zombie if the
998 : // references to it in the latest version is 0. We will remove the
999 : // backing sst from the zombie list in the next loop if one of the
1000 : // addedFiles in any of the levels is referencing the backing sst.
1001 : // This is possible if a physical sstable is virtualized, or if it
1002 : // is moved.
1003 1 : latestRefCount := f.LatestRefs()
1004 1 : if latestRefCount <= 0 {
1005 0 : // If a file is present in deletedFilesMap for a level, then it
1006 0 : // must have already been added to the level previously, which
1007 0 : // means that its latest ref count cannot be 0.
1008 0 : err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
1009 0 : return nil, err
1010 1 : } else if f.LatestUnref() == 0 {
1011 1 : addZombie(f.FileBacking)
1012 1 : }
1013 : }
1014 :
1015 1 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1016 1 : for _, f := range addedFilesMap {
1017 1 : addedFiles = append(addedFiles, f)
1018 1 : }
1019 : // Sort addedFiles by file number. This isn't necessary, but tests which
1020 : // replay invalid manifests check the error output, and the error output
1021 : // depends on the order in which files are added to the btree.
1022 1 : slices.SortFunc(addedFiles, func(a, b *FileMetadata) int {
1023 1 : return stdcmp.Compare(a.FileNum, b.FileNum)
1024 1 : })
1025 :
1026 1 : var sm, la *FileMetadata
1027 1 : for _, f := range addedFiles {
1028 1 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1029 1 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1030 1 : var allowedSeeks int64
1031 1 : if readCompactionRate != 0 {
1032 1 : allowedSeeks = int64(f.Size) / readCompactionRate
1033 1 : }
1034 1 : if allowedSeeks < 100 {
1035 1 : allowedSeeks = 100
1036 1 : }
1037 1 : f.AllowedSeeks.Store(allowedSeeks)
1038 1 : f.InitAllowedSeeks = allowedSeeks
1039 1 :
1040 1 : err := lm.insert(f)
1041 1 : // We're adding this file to the new version, so increment the
1042 1 : // latest refs count.
1043 1 : f.LatestRef()
1044 1 : if err != nil {
1045 1 : return nil, errors.Wrap(err, "pebble")
1046 1 : }
1047 1 : if f.HasRangeKeys {
1048 1 : err = lmRange.insert(f)
1049 1 : if err != nil {
1050 0 : return nil, errors.Wrap(err, "pebble")
1051 0 : }
1052 : }
1053 1 : removeZombie(f.FileBacking)
1054 1 : // Track the keys with the smallest and largest keys, so that we can
1055 1 : // check consistency of the modified span.
1056 1 : if sm == nil || base.InternalCompare(cmp, sm.Smallest, f.Smallest) > 0 {
1057 1 : sm = f
1058 1 : }
1059 1 : if la == nil || base.InternalCompare(cmp, la.Largest, f.Largest) < 0 {
1060 1 : la = f
1061 1 : }
1062 : }
1063 :
1064 1 : if level == 0 {
1065 1 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1066 1 : // Flushes and ingestions that do not delete any L0 files do not require
1067 1 : // a regeneration of L0Sublevels from scratch. We can instead generate
1068 1 : // it incrementally.
1069 1 : var err error
1070 1 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1071 1 : SortBySeqNum(addedFiles)
1072 1 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1073 1 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1074 1 : err = v.InitL0Sublevels(cmp, formatKey, flushSplitBytes)
1075 1 : } else if invariants.Enabled && err == nil {
1076 1 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], cmp, formatKey, flushSplitBytes)
1077 1 : if err != nil {
1078 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1079 : }
1080 1 : s1 := describeSublevels(base.DefaultFormatter, false /* verbose */, copyOfSublevels.Levels)
1081 1 : s2 := describeSublevels(base.DefaultFormatter, false /* verbose */, v.L0Sublevels.Levels)
1082 1 : if s1 != s2 {
1083 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1084 : }
1085 : }
1086 1 : if err != nil {
1087 0 : return nil, errors.Wrap(err, "pebble: internal error")
1088 0 : }
1089 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
1090 1 : } else if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
1091 0 : return nil, errors.Wrap(err, "pebble: internal error")
1092 0 : }
1093 1 : if err := CheckOrdering(cmp, formatKey, Level(0), v.Levels[level].Iter()); err != nil {
1094 0 : return nil, errors.Wrap(err, "pebble: internal error")
1095 0 : }
1096 1 : continue
1097 : }
1098 :
1099 : // Check consistency of the level in the vicinity of our edits.
1100 1 : if sm != nil && la != nil {
1101 1 : overlap := overlaps(v.Levels[level].Iter(), cmp, sm.Smallest.UserKey,
1102 1 : la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
1103 1 : // overlap contains all of the added files. We want to ensure that
1104 1 : // the added files are consistent with neighboring existing files
1105 1 : // too, so reslice the overlap to pull in a neighbor on each side.
1106 1 : check := overlap.Reslice(func(start, end *LevelIterator) {
1107 1 : if m := start.Prev(); m == nil {
1108 1 : start.Next()
1109 1 : }
1110 1 : if m := end.Next(); m == nil {
1111 1 : end.Prev()
1112 1 : }
1113 : })
1114 1 : if err := CheckOrdering(cmp, formatKey, Level(level), check.Iter()); err != nil {
1115 1 : return nil, errors.Wrap(err, "pebble: internal error")
1116 1 : }
1117 : }
1118 : }
1119 1 : return v, nil
1120 : }
|