Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "io"
12 : "slices"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/batchrepr"
16 : "github.com/cockroachdb/pebble/internal/arenaskl"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/manifest"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
23 : "github.com/cockroachdb/pebble/record"
24 : "github.com/cockroachdb/pebble/vfs"
25 : "github.com/cockroachdb/pebble/vfs/atomicfs"
26 : "github.com/cockroachdb/pebble/wal"
27 : )
28 :
29 : // recoverState reads the named database directory and recovers the set of files
30 : // encoding the database state at the moment the previous process exited.
31 : // recoverState is read only and does not mutate the on-disk state.
32 1 : func recoverState(opts *Options, dirname string) (s *recoveredState, err error) {
33 1 : rs := &recoveredState{
34 1 : dirname: dirname,
35 1 : fs: opts.FS,
36 1 : }
37 1 : if err := rs.init(opts, dirname); err != nil {
38 0 : return nil, errors.CombineErrors(err, rs.Close())
39 0 : }
40 1 : return rs, nil
41 : }
42 :
43 1 : func (rs *recoveredState) init(opts *Options, dirname string) error {
44 1 : dirs, err := prepareOpenAndLockDirs(dirname, opts)
45 1 : if err != nil {
46 0 : err = errors.Wrapf(err, "error opening database at %q", dirname)
47 0 : err = errors.CombineErrors(err, dirs.Close())
48 0 : return err
49 0 : }
50 1 : rs.dirs = dirs
51 1 :
52 1 : // List the directory contents. This also happens to include WAL log files,
53 1 : // if they are in the same dir.
54 1 : if rs.ls, err = opts.FS.List(dirname); err != nil {
55 0 : return errors.Wrapf(err, "pebble: database %q", dirname)
56 0 : }
57 : // Find the currently format major version and active manifest.
58 1 : rs.fmv, rs.fmvMarker, err = lookupFormatMajorVersion(opts.FS, dirname, rs.ls)
59 1 : if err != nil {
60 0 : return errors.Wrapf(err, "pebble: database %q", dirname)
61 0 : }
62 :
63 : // Open the object storage provider.
64 1 : providerSettings := opts.MakeObjStorageProviderSettings(dirname)
65 1 : providerSettings.Local.FSDirInitialListing = rs.ls
66 1 : rs.objProvider, err = objstorageprovider.Open(providerSettings)
67 1 : if err != nil {
68 0 : return errors.Wrapf(err, "pebble: database %q", dirname)
69 0 : }
70 :
71 : // Determine which manifest is current, and if one exists, replay it to
72 : // recover the current Version of the LSM.
73 1 : var manifestExists bool
74 1 : rs.manifestMarker, rs.manifestFileNum, manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
75 1 : if err != nil {
76 0 : return errors.Wrapf(err, "pebble: database %q", dirname)
77 0 : }
78 1 : if manifestExists {
79 1 : recoveredVersion, err := recoverVersion(opts, dirname, rs.objProvider, rs.manifestFileNum)
80 1 : if err != nil {
81 0 : return err
82 0 : }
83 1 : if !opts.DisableConsistencyCheck {
84 1 : if err := checkConsistency(recoveredVersion.version, rs.objProvider); err != nil {
85 0 : return err
86 0 : }
87 : }
88 1 : rs.recoveredVersion = recoveredVersion
89 : }
90 :
91 : // Identify the maximal file number in the directory. We do not want to
92 : // reuse any existing file numbers even if they are obsolete file numbers to
93 : // avoid modifying an ingested sstable's original external file.
94 : //
95 : // We also identify the most recent OPTIONS file, so we can validate our
96 : // configured Options against the previous options, and we collect any
97 : // orphaned temporary files that should be removed.
98 1 : var previousOptionsFileNum base.DiskFileNum
99 1 : for _, filename := range rs.ls {
100 1 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
101 1 : if !ok {
102 1 : continue
103 : }
104 1 : rs.maxFilenumUsed = max(rs.maxFilenumUsed, fn)
105 1 : switch ft {
106 0 : case base.FileTypeLog:
107 : // Ignore.
108 1 : case base.FileTypeOptions:
109 1 : if previousOptionsFileNum < fn {
110 1 : previousOptionsFileNum = fn
111 1 : rs.previousOptionsFilename = filename
112 1 : }
113 0 : case base.FileTypeTemp, base.FileTypeOldTemp:
114 0 : rs.obsoleteTempFilenames = append(rs.obsoleteTempFilenames, filename)
115 : }
116 : }
117 :
118 : // Validate the most-recent OPTIONS file, if there is one.
119 1 : if rs.previousOptionsFilename != "" {
120 1 : path := opts.FS.PathJoin(dirname, rs.previousOptionsFilename)
121 1 : previousOptions, err := readOptionsFile(opts, path)
122 1 : if err != nil {
123 0 : return err
124 0 : }
125 1 : if err := opts.CheckCompatibility(dirname, previousOptions); err != nil {
126 0 : return err
127 0 : }
128 : }
129 :
130 : // Ratchet rs.maxFilenumUsed ahead of all known objects in the objProvider.
131 : // This avoids FileNum collisions with obsolete sstables.
132 1 : objects := rs.objProvider.List()
133 1 : for _, obj := range objects {
134 1 : rs.maxFilenumUsed = max(rs.maxFilenumUsed, obj.DiskFileNum)
135 1 : }
136 :
137 : // Find all the WAL files across the various WAL directories.
138 1 : wals, err := wal.Scan(rs.dirs.WALDirs()...)
139 1 : if err != nil {
140 0 : return err
141 0 : }
142 1 : for _, w := range wals {
143 1 : // Don't reuse any obsolete file numbers to avoid modifying an ingested
144 1 : // sstable's original external file.
145 1 : rs.maxFilenumUsed = max(rs.maxFilenumUsed, base.DiskFileNum(w.Num))
146 1 : if rs.recoveredVersion == nil || base.DiskFileNum(w.Num) >= rs.recoveredVersion.minUnflushedLogNum {
147 1 : rs.walsReplay = append(rs.walsReplay, w)
148 1 : } else {
149 1 : rs.walsObsolete = append(rs.walsObsolete, w)
150 1 : }
151 : }
152 1 : return nil
153 : }
154 :
155 : // recoveredState encapsulates state recovered from reading the database
156 : // directory.
157 : type recoveredState struct {
158 : dirname string
159 : dirs *resolvedDirs
160 : fmv FormatMajorVersion
161 : fmvMarker *atomicfs.Marker
162 : fs vfs.FS
163 : ls []string
164 : manifestMarker *atomicfs.Marker
165 : manifestFileNum base.DiskFileNum
166 : maxFilenumUsed base.DiskFileNum
167 : obsoleteTempFilenames []string
168 : objProvider objstorage.Provider
169 : previousOptionsFilename string
170 : recoveredVersion *recoveredVersion
171 : walsObsolete wal.Logs
172 : walsReplay wal.Logs
173 : }
174 :
175 : // RemoveObsolete removes obsolete files uncovered during recovery.
176 1 : func (rs *recoveredState) RemoveObsolete(opts *Options) error {
177 1 : var err error
178 1 : // Atomic markers may leave behind obsolete files if there's a crash
179 1 : // mid-update.
180 1 : if rs.fmvMarker != nil {
181 1 : err = errors.CombineErrors(err, rs.fmvMarker.RemoveObsolete())
182 1 : }
183 1 : if rs.manifestMarker != nil {
184 1 : err = errors.CombineErrors(err, rs.manifestMarker.RemoveObsolete())
185 1 : }
186 : // Some codepaths write to a temporary file and then rename it to its final
187 : // location when complete. A temp file is leftover if a process exits
188 : // before the rename. Remove any that were found.
189 1 : for _, filename := range rs.obsoleteTempFilenames {
190 0 : err = errors.CombineErrors(err, rs.fs.Remove(rs.fs.PathJoin(rs.dirname, filename)))
191 0 : }
192 : // Remove any WAL files that are already obsolete. Pebble keeps some old WAL
193 : // files around for recycling.
194 1 : for _, w := range rs.walsObsolete {
195 1 : for i := range w.NumSegments() {
196 1 : fs, path := w.SegmentLocation(i)
197 1 : rmErr := fs.Remove(path)
198 1 : opts.EventListener.WALDeleted(WALDeleteInfo{
199 1 : JobID: 0,
200 1 : Path: path,
201 1 : FileNum: base.DiskFileNum(w.Num),
202 1 : Err: rmErr,
203 1 : })
204 1 : }
205 : }
206 1 : return err
207 : }
208 :
209 : // Close closes resources held by the RecoveredState, including open file
210 : // descriptors.
211 0 : func (rs *recoveredState) Close() error {
212 0 : var err error
213 0 : if rs.fmvMarker != nil {
214 0 : err = errors.CombineErrors(err, rs.fmvMarker.Close())
215 0 : }
216 0 : if rs.manifestMarker != nil {
217 0 : err = errors.CombineErrors(err, rs.manifestMarker.Close())
218 0 : }
219 0 : if rs.objProvider != nil {
220 0 : err = errors.CombineErrors(err, rs.objProvider.Close())
221 0 : }
222 0 : if rs.dirs != nil {
223 0 : err = errors.CombineErrors(err, rs.dirs.Close())
224 0 : }
225 0 : return err
226 : }
227 :
228 : // recoveredVersion describes the latest Version of the LSM recovered by
229 : // replaying a manifest file.
230 : type recoveredVersion struct {
231 : manifestFileNum base.DiskFileNum
232 : minUnflushedLogNum base.DiskFileNum
233 : nextFileNum base.DiskFileNum
234 : logSeqNum base.SeqNum
235 : latest *latestVersionState
236 : version *manifest.Version
237 : }
238 :
239 : // recoverVersion replays the named manifest file to recover the latest version
240 : // of the LSM from persisted state.
241 : func recoverVersion(
242 : opts *Options, dirname string, provider objstorage.Provider, manifestFileNum base.DiskFileNum,
243 1 : ) (*recoveredVersion, error) {
244 1 : rv := &recoveredVersion{
245 1 : manifestFileNum: manifestFileNum,
246 1 : nextFileNum: 1,
247 1 : logSeqNum: base.SeqNumStart,
248 1 : latest: &latestVersionState{
249 1 : l0Organizer: manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
250 1 : virtualBackings: manifest.MakeVirtualBackings(),
251 1 : },
252 1 : }
253 1 : manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, rv.manifestFileNum)
254 1 : manifestFilename := opts.FS.PathBase(manifestPath)
255 1 :
256 1 : // Read the versionEdits in the manifest file.
257 1 : var bve manifest.BulkVersionEdit
258 1 : bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
259 1 : manifestFile, err := opts.FS.Open(manifestPath)
260 1 : if err != nil {
261 0 : return nil, errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
262 0 : errors.Safe(manifestFilename), dirname)
263 0 : }
264 1 : defer manifestFile.Close()
265 1 : rr := record.NewReader(manifestFile, 0 /* logNum */)
266 1 : for {
267 1 : r, err := rr.Next()
268 1 : if err == io.EOF || record.IsInvalidRecord(err) {
269 1 : break
270 : }
271 1 : if err != nil {
272 0 : return nil, errors.Wrapf(err, "pebble: error when loading manifest file %q",
273 0 : errors.Safe(manifestFilename))
274 0 : }
275 1 : var ve manifest.VersionEdit
276 1 : err = ve.Decode(r)
277 1 : if err != nil {
278 0 : // Break instead of returning an error if the record is corrupted
279 0 : // or invalid.
280 0 : if err == io.EOF || record.IsInvalidRecord(err) {
281 0 : break
282 : }
283 0 : return nil, err
284 : }
285 1 : if ve.ComparerName != "" {
286 1 : if ve.ComparerName != opts.Comparer.Name {
287 0 : return nil, errors.Errorf("pebble: manifest file %q for DB %q: "+
288 0 : "comparer name from file %q != comparer name from Options %q",
289 0 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(opts.Comparer.Name))
290 0 : }
291 : }
292 1 : if err := bve.Accumulate(&ve); err != nil {
293 0 : return nil, err
294 0 : }
295 1 : if ve.MinUnflushedLogNum != 0 {
296 1 : rv.minUnflushedLogNum = ve.MinUnflushedLogNum
297 1 : }
298 1 : if ve.NextFileNum != 0 {
299 1 : rv.nextFileNum = base.DiskFileNum(ve.NextFileNum)
300 1 : }
301 1 : if ve.LastSeqNum != 0 {
302 1 : // logSeqNum is the _next_ sequence number that will be assigned,
303 1 : // while LastSeqNum is the last assigned sequence number. Note that
304 1 : // this behaviour mimics that in RocksDB; the first sequence number
305 1 : // assigned is one greater than the one present in the manifest
306 1 : // (assuming no WALs contain higher sequence numbers than the
307 1 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
308 1 : // next sequence number that will be assigned.
309 1 : //
310 1 : // If LastSeqNum is less than SeqNumStart, increase it to at least
311 1 : // SeqNumStart to leave ample room for reserved sequence numbers.
312 1 : rv.logSeqNum = max(ve.LastSeqNum+1, base.SeqNumStart)
313 1 : }
314 : }
315 :
316 : // We have already set vs.nextFileNum=1 at the beginning of the function and
317 : // could have only updated it to some other non-zero value, so it cannot be
318 : // 0 here.
319 1 : if rv.minUnflushedLogNum == 0 {
320 1 : if rv.nextFileNum >= 2 {
321 1 : // We either have a freshly created DB, or a DB created by RocksDB
322 1 : // that has not had a single flushed SSTable yet. This is because
323 1 : // RocksDB bumps up nextFileNum in this case without bumping up
324 1 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
325 1 : // present in the directory.
326 1 : } else {
327 0 : return nil, base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
328 0 : errors.Safe(manifestFilename), dirname)
329 0 : }
330 : }
331 1 : rv.nextFileNum = max(rv.nextFileNum, rv.minUnflushedLogNum+1)
332 1 :
333 1 : // Populate the virtual backings for virtual sstables since we have finished
334 1 : // version edit accumulation.
335 1 : for _, b := range bve.AddedFileBacking {
336 1 : placement := objstorage.Placement(provider, base.FileTypeTable, b.DiskFileNum)
337 1 : rv.latest.virtualBackings.AddAndRef(b, placement)
338 1 : }
339 1 : for l, addedLevel := range bve.AddedTables {
340 1 : for _, m := range addedLevel {
341 1 : if m.Virtual {
342 1 : rv.latest.virtualBackings.AddTable(m, l)
343 1 : }
344 : }
345 : }
346 :
347 1 : if invariants.Enabled {
348 1 : // There should be no deleted tables or backings, since we're starting
349 1 : // from an empty state.
350 1 : for _, deletedLevel := range bve.DeletedTables {
351 1 : if len(deletedLevel) != 0 {
352 0 : panic("deleted files after manifest replay")
353 : }
354 : }
355 1 : if len(bve.RemovedFileBacking) > 0 {
356 0 : panic("deleted backings after manifest replay")
357 : }
358 : }
359 :
360 1 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
361 1 : newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
362 1 : if err != nil {
363 0 : return nil, err
364 0 : }
365 1 : rv.latest.l0Organizer.PerformUpdate(rv.latest.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
366 1 : rv.latest.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
367 1 : rv.latest.blobFiles.Init(&bve, manifest.BlobRewriteHeuristic{
368 1 : CurrentTime: opts.private.timeNow,
369 1 : MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
370 1 : })
371 1 : rv.version = newVersion
372 1 : return rv, nil
373 : }
374 :
375 : // replayWAL replays the edits in the specified WAL. If the DB is in read
376 : // only mode, then the WALs are replayed into memtables and not flushed. If
377 : // the DB is not in read only mode, then the contents of the WAL are
378 : // guaranteed to be flushed when a flush is scheduled after this method is run.
379 : // Note that this flushing is very important for guaranteeing durability:
380 : // the application may have had a number of pending
381 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
382 : // happened but the corresponding data may now be readable from the WAL (while
383 : // sitting in write-back caches in the kernel or the storage device). By
384 : // reading the WAL (including the non-fsynced data) and then flushing all
385 : // these changes (flush does fsyncs), we are able to guarantee that the
386 : // initial state of the DB is durable.
387 : //
388 : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
389 : // WALs into the flushable queue. Flushing of the queue is expected to be handled
390 : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
391 : //
392 : // d.mu must be held when calling this, but the mutex may be dropped and
393 : // re-acquired during the course of this method.
394 : func (d *DB) replayWAL(
395 : jobID JobID, ll wal.LogicalLog, strictWALTail bool,
396 1 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
397 1 : rr := ll.OpenForRead()
398 1 : defer func() { _ = rr.Close() }()
399 1 : var (
400 1 : b Batch
401 1 : buf bytes.Buffer
402 1 : mem *memTable
403 1 : entry *flushableEntry
404 1 : offset wal.Offset
405 1 : lastFlushOffset int64
406 1 : keysReplayed int64 // number of keys replayed
407 1 : batchesReplayed int64 // number of batches replayed
408 1 : )
409 1 :
410 1 : // TODO(jackson): This function is interspersed with panics, in addition to
411 1 : // corruption error propagation. Audit them to ensure we're truly only
412 1 : // panicking where the error points to Pebble bug and not user or
413 1 : // hardware-induced corruption.
414 1 :
415 1 : // "Flushes" (ie. closes off) the current memtable, if not nil.
416 1 : flushMem := func() {
417 1 : if mem == nil {
418 1 : return
419 1 : }
420 1 : mem.writerUnref()
421 1 : if d.mu.mem.mutable == mem {
422 1 : d.mu.mem.mutable = nil
423 1 : }
424 1 : entry.flushForced = !d.opts.ReadOnly
425 1 : var logSize uint64
426 1 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
427 1 : if mergedOffset >= lastFlushOffset {
428 1 : logSize = uint64(mergedOffset - lastFlushOffset)
429 1 : }
430 : // Else, this was the initial memtable in the read-only case which must have
431 : // been empty, but we need to flush it since we don't want to add to it later.
432 1 : lastFlushOffset = mergedOffset
433 1 : entry.logSize = logSize
434 1 : mem, entry = nil, nil
435 : }
436 :
437 1 : mem = d.mu.mem.mutable
438 1 : if mem != nil {
439 1 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
440 1 : if !d.opts.ReadOnly {
441 1 : flushMem()
442 1 : }
443 : }
444 :
445 : // Creates a new memtable if there is no current memtable.
446 1 : ensureMem := func(seqNum base.SeqNum) {
447 1 : if mem != nil {
448 1 : return
449 1 : }
450 1 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
451 1 : d.mu.mem.mutable = mem
452 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
453 : }
454 :
455 1 : defer func() {
456 1 : if err != nil {
457 0 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
458 0 : }
459 : }()
460 :
461 1 : for {
462 1 : var r io.Reader
463 1 : var err error
464 1 : r, offset, err = rr.NextRecord()
465 1 : if err == nil {
466 1 : _, err = io.Copy(&buf, r)
467 1 : }
468 1 : if err != nil {
469 1 : // It is common to encounter a zeroed or invalid chunk due to WAL
470 1 : // preallocation and WAL recycling. However zeroed or invalid chunks
471 1 : // can also be a consequence of corruption / disk rot. When the log
472 1 : // reader encounters one of these cases, it attempts to disambiguate
473 1 : // by reading ahead looking for a future record. If a future chunk
474 1 : // indicates the chunk at the original offset should've been valid, it
475 1 : // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
476 1 : // errors are always indicative of corruption and data loss.
477 1 : //
478 1 : // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
479 1 : // that the WAL terminated uncleanly and ambiguously. If the WAL is
480 1 : // the most recent logical WAL, the caller passes in
481 1 : // (strictWALTail=false), indicating we should tolerate the unclean
482 1 : // ending. If the WAL is an older WAL, the caller passes in
483 1 : // (strictWALTail=true), indicating that the WAL should have been
484 1 : // closed cleanly, and we should interpret the
485 1 : // `record.ErrUnexpectedEOF` as corruption and stop recovery.
486 1 : if errors.Is(err, io.EOF) {
487 1 : break
488 0 : } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
489 0 : break
490 0 : } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
491 0 : errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
492 0 : // If a read-ahead returns record.ErrInvalidChunk or
493 0 : // record.ErrZeroedChunk, then there's definitively corruption.
494 0 : //
495 0 : // If strictWALTail=true, then record.ErrUnexpectedEOF should
496 0 : // also be considered corruption because the strictWALTail
497 0 : // indicates we expect a clean end to the WAL.
498 0 : //
499 0 : // Other I/O related errors should not be marked with corruption
500 0 : // and simply returned.
501 0 : err = errors.Mark(err, ErrCorruption)
502 0 : }
503 :
504 0 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
505 : }
506 :
507 1 : if buf.Len() < batchrepr.HeaderLen {
508 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
509 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
510 0 : }
511 :
512 1 : if d.opts.ErrorIfNotPristine {
513 0 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
514 0 : }
515 :
516 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
517 : // which is used below.
518 1 : b = Batch{}
519 1 : b.db = d
520 1 : if err := b.SetRepr(buf.Bytes()); err != nil {
521 0 : return nil, 0, err
522 0 : }
523 1 : seqNum := b.SeqNum()
524 1 : maxSeqNum = seqNum + base.SeqNum(b.Count())
525 1 : keysReplayed += int64(b.Count())
526 1 : batchesReplayed++
527 1 : {
528 1 : br := b.Reader()
529 1 : if kind, _, _, ok, err := br.Next(); err != nil {
530 0 : return nil, 0, err
531 1 : } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
532 1 : // We're in the flushable ingests (+ possibly excises) case.
533 1 : //
534 1 : // Ingests require an up-to-date view of the LSM to determine the target
535 1 : // level of ingested sstables, and to accurately compute excises. Instead of
536 1 : // doing an ingest in this function, we just enqueue a flushable ingest
537 1 : // in the flushables queue and run a regular flush.
538 1 : flushMem()
539 1 : // mem is nil here.
540 1 : entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
541 1 : if err != nil {
542 0 : return nil, 0, err
543 0 : }
544 1 : fi := entry.flushable.(*ingestedFlushable)
545 1 : flushableIngests = append(flushableIngests, fi)
546 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
547 1 : // A flushable ingest is always followed by a WAL rotation.
548 1 : break
549 : }
550 : }
551 :
552 1 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
553 1 : flushMem()
554 1 : // Make a copy of the data slice since it is currently owned by buf and will
555 1 : // be reused in the next iteration.
556 1 : b.data = slices.Clone(b.data)
557 1 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
558 1 : if err != nil {
559 0 : return nil, 0, err
560 0 : }
561 1 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
562 1 : // Disable memory accounting by adding a reader ref that will never be
563 1 : // removed.
564 1 : entry.readerRefs.Add(1)
565 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
566 1 : } else {
567 1 : ensureMem(seqNum)
568 1 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
569 0 : return nil, 0, err
570 0 : }
571 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
572 : // batch may not initially fit, but will eventually fit (since it is smaller than
573 : // largeBatchThreshold).
574 1 : for err == arenaskl.ErrArenaFull {
575 1 : flushMem()
576 1 : ensureMem(seqNum)
577 1 : err = mem.prepare(&b)
578 1 : if err != nil && err != arenaskl.ErrArenaFull {
579 0 : return nil, 0, err
580 0 : }
581 : }
582 1 : if err = mem.apply(&b, seqNum); err != nil {
583 0 : return nil, 0, err
584 0 : }
585 1 : mem.writerUnref()
586 : }
587 1 : buf.Reset()
588 : }
589 :
590 1 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
591 1 : jobID, ll.String(), offset, keysReplayed, batchesReplayed)
592 1 : if !d.opts.ReadOnly {
593 1 : flushMem()
594 1 : }
595 :
596 : // mem is nil here, if !ReadOnly.
597 1 : return flushableIngests, maxSeqNum, err
598 : }
599 :
600 : func (d *DB) replayIngestedFlushable(
601 : b *Batch, logNum base.DiskFileNum,
602 1 : ) (entry *flushableEntry, err error) {
603 1 : br := b.Reader()
604 1 : seqNum := b.SeqNum()
605 1 :
606 1 : fileNums := make([]base.DiskFileNum, 0, b.Count())
607 1 : var exciseSpan KeyRange
608 1 : addFileNum := func(encodedFileNum []byte) {
609 1 : fileNum, n := binary.Uvarint(encodedFileNum)
610 1 : if n <= 0 {
611 0 : panic("pebble: ingest sstable file num is invalid")
612 : }
613 1 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
614 : }
615 :
616 1 : for i := 0; i < int(b.Count()); i++ {
617 1 : kind, key, val, ok, err := br.Next()
618 1 : if err != nil {
619 0 : return nil, err
620 0 : }
621 1 : if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
622 0 : panic("pebble: invalid batch key kind")
623 : }
624 1 : if !ok {
625 0 : panic("pebble: invalid batch count")
626 : }
627 1 : if kind == base.InternalKeyKindExcise {
628 1 : if exciseSpan.Valid() {
629 0 : panic("pebble: multiple excise spans in a single batch")
630 : }
631 1 : exciseSpan.Start = slices.Clone(key)
632 1 : exciseSpan.End = slices.Clone(val)
633 1 : continue
634 : }
635 1 : addFileNum(key)
636 : }
637 :
638 1 : if _, _, _, ok, err := br.Next(); err != nil {
639 0 : return nil, err
640 1 : } else if ok {
641 0 : panic("pebble: invalid number of entries in batch")
642 : }
643 :
644 1 : meta := make([]*manifest.TableMetadata, len(fileNums))
645 1 : var lastRangeKey keyspan.Span
646 1 : for i, n := range fileNums {
647 1 : readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
648 1 : objstorage.OpenOptions{MustExist: true})
649 1 : if err != nil {
650 0 : return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
651 0 : }
652 : // NB: ingestLoad1 will close readable.
653 1 : meta[i], lastRangeKey, _, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
654 1 : readable, d.cacheHandle, &d.compressionCounters, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
655 1 : if err != nil {
656 0 : return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
657 0 : }
658 : }
659 1 : if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
660 0 : return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
661 0 : d.opts.Comparer.FormatKey(lastRangeKey.End))
662 0 : }
663 :
664 1 : numFiles := len(meta)
665 1 : if exciseSpan.Valid() {
666 1 : numFiles++
667 1 : }
668 1 : if uint32(numFiles) != b.Count() {
669 0 : panic("pebble: couldn't load all files in WAL entry")
670 : }
671 :
672 1 : return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
673 : }
|