Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/errors/oserror"
22 : "github.com/cockroachdb/pebble/batchrepr"
23 : "github.com/cockroachdb/pebble/internal/arenaskl"
24 : "github.com/cockroachdb/pebble/internal/base"
25 : "github.com/cockroachdb/pebble/internal/cache"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/keyspan"
28 : "github.com/cockroachdb/pebble/internal/manifest"
29 : "github.com/cockroachdb/pebble/internal/manual"
30 : "github.com/cockroachdb/pebble/objstorage"
31 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
32 : "github.com/cockroachdb/pebble/objstorage/remote"
33 : "github.com/cockroachdb/pebble/record"
34 : "github.com/cockroachdb/pebble/vfs"
35 : "github.com/cockroachdb/pebble/wal"
36 : "github.com/prometheus/client_golang/prometheus"
37 : )
38 :
39 : const (
40 : initialMemTableSize = 256 << 10 // 256 KB
41 :
42 : // The max batch size is limited by the uint32 offsets stored in
43 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
44 : //
45 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
46 : // end of an allocation fits in uint32.
47 : //
48 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
49 : // 2GB).
50 : maxBatchSize = min(math.MaxUint32, math.MaxInt)
51 :
52 : // The max memtable size is limited by the uint32 offsets stored in
53 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
54 : //
55 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
56 : // end of an allocation fits in uint32.
57 : //
58 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
59 : // 2GB).
60 : maxMemTableSize = min(math.MaxUint32, math.MaxInt)
61 : )
62 :
63 : // FileCacheSize can be used to determine the file
64 : // cache size for a single db, given the maximum open
65 : // files which can be used by a file cache which is
66 : // only used by a single db.
67 2 : func FileCacheSize(maxOpenFiles int) int {
68 2 : fileCacheSize := maxOpenFiles - numNonFileCacheFiles
69 2 : if fileCacheSize < minFileCacheSize {
70 2 : fileCacheSize = minFileCacheSize
71 2 : }
72 2 : return fileCacheSize
73 : }
74 :
75 : // Open opens a DB whose files live in the given directory.
76 : //
77 : // IsCorruptionError() can be use to determine if the error is caused by on-disk
78 : // corruption.
79 2 : func Open(dirname string, opts *Options) (db *DB, err error) {
80 2 : // Make a copy of the options so that we don't mutate the passed in options.
81 2 : opts = opts.Clone()
82 2 : opts.EnsureDefaults()
83 2 : if err := opts.Validate(); err != nil {
84 0 : return nil, err
85 0 : }
86 2 : if opts.LoggerAndTracer == nil {
87 2 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
88 2 : } else {
89 1 : opts.Logger = opts.LoggerAndTracer
90 1 : }
91 :
92 2 : if invariants.Sometimes(5) {
93 2 : assertComparer := base.MakeAssertComparer(*opts.Comparer)
94 2 : opts.Comparer = &assertComparer
95 2 : }
96 :
97 : // In all error cases, we return db = nil; this is used by various
98 : // deferred cleanups.
99 2 : maybeCleanUp := func(fn func() error) {
100 2 : if db == nil {
101 1 : err = errors.CombineErrors(err, fn())
102 1 : }
103 : }
104 :
105 : // Open the database and WAL directories first.
106 2 : walDirname, secondaryWalDirName, dataDir, err := prepareAndOpenDirs(dirname, opts)
107 2 : if err != nil {
108 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
109 1 : }
110 2 : defer maybeCleanUp(dataDir.Close)
111 2 :
112 2 : // Lock the database directory.
113 2 : fileLock, err := base.AcquireOrValidateDirectoryLock(opts.Lock, dirname, opts.FS)
114 2 : if err != nil {
115 1 : return nil, err
116 1 : }
117 2 : defer maybeCleanUp(fileLock.Close)
118 2 :
119 2 : // List the directory contents. This also happens to include WAL log files, if
120 2 : // they are in the same dir, but we will ignore those below. The provider is
121 2 : // also given this list, but it ignores non sstable files.
122 2 : ls, err := opts.FS.List(dirname)
123 2 : if err != nil {
124 1 : return nil, err
125 1 : }
126 :
127 : // Establish the format major version.
128 2 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
129 2 : if err != nil {
130 1 : return nil, err
131 1 : }
132 2 : defer maybeCleanUp(formatVersionMarker.Close)
133 2 :
134 2 : noFormatVersionMarker := formatVersion == FormatDefault
135 2 : if noFormatVersionMarker {
136 2 : // We will initialize the store at the minimum possible format, then upgrade
137 2 : // the format to the desired one. This helps test the format upgrade code.
138 2 : formatVersion = FormatMinSupported
139 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
140 2 : formatVersion = FormatMinForSharedObjects
141 2 : }
142 : // There is no format version marker file. There are three cases:
143 : // - we are trying to open an existing store that was created at
144 : // FormatMostCompatible (the only one without a version marker file)
145 : // - we are creating a new store;
146 : // - we are retrying a failed creation.
147 : //
148 : // To error in the first case, we set ErrorIfNotPristine.
149 2 : opts.ErrorIfNotPristine = true
150 2 : defer func() {
151 2 : if err != nil && errors.Is(err, ErrDBNotPristine) {
152 0 : // We must be trying to open an existing store at FormatMostCompatible.
153 0 : // Correct the error in this case -we
154 0 : err = errors.Newf(
155 0 : "pebble: database %q written in format major version 1 which is no longer supported",
156 0 : dirname)
157 0 : }
158 : }()
159 : }
160 :
161 : // Find the currently active manifest, if there is one.
162 2 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
163 2 : if err != nil {
164 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
165 1 : }
166 2 : defer maybeCleanUp(manifestMarker.Close)
167 2 :
168 2 : // Atomic markers may leave behind obsolete files if there's a crash
169 2 : // mid-update. Clean these up if we're not in read-only mode.
170 2 : if !opts.ReadOnly {
171 2 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
172 0 : return nil, err
173 0 : }
174 2 : if err := manifestMarker.RemoveObsolete(); err != nil {
175 0 : return nil, err
176 0 : }
177 : }
178 :
179 2 : if opts.Cache == nil {
180 2 : opts.Cache = cache.New(opts.CacheSize)
181 2 : defer opts.Cache.Unref()
182 2 : }
183 :
184 2 : d := &DB{
185 2 : cacheHandle: opts.Cache.NewHandle(),
186 2 : dirname: dirname,
187 2 : opts: opts,
188 2 : cmp: opts.Comparer.Compare,
189 2 : equal: opts.Comparer.Equal,
190 2 : merge: opts.Merger.Merge,
191 2 : split: opts.Comparer.Split,
192 2 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
193 2 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
194 2 : dataDirLock: fileLock,
195 2 : dataDir: dataDir,
196 2 : closed: new(atomic.Value),
197 2 : closedCh: make(chan struct{}),
198 2 : }
199 2 : d.mu.versions = &versionSet{}
200 2 : d.diskAvailBytes.Store(math.MaxUint64)
201 2 : d.problemSpans.Init(manifest.NumLevels, opts.Comparer.Compare)
202 2 : if opts.Experimental.CompactionScheduler != nil {
203 2 : d.compactionScheduler = opts.Experimental.CompactionScheduler()
204 2 : } else {
205 1 : d.compactionScheduler = newConcurrencyLimitScheduler(defaultTimeSource{})
206 1 : }
207 :
208 2 : defer func() {
209 2 : // If an error or panic occurs during open, attempt to release the manually
210 2 : // allocated memory resources. Note that rather than look for an error, we
211 2 : // look for the return of a nil DB pointer.
212 2 : if r := recover(); db == nil {
213 1 : // If there's an unused, recycled memtable, we need to release its memory.
214 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
215 1 : d.freeMemTable(obsoleteMemTable)
216 1 : }
217 :
218 1 : if d.fileCache != nil {
219 1 : _ = d.fileCache.Close()
220 1 : }
221 1 : d.cacheHandle.Close()
222 1 :
223 1 : for _, mem := range d.mu.mem.queue {
224 1 : switch t := mem.flushable.(type) {
225 1 : case *memTable:
226 1 : manual.Free(manual.MemTable, t.arenaBuf)
227 1 : t.arenaBuf = manual.Buf{}
228 : }
229 : }
230 1 : if d.cleanupManager != nil {
231 1 : d.cleanupManager.Close()
232 1 : }
233 1 : if d.objProvider != nil {
234 1 : _ = d.objProvider.Close()
235 1 : }
236 1 : if d.mu.versions.manifestFile != nil {
237 1 : _ = d.mu.versions.manifestFile.Close()
238 1 : }
239 1 : if r != nil {
240 1 : panic(r)
241 : }
242 : }
243 : }()
244 :
245 2 : d.commit = newCommitPipeline(commitEnv{
246 2 : logSeqNum: &d.mu.versions.logSeqNum,
247 2 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
248 2 : apply: d.commitApply,
249 2 : write: d.commitWrite,
250 2 : })
251 2 : d.mu.nextJobID = 1
252 2 : d.mu.mem.nextSize = opts.MemTableSize
253 2 : if d.mu.mem.nextSize > initialMemTableSize {
254 2 : d.mu.mem.nextSize = initialMemTableSize
255 2 : }
256 2 : d.mu.compact.cond.L = &d.mu.Mutex
257 2 : d.mu.compact.inProgress = make(map[compaction]struct{})
258 2 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
259 2 : d.mu.snapshots.init()
260 2 : // logSeqNum is the next sequence number that will be assigned.
261 2 : // Start assigning sequence numbers from base.SeqNumStart to leave
262 2 : // room for reserved sequence numbers (see comments around
263 2 : // SeqNumStart).
264 2 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
265 2 : d.mu.formatVers.vers.Store(uint64(formatVersion))
266 2 : d.mu.formatVers.marker = formatVersionMarker
267 2 :
268 2 : d.timeNow = time.Now
269 2 : d.openedAt = d.timeNow()
270 2 :
271 2 : d.mu.Lock()
272 2 : defer d.mu.Unlock()
273 2 :
274 2 : jobID := d.newJobIDLocked()
275 2 :
276 2 : providerSettings := opts.MakeObjStorageProviderSettings(dirname)
277 2 : providerSettings.FSDirInitialListing = ls
278 2 : d.objProvider, err = objstorageprovider.Open(providerSettings)
279 2 : if err != nil {
280 1 : return nil, err
281 1 : }
282 :
283 2 : blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
284 2 : CurrentTime: d.timeNow,
285 2 : MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
286 2 : }
287 2 :
288 2 : if !manifestExists {
289 2 : // DB does not exist.
290 2 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
291 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
292 1 : }
293 :
294 : // Create the DB.
295 2 : if err := d.mu.versions.create(
296 2 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
297 1 : return nil, err
298 1 : }
299 2 : } else {
300 2 : if opts.ErrorIfExists {
301 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
302 1 : }
303 : // Load the version set.
304 2 : if err := d.mu.versions.load(
305 2 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
306 1 : return nil, err
307 1 : }
308 2 : if opts.ErrorIfNotPristine {
309 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
310 1 : d.mu.versions.addLiveFileNums(liveFileNums)
311 1 : if len(liveFileNums) != 0 {
312 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
313 1 : }
314 : }
315 : }
316 :
317 : // In read-only mode, we replay directly into the mutable memtable but never
318 : // flush it. We need to delay creation of the memtable until we know the
319 : // sequence number of the first batch that will be inserted.
320 2 : if !d.opts.ReadOnly {
321 2 : var entry *flushableEntry
322 2 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
323 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
324 2 : }
325 :
326 2 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
327 2 : Buckets: FsyncLatencyBuckets,
328 2 : })
329 2 :
330 2 : walOpts := wal.Options{
331 2 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
332 2 : Secondary: wal.Dir{},
333 2 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
334 2 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
335 2 : NoSyncOnClose: opts.NoSyncOnClose,
336 2 : BytesPerSync: opts.WALBytesPerSync,
337 2 : PreallocateSize: d.walPreallocateSize,
338 2 : MinSyncInterval: opts.WALMinSyncInterval,
339 2 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
340 2 : QueueSemChan: d.commit.logSyncQSem,
341 2 : Logger: opts.Logger,
342 2 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
343 2 : WriteWALSyncOffsets: func() bool { return d.FormatMajorVersion() >= FormatWALSyncChunks },
344 : }
345 : // Ensure we release the WAL directory locks if we fail to open the
346 : // database. If we fail before initializing the WAL manager, this defer is
347 : // responsible for releasing the locks. If we fail after initializing the
348 : // WAL manager, closing the WAL manager will release the locks.
349 : //
350 : // TODO(jackson): Open's cleanup error handling logic is convoluted; can we
351 : // simplify it?
352 2 : defer maybeCleanUp(func() (err error) {
353 1 : if d.mu.log.manager == nil {
354 1 : if walOpts.Primary.Lock != nil {
355 0 : err = errors.CombineErrors(err, walOpts.Primary.Lock.Close())
356 0 : }
357 1 : if walOpts.Secondary.Lock != nil {
358 0 : err = errors.CombineErrors(err, walOpts.Secondary.Lock.Close())
359 0 : }
360 1 : return err
361 : }
362 1 : return nil
363 : })
364 :
365 : // Lock the dedicated WAL directory, if configured.
366 2 : if walDirname != dirname {
367 2 : walOpts.Primary.Lock, err = base.AcquireOrValidateDirectoryLock(opts.WALDirLock, walDirname, opts.FS)
368 2 : if err != nil {
369 1 : return nil, err
370 1 : }
371 : }
372 2 : if opts.WALFailover != nil {
373 2 : walOpts.Secondary = opts.WALFailover.Secondary
374 2 : // Lock the secondary WAL directory, if distinct from the data directory
375 2 : // and primary WAL directory.
376 2 : if secondaryWalDirName != dirname && secondaryWalDirName != walDirname {
377 2 : walOpts.Secondary.Lock, err = base.AcquireOrValidateDirectoryLock(
378 2 : opts.WALFailover.Secondary.Lock, secondaryWalDirName, opts.WALFailover.Secondary.FS)
379 2 : if err != nil {
380 1 : return nil, err
381 1 : }
382 : }
383 2 : walOpts.Secondary.Dirname = secondaryWalDirName
384 2 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
385 2 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
386 2 : Buckets: FsyncLatencyBuckets,
387 2 : })
388 : }
389 2 : walDirs := walOpts.Dirs()
390 2 : walRecoveryLocks := make([]*base.DirLock, len(opts.WALRecoveryDirs))
391 2 : defer func() {
392 2 : // We only need the recovery WALs during Open, so we can release
393 2 : // the locks after the WALs have been scanned.
394 2 : for _, l := range walRecoveryLocks {
395 1 : if l != nil {
396 1 : _ = l.Close()
397 1 : }
398 : }
399 : }()
400 2 : for i, dir := range opts.WALRecoveryDirs {
401 1 : dir.Dirname = resolveStorePath(dirname, dir.Dirname)
402 1 : if dir.Dirname != dirname {
403 1 : // Acquire a lock on the WAL recovery directory.
404 1 : walRecoveryLocks[i], err = base.AcquireOrValidateDirectoryLock(dir.Lock, dir.Dirname, dir.FS)
405 1 : if err != nil {
406 0 : return nil, errors.Wrapf(err, "error acquiring lock on WAL recovery directory %q", dir.Dirname)
407 0 : }
408 : }
409 1 : walDirs = append(walDirs, dir)
410 : }
411 2 : wals, err := wal.Scan(walDirs...)
412 2 : if err != nil {
413 1 : return nil, err
414 1 : }
415 :
416 : // Remove obsolete WAL files now (as opposed to relying on asynchronous cleanup)
417 : // to prevent crash loops due to no disk space (ENOSPC).
418 2 : var retainedWALs wal.Logs
419 2 : for _, w := range wals {
420 2 : if base.DiskFileNum(w.Num) < d.mu.versions.minUnflushedLogNum {
421 2 : // Log obsolete WALs that will be removed.
422 2 : for i := range w.NumSegments() {
423 2 : fs, path := w.SegmentLocation(i)
424 2 : if err := fs.Remove(path); err != nil {
425 1 : // It's not a big deal if we can't delete the file now.
426 1 : // We'll try to remove it later in the cleanup process.
427 1 : d.opts.EventListener.WALDeleted(WALDeleteInfo{
428 1 : JobID: 0,
429 1 : Path: path,
430 1 : FileNum: base.DiskFileNum(w.Num),
431 1 : Err: err,
432 1 : })
433 1 : retainedWALs = append(retainedWALs, w)
434 2 : } else {
435 2 : d.opts.EventListener.WALDeleted(WALDeleteInfo{
436 2 : JobID: 0,
437 2 : Path: path,
438 2 : FileNum: base.DiskFileNum(w.Num),
439 2 : Err: nil,
440 2 : })
441 2 : }
442 : }
443 2 : } else {
444 2 : retainedWALs = append(retainedWALs, w)
445 2 : }
446 : }
447 :
448 2 : walManager, err := wal.Init(walOpts, retainedWALs)
449 2 : if err != nil {
450 1 : return nil, err
451 1 : }
452 2 : defer maybeCleanUp(walManager.Close)
453 2 : d.mu.log.manager = walManager
454 2 :
455 2 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.getDeletionPacerInfo)
456 2 :
457 2 : if manifestExists && !opts.DisableConsistencyCheck {
458 2 : curVersion := d.mu.versions.currentVersion()
459 2 : if err := checkConsistency(curVersion, d.objProvider); err != nil {
460 1 : return nil, err
461 1 : }
462 : }
463 :
464 2 : fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
465 2 : if opts.FileCache == nil {
466 2 : opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
467 2 : defer opts.FileCache.Unref()
468 2 : }
469 2 : d.fileCache = opts.FileCache.newHandle(d.cacheHandle, d.objProvider, d.opts.LoggerAndTracer, d.opts.MakeReaderOptions(), d.reportCorruption)
470 2 : d.newIters = d.fileCache.newIters
471 2 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
472 2 :
473 2 : d.fileSizeAnnotator = d.makeFileSizeAnnotator()
474 2 :
475 2 : var previousOptionsFileNum base.DiskFileNum
476 2 : var previousOptionsFilename string
477 2 : for _, filename := range ls {
478 2 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
479 2 : if !ok {
480 2 : continue
481 : }
482 :
483 : // Don't reuse any obsolete file numbers to avoid modifying an
484 : // ingested sstable's original external file.
485 2 : d.mu.versions.markFileNumUsed(fn)
486 2 :
487 2 : switch ft {
488 0 : case base.FileTypeLog:
489 : // Ignore.
490 2 : case base.FileTypeOptions:
491 2 : if previousOptionsFileNum < fn {
492 2 : previousOptionsFileNum = fn
493 2 : previousOptionsFilename = filename
494 2 : }
495 1 : case base.FileTypeTemp, base.FileTypeOldTemp:
496 1 : if !d.opts.ReadOnly {
497 1 : // Some codepaths write to a temporary file and then
498 1 : // rename it to its final location when complete. A
499 1 : // temp file is leftover if a process exits before the
500 1 : // rename. Remove it.
501 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
502 1 : if err != nil {
503 0 : return nil, err
504 0 : }
505 : }
506 : }
507 : }
508 2 : if n := len(wals); n > 0 {
509 2 : // Don't reuse any obsolete file numbers to avoid modifying an
510 2 : // ingested sstable's original external file.
511 2 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
512 2 : }
513 :
514 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
515 : // objProvider. This avoids FileNum collisions with obsolete sstables.
516 2 : objects := d.objProvider.List()
517 2 : for _, obj := range objects {
518 2 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
519 2 : }
520 :
521 : // Validate the most-recent OPTIONS file, if there is one.
522 2 : if previousOptionsFilename != "" {
523 2 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
524 2 : previousOptions, err := readOptionsFile(opts, path)
525 2 : if err != nil {
526 0 : return nil, err
527 0 : }
528 2 : if err := opts.CheckCompatibility(dirname, previousOptions); err != nil {
529 1 : return nil, err
530 1 : }
531 : }
532 :
533 : // Replay any newer log files than the ones named in the manifest.
534 2 : var replayWALs wal.Logs
535 2 : for i, w := range wals {
536 2 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
537 2 : replayWALs = wals[i:]
538 2 : break
539 : }
540 : }
541 2 : var flushableIngests []*ingestedFlushable
542 2 : for i, lf := range replayWALs {
543 2 : // WALs other than the last one would have been closed cleanly.
544 2 : //
545 2 : // Note: we used to never require strict WAL tails when reading from older
546 2 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
547 2 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
548 2 : // compatible Pebble format is newer and guarantees a clean EOF.
549 2 : strictWALTail := i < len(replayWALs)-1
550 2 : fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
551 2 : if err != nil {
552 1 : return nil, err
553 1 : }
554 2 : if len(fi) > 0 {
555 2 : flushableIngests = append(flushableIngests, fi...)
556 2 : }
557 2 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
558 2 : d.mu.versions.logSeqNum.Store(maxSeqNum)
559 2 : }
560 : }
561 2 : if d.mu.mem.mutable == nil {
562 2 : // Recreate the mutable memtable if replayWAL got rid of it.
563 2 : var entry *flushableEntry
564 2 : d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
565 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
566 2 : }
567 2 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
568 2 :
569 2 : // Register with the CompactionScheduler before calling
570 2 : // d.maybeScheduleFlush, since completion of the flush can trigger
571 2 : // compactions.
572 2 : d.compactionScheduler.Register(2, d)
573 2 : if !d.opts.ReadOnly {
574 2 : d.maybeScheduleFlush()
575 2 : for d.mu.compact.flushing {
576 2 : d.mu.compact.cond.Wait()
577 2 : }
578 :
579 : // Create an empty .log file for the mutable memtable.
580 2 : newLogNum := d.mu.versions.getNextDiskFileNum()
581 2 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
582 2 : if err != nil {
583 1 : return nil, err
584 1 : }
585 :
586 : // This isn't strictly necessary as we don't use the log number for
587 : // memtables being flushed, only for the next unflushed memtable.
588 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
589 : }
590 2 : d.updateReadStateLocked(d.opts.DebugCheck)
591 2 :
592 2 : if !d.opts.ReadOnly {
593 2 : // If the Options specify a format major version higher than the
594 2 : // loaded database's, upgrade it. If this is a new database, this
595 2 : // code path also performs an initial upgrade from the starting
596 2 : // implicit MinSupported version.
597 2 : //
598 2 : // We ratchet the version this far into Open so that migrations have a read
599 2 : // state available. Note that this also results in creating/updating the
600 2 : // format version marker file.
601 2 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
602 2 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
603 0 : return nil, err
604 0 : }
605 2 : } else if noFormatVersionMarker {
606 2 : // We are creating a new store. Create the format version marker file.
607 2 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
608 1 : return nil, err
609 1 : }
610 : }
611 :
612 : // Write the current options to disk.
613 2 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
614 2 : tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
615 2 : optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
616 2 :
617 2 : // Write them to a temporary file first, in case we crash before
618 2 : // we're done. A corrupt options file prevents opening the
619 2 : // database.
620 2 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
621 2 : if err != nil {
622 1 : return nil, err
623 1 : }
624 2 : serializedOpts := []byte(opts.String())
625 2 : if _, err := optionsFile.Write(serializedOpts); err != nil {
626 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
627 1 : }
628 2 : d.optionsFileSize = uint64(len(serializedOpts))
629 2 : if err := optionsFile.Sync(); err != nil {
630 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
631 1 : }
632 2 : if err := optionsFile.Close(); err != nil {
633 0 : return nil, err
634 0 : }
635 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
636 : // guaranteed to be atomic because the destination path does not
637 : // exist.
638 2 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
639 1 : return nil, err
640 1 : }
641 2 : if err := d.dataDir.Sync(); err != nil {
642 1 : return nil, err
643 1 : }
644 : }
645 :
646 2 : if !d.opts.ReadOnly {
647 2 : // Get a fresh list of files, in case some of the earlier flushes/compactions
648 2 : // have deleted some files.
649 2 : ls, err := opts.FS.List(dirname)
650 2 : if err != nil {
651 1 : return nil, err
652 1 : }
653 2 : d.scanObsoleteFiles(ls, flushableIngests)
654 2 : d.deleteObsoleteFiles(jobID)
655 : }
656 : // Else, nothing is obsolete.
657 :
658 2 : d.mu.tableStats.cond.L = &d.mu.Mutex
659 2 : d.mu.tableValidation.cond.L = &d.mu.Mutex
660 2 : if !d.opts.ReadOnly {
661 2 : d.maybeCollectTableStatsLocked()
662 2 : }
663 2 : d.calculateDiskAvailableBytes()
664 2 :
665 2 : d.maybeScheduleFlush()
666 2 : d.maybeScheduleCompaction()
667 2 :
668 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
669 2 : //
670 2 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
671 2 : // finalizer to never be run. The problem is due to this limitation of
672 2 : // finalizers mention in the SetFinalizer docs:
673 2 : //
674 2 : // If a cyclic structure includes a block with a finalizer, that cycle is
675 2 : // not guaranteed to be garbage collected and the finalizer is not
676 2 : // guaranteed to run, because there is no ordering that respects the
677 2 : // dependencies.
678 2 : //
679 2 : // DB has cycles with several of its internal structures: readState,
680 2 : // newIters, fileCache, versions, etc. Each of this individually cause a
681 2 : // cycle and prevent the finalizer from being run. But we can workaround this
682 2 : // finializer limitation by setting a finalizer on another object that is
683 2 : // tied to the lifetime of DB: the DB.closed atomic.Value.
684 2 : dPtr := fmt.Sprintf("%p", d)
685 2 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
686 0 : v := obj.(*atomic.Value)
687 0 : if err := v.Load(); err == nil {
688 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
689 0 : os.Exit(1)
690 0 : }
691 : })
692 :
693 2 : return d, nil
694 : }
695 :
696 : // prepareAndOpenDirs opens the directories for the store (and creates them if
697 : // necessary).
698 : //
699 : // Returns an error if ReadOnly is set and the directories don't exist.
700 : func prepareAndOpenDirs(
701 : dirname string, opts *Options,
702 2 : ) (walDirname string, secondaryWalDirName string, dataDir vfs.File, err error) {
703 2 : walDirname = dirname
704 2 : if opts.WALDir != "" {
705 2 : walDirname = resolveStorePath(dirname, opts.WALDir)
706 2 : }
707 2 : if opts.WALFailover != nil {
708 2 : secondaryWalDirName = resolveStorePath(dirname, opts.WALFailover.Secondary.Dirname)
709 2 : }
710 :
711 : // Create directories if needed.
712 2 : if !opts.ReadOnly {
713 2 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
714 2 : if err != nil {
715 1 : return "", "", nil, err
716 1 : }
717 2 : f.Close()
718 2 : if walDirname != dirname {
719 2 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
720 2 : if err != nil {
721 0 : return "", "", nil, err
722 0 : }
723 2 : f.Close()
724 : }
725 2 : if opts.WALFailover != nil {
726 2 : f, err := mkdirAllAndSyncParents(opts.WALFailover.Secondary.FS, secondaryWalDirName)
727 2 : if err != nil {
728 0 : return "", "", nil, err
729 0 : }
730 2 : f.Close()
731 : }
732 : }
733 :
734 2 : dataDir, err = opts.FS.OpenDir(dirname)
735 2 : if err != nil {
736 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
737 1 : return "", "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
738 1 : }
739 1 : return "", "", nil, err
740 : }
741 2 : if opts.ReadOnly && walDirname != dirname {
742 1 : // Check that the wal dir exists.
743 1 : walDir, err := opts.FS.OpenDir(walDirname)
744 1 : if err != nil {
745 1 : dataDir.Close()
746 1 : return "", "", nil, err
747 1 : }
748 1 : walDir.Close()
749 : }
750 :
751 2 : return walDirname, secondaryWalDirName, dataDir, nil
752 : }
753 :
754 : // GetVersion returns the engine version string from the latest options
755 : // file present in dir. Used to check what Pebble or RocksDB version was last
756 : // used to write to the database stored in this directory. An empty string is
757 : // returned if no valid OPTIONS file with a version key was found.
758 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
759 1 : ls, err := fs.List(dir)
760 1 : if err != nil {
761 0 : return "", err
762 0 : }
763 1 : var version string
764 1 : lastOptionsSeen := base.DiskFileNum(0)
765 1 : for _, filename := range ls {
766 1 : ft, fn, ok := base.ParseFilename(fs, filename)
767 1 : if !ok {
768 1 : continue
769 : }
770 1 : switch ft {
771 1 : case base.FileTypeOptions:
772 1 : // If this file has a higher number than the last options file
773 1 : // processed, reset version. This is because rocksdb often
774 1 : // writes multiple options files without deleting previous ones.
775 1 : // Otherwise, skip parsing this options file.
776 1 : if fn > lastOptionsSeen {
777 1 : version = ""
778 1 : lastOptionsSeen = fn
779 1 : } else {
780 1 : continue
781 : }
782 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
783 1 : if err != nil {
784 0 : return "", err
785 0 : }
786 1 : data, err := io.ReadAll(f)
787 1 : f.Close()
788 1 :
789 1 : if err != nil {
790 0 : return "", err
791 0 : }
792 1 : err = parseOptions(string(data), parseOptionsFuncs{
793 1 : visitKeyValue: func(i, j int, section, key, value string) error {
794 1 : switch {
795 1 : case section == "Version":
796 1 : switch key {
797 1 : case "pebble_version":
798 1 : version = value
799 1 : case "rocksdb_version":
800 1 : version = fmt.Sprintf("rocksdb v%s", value)
801 : }
802 : }
803 1 : return nil
804 : },
805 : })
806 1 : if err != nil {
807 0 : return "", err
808 0 : }
809 : }
810 : }
811 1 : return version, nil
812 : }
813 :
814 : func (d *DB) replayIngestedFlushable(
815 : b *Batch, logNum base.DiskFileNum,
816 2 : ) (entry *flushableEntry, err error) {
817 2 : br := b.Reader()
818 2 : seqNum := b.SeqNum()
819 2 :
820 2 : fileNums := make([]base.DiskFileNum, 0, b.Count())
821 2 : var exciseSpan KeyRange
822 2 : addFileNum := func(encodedFileNum []byte) {
823 2 : fileNum, n := binary.Uvarint(encodedFileNum)
824 2 : if n <= 0 {
825 0 : panic("pebble: ingest sstable file num is invalid")
826 : }
827 2 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
828 : }
829 :
830 2 : for i := 0; i < int(b.Count()); i++ {
831 2 : kind, key, val, ok, err := br.Next()
832 2 : if err != nil {
833 0 : return nil, err
834 0 : }
835 2 : if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
836 0 : panic("pebble: invalid batch key kind")
837 : }
838 2 : if !ok {
839 0 : panic("pebble: invalid batch count")
840 : }
841 2 : if kind == base.InternalKeyKindExcise {
842 1 : if exciseSpan.Valid() {
843 0 : panic("pebble: multiple excise spans in a single batch")
844 : }
845 1 : exciseSpan.Start = slices.Clone(key)
846 1 : exciseSpan.End = slices.Clone(val)
847 1 : continue
848 : }
849 2 : addFileNum(key)
850 : }
851 :
852 2 : if _, _, _, ok, err := br.Next(); err != nil {
853 0 : return nil, err
854 2 : } else if ok {
855 0 : panic("pebble: invalid number of entries in batch")
856 : }
857 :
858 2 : meta := make([]*manifest.TableMetadata, len(fileNums))
859 2 : var lastRangeKey keyspan.Span
860 2 : for i, n := range fileNums {
861 2 : readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
862 2 : objstorage.OpenOptions{MustExist: true})
863 2 : if err != nil {
864 0 : return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
865 0 : }
866 : // NB: ingestLoad1 will close readable.
867 2 : meta[i], lastRangeKey, _, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
868 2 : readable, d.cacheHandle, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
869 2 : if err != nil {
870 0 : return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
871 0 : }
872 : }
873 2 : if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
874 0 : return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
875 0 : d.opts.Comparer.FormatKey(lastRangeKey.End))
876 0 : }
877 :
878 2 : numFiles := len(meta)
879 2 : if exciseSpan.Valid() {
880 1 : numFiles++
881 1 : }
882 2 : if uint32(numFiles) != b.Count() {
883 0 : panic("pebble: couldn't load all files in WAL entry")
884 : }
885 :
886 2 : return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
887 : }
888 :
889 : // replayWAL replays the edits in the specified WAL. If the DB is in read
890 : // only mode, then the WALs are replayed into memtables and not flushed. If
891 : // the DB is not in read only mode, then the contents of the WAL are
892 : // guaranteed to be flushed when a flush is scheduled after this method is run.
893 : // Note that this flushing is very important for guaranteeing durability:
894 : // the application may have had a number of pending
895 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
896 : // happened but the corresponding data may now be readable from the WAL (while
897 : // sitting in write-back caches in the kernel or the storage device). By
898 : // reading the WAL (including the non-fsynced data) and then flushing all
899 : // these changes (flush does fsyncs), we are able to guarantee that the
900 : // initial state of the DB is durable.
901 : //
902 : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
903 : // WALs into the flushable queue. Flushing of the queue is expected to be handled
904 : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
905 : //
906 : // d.mu must be held when calling this, but the mutex may be dropped and
907 : // re-acquired during the course of this method.
908 : func (d *DB) replayWAL(
909 : jobID JobID, ll wal.LogicalLog, strictWALTail bool,
910 2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
911 2 : rr := ll.OpenForRead()
912 2 : defer func() { _ = rr.Close() }()
913 2 : var (
914 2 : b Batch
915 2 : buf bytes.Buffer
916 2 : mem *memTable
917 2 : entry *flushableEntry
918 2 : offset wal.Offset
919 2 : lastFlushOffset int64
920 2 : keysReplayed int64 // number of keys replayed
921 2 : batchesReplayed int64 // number of batches replayed
922 2 : )
923 2 :
924 2 : // TODO(jackson): This function is interspersed with panics, in addition to
925 2 : // corruption error propagation. Audit them to ensure we're truly only
926 2 : // panicking where the error points to Pebble bug and not user or
927 2 : // hardware-induced corruption.
928 2 :
929 2 : // "Flushes" (ie. closes off) the current memtable, if not nil.
930 2 : flushMem := func() {
931 2 : if mem == nil {
932 2 : return
933 2 : }
934 2 : mem.writerUnref()
935 2 : if d.mu.mem.mutable == mem {
936 2 : d.mu.mem.mutable = nil
937 2 : }
938 2 : entry.flushForced = !d.opts.ReadOnly
939 2 : var logSize uint64
940 2 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
941 2 : if mergedOffset >= lastFlushOffset {
942 2 : logSize = uint64(mergedOffset - lastFlushOffset)
943 2 : }
944 : // Else, this was the initial memtable in the read-only case which must have
945 : // been empty, but we need to flush it since we don't want to add to it later.
946 2 : lastFlushOffset = mergedOffset
947 2 : entry.logSize = logSize
948 2 : mem, entry = nil, nil
949 : }
950 :
951 2 : mem = d.mu.mem.mutable
952 2 : if mem != nil {
953 2 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
954 2 : if !d.opts.ReadOnly {
955 2 : flushMem()
956 2 : }
957 : }
958 :
959 : // Creates a new memtable if there is no current memtable.
960 2 : ensureMem := func(seqNum base.SeqNum) {
961 2 : if mem != nil {
962 2 : return
963 2 : }
964 2 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
965 2 : d.mu.mem.mutable = mem
966 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
967 : }
968 :
969 2 : defer func() {
970 2 : if err != nil {
971 1 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
972 1 : }
973 : }()
974 :
975 2 : for {
976 2 : var r io.Reader
977 2 : var err error
978 2 : r, offset, err = rr.NextRecord()
979 2 : if err == nil {
980 2 : _, err = io.Copy(&buf, r)
981 2 : }
982 2 : if err != nil {
983 2 : // It is common to encounter a zeroed or invalid chunk due to WAL
984 2 : // preallocation and WAL recycling. However zeroed or invalid chunks
985 2 : // can also be a consequence of corruption / disk rot. When the log
986 2 : // reader encounters one of these cases, it attempts to disambiguate
987 2 : // by reading ahead looking for a future record. If a future chunk
988 2 : // indicates the chunk at the original offset should've been valid, it
989 2 : // surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
990 2 : // errors are always indicative of corruption and data loss.
991 2 : //
992 2 : // Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
993 2 : // that the WAL terminated uncleanly and ambiguously. If the WAL is
994 2 : // the most recent logical WAL, the caller passes in
995 2 : // (strictWALTail=false), indicating we should tolerate the unclean
996 2 : // ending. If the WAL is an older WAL, the caller passes in
997 2 : // (strictWALTail=true), indicating that the WAL should have been
998 2 : // closed cleanly, and we should interpret the
999 2 : // `record.ErrUnexpectedEOF` as corruption and stop recovery.
1000 2 : if errors.Is(err, io.EOF) {
1001 2 : break
1002 1 : } else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
1003 1 : break
1004 1 : } else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
1005 1 : errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
1006 1 : // If a read-ahead returns record.ErrInvalidChunk or
1007 1 : // record.ErrZeroedChunk, then there's definitively corruption.
1008 1 : //
1009 1 : // If strictWALTail=true, then record.ErrUnexpectedEOF should
1010 1 : // also be considered corruption because the strictWALTail
1011 1 : // indicates we expect a clean end to the WAL.
1012 1 : //
1013 1 : // Other I/O related errors should not be marked with corruption
1014 1 : // and simply returned.
1015 1 : err = errors.Mark(err, ErrCorruption)
1016 1 : }
1017 :
1018 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
1019 : }
1020 :
1021 2 : if buf.Len() < batchrepr.HeaderLen {
1022 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
1023 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
1024 0 : }
1025 :
1026 2 : if d.opts.ErrorIfNotPristine {
1027 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
1028 1 : }
1029 :
1030 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
1031 : // which is used below.
1032 2 : b = Batch{}
1033 2 : b.db = d
1034 2 : if err := b.SetRepr(buf.Bytes()); err != nil {
1035 0 : return nil, 0, err
1036 0 : }
1037 2 : seqNum := b.SeqNum()
1038 2 : maxSeqNum = seqNum + base.SeqNum(b.Count())
1039 2 : keysReplayed += int64(b.Count())
1040 2 : batchesReplayed++
1041 2 : {
1042 2 : br := b.Reader()
1043 2 : if kind, _, _, ok, err := br.Next(); err != nil {
1044 0 : return nil, 0, err
1045 2 : } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
1046 2 : // We're in the flushable ingests (+ possibly excises) case.
1047 2 : //
1048 2 : // Ingests require an up-to-date view of the LSM to determine the target
1049 2 : // level of ingested sstables, and to accurately compute excises. Instead of
1050 2 : // doing an ingest in this function, we just enqueue a flushable ingest
1051 2 : // in the flushables queue and run a regular flush.
1052 2 : flushMem()
1053 2 : // mem is nil here.
1054 2 : entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
1055 2 : if err != nil {
1056 0 : return nil, 0, err
1057 0 : }
1058 2 : fi := entry.flushable.(*ingestedFlushable)
1059 2 : flushableIngests = append(flushableIngests, fi)
1060 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1061 2 : // A flushable ingest is always followed by a WAL rotation.
1062 2 : break
1063 : }
1064 : }
1065 :
1066 2 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
1067 2 : flushMem()
1068 2 : // Make a copy of the data slice since it is currently owned by buf and will
1069 2 : // be reused in the next iteration.
1070 2 : b.data = slices.Clone(b.data)
1071 2 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
1072 2 : if err != nil {
1073 0 : return nil, 0, err
1074 0 : }
1075 2 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
1076 2 : // Disable memory accounting by adding a reader ref that will never be
1077 2 : // removed.
1078 2 : entry.readerRefs.Add(1)
1079 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1080 2 : } else {
1081 2 : ensureMem(seqNum)
1082 2 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1083 0 : return nil, 0, err
1084 0 : }
1085 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1086 : // batch may not initially fit, but will eventually fit (since it is smaller than
1087 : // largeBatchThreshold).
1088 2 : for err == arenaskl.ErrArenaFull {
1089 2 : flushMem()
1090 2 : ensureMem(seqNum)
1091 2 : err = mem.prepare(&b)
1092 2 : if err != nil && err != arenaskl.ErrArenaFull {
1093 0 : return nil, 0, err
1094 0 : }
1095 : }
1096 2 : if err = mem.apply(&b, seqNum); err != nil {
1097 0 : return nil, 0, err
1098 0 : }
1099 2 : mem.writerUnref()
1100 : }
1101 2 : buf.Reset()
1102 : }
1103 :
1104 2 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
1105 2 : jobID, ll.String(), offset, keysReplayed, batchesReplayed)
1106 2 : if !d.opts.ReadOnly {
1107 2 : flushMem()
1108 2 : }
1109 :
1110 : // mem is nil here, if !ReadOnly.
1111 2 : return flushableIngests, maxSeqNum, err
1112 : }
1113 :
1114 2 : func readOptionsFile(opts *Options, path string) (string, error) {
1115 2 : f, err := opts.FS.Open(path)
1116 2 : if err != nil {
1117 0 : return "", err
1118 0 : }
1119 2 : defer f.Close()
1120 2 :
1121 2 : data, err := io.ReadAll(f)
1122 2 : if err != nil {
1123 0 : return "", err
1124 0 : }
1125 2 : return string(data), nil
1126 : }
1127 :
1128 : // DBDesc briefly describes high-level state about a database.
1129 : type DBDesc struct {
1130 : // Exists is true if an existing database was found.
1131 : Exists bool
1132 : // FormatMajorVersion indicates the database's current format
1133 : // version.
1134 : FormatMajorVersion FormatMajorVersion
1135 : // ManifestFilename is the filename of the current active manifest,
1136 : // if the database exists.
1137 : ManifestFilename string
1138 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1139 : // exists.
1140 : OptionsFilename string
1141 : }
1142 :
1143 : // String implements fmt.Stringer.
1144 1 : func (d *DBDesc) String() string {
1145 1 : if !d.Exists {
1146 1 : return "uninitialized"
1147 1 : }
1148 1 : var buf bytes.Buffer
1149 1 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1150 1 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1151 1 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1152 1 : return buf.String()
1153 : }
1154 :
1155 : // Peek looks for an existing database in dirname on the provided FS. It
1156 : // returns a brief description of the database. Peek is read-only and
1157 : // does not open the database
1158 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1159 1 : ls, err := fs.List(dirname)
1160 1 : if err != nil {
1161 1 : return nil, err
1162 1 : }
1163 :
1164 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1165 1 : if err != nil {
1166 0 : return nil, err
1167 0 : }
1168 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1169 : // PeekMarker variant that avoids opening the directory.
1170 1 : if err := versMarker.Close(); err != nil {
1171 0 : return nil, err
1172 0 : }
1173 :
1174 : // Find the currently active manifest, if there is one.
1175 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1176 1 : if err != nil {
1177 0 : return nil, err
1178 0 : }
1179 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1180 : // PeekMarker variant that avoids opening the directory.
1181 1 : if err := manifestMarker.Close(); err != nil {
1182 0 : return nil, err
1183 0 : }
1184 :
1185 1 : desc := &DBDesc{
1186 1 : Exists: exists,
1187 1 : FormatMajorVersion: vers,
1188 1 : }
1189 1 :
1190 1 : // Find the OPTIONS file with the highest file number within the list of
1191 1 : // directory entries.
1192 1 : var previousOptionsFileNum base.DiskFileNum
1193 1 : for _, filename := range ls {
1194 1 : ft, fn, ok := base.ParseFilename(fs, filename)
1195 1 : if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
1196 1 : continue
1197 : }
1198 1 : previousOptionsFileNum = fn
1199 1 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1200 : }
1201 :
1202 1 : if exists {
1203 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
1204 1 : }
1205 1 : return desc, nil
1206 : }
1207 :
1208 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1209 : // does not exist.
1210 : //
1211 : // Note that errors can be wrapped with more details; use errors.Is().
1212 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1213 :
1214 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1215 : // already exists.
1216 : //
1217 : // Note that errors can be wrapped with more details; use errors.Is().
1218 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1219 :
1220 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1221 : // already exists and is not pristine.
1222 : //
1223 : // Note that errors can be wrapped with more details; use errors.Is().
1224 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1225 :
1226 2 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
1227 2 : var errs []error
1228 2 : dedup := make(map[base.DiskFileNum]struct{})
1229 2 : for level, files := range v.Levels {
1230 2 : for f := range files.All() {
1231 2 : backingState := f.TableBacking
1232 2 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1233 2 : continue
1234 : }
1235 2 : dedup[backingState.DiskFileNum] = struct{}{}
1236 2 : fileNum := backingState.DiskFileNum
1237 2 : fileSize := backingState.Size
1238 2 : // We skip over remote objects; those are instead checked asynchronously
1239 2 : // by the table stats loading job.
1240 2 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1241 2 : var size int64
1242 2 : if err == nil {
1243 2 : if meta.IsRemote() {
1244 2 : continue
1245 : }
1246 2 : size, err = objProvider.Size(meta)
1247 : }
1248 2 : if err != nil {
1249 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1250 1 : continue
1251 : }
1252 :
1253 2 : if size != int64(fileSize) {
1254 0 : errs = append(errs, errors.Errorf(
1255 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1256 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1257 0 : errors.Safe(size), errors.Safe(fileSize)))
1258 0 : continue
1259 : }
1260 : }
1261 : }
1262 2 : return errors.Join(errs...)
1263 : }
1264 :
1265 : type walEventListenerAdaptor struct {
1266 : l *EventListener
1267 : }
1268 :
1269 2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1270 2 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1271 2 : // is insufficient to infer whether primary or secondary.
1272 2 : wci := WALCreateInfo{
1273 2 : JobID: ci.JobID,
1274 2 : Path: ci.Path,
1275 2 : FileNum: base.DiskFileNum(ci.Num),
1276 2 : RecycledFileNum: ci.RecycledFileNum,
1277 2 : Err: ci.Err,
1278 2 : }
1279 2 : l.l.WALCreated(wci)
1280 2 : }
|