Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "slices"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/errors/oserror"
22 : "github.com/cockroachdb/pebble/batchrepr"
23 : "github.com/cockroachdb/pebble/internal/arenaskl"
24 : "github.com/cockroachdb/pebble/internal/base"
25 : "github.com/cockroachdb/pebble/internal/cache"
26 : "github.com/cockroachdb/pebble/internal/constants"
27 : "github.com/cockroachdb/pebble/internal/invariants"
28 : "github.com/cockroachdb/pebble/internal/keyspan"
29 : "github.com/cockroachdb/pebble/internal/manifest"
30 : "github.com/cockroachdb/pebble/internal/manual"
31 : "github.com/cockroachdb/pebble/objstorage"
32 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
33 : "github.com/cockroachdb/pebble/objstorage/remote"
34 : "github.com/cockroachdb/pebble/record"
35 : "github.com/cockroachdb/pebble/vfs"
36 : "github.com/cockroachdb/pebble/wal"
37 : "github.com/prometheus/client_golang/prometheus"
38 : )
39 :
40 : const (
41 : initialMemTableSize = 256 << 10 // 256 KB
42 :
43 : // The max batch size is limited by the uint32 offsets stored in
44 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
45 : //
46 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
47 : // end of an allocation fits in uint32.
48 : //
49 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
50 : // 2GB).
51 : maxBatchSize = constants.MaxUint32OrInt
52 :
53 : // The max memtable size is limited by the uint32 offsets stored in
54 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
55 : //
56 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
57 : // end of an allocation fits in uint32.
58 : //
59 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
60 : // 2GB).
61 : maxMemTableSize = constants.MaxUint32OrInt
62 : )
63 :
64 : // FileCacheSize can be used to determine the file
65 : // cache size for a single db, given the maximum open
66 : // files which can be used by a file cache which is
67 : // only used by a single db.
68 2 : func FileCacheSize(maxOpenFiles int) int {
69 2 : fileCacheSize := maxOpenFiles - numNonFileCacheFiles
70 2 : if fileCacheSize < minFileCacheSize {
71 1 : fileCacheSize = minFileCacheSize
72 1 : }
73 2 : return fileCacheSize
74 : }
75 :
76 : // Open opens a DB whose files live in the given directory.
77 2 : func Open(dirname string, opts *Options) (db *DB, err error) {
78 2 : // Make a copy of the options so that we don't mutate the passed in options.
79 2 : opts = opts.Clone()
80 2 : opts.EnsureDefaults()
81 2 : if err := opts.Validate(); err != nil {
82 0 : return nil, err
83 0 : }
84 2 : if opts.LoggerAndTracer == nil {
85 2 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
86 2 : } else {
87 1 : opts.Logger = opts.LoggerAndTracer
88 1 : }
89 :
90 2 : if invariants.Sometimes(5) {
91 2 : assertComparer := base.MakeAssertComparer(*opts.Comparer)
92 2 : opts.Comparer = &assertComparer
93 2 : }
94 :
95 : // In all error cases, we return db = nil; this is used by various
96 : // deferred cleanups.
97 :
98 : // Open the database and WAL directories first.
99 2 : walDirname, dataDir, err := prepareAndOpenDirs(dirname, opts)
100 2 : if err != nil {
101 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
102 1 : }
103 2 : defer func() {
104 2 : if db == nil {
105 1 : dataDir.Close()
106 1 : }
107 : }()
108 :
109 : // Lock the database directory.
110 2 : var fileLock *Lock
111 2 : if opts.Lock != nil {
112 1 : // The caller already acquired the database lock. Ensure that the
113 1 : // directory matches.
114 1 : if err := opts.Lock.pathMatches(dirname); err != nil {
115 0 : return nil, err
116 0 : }
117 1 : if err := opts.Lock.refForOpen(); err != nil {
118 1 : return nil, err
119 1 : }
120 1 : fileLock = opts.Lock
121 2 : } else {
122 2 : fileLock, err = LockDirectory(dirname, opts.FS)
123 2 : if err != nil {
124 1 : return nil, err
125 1 : }
126 : }
127 2 : defer func() {
128 2 : if db == nil {
129 1 : fileLock.Close()
130 1 : }
131 : }()
132 :
133 : // List the directory contents. This also happens to include WAL log files, if
134 : // they are in the same dir, but we will ignore those below. The provider is
135 : // also given this list, but it ignores non sstable files.
136 2 : ls, err := opts.FS.List(dirname)
137 2 : if err != nil {
138 1 : return nil, err
139 1 : }
140 :
141 : // Establish the format major version.
142 2 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
143 2 : if err != nil {
144 1 : return nil, err
145 1 : }
146 2 : defer func() {
147 2 : if db == nil {
148 1 : formatVersionMarker.Close()
149 1 : }
150 : }()
151 :
152 2 : noFormatVersionMarker := formatVersion == FormatDefault
153 2 : if noFormatVersionMarker {
154 2 : // We will initialize the store at the minimum possible format, then upgrade
155 2 : // the format to the desired one. This helps test the format upgrade code.
156 2 : formatVersion = FormatMinSupported
157 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone {
158 2 : formatVersion = FormatMinForSharedObjects
159 2 : }
160 : // There is no format version marker file. There are three cases:
161 : // - we are trying to open an existing store that was created at
162 : // FormatMostCompatible (the only one without a version marker file)
163 : // - we are creating a new store;
164 : // - we are retrying a failed creation.
165 : //
166 : // To error in the first case, we set ErrorIfNotPristine.
167 2 : opts.ErrorIfNotPristine = true
168 2 : defer func() {
169 2 : if err != nil && errors.Is(err, ErrDBNotPristine) {
170 0 : // We must be trying to open an existing store at FormatMostCompatible.
171 0 : // Correct the error in this case -we
172 0 : err = errors.Newf(
173 0 : "pebble: database %q written in format major version 1 which is no longer supported",
174 0 : dirname)
175 0 : }
176 : }()
177 2 : } else {
178 2 : if opts.Experimental.CreateOnShared != remote.CreateOnSharedNone && formatVersion < FormatMinForSharedObjects {
179 0 : return nil, errors.Newf(
180 0 : "pebble: database %q configured with shared objects but written in too old format major version %d",
181 0 : dirname, formatVersion)
182 0 : }
183 : }
184 :
185 : // Find the currently active manifest, if there is one.
186 2 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
187 2 : if err != nil {
188 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
189 1 : }
190 2 : defer func() {
191 2 : if db == nil {
192 1 : manifestMarker.Close()
193 1 : }
194 : }()
195 :
196 : // Atomic markers may leave behind obsolete files if there's a crash
197 : // mid-update. Clean these up if we're not in read-only mode.
198 2 : if !opts.ReadOnly {
199 2 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
200 0 : return nil, err
201 0 : }
202 2 : if err := manifestMarker.RemoveObsolete(); err != nil {
203 0 : return nil, err
204 0 : }
205 : }
206 :
207 2 : if opts.Cache == nil {
208 2 : opts.Cache = cache.New(opts.CacheSize)
209 2 : } else {
210 1 : opts.Cache.Ref()
211 1 : }
212 :
213 2 : d := &DB{
214 2 : cacheID: opts.Cache.NewID(),
215 2 : dirname: dirname,
216 2 : opts: opts,
217 2 : cmp: opts.Comparer.Compare,
218 2 : equal: opts.Comparer.Equal,
219 2 : merge: opts.Merger.Merge,
220 2 : split: opts.Comparer.Split,
221 2 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
222 2 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
223 2 : fileLock: fileLock,
224 2 : dataDir: dataDir,
225 2 : closed: new(atomic.Value),
226 2 : closedCh: make(chan struct{}),
227 2 : }
228 2 : d.mu.versions = &versionSet{}
229 2 : d.diskAvailBytes.Store(math.MaxUint64)
230 2 :
231 2 : defer func() {
232 2 : // If an error or panic occurs during open, attempt to release the manually
233 2 : // allocated memory resources. Note that rather than look for an error, we
234 2 : // look for the return of a nil DB pointer.
235 2 : if r := recover(); db == nil {
236 1 : // If there's an unused, recycled memtable, we need to release its memory.
237 1 : if obsoleteMemTable := d.memTableRecycle.Swap(nil); obsoleteMemTable != nil {
238 1 : d.freeMemTable(obsoleteMemTable)
239 1 : }
240 :
241 : // Release our references to the Cache. Note that both the DB, and
242 : // fileCache have a reference. When we release the reference to
243 : // the fileCache, and if there are no other references to
244 : // the fileCache, then the fileCache will also release its
245 : // reference to the cache.
246 1 : opts.Cache.Unref()
247 1 :
248 1 : if d.fileCache != nil {
249 1 : _ = d.fileCache.Close()
250 1 : }
251 :
252 1 : for _, mem := range d.mu.mem.queue {
253 1 : switch t := mem.flushable.(type) {
254 1 : case *memTable:
255 1 : manual.Free(manual.MemTable, t.arenaBuf)
256 1 : t.arenaBuf = manual.Buf{}
257 : }
258 : }
259 1 : if d.cleanupManager != nil {
260 1 : d.cleanupManager.Close()
261 1 : }
262 1 : if d.objProvider != nil {
263 1 : d.objProvider.Close()
264 1 : }
265 1 : if r != nil {
266 1 : panic(r)
267 : }
268 : }
269 : }()
270 :
271 2 : d.commit = newCommitPipeline(commitEnv{
272 2 : logSeqNum: &d.mu.versions.logSeqNum,
273 2 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
274 2 : apply: d.commitApply,
275 2 : write: d.commitWrite,
276 2 : })
277 2 : d.mu.nextJobID = 1
278 2 : d.mu.mem.nextSize = opts.MemTableSize
279 2 : if d.mu.mem.nextSize > initialMemTableSize {
280 2 : d.mu.mem.nextSize = initialMemTableSize
281 2 : }
282 2 : d.mu.compact.cond.L = &d.mu.Mutex
283 2 : d.mu.compact.inProgress = make(map[*compaction]struct{})
284 2 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
285 2 : d.mu.snapshots.init()
286 2 : // logSeqNum is the next sequence number that will be assigned.
287 2 : // Start assigning sequence numbers from base.SeqNumStart to leave
288 2 : // room for reserved sequence numbers (see comments around
289 2 : // SeqNumStart).
290 2 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
291 2 : d.mu.formatVers.vers.Store(uint64(formatVersion))
292 2 : d.mu.formatVers.marker = formatVersionMarker
293 2 :
294 2 : d.timeNow = time.Now
295 2 : d.openedAt = d.timeNow()
296 2 :
297 2 : d.mu.Lock()
298 2 : defer d.mu.Unlock()
299 2 :
300 2 : jobID := d.newJobIDLocked()
301 2 :
302 2 : providerSettings := objstorageprovider.Settings{
303 2 : Logger: opts.Logger,
304 2 : FS: opts.FS,
305 2 : FSDirName: dirname,
306 2 : FSDirInitialListing: ls,
307 2 : FSCleaner: opts.Cleaner,
308 2 : NoSyncOnClose: opts.NoSyncOnClose,
309 2 : BytesPerSync: opts.BytesPerSync,
310 2 : }
311 2 : providerSettings.Local.ReadaheadConfig = opts.Local.ReadaheadConfig
312 2 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
313 2 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
314 2 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
315 2 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
316 2 :
317 2 : d.objProvider, err = objstorageprovider.Open(providerSettings)
318 2 : if err != nil {
319 1 : return nil, err
320 1 : }
321 :
322 2 : if !manifestExists {
323 2 : // DB does not exist.
324 2 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
325 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
326 1 : }
327 :
328 : // Create the DB.
329 2 : if err := d.mu.versions.create(
330 2 : jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
331 1 : return nil, err
332 1 : }
333 2 : } else {
334 2 : if opts.ErrorIfExists {
335 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
336 1 : }
337 : // Load the version set.
338 2 : if err := d.mu.versions.load(
339 2 : dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
340 1 : return nil, err
341 1 : }
342 2 : if opts.ErrorIfNotPristine {
343 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
344 1 : d.mu.versions.addLiveFileNums(liveFileNums)
345 1 : if len(liveFileNums) != 0 {
346 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
347 1 : }
348 : }
349 : }
350 :
351 : // In read-only mode, we replay directly into the mutable memtable but never
352 : // flush it. We need to delay creation of the memtable until we know the
353 : // sequence number of the first batch that will be inserted.
354 2 : if !d.opts.ReadOnly {
355 2 : var entry *flushableEntry
356 2 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
357 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
358 2 : }
359 :
360 2 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
361 2 : Buckets: FsyncLatencyBuckets,
362 2 : })
363 2 : walOpts := wal.Options{
364 2 : Primary: wal.Dir{FS: opts.FS, Dirname: walDirname},
365 2 : Secondary: wal.Dir{},
366 2 : MinUnflushedWALNum: wal.NumWAL(d.mu.versions.minUnflushedLogNum),
367 2 : MaxNumRecyclableLogs: opts.MemTableStopWritesThreshold + 1,
368 2 : NoSyncOnClose: opts.NoSyncOnClose,
369 2 : BytesPerSync: opts.WALBytesPerSync,
370 2 : PreallocateSize: d.walPreallocateSize,
371 2 : MinSyncInterval: opts.WALMinSyncInterval,
372 2 : FsyncLatency: d.mu.log.metrics.fsyncLatency,
373 2 : QueueSemChan: d.commit.logSyncQSem,
374 2 : Logger: opts.Logger,
375 2 : EventListener: walEventListenerAdaptor{l: opts.EventListener},
376 2 : }
377 2 : if opts.WALFailover != nil {
378 2 : walOpts.Secondary = opts.WALFailover.Secondary
379 2 : walOpts.FailoverOptions = opts.WALFailover.FailoverOptions
380 2 : walOpts.FailoverWriteAndSyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
381 2 : Buckets: FsyncLatencyBuckets,
382 2 : })
383 2 : }
384 2 : walDirs := append(walOpts.Dirs(), opts.WALRecoveryDirs...)
385 2 : wals, err := wal.Scan(walDirs...)
386 2 : if err != nil {
387 1 : return nil, err
388 1 : }
389 2 : walManager, err := wal.Init(walOpts, wals)
390 2 : if err != nil {
391 1 : return nil, err
392 1 : }
393 2 : defer func() {
394 2 : if db == nil {
395 1 : walManager.Close()
396 1 : }
397 : }()
398 :
399 2 : d.mu.log.manager = walManager
400 2 :
401 2 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
402 2 :
403 2 : if manifestExists && !opts.DisableConsistencyCheck {
404 2 : curVersion := d.mu.versions.currentVersion()
405 2 : if err := checkConsistency(curVersion, d.objProvider); err != nil {
406 1 : return nil, err
407 1 : }
408 : }
409 :
410 2 : fileCacheSize := FileCacheSize(opts.MaxOpenFiles)
411 2 : if opts.FileCache == nil {
412 2 : opts.FileCache = NewFileCache(opts.Experimental.FileCacheShards, fileCacheSize)
413 2 : defer opts.FileCache.Unref()
414 2 : }
415 2 : d.fileCache = opts.FileCache.newHandle(d.cacheID, d.objProvider, d.opts)
416 2 : d.newIters = d.fileCache.newIters
417 2 : d.tableNewRangeKeyIter = tableNewRangeKeyIter(d.newIters)
418 2 :
419 2 : d.mu.annotators.totalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
420 1 : return true
421 1 : })
422 2 : d.mu.annotators.remoteSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
423 1 : meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
424 1 : if err != nil {
425 0 : return false
426 0 : }
427 1 : return meta.IsRemote()
428 : })
429 2 : d.mu.annotators.externalSize = d.makeFileSizeAnnotator(func(f *manifest.FileMetadata) bool {
430 1 : meta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
431 1 : if err != nil {
432 0 : return false
433 0 : }
434 1 : return meta.IsRemote() && meta.Remote.CleanupMethod == objstorage.SharedNoCleanup
435 : })
436 :
437 2 : var previousOptionsFileNum base.DiskFileNum
438 2 : var previousOptionsFilename string
439 2 : for _, filename := range ls {
440 2 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
441 2 : if !ok {
442 2 : continue
443 : }
444 :
445 : // Don't reuse any obsolete file numbers to avoid modifying an
446 : // ingested sstable's original external file.
447 2 : d.mu.versions.markFileNumUsed(fn)
448 2 :
449 2 : switch ft {
450 0 : case base.FileTypeLog:
451 : // Ignore.
452 2 : case base.FileTypeOptions:
453 2 : if previousOptionsFileNum < fn {
454 2 : previousOptionsFileNum = fn
455 2 : previousOptionsFilename = filename
456 2 : }
457 1 : case base.FileTypeTemp, base.FileTypeOldTemp:
458 1 : if !d.opts.ReadOnly {
459 1 : // Some codepaths write to a temporary file and then
460 1 : // rename it to its final location when complete. A
461 1 : // temp file is leftover if a process exits before the
462 1 : // rename. Remove it.
463 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
464 1 : if err != nil {
465 0 : return nil, err
466 0 : }
467 : }
468 : }
469 : }
470 2 : if n := len(wals); n > 0 {
471 2 : // Don't reuse any obsolete file numbers to avoid modifying an
472 2 : // ingested sstable's original external file.
473 2 : d.mu.versions.markFileNumUsed(base.DiskFileNum(wals[n-1].Num))
474 2 : }
475 :
476 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
477 : // objProvider. This avoids FileNum collisions with obsolete sstables.
478 2 : objects := d.objProvider.List()
479 2 : for _, obj := range objects {
480 2 : d.mu.versions.markFileNumUsed(obj.DiskFileNum)
481 2 : }
482 :
483 : // Validate the most-recent OPTIONS file, if there is one.
484 2 : if previousOptionsFilename != "" {
485 2 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
486 2 : previousOptions, err := readOptionsFile(opts, path)
487 2 : if err != nil {
488 0 : return nil, err
489 0 : }
490 2 : if err := opts.CheckCompatibility(previousOptions); err != nil {
491 1 : return nil, err
492 1 : }
493 : }
494 :
495 : // Replay any newer log files than the ones named in the manifest.
496 2 : var replayWALs wal.Logs
497 2 : for i, w := range wals {
498 2 : if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
499 2 : replayWALs = wals[i:]
500 2 : break
501 : }
502 : }
503 2 : var flushableIngests []*ingestedFlushable
504 2 : for i, lf := range replayWALs {
505 2 : // WALs other than the last one would have been closed cleanly.
506 2 : //
507 2 : // Note: we used to never require strict WAL tails when reading from older
508 2 : // versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
509 2 : // 20.1 do not guarantee that closed WALs end cleanly. But the earliest
510 2 : // compatible Pebble format is newer and guarantees a clean EOF.
511 2 : strictWALTail := i < len(replayWALs)-1
512 2 : fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
513 2 : if err != nil {
514 1 : return nil, err
515 1 : }
516 2 : if len(fi) > 0 {
517 2 : flushableIngests = append(flushableIngests, fi...)
518 2 : }
519 2 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
520 2 : d.mu.versions.logSeqNum.Store(maxSeqNum)
521 2 : }
522 : }
523 2 : if d.mu.mem.mutable == nil {
524 2 : // Recreate the mutable memtable if replayWAL got rid of it.
525 2 : var entry *flushableEntry
526 2 : d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */)
527 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
528 2 : }
529 2 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
530 2 :
531 2 : if !d.opts.ReadOnly {
532 2 : d.maybeScheduleFlush()
533 2 : for d.mu.compact.flushing {
534 2 : d.mu.compact.cond.Wait()
535 2 : }
536 :
537 : // Create an empty .log file for the mutable memtable.
538 2 : newLogNum := d.mu.versions.getNextDiskFileNum()
539 2 : d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
540 2 : if err != nil {
541 1 : return nil, err
542 1 : }
543 :
544 : // This isn't strictly necessary as we don't use the log number for
545 : // memtables being flushed, only for the next unflushed memtable.
546 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
547 : }
548 2 : d.updateReadStateLocked(d.opts.DebugCheck)
549 2 :
550 2 : if !d.opts.ReadOnly {
551 2 : // If the Options specify a format major version higher than the
552 2 : // loaded database's, upgrade it. If this is a new database, this
553 2 : // code path also performs an initial upgrade from the starting
554 2 : // implicit MinSupported version.
555 2 : //
556 2 : // We ratchet the version this far into Open so that migrations have a read
557 2 : // state available. Note that this also results in creating/updating the
558 2 : // format version marker file.
559 2 : if opts.FormatMajorVersion > d.FormatMajorVersion() {
560 2 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
561 0 : return nil, err
562 0 : }
563 2 : } else if noFormatVersionMarker {
564 2 : // We are creating a new store. Create the format version marker file.
565 2 : if err := d.writeFormatVersionMarker(d.FormatMajorVersion()); err != nil {
566 1 : return nil, err
567 1 : }
568 : }
569 :
570 : // Write the current options to disk.
571 2 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
572 2 : tmpPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeTemp, d.optionsFileNum)
573 2 : optionsPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeOptions, d.optionsFileNum)
574 2 :
575 2 : // Write them to a temporary file first, in case we crash before
576 2 : // we're done. A corrupt options file prevents opening the
577 2 : // database.
578 2 : optionsFile, err := opts.FS.Create(tmpPath, vfs.WriteCategoryUnspecified)
579 2 : if err != nil {
580 1 : return nil, err
581 1 : }
582 2 : serializedOpts := []byte(opts.String())
583 2 : if _, err := optionsFile.Write(serializedOpts); err != nil {
584 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
585 1 : }
586 2 : d.optionsFileSize = uint64(len(serializedOpts))
587 2 : if err := optionsFile.Sync(); err != nil {
588 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
589 1 : }
590 2 : if err := optionsFile.Close(); err != nil {
591 0 : return nil, err
592 0 : }
593 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
594 : // guaranteed to be atomic because the destination path does not
595 : // exist.
596 2 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
597 1 : return nil, err
598 1 : }
599 2 : if err := d.dataDir.Sync(); err != nil {
600 1 : return nil, err
601 1 : }
602 : }
603 :
604 2 : if !d.opts.ReadOnly {
605 2 : // Get a fresh list of files, in case some of the earlier flushes/compactions
606 2 : // have deleted some files.
607 2 : ls, err := opts.FS.List(dirname)
608 2 : if err != nil {
609 1 : return nil, err
610 1 : }
611 2 : d.scanObsoleteFiles(ls, flushableIngests)
612 2 : d.deleteObsoleteFiles(jobID)
613 : }
614 : // Else, nothing is obsolete.
615 :
616 2 : d.mu.tableStats.cond.L = &d.mu.Mutex
617 2 : d.mu.tableValidation.cond.L = &d.mu.Mutex
618 2 : if !d.opts.ReadOnly {
619 2 : d.maybeCollectTableStatsLocked()
620 2 : }
621 2 : d.calculateDiskAvailableBytes()
622 2 :
623 2 : d.maybeScheduleFlush()
624 2 : d.maybeScheduleCompaction()
625 2 :
626 2 : // Note: this is a no-op if invariants are disabled or race is enabled.
627 2 : //
628 2 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
629 2 : // finalizer to never be run. The problem is due to this limitation of
630 2 : // finalizers mention in the SetFinalizer docs:
631 2 : //
632 2 : // If a cyclic structure includes a block with a finalizer, that cycle is
633 2 : // not guaranteed to be garbage collected and the finalizer is not
634 2 : // guaranteed to run, because there is no ordering that respects the
635 2 : // dependencies.
636 2 : //
637 2 : // DB has cycles with several of its internal structures: readState,
638 2 : // newIters, fileCache, versions, etc. Each of this individually cause a
639 2 : // cycle and prevent the finalizer from being run. But we can workaround this
640 2 : // finializer limitation by setting a finalizer on another object that is
641 2 : // tied to the lifetime of DB: the DB.closed atomic.Value.
642 2 : dPtr := fmt.Sprintf("%p", d)
643 2 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
644 0 : v := obj.(*atomic.Value)
645 0 : if err := v.Load(); err == nil {
646 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
647 0 : os.Exit(1)
648 0 : }
649 : })
650 :
651 2 : return d, nil
652 : }
653 :
654 : // prepareAndOpenDirs opens the directories for the store (and creates them if
655 : // necessary).
656 : //
657 : // Returns an error if ReadOnly is set and the directories don't exist.
658 : func prepareAndOpenDirs(
659 : dirname string, opts *Options,
660 2 : ) (walDirname string, dataDir vfs.File, err error) {
661 2 : walDirname = opts.WALDir
662 2 : if opts.WALDir == "" {
663 2 : walDirname = dirname
664 2 : }
665 :
666 : // Create directories if needed.
667 2 : if !opts.ReadOnly {
668 2 : f, err := mkdirAllAndSyncParents(opts.FS, dirname)
669 2 : if err != nil {
670 1 : return "", nil, err
671 1 : }
672 2 : f.Close()
673 2 : if walDirname != dirname {
674 2 : f, err := mkdirAllAndSyncParents(opts.FS, walDirname)
675 2 : if err != nil {
676 0 : return "", nil, err
677 0 : }
678 2 : f.Close()
679 : }
680 2 : if opts.WALFailover != nil {
681 2 : secondary := opts.WALFailover.Secondary
682 2 : f, err := mkdirAllAndSyncParents(secondary.FS, secondary.Dirname)
683 2 : if err != nil {
684 0 : return "", nil, err
685 0 : }
686 2 : f.Close()
687 : }
688 : }
689 :
690 2 : dataDir, err = opts.FS.OpenDir(dirname)
691 2 : if err != nil {
692 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
693 1 : return "", nil, errors.Errorf("pebble: database %q does not exist", dirname)
694 1 : }
695 1 : return "", nil, err
696 : }
697 2 : if opts.ReadOnly && walDirname != dirname {
698 1 : // Check that the wal dir exists.
699 1 : walDir, err := opts.FS.OpenDir(walDirname)
700 1 : if err != nil {
701 1 : dataDir.Close()
702 1 : return "", nil, err
703 1 : }
704 1 : walDir.Close()
705 : }
706 :
707 2 : return walDirname, dataDir, nil
708 : }
709 :
710 : // GetVersion returns the engine version string from the latest options
711 : // file present in dir. Used to check what Pebble or RocksDB version was last
712 : // used to write to the database stored in this directory. An empty string is
713 : // returned if no valid OPTIONS file with a version key was found.
714 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
715 1 : ls, err := fs.List(dir)
716 1 : if err != nil {
717 0 : return "", err
718 0 : }
719 1 : var version string
720 1 : lastOptionsSeen := base.DiskFileNum(0)
721 1 : for _, filename := range ls {
722 1 : ft, fn, ok := base.ParseFilename(fs, filename)
723 1 : if !ok {
724 1 : continue
725 : }
726 1 : switch ft {
727 1 : case base.FileTypeOptions:
728 1 : // If this file has a higher number than the last options file
729 1 : // processed, reset version. This is because rocksdb often
730 1 : // writes multiple options files without deleting previous ones.
731 1 : // Otherwise, skip parsing this options file.
732 1 : if fn > lastOptionsSeen {
733 1 : version = ""
734 1 : lastOptionsSeen = fn
735 1 : } else {
736 0 : continue
737 : }
738 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
739 1 : if err != nil {
740 0 : return "", err
741 0 : }
742 1 : data, err := io.ReadAll(f)
743 1 : f.Close()
744 1 :
745 1 : if err != nil {
746 0 : return "", err
747 0 : }
748 1 : err = parseOptions(string(data), parseOptionsFuncs{
749 1 : visitKeyValue: func(i, j int, section, key, value string) error {
750 1 : switch {
751 1 : case section == "Version":
752 1 : switch key {
753 1 : case "pebble_version":
754 1 : version = value
755 1 : case "rocksdb_version":
756 1 : version = fmt.Sprintf("rocksdb v%s", value)
757 : }
758 : }
759 1 : return nil
760 : },
761 : })
762 1 : if err != nil {
763 0 : return "", err
764 0 : }
765 : }
766 : }
767 1 : return version, nil
768 : }
769 :
770 : func (d *DB) replayIngestedFlushable(
771 : b *Batch, logNum base.DiskFileNum,
772 2 : ) (entry *flushableEntry, err error) {
773 2 : br := b.Reader()
774 2 : seqNum := b.SeqNum()
775 2 :
776 2 : fileNums := make([]base.DiskFileNum, 0, b.Count())
777 2 : var exciseSpan KeyRange
778 2 : addFileNum := func(encodedFileNum []byte) {
779 2 : fileNum, n := binary.Uvarint(encodedFileNum)
780 2 : if n <= 0 {
781 0 : panic("pebble: ingest sstable file num is invalid")
782 : }
783 2 : fileNums = append(fileNums, base.DiskFileNum(fileNum))
784 : }
785 :
786 2 : for i := 0; i < int(b.Count()); i++ {
787 2 : kind, key, val, ok, err := br.Next()
788 2 : if err != nil {
789 0 : return nil, err
790 0 : }
791 2 : if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise {
792 0 : panic("pebble: invalid batch key kind")
793 : }
794 2 : if !ok {
795 0 : panic("pebble: invalid batch count")
796 : }
797 2 : if kind == base.InternalKeyKindExcise {
798 2 : if exciseSpan.Valid() {
799 0 : panic("pebble: multiple excise spans in a single batch")
800 : }
801 2 : exciseSpan.Start = slices.Clone(key)
802 2 : exciseSpan.End = slices.Clone(val)
803 2 : continue
804 : }
805 2 : addFileNum(key)
806 : }
807 :
808 2 : if _, _, _, ok, err := br.Next(); err != nil {
809 0 : return nil, err
810 2 : } else if ok {
811 0 : panic("pebble: invalid number of entries in batch")
812 : }
813 :
814 2 : meta := make([]*fileMetadata, len(fileNums))
815 2 : var lastRangeKey keyspan.Span
816 2 : for i, n := range fileNums {
817 2 : readable, err := d.objProvider.OpenForReading(context.TODO(), base.FileTypeTable, n,
818 2 : objstorage.OpenOptions{MustExist: true})
819 2 : if err != nil {
820 0 : return nil, errors.Wrap(err, "pebble: error when opening flushable ingest files")
821 0 : }
822 : // NB: ingestLoad1 will close readable.
823 2 : meta[i], lastRangeKey, err = ingestLoad1(context.TODO(), d.opts, d.FormatMajorVersion(),
824 2 : readable, d.cacheID, base.PhysicalTableFileNum(n), disableRangeKeyChecks())
825 2 : if err != nil {
826 0 : return nil, errors.Wrap(err, "pebble: error when loading flushable ingest files")
827 0 : }
828 : }
829 2 : if lastRangeKey.Valid() && d.opts.Comparer.Split.HasSuffix(lastRangeKey.End) {
830 0 : return nil, errors.AssertionFailedf("pebble: last ingest sstable has suffixed range key end %s",
831 0 : d.opts.Comparer.FormatKey(lastRangeKey.End))
832 0 : }
833 :
834 2 : numFiles := len(meta)
835 2 : if exciseSpan.Valid() {
836 2 : numFiles++
837 2 : }
838 2 : if uint32(numFiles) != b.Count() {
839 0 : panic("pebble: couldn't load all files in WAL entry")
840 : }
841 :
842 2 : return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
843 : }
844 :
845 : // replayWAL replays the edits in the specified WAL. If the DB is in read
846 : // only mode, then the WALs are replayed into memtables and not flushed. If
847 : // the DB is not in read only mode, then the contents of the WAL are
848 : // guaranteed to be flushed when a flush is scheduled after this method is run.
849 : // Note that this flushing is very important for guaranteeing durability:
850 : // the application may have had a number of pending
851 : // fsyncs to the WAL before the process crashed, and those fsyncs may not have
852 : // happened but the corresponding data may now be readable from the WAL (while
853 : // sitting in write-back caches in the kernel or the storage device). By
854 : // reading the WAL (including the non-fsynced data) and then flushing all
855 : // these changes (flush does fsyncs), we are able to guarantee that the
856 : // initial state of the DB is durable.
857 : //
858 : // This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
859 : // WALs into the flushable queue. Flushing of the queue is expected to be handled
860 : // by callers. A list of flushable ingests (but not memtables) replayed is returned.
861 : //
862 : // d.mu must be held when calling this, but the mutex may be dropped and
863 : // re-acquired during the course of this method.
864 : func (d *DB) replayWAL(
865 : jobID JobID, ll wal.LogicalLog, strictWALTail bool,
866 2 : ) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
867 2 : rr := ll.OpenForRead()
868 2 : defer rr.Close()
869 2 : var (
870 2 : b Batch
871 2 : buf bytes.Buffer
872 2 : mem *memTable
873 2 : entry *flushableEntry
874 2 : offset wal.Offset
875 2 : lastFlushOffset int64
876 2 : keysReplayed int64 // number of keys replayed
877 2 : batchesReplayed int64 // number of batches replayed
878 2 : )
879 2 :
880 2 : // TODO(jackson): This function is interspersed with panics, in addition to
881 2 : // corruption error propagation. Audit them to ensure we're truly only
882 2 : // panicking where the error points to Pebble bug and not user or
883 2 : // hardware-induced corruption.
884 2 :
885 2 : // "Flushes" (ie. closes off) the current memtable, if not nil.
886 2 : flushMem := func() {
887 2 : if mem == nil {
888 2 : return
889 2 : }
890 2 : mem.writerUnref()
891 2 : if d.mu.mem.mutable == mem {
892 2 : d.mu.mem.mutable = nil
893 2 : }
894 2 : entry.flushForced = !d.opts.ReadOnly
895 2 : var logSize uint64
896 2 : mergedOffset := offset.Physical + offset.PreviousFilesBytes
897 2 : if mergedOffset >= lastFlushOffset {
898 2 : logSize = uint64(mergedOffset - lastFlushOffset)
899 2 : }
900 : // Else, this was the initial memtable in the read-only case which must have
901 : // been empty, but we need to flush it since we don't want to add to it later.
902 2 : lastFlushOffset = mergedOffset
903 2 : entry.logSize = logSize
904 2 : mem, entry = nil, nil
905 : }
906 :
907 2 : mem = d.mu.mem.mutable
908 2 : if mem != nil {
909 2 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
910 2 : if !d.opts.ReadOnly {
911 2 : flushMem()
912 2 : }
913 : }
914 :
915 : // Creates a new memtable if there is no current memtable.
916 2 : ensureMem := func(seqNum base.SeqNum) {
917 2 : if mem != nil {
918 2 : return
919 2 : }
920 2 : mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
921 2 : d.mu.mem.mutable = mem
922 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
923 : }
924 :
925 2 : defer func() {
926 2 : if err != nil {
927 1 : err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
928 1 : }
929 : }()
930 :
931 2 : for {
932 2 : var r io.Reader
933 2 : var err error
934 2 : r, offset, err = rr.NextRecord()
935 2 : if err == nil {
936 2 : _, err = io.Copy(&buf, r)
937 2 : }
938 2 : if err != nil {
939 2 : // It is common to encounter a zeroed or invalid chunk due to WAL
940 2 : // preallocation and WAL recycling. We need to distinguish these
941 2 : // errors from EOF in order to recognize that the record was
942 2 : // truncated and to avoid replaying subsequent WALs, but want
943 2 : // to otherwise treat them like EOF.
944 2 : if err == io.EOF {
945 2 : break
946 1 : } else if record.IsInvalidRecord(err) {
947 1 : if !strictWALTail {
948 1 : break
949 : }
950 1 : err = errors.Mark(err, ErrCorruption)
951 : }
952 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
953 : }
954 :
955 2 : if buf.Len() < batchrepr.HeaderLen {
956 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
957 0 : errors.Safe(base.DiskFileNum(ll.Num)), offset)
958 0 : }
959 :
960 2 : if d.opts.ErrorIfNotPristine {
961 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
962 1 : }
963 :
964 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
965 : // which is used below.
966 2 : b = Batch{}
967 2 : b.db = d
968 2 : b.SetRepr(buf.Bytes())
969 2 : seqNum := b.SeqNum()
970 2 : maxSeqNum = seqNum + base.SeqNum(b.Count())
971 2 : keysReplayed += int64(b.Count())
972 2 : batchesReplayed++
973 2 : {
974 2 : br := b.Reader()
975 2 : if kind, _, _, ok, err := br.Next(); err != nil {
976 0 : return nil, 0, err
977 2 : } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
978 2 : // We're in the flushable ingests (+ possibly excises) case.
979 2 : //
980 2 : // Ingests require an up-to-date view of the LSM to determine the target
981 2 : // level of ingested sstables, and to accurately compute excises. Instead of
982 2 : // doing an ingest in this function, we just enqueue a flushable ingest
983 2 : // in the flushables queue and run a regular flush.
984 2 : flushMem()
985 2 : // mem is nil here.
986 2 : entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
987 2 : if err != nil {
988 0 : return nil, 0, err
989 0 : }
990 2 : fi := entry.flushable.(*ingestedFlushable)
991 2 : flushableIngests = append(flushableIngests, fi)
992 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
993 2 : // A flushable ingest is always followed by a WAL rotation.
994 2 : break
995 : }
996 : }
997 :
998 2 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
999 2 : flushMem()
1000 2 : // Make a copy of the data slice since it is currently owned by buf and will
1001 2 : // be reused in the next iteration.
1002 2 : b.data = slices.Clone(b.data)
1003 2 : b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
1004 2 : if err != nil {
1005 0 : return nil, 0, err
1006 0 : }
1007 2 : entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
1008 2 : // Disable memory accounting by adding a reader ref that will never be
1009 2 : // removed.
1010 2 : entry.readerRefs.Add(1)
1011 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1012 2 : } else {
1013 2 : ensureMem(seqNum)
1014 2 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
1015 0 : return nil, 0, err
1016 0 : }
1017 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
1018 : // batch may not initially fit, but will eventually fit (since it is smaller than
1019 : // largeBatchThreshold).
1020 2 : for err == arenaskl.ErrArenaFull {
1021 2 : flushMem()
1022 2 : ensureMem(seqNum)
1023 2 : err = mem.prepare(&b)
1024 2 : if err != nil && err != arenaskl.ErrArenaFull {
1025 0 : return nil, 0, err
1026 0 : }
1027 : }
1028 2 : if err = mem.apply(&b, seqNum); err != nil {
1029 0 : return nil, 0, err
1030 0 : }
1031 2 : mem.writerUnref()
1032 : }
1033 2 : buf.Reset()
1034 : }
1035 :
1036 2 : d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
1037 2 : jobID, ll.String(), offset, keysReplayed, batchesReplayed)
1038 2 : if !d.opts.ReadOnly {
1039 2 : flushMem()
1040 2 : }
1041 :
1042 : // mem is nil here, if !ReadOnly.
1043 2 : return flushableIngests, maxSeqNum, err
1044 : }
1045 :
1046 2 : func readOptionsFile(opts *Options, path string) (string, error) {
1047 2 : f, err := opts.FS.Open(path)
1048 2 : if err != nil {
1049 0 : return "", err
1050 0 : }
1051 2 : defer f.Close()
1052 2 :
1053 2 : data, err := io.ReadAll(f)
1054 2 : if err != nil {
1055 0 : return "", err
1056 0 : }
1057 2 : return string(data), nil
1058 : }
1059 :
1060 : // DBDesc briefly describes high-level state about a database.
1061 : type DBDesc struct {
1062 : // Exists is true if an existing database was found.
1063 : Exists bool
1064 : // FormatMajorVersion indicates the database's current format
1065 : // version.
1066 : FormatMajorVersion FormatMajorVersion
1067 : // ManifestFilename is the filename of the current active manifest,
1068 : // if the database exists.
1069 : ManifestFilename string
1070 : // OptionsFilename is the filename of the most recent OPTIONS file, if it
1071 : // exists.
1072 : OptionsFilename string
1073 : }
1074 :
1075 : // String implements fmt.Stringer.
1076 1 : func (d *DBDesc) String() string {
1077 1 : if !d.Exists {
1078 1 : return "uninitialized"
1079 1 : }
1080 1 : var buf bytes.Buffer
1081 1 : fmt.Fprintf(&buf, "initialized at format major version %s\n", d.FormatMajorVersion)
1082 1 : fmt.Fprintf(&buf, "manifest: %s\n", d.ManifestFilename)
1083 1 : fmt.Fprintf(&buf, "options: %s", d.OptionsFilename)
1084 1 : return buf.String()
1085 : }
1086 :
1087 : // Peek looks for an existing database in dirname on the provided FS. It
1088 : // returns a brief description of the database. Peek is read-only and
1089 : // does not open the database
1090 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1091 1 : ls, err := fs.List(dirname)
1092 1 : if err != nil {
1093 1 : return nil, err
1094 1 : }
1095 :
1096 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname, ls)
1097 1 : if err != nil {
1098 0 : return nil, err
1099 0 : }
1100 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1101 : // PeekMarker variant that avoids opening the directory.
1102 1 : if err := versMarker.Close(); err != nil {
1103 0 : return nil, err
1104 0 : }
1105 :
1106 : // Find the currently active manifest, if there is one.
1107 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(fs, dirname, ls)
1108 1 : if err != nil {
1109 0 : return nil, err
1110 0 : }
1111 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1112 : // PeekMarker variant that avoids opening the directory.
1113 1 : if err := manifestMarker.Close(); err != nil {
1114 0 : return nil, err
1115 0 : }
1116 :
1117 1 : desc := &DBDesc{
1118 1 : Exists: exists,
1119 1 : FormatMajorVersion: vers,
1120 1 : }
1121 1 :
1122 1 : // Find the OPTIONS file with the highest file number within the list of
1123 1 : // directory entries.
1124 1 : var previousOptionsFileNum base.DiskFileNum
1125 1 : for _, filename := range ls {
1126 1 : ft, fn, ok := base.ParseFilename(fs, filename)
1127 1 : if !ok || ft != base.FileTypeOptions || fn < previousOptionsFileNum {
1128 1 : continue
1129 : }
1130 1 : previousOptionsFileNum = fn
1131 1 : desc.OptionsFilename = fs.PathJoin(dirname, filename)
1132 : }
1133 :
1134 1 : if exists {
1135 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, base.FileTypeManifest, manifestFileNum)
1136 1 : }
1137 1 : return desc, nil
1138 : }
1139 :
1140 : // LockDirectory acquires the database directory lock in the named directory,
1141 : // preventing another process from opening the database. LockDirectory returns a
1142 : // handle to the held lock that may be passed to Open through Options.Lock to
1143 : // subsequently open the database, skipping lock acquistion during Open.
1144 : //
1145 : // LockDirectory may be used to expand the critical section protected by the
1146 : // database lock to include setup before the call to Open.
1147 2 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1148 2 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, base.FileTypeLock, base.DiskFileNum(0)))
1149 2 : if err != nil {
1150 1 : return nil, err
1151 1 : }
1152 2 : l := &Lock{dirname: dirname, fileLock: fileLock}
1153 2 : l.refs.Store(1)
1154 2 : invariants.SetFinalizer(l, func(obj interface{}) {
1155 2 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1156 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1157 : }
1158 : })
1159 2 : return l, nil
1160 : }
1161 :
1162 : // Lock represents a file lock on a directory. It may be passed to Open through
1163 : // Options.Lock to elide lock aquisition during Open.
1164 : type Lock struct {
1165 : dirname string
1166 : fileLock io.Closer
1167 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1168 : // or 2.
1169 : //
1170 : // When acquired by the client and passed to Open, refs = 1 and the Open
1171 : // call increments it to 2. When the database is closed, it's decremented to
1172 : // 1. Finally when the original caller, calls Close on the Lock, it's
1173 : // drecemented to zero and the underlying file lock is released.
1174 : //
1175 : // When Open acquires the file lock, refs remains at 1 until the database is
1176 : // closed.
1177 : refs atomic.Int32
1178 : }
1179 :
1180 1 : func (l *Lock) refForOpen() error {
1181 1 : // During Open, when a user passed in a lock, the reference count must be
1182 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1183 1 : // it's 2, the lock is already in use by another database within the
1184 1 : // process.
1185 1 : if !l.refs.CompareAndSwap(1, 2) {
1186 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1187 1 : }
1188 1 : return nil
1189 : }
1190 :
1191 : // Close releases the lock, permitting another process to lock and open the
1192 : // database. Close must not be called until after a database using the Lock has
1193 : // been closed.
1194 2 : func (l *Lock) Close() error {
1195 2 : if l.refs.Add(-1) > 0 {
1196 1 : return nil
1197 1 : }
1198 2 : defer func() { l.fileLock = nil }()
1199 2 : return l.fileLock.Close()
1200 : }
1201 :
1202 1 : func (l *Lock) pathMatches(dirname string) error {
1203 1 : if dirname == l.dirname {
1204 1 : return nil
1205 1 : }
1206 : // Check for relative paths, symlinks, etc. This isn't ideal because we're
1207 : // circumventing the vfs.FS interface here.
1208 : //
1209 : // TODO(jackson): We could add support for retrieving file inodes through Stat
1210 : // calls in the VFS interface on platforms where it's available and use that
1211 : // to differentiate.
1212 1 : dirStat, err1 := os.Stat(dirname)
1213 1 : lockDirStat, err2 := os.Stat(l.dirname)
1214 1 : if err1 == nil && err2 == nil && os.SameFile(dirStat, lockDirStat) {
1215 1 : return nil
1216 1 : }
1217 0 : return errors.Join(
1218 0 : errors.Newf("pebble: opts.Lock acquired in %q not %q", l.dirname, dirname),
1219 0 : err1, err2)
1220 : }
1221 :
1222 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1223 : // does not exist.
1224 : //
1225 : // Note that errors can be wrapped with more details; use errors.Is().
1226 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1227 :
1228 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1229 : // already exists.
1230 : //
1231 : // Note that errors can be wrapped with more details; use errors.Is().
1232 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1233 :
1234 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1235 : // already exists and is not pristine.
1236 : //
1237 : // Note that errors can be wrapped with more details; use errors.Is().
1238 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1239 :
1240 : // IsCorruptionError returns true if the given error indicates database
1241 : // corruption.
1242 1 : func IsCorruptionError(err error) bool {
1243 1 : return errors.Is(err, base.ErrCorruption)
1244 1 : }
1245 :
1246 2 : func checkConsistency(v *manifest.Version, objProvider objstorage.Provider) error {
1247 2 : var errs []error
1248 2 : dedup := make(map[base.DiskFileNum]struct{})
1249 2 : for level, files := range v.Levels {
1250 2 : iter := files.Iter()
1251 2 : for f := iter.First(); f != nil; f = iter.Next() {
1252 2 : backingState := f.FileBacking
1253 2 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1254 2 : continue
1255 : }
1256 2 : dedup[backingState.DiskFileNum] = struct{}{}
1257 2 : fileNum := backingState.DiskFileNum
1258 2 : fileSize := backingState.Size
1259 2 : // We skip over remote objects; those are instead checked asynchronously
1260 2 : // by the table stats loading job.
1261 2 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1262 2 : var size int64
1263 2 : if err == nil {
1264 2 : if meta.IsRemote() {
1265 2 : continue
1266 : }
1267 2 : size, err = objProvider.Size(meta)
1268 : }
1269 2 : if err != nil {
1270 1 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1271 1 : continue
1272 : }
1273 :
1274 2 : if size != int64(fileSize) {
1275 0 : errs = append(errs, errors.Errorf(
1276 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1277 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1278 0 : errors.Safe(size), errors.Safe(fileSize)))
1279 0 : continue
1280 : }
1281 : }
1282 : }
1283 2 : return errors.Join(errs...)
1284 : }
1285 :
1286 : type walEventListenerAdaptor struct {
1287 : l *EventListener
1288 : }
1289 :
1290 2 : func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
1291 2 : // TODO(sumeer): extend WALCreateInfo for the failover case in case the path
1292 2 : // is insufficient to infer whether primary or secondary.
1293 2 : wci := WALCreateInfo{
1294 2 : JobID: ci.JobID,
1295 2 : Path: ci.Path,
1296 2 : FileNum: base.DiskFileNum(ci.Num),
1297 2 : RecycledFileNum: ci.RecycledFileNum,
1298 2 : Err: ci.Err,
1299 2 : }
1300 2 : l.l.WALCreated(wci)
1301 2 : }
|