Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bufio"
9 : "bytes"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "sort"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : )
20 :
21 : // TODO(peter): describe the MANIFEST file format, independently of the C++
22 : // project.
23 :
24 : var errCorruptManifest = base.CorruptionErrorf("pebble: corrupt manifest")
25 :
26 : type byteReader interface {
27 : io.ByteReader
28 : io.Reader
29 : }
30 :
31 : // Tags for the versionEdit disk format.
32 : // Tag 8 is no longer used.
33 : const (
34 : // LevelDB tags.
35 : tagComparator = 1
36 : tagLogNumber = 2
37 : tagNextFileNumber = 3
38 : tagLastSequence = 4
39 : tagCompactPointer = 5
40 : tagDeletedFile = 6
41 : tagNewFile = 7
42 : tagPrevLogNumber = 9
43 :
44 : // RocksDB tags.
45 : tagNewFile2 = 100
46 : tagNewFile3 = 102
47 : tagNewFile4 = 103
48 : tagColumnFamily = 200
49 : tagColumnFamilyAdd = 201
50 : tagColumnFamilyDrop = 202
51 : tagMaxColumnFamily = 203
52 :
53 : // Pebble tags.
54 : tagNewFile5 = 104 // Range keys.
55 : tagCreatedBackingTable = 105
56 : tagRemovedBackingTable = 106
57 :
58 : // The custom tags sub-format used by tagNewFile4 and above.
59 : customTagTerminate = 1
60 : customTagNeedsCompaction = 2
61 : customTagCreationTime = 6
62 : customTagPathID = 65
63 : customTagNonSafeIgnoreMask = 1 << 6
64 : customTagVirtual = 66
65 : )
66 :
67 : // DeletedFileEntry holds the state for a file deletion from a level. The file
68 : // itself might still be referenced by another level.
69 : type DeletedFileEntry struct {
70 : Level int
71 : FileNum base.FileNum
72 : }
73 :
74 : // NewFileEntry holds the state for a new file or one moved from a different
75 : // level.
76 : type NewFileEntry struct {
77 : Level int
78 : Meta *FileMetadata
79 : // BackingFileNum is only set during manifest replay, and only for virtual
80 : // sstables.
81 : BackingFileNum base.DiskFileNum
82 : }
83 :
84 : // VersionEdit holds the state for an edit to a Version along with other
85 : // on-disk state (log numbers, next file number, and the last sequence number).
86 : type VersionEdit struct {
87 : // ComparerName is the value of Options.Comparer.Name. This is only set in
88 : // the first VersionEdit in a manifest (either when the DB is created, or
89 : // when a new manifest is created) and is used to verify that the comparer
90 : // specified at Open matches the comparer that was previously used.
91 : ComparerName string
92 :
93 : // MinUnflushedLogNum is the smallest WAL log file number corresponding to
94 : // mutations that have not been flushed to an sstable.
95 : //
96 : // This is an optional field, and 0 represents it is not set.
97 : MinUnflushedLogNum base.DiskFileNum
98 :
99 : // ObsoletePrevLogNum is a historic artifact from LevelDB that is not used by
100 : // Pebble, RocksDB, or even LevelDB. Its use in LevelDB was deprecated in
101 : // 6/2011. We keep it around purely for informational purposes when
102 : // displaying MANIFEST contents.
103 : ObsoletePrevLogNum uint64
104 :
105 : // The next file number. A single counter is used to assign file numbers
106 : // for the WAL, MANIFEST, sstable, and OPTIONS files.
107 : NextFileNum uint64
108 :
109 : // LastSeqNum is an upper bound on the sequence numbers that have been
110 : // assigned in flushed WALs. Unflushed WALs (that will be replayed during
111 : // recovery) may contain sequence numbers greater than this value.
112 : LastSeqNum uint64
113 :
114 : // A file num may be present in both deleted files and new files when it
115 : // is moved from a lower level to a higher level (when the compaction
116 : // found that there was no overlapping file at the higher level).
117 : DeletedFiles map[DeletedFileEntry]*FileMetadata
118 : NewFiles []NewFileEntry
119 : // CreatedBackingTables can be used to preserve the FileBacking associated
120 : // with a physical sstable. This is useful when virtual sstables in the
121 : // latest version are reconstructed during manifest replay, and we also need
122 : // to reconstruct the FileBacking which is required by these virtual
123 : // sstables.
124 : //
125 : // INVARIANT: The FileBacking associated with a physical sstable must only
126 : // be added as a backing file in the same version edit where the physical
127 : // sstable is first virtualized. This means that the physical sstable must
128 : // be present in DeletedFiles and that there must be at least one virtual
129 : // sstable with the same FileBacking as the physical sstable in NewFiles. A
130 : // file must be present in CreatedBackingTables in exactly one version edit.
131 : // The physical sstable associated with the FileBacking must also not be
132 : // present in NewFiles.
133 : CreatedBackingTables []*FileBacking
134 : // RemovedBackingTables is used to remove the FileBacking associated with a
135 : // virtual sstable. Note that a backing sstable can be removed as soon as
136 : // there are no virtual sstables in the latest version which are using the
137 : // backing sstable, but the backing sstable doesn't necessarily have to be
138 : // removed atomically with the version edit which removes the last virtual
139 : // sstable associated with the backing sstable. The removal can happen in a
140 : // future version edit.
141 : //
142 : // INVARIANT: A file must only be added to RemovedBackingTables if it was
143 : // added to CreateBackingTables in a prior version edit. The same version
144 : // edit also cannot have the same file present in both CreateBackingTables
145 : // and RemovedBackingTables. A file must be present in RemovedBackingTables
146 : // in exactly one version edit.
147 : RemovedBackingTables []base.DiskFileNum
148 : }
149 :
150 : // Decode decodes an edit from the specified reader.
151 : //
152 : // Note that the Decode step will not set the FileBacking for virtual sstables
153 : // and the responsibility is left to the caller. However, the Decode step will
154 : // populate the NewFileEntry.BackingFileNum in VersionEdit.NewFiles.
155 1 : func (v *VersionEdit) Decode(r io.Reader) error {
156 1 : br, ok := r.(byteReader)
157 1 : if !ok {
158 1 : br = bufio.NewReader(r)
159 1 : }
160 1 : d := versionEditDecoder{br}
161 1 : for {
162 1 : tag, err := binary.ReadUvarint(br)
163 1 : if err == io.EOF {
164 1 : break
165 : }
166 1 : if err != nil {
167 0 : return err
168 0 : }
169 1 : switch tag {
170 1 : case tagComparator:
171 1 : s, err := d.readBytes()
172 1 : if err != nil {
173 0 : return err
174 0 : }
175 1 : v.ComparerName = string(s)
176 :
177 1 : case tagLogNumber:
178 1 : n, err := d.readUvarint()
179 1 : if err != nil {
180 0 : return err
181 0 : }
182 1 : v.MinUnflushedLogNum = base.DiskFileNum(n)
183 :
184 1 : case tagNextFileNumber:
185 1 : n, err := d.readUvarint()
186 1 : if err != nil {
187 0 : return err
188 0 : }
189 1 : v.NextFileNum = n
190 :
191 1 : case tagLastSequence:
192 1 : n, err := d.readUvarint()
193 1 : if err != nil {
194 0 : return err
195 0 : }
196 1 : v.LastSeqNum = n
197 :
198 0 : case tagCompactPointer:
199 0 : if _, err := d.readLevel(); err != nil {
200 0 : return err
201 0 : }
202 0 : if _, err := d.readBytes(); err != nil {
203 0 : return err
204 0 : }
205 : // NB: RocksDB does not use compaction pointers anymore.
206 :
207 1 : case tagRemovedBackingTable:
208 1 : n, err := d.readUvarint()
209 1 : if err != nil {
210 0 : return err
211 0 : }
212 1 : v.RemovedBackingTables = append(
213 1 : v.RemovedBackingTables, base.FileNum(n).DiskFileNum(),
214 1 : )
215 1 : case tagCreatedBackingTable:
216 1 : dfn, err := d.readUvarint()
217 1 : if err != nil {
218 0 : return err
219 0 : }
220 1 : size, err := d.readUvarint()
221 1 : if err != nil {
222 0 : return err
223 0 : }
224 1 : fileBacking := &FileBacking{
225 1 : DiskFileNum: base.FileNum(dfn).DiskFileNum(),
226 1 : Size: size,
227 1 : }
228 1 : v.CreatedBackingTables = append(v.CreatedBackingTables, fileBacking)
229 1 : case tagDeletedFile:
230 1 : level, err := d.readLevel()
231 1 : if err != nil {
232 0 : return err
233 0 : }
234 1 : fileNum, err := d.readFileNum()
235 1 : if err != nil {
236 0 : return err
237 0 : }
238 1 : if v.DeletedFiles == nil {
239 1 : v.DeletedFiles = make(map[DeletedFileEntry]*FileMetadata)
240 1 : }
241 1 : v.DeletedFiles[DeletedFileEntry{level, fileNum}] = nil
242 :
243 1 : case tagNewFile, tagNewFile2, tagNewFile3, tagNewFile4, tagNewFile5:
244 1 : level, err := d.readLevel()
245 1 : if err != nil {
246 0 : return err
247 0 : }
248 1 : fileNum, err := d.readFileNum()
249 1 : if err != nil {
250 0 : return err
251 0 : }
252 1 : if tag == tagNewFile3 {
253 0 : // The pathID field appears unused in RocksDB.
254 0 : _ /* pathID */, err := d.readUvarint()
255 0 : if err != nil {
256 0 : return err
257 0 : }
258 : }
259 1 : size, err := d.readUvarint()
260 1 : if err != nil {
261 0 : return err
262 0 : }
263 : // We read the smallest / largest key bounds differently depending on
264 : // whether we have point, range or both types of keys present in the
265 : // table.
266 1 : var (
267 1 : smallestPointKey, largestPointKey []byte
268 1 : smallestRangeKey, largestRangeKey []byte
269 1 : parsedPointBounds bool
270 1 : boundsMarker byte
271 1 : )
272 1 : if tag != tagNewFile5 {
273 1 : // Range keys not present in the table. Parse the point key bounds.
274 1 : smallestPointKey, err = d.readBytes()
275 1 : if err != nil {
276 0 : return err
277 0 : }
278 1 : largestPointKey, err = d.readBytes()
279 1 : if err != nil {
280 0 : return err
281 0 : }
282 1 : } else {
283 1 : // Range keys are present in the table. Determine whether we have point
284 1 : // keys to parse, in addition to the bounds.
285 1 : boundsMarker, err = d.ReadByte()
286 1 : if err != nil {
287 0 : return err
288 0 : }
289 : // Parse point key bounds, if present.
290 1 : if boundsMarker&maskContainsPointKeys > 0 {
291 1 : smallestPointKey, err = d.readBytes()
292 1 : if err != nil {
293 0 : return err
294 0 : }
295 1 : largestPointKey, err = d.readBytes()
296 1 : if err != nil {
297 0 : return err
298 0 : }
299 1 : parsedPointBounds = true
300 1 : } else {
301 1 : // The table does not have point keys.
302 1 : // Sanity check: the bounds must be range keys.
303 1 : if boundsMarker&maskSmallest != 0 || boundsMarker&maskLargest != 0 {
304 0 : return base.CorruptionErrorf(
305 0 : "new-file-4-range-keys: table without point keys has point key bounds: marker=%x",
306 0 : boundsMarker,
307 0 : )
308 0 : }
309 : }
310 : // Parse range key bounds.
311 1 : smallestRangeKey, err = d.readBytes()
312 1 : if err != nil {
313 0 : return err
314 0 : }
315 1 : largestRangeKey, err = d.readBytes()
316 1 : if err != nil {
317 0 : return err
318 0 : }
319 : }
320 1 : var smallestSeqNum uint64
321 1 : var largestSeqNum uint64
322 1 : if tag != tagNewFile {
323 1 : smallestSeqNum, err = d.readUvarint()
324 1 : if err != nil {
325 0 : return err
326 0 : }
327 1 : largestSeqNum, err = d.readUvarint()
328 1 : if err != nil {
329 0 : return err
330 0 : }
331 : }
332 1 : var markedForCompaction bool
333 1 : var creationTime uint64
334 1 : virtualState := struct {
335 1 : virtual bool
336 1 : backingFileNum uint64
337 1 : }{}
338 1 : if tag == tagNewFile4 || tag == tagNewFile5 {
339 1 : for {
340 1 : customTag, err := d.readUvarint()
341 1 : if err != nil {
342 0 : return err
343 0 : }
344 1 : if customTag == customTagTerminate {
345 1 : break
346 1 : } else if customTag == customTagVirtual {
347 1 : virtualState.virtual = true
348 1 : n, err := d.readUvarint()
349 1 : if err != nil {
350 0 : return err
351 0 : }
352 1 : virtualState.backingFileNum = n
353 1 : continue
354 : }
355 :
356 1 : field, err := d.readBytes()
357 1 : if err != nil {
358 0 : return err
359 0 : }
360 1 : switch customTag {
361 0 : case customTagNeedsCompaction:
362 0 : if len(field) != 1 {
363 0 : return base.CorruptionErrorf("new-file4: need-compaction field wrong size")
364 0 : }
365 0 : markedForCompaction = (field[0] == 1)
366 :
367 1 : case customTagCreationTime:
368 1 : var n int
369 1 : creationTime, n = binary.Uvarint(field)
370 1 : if n != len(field) {
371 0 : return base.CorruptionErrorf("new-file4: invalid file creation time")
372 0 : }
373 :
374 0 : case customTagPathID:
375 0 : return base.CorruptionErrorf("new-file4: path-id field not supported")
376 :
377 0 : default:
378 0 : if (customTag & customTagNonSafeIgnoreMask) != 0 {
379 0 : return base.CorruptionErrorf("new-file4: custom field not supported: %d", customTag)
380 0 : }
381 : }
382 : }
383 : }
384 1 : m := &FileMetadata{
385 1 : FileNum: fileNum,
386 1 : Size: size,
387 1 : CreationTime: int64(creationTime),
388 1 : SmallestSeqNum: smallestSeqNum,
389 1 : LargestSeqNum: largestSeqNum,
390 1 : MarkedForCompaction: markedForCompaction,
391 1 : Virtual: virtualState.virtual,
392 1 : }
393 1 : if tag != tagNewFile5 { // no range keys present
394 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
395 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
396 1 : m.HasPointKeys = true
397 1 : m.Smallest, m.Largest = m.SmallestPointKey, m.LargestPointKey
398 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypePointKey, boundTypePointKey
399 1 : } else { // range keys present
400 1 : // Set point key bounds, if parsed.
401 1 : if parsedPointBounds {
402 1 : m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey)
403 1 : m.LargestPointKey = base.DecodeInternalKey(largestPointKey)
404 1 : m.HasPointKeys = true
405 1 : }
406 : // Set range key bounds.
407 1 : m.SmallestRangeKey = base.DecodeInternalKey(smallestRangeKey)
408 1 : m.LargestRangeKey = base.DecodeInternalKey(largestRangeKey)
409 1 : m.HasRangeKeys = true
410 1 : // Set overall bounds (by default assume range keys).
411 1 : m.Smallest, m.Largest = m.SmallestRangeKey, m.LargestRangeKey
412 1 : m.boundTypeSmallest, m.boundTypeLargest = boundTypeRangeKey, boundTypeRangeKey
413 1 : if boundsMarker&maskSmallest == maskSmallest {
414 1 : m.Smallest = m.SmallestPointKey
415 1 : m.boundTypeSmallest = boundTypePointKey
416 1 : }
417 1 : if boundsMarker&maskLargest == maskLargest {
418 1 : m.Largest = m.LargestPointKey
419 1 : m.boundTypeLargest = boundTypePointKey
420 1 : }
421 : }
422 1 : m.boundsSet = true
423 1 : if !virtualState.virtual {
424 1 : m.InitPhysicalBacking()
425 1 : }
426 :
427 1 : nfe := NewFileEntry{
428 1 : Level: level,
429 1 : Meta: m,
430 1 : }
431 1 : if virtualState.virtual {
432 1 : nfe.BackingFileNum = base.FileNum(virtualState.backingFileNum).DiskFileNum()
433 1 : }
434 1 : v.NewFiles = append(v.NewFiles, nfe)
435 :
436 0 : case tagPrevLogNumber:
437 0 : n, err := d.readUvarint()
438 0 : if err != nil {
439 0 : return err
440 0 : }
441 0 : v.ObsoletePrevLogNum = n
442 :
443 0 : case tagColumnFamily, tagColumnFamilyAdd, tagColumnFamilyDrop, tagMaxColumnFamily:
444 0 : return base.CorruptionErrorf("column families are not supported")
445 :
446 0 : default:
447 0 : return errCorruptManifest
448 : }
449 : }
450 1 : return nil
451 : }
452 :
453 0 : func (v *VersionEdit) string(verbose bool, fmtKey base.FormatKey) string {
454 0 : var buf bytes.Buffer
455 0 : if v.ComparerName != "" {
456 0 : fmt.Fprintf(&buf, " comparer: %s", v.ComparerName)
457 0 : }
458 0 : if v.MinUnflushedLogNum != 0 {
459 0 : fmt.Fprintf(&buf, " log-num: %d\n", v.MinUnflushedLogNum)
460 0 : }
461 0 : if v.ObsoletePrevLogNum != 0 {
462 0 : fmt.Fprintf(&buf, " prev-log-num: %d\n", v.ObsoletePrevLogNum)
463 0 : }
464 0 : if v.NextFileNum != 0 {
465 0 : fmt.Fprintf(&buf, " next-file-num: %d\n", v.NextFileNum)
466 0 : }
467 0 : if v.LastSeqNum != 0 {
468 0 : fmt.Fprintf(&buf, " last-seq-num: %d\n", v.LastSeqNum)
469 0 : }
470 0 : entries := make([]DeletedFileEntry, 0, len(v.DeletedFiles))
471 0 : for df := range v.DeletedFiles {
472 0 : entries = append(entries, df)
473 0 : }
474 0 : sort.Slice(entries, func(i, j int) bool {
475 0 : if entries[i].Level != entries[j].Level {
476 0 : return entries[i].Level < entries[j].Level
477 0 : }
478 0 : return entries[i].FileNum < entries[j].FileNum
479 : })
480 0 : for _, df := range entries {
481 0 : fmt.Fprintf(&buf, " deleted: L%d %s\n", df.Level, df.FileNum)
482 0 : }
483 0 : for _, nf := range v.NewFiles {
484 0 : fmt.Fprintf(&buf, " added: L%d", nf.Level)
485 0 : if verbose {
486 0 : fmt.Fprintf(&buf, " %s", nf.Meta.DebugString(fmtKey, true /* verbose */))
487 0 : } else {
488 0 : fmt.Fprintf(&buf, " %s", nf.Meta.String())
489 0 : }
490 0 : if nf.Meta.CreationTime != 0 {
491 0 : fmt.Fprintf(&buf, " (%s)",
492 0 : time.Unix(nf.Meta.CreationTime, 0).UTC().Format(time.RFC3339))
493 0 : }
494 0 : fmt.Fprintln(&buf)
495 : }
496 0 : return buf.String()
497 : }
498 :
499 : // DebugString is a more verbose version of String(). Use this in tests.
500 0 : func (v *VersionEdit) DebugString(fmtKey base.FormatKey) string {
501 0 : return v.string(true /* verbose */, fmtKey)
502 0 : }
503 :
504 : // String implements fmt.Stringer for a VersionEdit.
505 0 : func (v *VersionEdit) String() string {
506 0 : return v.string(false /* verbose */, base.DefaultFormatter)
507 0 : }
508 :
509 : // Encode encodes an edit to the specified writer.
510 1 : func (v *VersionEdit) Encode(w io.Writer) error {
511 1 : e := versionEditEncoder{new(bytes.Buffer)}
512 1 :
513 1 : if v.ComparerName != "" {
514 1 : e.writeUvarint(tagComparator)
515 1 : e.writeString(v.ComparerName)
516 1 : }
517 1 : if v.MinUnflushedLogNum != 0 {
518 1 : e.writeUvarint(tagLogNumber)
519 1 : e.writeUvarint(uint64(v.MinUnflushedLogNum))
520 1 : }
521 1 : if v.ObsoletePrevLogNum != 0 {
522 0 : e.writeUvarint(tagPrevLogNumber)
523 0 : e.writeUvarint(v.ObsoletePrevLogNum)
524 0 : }
525 1 : if v.NextFileNum != 0 {
526 1 : e.writeUvarint(tagNextFileNumber)
527 1 : e.writeUvarint(uint64(v.NextFileNum))
528 1 : }
529 1 : for _, dfn := range v.RemovedBackingTables {
530 1 : e.writeUvarint(tagRemovedBackingTable)
531 1 : e.writeUvarint(uint64(dfn.FileNum()))
532 1 : }
533 1 : for _, fileBacking := range v.CreatedBackingTables {
534 1 : e.writeUvarint(tagCreatedBackingTable)
535 1 : e.writeUvarint(uint64(fileBacking.DiskFileNum.FileNum()))
536 1 : e.writeUvarint(fileBacking.Size)
537 1 : }
538 : // RocksDB requires LastSeqNum to be encoded for the first MANIFEST entry,
539 : // even though its value is zero. We detect this by encoding LastSeqNum when
540 : // ComparerName is set.
541 1 : if v.LastSeqNum != 0 || v.ComparerName != "" {
542 1 : e.writeUvarint(tagLastSequence)
543 1 : e.writeUvarint(v.LastSeqNum)
544 1 : }
545 1 : for x := range v.DeletedFiles {
546 1 : e.writeUvarint(tagDeletedFile)
547 1 : e.writeUvarint(uint64(x.Level))
548 1 : e.writeUvarint(uint64(x.FileNum))
549 1 : }
550 1 : for _, x := range v.NewFiles {
551 1 : customFields := x.Meta.MarkedForCompaction || x.Meta.CreationTime != 0 || x.Meta.Virtual
552 1 : var tag uint64
553 1 : switch {
554 1 : case x.Meta.HasRangeKeys:
555 1 : tag = tagNewFile5
556 1 : case customFields:
557 1 : tag = tagNewFile4
558 0 : default:
559 0 : tag = tagNewFile2
560 : }
561 1 : e.writeUvarint(tag)
562 1 : e.writeUvarint(uint64(x.Level))
563 1 : e.writeUvarint(uint64(x.Meta.FileNum))
564 1 : e.writeUvarint(x.Meta.Size)
565 1 : if !x.Meta.HasRangeKeys {
566 1 : // If we have no range keys, preserve the original format and write the
567 1 : // smallest and largest point keys.
568 1 : e.writeKey(x.Meta.SmallestPointKey)
569 1 : e.writeKey(x.Meta.LargestPointKey)
570 1 : } else {
571 1 : // When range keys are present, we first write a marker byte that
572 1 : // indicates if the table also contains point keys, in addition to how the
573 1 : // overall bounds for the table should be reconstructed. This byte is
574 1 : // followed by the keys themselves.
575 1 : b, err := x.Meta.boundsMarker()
576 1 : if err != nil {
577 0 : return err
578 0 : }
579 1 : if err = e.WriteByte(b); err != nil {
580 0 : return err
581 0 : }
582 : // Write point key bounds (if present).
583 1 : if x.Meta.HasPointKeys {
584 1 : e.writeKey(x.Meta.SmallestPointKey)
585 1 : e.writeKey(x.Meta.LargestPointKey)
586 1 : }
587 : // Write range key bounds.
588 1 : e.writeKey(x.Meta.SmallestRangeKey)
589 1 : e.writeKey(x.Meta.LargestRangeKey)
590 : }
591 1 : e.writeUvarint(x.Meta.SmallestSeqNum)
592 1 : e.writeUvarint(x.Meta.LargestSeqNum)
593 1 : if customFields {
594 1 : if x.Meta.CreationTime != 0 {
595 1 : e.writeUvarint(customTagCreationTime)
596 1 : var buf [binary.MaxVarintLen64]byte
597 1 : n := binary.PutUvarint(buf[:], uint64(x.Meta.CreationTime))
598 1 : e.writeBytes(buf[:n])
599 1 : }
600 1 : if x.Meta.MarkedForCompaction {
601 0 : e.writeUvarint(customTagNeedsCompaction)
602 0 : e.writeBytes([]byte{1})
603 0 : }
604 1 : if x.Meta.Virtual {
605 1 : e.writeUvarint(customTagVirtual)
606 1 : e.writeUvarint(uint64(x.Meta.FileBacking.DiskFileNum.FileNum()))
607 1 : }
608 1 : e.writeUvarint(customTagTerminate)
609 : }
610 : }
611 1 : _, err := w.Write(e.Bytes())
612 1 : return err
613 : }
614 :
615 : // versionEditDecoder should be used to decode version edits.
616 : type versionEditDecoder struct {
617 : byteReader
618 : }
619 :
620 1 : func (d versionEditDecoder) readBytes() ([]byte, error) {
621 1 : n, err := d.readUvarint()
622 1 : if err != nil {
623 0 : return nil, err
624 0 : }
625 1 : s := make([]byte, n)
626 1 : _, err = io.ReadFull(d, s)
627 1 : if err != nil {
628 0 : if err == io.ErrUnexpectedEOF {
629 0 : return nil, errCorruptManifest
630 0 : }
631 0 : return nil, err
632 : }
633 1 : return s, nil
634 : }
635 :
636 1 : func (d versionEditDecoder) readLevel() (int, error) {
637 1 : u, err := d.readUvarint()
638 1 : if err != nil {
639 0 : return 0, err
640 0 : }
641 1 : if u >= NumLevels {
642 0 : return 0, errCorruptManifest
643 0 : }
644 1 : return int(u), nil
645 : }
646 :
647 1 : func (d versionEditDecoder) readFileNum() (base.FileNum, error) {
648 1 : u, err := d.readUvarint()
649 1 : if err != nil {
650 0 : return 0, err
651 0 : }
652 1 : return base.FileNum(u), nil
653 : }
654 :
655 1 : func (d versionEditDecoder) readUvarint() (uint64, error) {
656 1 : u, err := binary.ReadUvarint(d)
657 1 : if err != nil {
658 0 : if err == io.EOF {
659 0 : return 0, errCorruptManifest
660 0 : }
661 0 : return 0, err
662 : }
663 1 : return u, nil
664 : }
665 :
666 : type versionEditEncoder struct {
667 : *bytes.Buffer
668 : }
669 :
670 1 : func (e versionEditEncoder) writeBytes(p []byte) {
671 1 : e.writeUvarint(uint64(len(p)))
672 1 : e.Write(p)
673 1 : }
674 :
675 1 : func (e versionEditEncoder) writeKey(k InternalKey) {
676 1 : e.writeUvarint(uint64(k.Size()))
677 1 : e.Write(k.UserKey)
678 1 : buf := k.EncodeTrailer()
679 1 : e.Write(buf[:])
680 1 : }
681 :
682 1 : func (e versionEditEncoder) writeString(s string) {
683 1 : e.writeUvarint(uint64(len(s)))
684 1 : e.WriteString(s)
685 1 : }
686 :
687 1 : func (e versionEditEncoder) writeUvarint(u uint64) {
688 1 : var buf [binary.MaxVarintLen64]byte
689 1 : n := binary.PutUvarint(buf[:], u)
690 1 : e.Write(buf[:n])
691 1 : }
692 :
693 : // BulkVersionEdit summarizes the files added and deleted from a set of version
694 : // edits.
695 : //
696 : // INVARIANTS:
697 : // No file can be added to a level more than once. This is true globally, and
698 : // also true for all of the calls to Accumulate for a single bulk version edit.
699 : //
700 : // No file can be removed from a level more than once. This is true globally,
701 : // and also true for all of the calls to Accumulate for a single bulk version
702 : // edit.
703 : //
704 : // A file must not be added and removed from a given level in the same version
705 : // edit.
706 : //
707 : // A file that is being removed from a level must have been added to that level
708 : // before (in a prior version edit). Note that a given file can be deleted from
709 : // a level and added to another level in a single version edit
710 : type BulkVersionEdit struct {
711 : Added [NumLevels]map[base.FileNum]*FileMetadata
712 : Deleted [NumLevels]map[base.FileNum]*FileMetadata
713 :
714 : // AddedFileBacking is a map to support lookup so that we can populate the
715 : // FileBacking of virtual sstables during manifest replay.
716 : AddedFileBacking map[base.DiskFileNum]*FileBacking
717 : RemovedFileBacking []base.DiskFileNum
718 :
719 : // AddedByFileNum maps file number to file metadata for all added files
720 : // from accumulated version edits. AddedByFileNum is only populated if set
721 : // to non-nil by a caller. It must be set to non-nil when replaying
722 : // version edits read from a MANIFEST (as opposed to VersionEdits
723 : // constructed in-memory). While replaying a MANIFEST file,
724 : // VersionEdit.DeletedFiles map entries have nil values, because the
725 : // on-disk deletion record encodes only the file number. Accumulate
726 : // uses AddedByFileNum to correctly populate the BulkVersionEdit's Deleted
727 : // field with non-nil *FileMetadata.
728 : AddedByFileNum map[base.FileNum]*FileMetadata
729 :
730 : // MarkedForCompactionCountDiff holds the aggregated count of files
731 : // marked for compaction added or removed.
732 : MarkedForCompactionCountDiff int
733 : }
734 :
735 : // Accumulate adds the file addition and deletions in the specified version
736 : // edit to the bulk edit's internal state.
737 : //
738 : // INVARIANTS:
739 : // If a file is added to a given level in a call to Accumulate and then removed
740 : // from that level in a subsequent call, the file will not be present in the
741 : // resulting BulkVersionEdit.Deleted for that level.
742 : //
743 : // After accumulation of version edits, the bulk version edit may have
744 : // information about a file which has been deleted from a level, but it may
745 : // not have information about the same file added to the same level. The add
746 : // could've occurred as part of a previous bulk version edit. In this case,
747 : // the deleted file must be present in BulkVersionEdit.Deleted, at the end
748 : // of the accumulation, because we need to decrease the refcount of the
749 : // deleted file in Apply.
750 1 : func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
751 1 : for df, m := range ve.DeletedFiles {
752 1 : dmap := b.Deleted[df.Level]
753 1 : if dmap == nil {
754 1 : dmap = make(map[base.FileNum]*FileMetadata)
755 1 : b.Deleted[df.Level] = dmap
756 1 : }
757 :
758 1 : if m == nil {
759 1 : // m is nil only when replaying a MANIFEST.
760 1 : if b.AddedByFileNum == nil {
761 0 : return errors.Errorf("deleted file L%d.%s's metadata is absent and bve.AddedByFileNum is nil", df.Level, df.FileNum)
762 0 : }
763 1 : m = b.AddedByFileNum[df.FileNum]
764 1 : if m == nil {
765 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", df.Level, df.FileNum)
766 0 : }
767 : }
768 1 : if m.MarkedForCompaction {
769 0 : b.MarkedForCompactionCountDiff--
770 0 : }
771 1 : if _, ok := b.Added[df.Level][df.FileNum]; !ok {
772 1 : dmap[df.FileNum] = m
773 1 : } else {
774 1 : // Present in b.Added for the same level.
775 1 : delete(b.Added[df.Level], df.FileNum)
776 1 : }
777 : }
778 :
779 : // Generate state for Added backing files. Note that these must be generated
780 : // before we loop through the NewFiles, because we need to populate the
781 : // FileBackings which might be used by the NewFiles loop.
782 1 : if b.AddedFileBacking == nil {
783 1 : b.AddedFileBacking = make(map[base.DiskFileNum]*FileBacking)
784 1 : }
785 1 : for _, fb := range ve.CreatedBackingTables {
786 1 : if _, ok := b.AddedFileBacking[fb.DiskFileNum]; ok {
787 0 : // There is already a FileBacking associated with fb.DiskFileNum.
788 0 : // This should never happen. There must always be only one FileBacking
789 0 : // associated with a backing sstable.
790 0 : panic(fmt.Sprintf("pebble: duplicate file backing %s", fb.DiskFileNum.String()))
791 : }
792 1 : b.AddedFileBacking[fb.DiskFileNum] = fb
793 : }
794 :
795 1 : for _, nf := range ve.NewFiles {
796 1 : // A new file should not have been deleted in this or a preceding
797 1 : // VersionEdit at the same level (though files can move across levels).
798 1 : if dmap := b.Deleted[nf.Level]; dmap != nil {
799 1 : if _, ok := dmap[nf.Meta.FileNum]; ok {
800 0 : return base.CorruptionErrorf("pebble: file deleted L%d.%s before it was inserted", nf.Level, nf.Meta.FileNum)
801 0 : }
802 : }
803 1 : if nf.Meta.Virtual && nf.Meta.FileBacking == nil {
804 1 : // FileBacking for a virtual sstable must only be nil if we're performing
805 1 : // manifest replay.
806 1 : nf.Meta.FileBacking = b.AddedFileBacking[nf.BackingFileNum]
807 1 : if nf.Meta.FileBacking == nil {
808 0 : return errors.Errorf("FileBacking for virtual sstable must not be nil")
809 0 : }
810 1 : } else if nf.Meta.FileBacking == nil {
811 0 : return errors.Errorf("Added file L%d.%s's has no FileBacking", nf.Level, nf.Meta.FileNum)
812 0 : }
813 :
814 1 : if b.Added[nf.Level] == nil {
815 1 : b.Added[nf.Level] = make(map[base.FileNum]*FileMetadata)
816 1 : }
817 1 : b.Added[nf.Level][nf.Meta.FileNum] = nf.Meta
818 1 : if b.AddedByFileNum != nil {
819 1 : b.AddedByFileNum[nf.Meta.FileNum] = nf.Meta
820 1 : }
821 1 : if nf.Meta.MarkedForCompaction {
822 0 : b.MarkedForCompactionCountDiff++
823 0 : }
824 : }
825 :
826 : // Since a file can be removed from backing files in exactly one version
827 : // edit it is safe to just append without any de-duplication.
828 1 : b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...)
829 1 :
830 1 : return nil
831 : }
832 :
833 : // AccumulateIncompleteAndApplySingleVE should be called if a single version edit
834 : // is to be applied to the provided curr Version and if the caller needs to
835 : // update the versionSet.zombieTables map. This function exists separately from
836 : // BulkVersionEdit.Apply because it is easier to reason about properties
837 : // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
838 : // know that exactly one version edit is being accumulated.
839 : //
840 : // Note that the version edit passed into this function may be incomplete
841 : // because compactions don't have the ref counting information necessary to
842 : // populate VersionEdit.RemovedBackingTables. This function will complete such a
843 : // version edit by populating RemovedBackingTables.
844 : //
845 : // Invariant: Any file being deleted through ve must belong to the curr Version.
846 : // We can't have a delete for some arbitrary file which does not exist in curr.
847 : func AccumulateIncompleteAndApplySingleVE(
848 : ve *VersionEdit,
849 : curr *Version,
850 : cmp Compare,
851 : formatKey base.FormatKey,
852 : flushSplitBytes int64,
853 : readCompactionRate int64,
854 : backingStateMap map[base.DiskFileNum]*FileBacking,
855 : addBackingFunc func(*FileBacking),
856 : removeBackingFunc func(base.DiskFileNum),
857 1 : ) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
858 1 : if len(ve.RemovedBackingTables) != 0 {
859 0 : panic("pebble: invalid incomplete version edit")
860 : }
861 1 : var b BulkVersionEdit
862 1 : err := b.Accumulate(ve)
863 1 : if err != nil {
864 0 : return nil, nil, err
865 0 : }
866 1 : zombies = make(map[base.DiskFileNum]uint64)
867 1 : v, err := b.Apply(
868 1 : curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies,
869 1 : )
870 1 : if err != nil {
871 0 : return nil, nil, err
872 0 : }
873 :
874 1 : for _, s := range b.AddedFileBacking {
875 1 : addBackingFunc(s)
876 1 : }
877 :
878 1 : for fileNum := range zombies {
879 1 : if _, ok := backingStateMap[fileNum]; ok {
880 1 : // This table was backing some virtual sstable in the latest version,
881 1 : // but is now a zombie. We add RemovedBackingTables entries for
882 1 : // these, before the version edit is written to disk.
883 1 : ve.RemovedBackingTables = append(
884 1 : ve.RemovedBackingTables, fileNum,
885 1 : )
886 1 : removeBackingFunc(fileNum)
887 1 : }
888 : }
889 1 : return v, zombies, nil
890 : }
891 :
892 : // Apply applies the delta b to the current version to produce a new
893 : // version. The new version is consistent with respect to the comparer cmp.
894 : //
895 : // curr may be nil, which is equivalent to a pointer to a zero version.
896 : //
897 : // On success, if a non-nil zombies map is provided to Apply, the map is updated
898 : // with file numbers and files sizes of deleted files. These files are
899 : // considered zombies because they are no longer referenced by the returned
900 : // Version, but cannot be deleted from disk as they are still in use by the
901 : // incoming Version.
902 : func (b *BulkVersionEdit) Apply(
903 : curr *Version,
904 : cmp Compare,
905 : formatKey base.FormatKey,
906 : flushSplitBytes int64,
907 : readCompactionRate int64,
908 : zombies map[base.DiskFileNum]uint64,
909 1 : ) (*Version, error) {
910 1 : addZombie := func(state *FileBacking) {
911 1 : if zombies != nil {
912 1 : zombies[state.DiskFileNum] = state.Size
913 1 : }
914 : }
915 1 : removeZombie := func(state *FileBacking) {
916 1 : if zombies != nil {
917 1 : delete(zombies, state.DiskFileNum)
918 1 : }
919 : }
920 :
921 1 : v := new(Version)
922 1 :
923 1 : // Adjust the count of files marked for compaction.
924 1 : if curr != nil {
925 1 : v.Stats.MarkedForCompaction = curr.Stats.MarkedForCompaction
926 1 : }
927 1 : v.Stats.MarkedForCompaction += b.MarkedForCompactionCountDiff
928 1 : if v.Stats.MarkedForCompaction < 0 {
929 0 : return nil, base.CorruptionErrorf("pebble: version marked for compaction count negative")
930 0 : }
931 :
932 1 : for level := range v.Levels {
933 1 : if curr == nil || curr.Levels[level].tree.root == nil {
934 1 : v.Levels[level] = makeLevelMetadata(cmp, level, nil /* files */)
935 1 : } else {
936 1 : v.Levels[level] = curr.Levels[level].clone()
937 1 : }
938 1 : if curr == nil || curr.RangeKeyLevels[level].tree.root == nil {
939 1 : v.RangeKeyLevels[level] = makeLevelMetadata(cmp, level, nil /* files */)
940 1 : } else {
941 1 : v.RangeKeyLevels[level] = curr.RangeKeyLevels[level].clone()
942 1 : }
943 :
944 1 : if len(b.Added[level]) == 0 && len(b.Deleted[level]) == 0 {
945 1 : // There are no edits on this level.
946 1 : if level == 0 {
947 1 : // Initialize L0Sublevels.
948 1 : if curr == nil || curr.L0Sublevels == nil {
949 1 : if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
950 0 : return nil, errors.Wrap(err, "pebble: internal error")
951 0 : }
952 1 : } else {
953 1 : v.L0Sublevels = curr.L0Sublevels
954 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
955 1 : }
956 : }
957 1 : continue
958 : }
959 :
960 : // Some edits on this level.
961 1 : lm := &v.Levels[level]
962 1 : lmRange := &v.RangeKeyLevels[level]
963 1 :
964 1 : addedFilesMap := b.Added[level]
965 1 : deletedFilesMap := b.Deleted[level]
966 1 : if n := v.Levels[level].Len() + len(addedFilesMap); n == 0 {
967 0 : return nil, base.CorruptionErrorf(
968 0 : "pebble: internal error: No current or added files but have deleted files: %d",
969 0 : errors.Safe(len(deletedFilesMap)))
970 0 : }
971 :
972 : // NB: addedFilesMap may be empty. If a file is present in addedFilesMap
973 : // for a level, it won't be present in deletedFilesMap for the same
974 : // level.
975 :
976 1 : for _, f := range deletedFilesMap {
977 1 : if obsolete := v.Levels[level].remove(f); obsolete {
978 0 : // Deleting a file from the B-Tree may decrement its
979 0 : // reference count. However, because we cloned the
980 0 : // previous level's B-Tree, this should never result in a
981 0 : // file's reference count dropping to zero.
982 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during B-Tree removal", level, f.FileNum)
983 0 : return nil, err
984 0 : }
985 1 : if f.HasRangeKeys {
986 1 : if obsolete := v.RangeKeyLevels[level].remove(f); obsolete {
987 0 : // Deleting a file from the B-Tree may decrement its
988 0 : // reference count. However, because we cloned the
989 0 : // previous level's B-Tree, this should never result in a
990 0 : // file's reference count dropping to zero.
991 0 : err := errors.Errorf("pebble: internal error: file L%d.%s obsolete during range-key B-Tree removal", level, f.FileNum)
992 0 : return nil, err
993 0 : }
994 : }
995 :
996 : // Note that a backing sst will only become a zombie if the
997 : // references to it in the latest version is 0. We will remove the
998 : // backing sst from the zombie list in the next loop if one of the
999 : // addedFiles in any of the levels is referencing the backing sst.
1000 : // This is possible if a physical sstable is virtualized, or if it
1001 : // is moved.
1002 1 : latestRefCount := f.LatestRefs()
1003 1 : if latestRefCount <= 0 {
1004 0 : // If a file is present in deletedFilesMap for a level, then it
1005 0 : // must have already been added to the level previously, which
1006 0 : // means that its latest ref count cannot be 0.
1007 0 : err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
1008 0 : return nil, err
1009 1 : } else if f.LatestUnref() == 0 {
1010 1 : addZombie(f.FileBacking)
1011 1 : }
1012 : }
1013 :
1014 1 : addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
1015 1 : for _, f := range addedFilesMap {
1016 1 : addedFiles = append(addedFiles, f)
1017 1 : }
1018 : // Sort addedFiles by file number. This isn't necessary, but tests which
1019 : // replay invalid manifests check the error output, and the error output
1020 : // depends on the order in which files are added to the btree.
1021 1 : sort.Slice(addedFiles, func(i, j int) bool {
1022 1 : return addedFiles[i].FileNum < addedFiles[j].FileNum
1023 1 : })
1024 :
1025 1 : var sm, la *FileMetadata
1026 1 : for _, f := range addedFiles {
1027 1 : // NB: allowedSeeks is used for read triggered compactions. It is set using
1028 1 : // Options.Experimental.ReadCompactionRate which defaults to 32KB.
1029 1 : var allowedSeeks int64
1030 1 : if readCompactionRate != 0 {
1031 1 : allowedSeeks = int64(f.Size) / readCompactionRate
1032 1 : }
1033 1 : if allowedSeeks < 100 {
1034 1 : allowedSeeks = 100
1035 1 : }
1036 1 : f.AllowedSeeks.Store(allowedSeeks)
1037 1 : f.InitAllowedSeeks = allowedSeeks
1038 1 :
1039 1 : err := lm.insert(f)
1040 1 : // We're adding this file to the new version, so increment the
1041 1 : // latest refs count.
1042 1 : f.LatestRef()
1043 1 : if err != nil {
1044 0 : return nil, errors.Wrap(err, "pebble")
1045 0 : }
1046 1 : if f.HasRangeKeys {
1047 1 : err = lmRange.insert(f)
1048 1 : if err != nil {
1049 0 : return nil, errors.Wrap(err, "pebble")
1050 0 : }
1051 : }
1052 1 : removeZombie(f.FileBacking)
1053 1 : // Track the keys with the smallest and largest keys, so that we can
1054 1 : // check consistency of the modified span.
1055 1 : if sm == nil || base.InternalCompare(cmp, sm.Smallest, f.Smallest) > 0 {
1056 1 : sm = f
1057 1 : }
1058 1 : if la == nil || base.InternalCompare(cmp, la.Largest, f.Largest) < 0 {
1059 1 : la = f
1060 1 : }
1061 : }
1062 :
1063 1 : if level == 0 {
1064 1 : if curr != nil && curr.L0Sublevels != nil && len(deletedFilesMap) == 0 {
1065 1 : // Flushes and ingestions that do not delete any L0 files do not require
1066 1 : // a regeneration of L0Sublevels from scratch. We can instead generate
1067 1 : // it incrementally.
1068 1 : var err error
1069 1 : // AddL0Files requires addedFiles to be sorted in seqnum order.
1070 1 : SortBySeqNum(addedFiles)
1071 1 : v.L0Sublevels, err = curr.L0Sublevels.AddL0Files(addedFiles, flushSplitBytes, &v.Levels[0])
1072 1 : if errors.Is(err, errInvalidL0SublevelsOpt) {
1073 1 : err = v.InitL0Sublevels(cmp, formatKey, flushSplitBytes)
1074 1 : } else if invariants.Enabled && err == nil {
1075 1 : copyOfSublevels, err := NewL0Sublevels(&v.Levels[0], cmp, formatKey, flushSplitBytes)
1076 1 : if err != nil {
1077 0 : panic(fmt.Sprintf("error when regenerating sublevels: %s", err))
1078 : }
1079 1 : s1 := describeSublevels(base.DefaultFormatter, false /* verbose */, copyOfSublevels.Levels)
1080 1 : s2 := describeSublevels(base.DefaultFormatter, false /* verbose */, v.L0Sublevels.Levels)
1081 1 : if s1 != s2 {
1082 0 : panic(fmt.Sprintf("incremental L0 sublevel generation produced different output than regeneration: %s != %s", s1, s2))
1083 : }
1084 : }
1085 1 : if err != nil {
1086 0 : return nil, errors.Wrap(err, "pebble: internal error")
1087 0 : }
1088 1 : v.L0SublevelFiles = v.L0Sublevels.Levels
1089 1 : } else if err := v.InitL0Sublevels(cmp, formatKey, flushSplitBytes); err != nil {
1090 0 : return nil, errors.Wrap(err, "pebble: internal error")
1091 0 : }
1092 1 : if err := CheckOrdering(cmp, formatKey, Level(0), v.Levels[level].Iter()); err != nil {
1093 0 : return nil, errors.Wrap(err, "pebble: internal error")
1094 0 : }
1095 1 : continue
1096 : }
1097 :
1098 : // Check consistency of the level in the vicinity of our edits.
1099 1 : if sm != nil && la != nil {
1100 1 : overlap := overlaps(v.Levels[level].Iter(), cmp, sm.Smallest.UserKey,
1101 1 : la.Largest.UserKey, la.Largest.IsExclusiveSentinel())
1102 1 : // overlap contains all of the added files. We want to ensure that
1103 1 : // the added files are consistent with neighboring existing files
1104 1 : // too, so reslice the overlap to pull in a neighbor on each side.
1105 1 : check := overlap.Reslice(func(start, end *LevelIterator) {
1106 1 : if m := start.Prev(); m == nil {
1107 1 : start.Next()
1108 1 : }
1109 1 : if m := end.Next(); m == nil {
1110 1 : end.Prev()
1111 1 : }
1112 : })
1113 1 : if err := CheckOrdering(cmp, formatKey, Level(level), check.Iter()); err != nil {
1114 0 : return nil, errors.Wrap(err, "pebble: internal error")
1115 0 : }
1116 : }
1117 : }
1118 1 : return v, nil
1119 : }
|