Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "math"
15 : "os"
16 : "slices"
17 : "sync/atomic"
18 : "time"
19 :
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/errors/oserror"
22 : "github.com/cockroachdb/pebble/internal/arenaskl"
23 : "github.com/cockroachdb/pebble/internal/base"
24 : "github.com/cockroachdb/pebble/internal/cache"
25 : "github.com/cockroachdb/pebble/internal/constants"
26 : "github.com/cockroachdb/pebble/internal/invariants"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/manual"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
31 : "github.com/cockroachdb/pebble/record"
32 : "github.com/cockroachdb/pebble/sstable"
33 : "github.com/cockroachdb/pebble/vfs"
34 : "github.com/prometheus/client_golang/prometheus"
35 : )
36 :
37 : const (
38 : initialMemTableSize = 256 << 10 // 256 KB
39 :
40 : // The max batch size is limited by the uint32 offsets stored in
41 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
42 : //
43 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
44 : // end of an allocation fits in uint32.
45 : //
46 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
47 : // 2GB).
48 : maxBatchSize = constants.MaxUint32OrInt
49 :
50 : // The max memtable size is limited by the uint32 offsets stored in
51 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
52 : //
53 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
54 : // end of an allocation fits in uint32.
55 : //
56 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
57 : // 2GB).
58 : maxMemTableSize = constants.MaxUint32OrInt
59 : )
60 :
61 : // TableCacheSize can be used to determine the table
62 : // cache size for a single db, given the maximum open
63 : // files which can be used by a table cache which is
64 : // only used by a single db.
65 1 : func TableCacheSize(maxOpenFiles int) int {
66 1 : tableCacheSize := maxOpenFiles - numNonTableCacheFiles
67 1 : if tableCacheSize < minTableCacheSize {
68 1 : tableCacheSize = minTableCacheSize
69 1 : }
70 1 : return tableCacheSize
71 : }
72 :
73 : // Open opens a DB whose files live in the given directory.
74 1 : func Open(dirname string, opts *Options) (db *DB, _ error) {
75 1 : // Make a copy of the options so that we don't mutate the passed in options.
76 1 : opts = opts.Clone()
77 1 : opts = opts.EnsureDefaults()
78 1 : if err := opts.Validate(); err != nil {
79 0 : return nil, err
80 0 : }
81 1 : if opts.LoggerAndTracer == nil {
82 1 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
83 1 : } else {
84 0 : opts.Logger = opts.LoggerAndTracer
85 0 : }
86 :
87 : // In all error cases, we return db = nil; this is used by various
88 : // deferred cleanups.
89 :
90 : // Open the database and WAL directories first.
91 1 : walDirname, dataDir, walDir, err := prepareAndOpenDirs(dirname, opts)
92 1 : if err != nil {
93 0 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
94 0 : }
95 1 : defer func() {
96 1 : if db == nil {
97 0 : if walDir != dataDir {
98 0 : walDir.Close()
99 0 : }
100 0 : dataDir.Close()
101 : }
102 : }()
103 :
104 : // Lock the database directory.
105 1 : var fileLock *Lock
106 1 : if opts.Lock != nil {
107 0 : // The caller already acquired the database lock. Ensure that the
108 0 : // directory matches.
109 0 : if dirname != opts.Lock.dirname {
110 0 : return nil, errors.Newf("pebble: opts.Lock acquired in %q not %q", opts.Lock.dirname, dirname)
111 0 : }
112 0 : if err := opts.Lock.refForOpen(); err != nil {
113 0 : return nil, err
114 0 : }
115 0 : fileLock = opts.Lock
116 1 : } else {
117 1 : fileLock, err = LockDirectory(dirname, opts.FS)
118 1 : if err != nil {
119 0 : return nil, err
120 0 : }
121 : }
122 1 : defer func() {
123 1 : if db == nil {
124 0 : fileLock.Close()
125 0 : }
126 : }()
127 :
128 : // Establish the format major version.
129 1 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname)
130 1 : if err != nil {
131 0 : return nil, err
132 0 : }
133 1 : defer func() {
134 1 : if db == nil {
135 0 : formatVersionMarker.Close()
136 0 : }
137 : }()
138 :
139 : // Find the currently active manifest, if there is one.
140 1 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(formatVersion, opts.FS, dirname)
141 1 : if err != nil {
142 0 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
143 0 : }
144 1 : defer func() {
145 1 : if db == nil {
146 0 : manifestMarker.Close()
147 0 : }
148 : }()
149 :
150 : // Atomic markers may leave behind obsolete files if there's a crash
151 : // mid-update. Clean these up if we're not in read-only mode.
152 1 : if !opts.ReadOnly {
153 1 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
154 0 : return nil, err
155 0 : }
156 1 : if err := manifestMarker.RemoveObsolete(); err != nil {
157 0 : return nil, err
158 0 : }
159 : }
160 :
161 1 : if opts.Cache == nil {
162 0 : opts.Cache = cache.New(cacheDefaultSize)
163 1 : } else {
164 1 : opts.Cache.Ref()
165 1 : }
166 :
167 1 : d := &DB{
168 1 : cacheID: opts.Cache.NewID(),
169 1 : dirname: dirname,
170 1 : walDirname: walDirname,
171 1 : opts: opts,
172 1 : cmp: opts.Comparer.Compare,
173 1 : equal: opts.equal(),
174 1 : merge: opts.Merger.Merge,
175 1 : split: opts.Comparer.Split,
176 1 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
177 1 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
178 1 : fileLock: fileLock,
179 1 : dataDir: dataDir,
180 1 : walDir: walDir,
181 1 : logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
182 1 : closed: new(atomic.Value),
183 1 : closedCh: make(chan struct{}),
184 1 : }
185 1 : d.mu.versions = &versionSet{}
186 1 : d.diskAvailBytes.Store(math.MaxUint64)
187 1 :
188 1 : defer func() {
189 1 : // If an error or panic occurs during open, attempt to release the manually
190 1 : // allocated memory resources. Note that rather than look for an error, we
191 1 : // look for the return of a nil DB pointer.
192 1 : if r := recover(); db == nil {
193 0 : // Release our references to the Cache. Note that both the DB, and
194 0 : // tableCache have a reference. When we release the reference to
195 0 : // the tableCache, and if there are no other references to
196 0 : // the tableCache, then the tableCache will also release its
197 0 : // reference to the cache.
198 0 : opts.Cache.Unref()
199 0 :
200 0 : if d.tableCache != nil {
201 0 : _ = d.tableCache.close()
202 0 : }
203 :
204 0 : for _, mem := range d.mu.mem.queue {
205 0 : switch t := mem.flushable.(type) {
206 0 : case *memTable:
207 0 : manual.Free(t.arenaBuf)
208 0 : t.arenaBuf = nil
209 : }
210 : }
211 0 : if d.cleanupManager != nil {
212 0 : d.cleanupManager.Close()
213 0 : }
214 0 : if d.objProvider != nil {
215 0 : d.objProvider.Close()
216 0 : }
217 0 : if r != nil {
218 0 : panic(r)
219 : }
220 : }
221 : }()
222 :
223 1 : d.commit = newCommitPipeline(commitEnv{
224 1 : logSeqNum: &d.mu.versions.logSeqNum,
225 1 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
226 1 : apply: d.commitApply,
227 1 : write: d.commitWrite,
228 1 : })
229 1 : d.mu.nextJobID = 1
230 1 : d.mu.mem.nextSize = opts.MemTableSize
231 1 : if d.mu.mem.nextSize > initialMemTableSize {
232 1 : d.mu.mem.nextSize = initialMemTableSize
233 1 : }
234 1 : d.mu.compact.cond.L = &d.mu.Mutex
235 1 : d.mu.compact.inProgress = make(map[*compaction]struct{})
236 1 : d.mu.compact.noOngoingFlushStartTime = time.Now()
237 1 : d.mu.snapshots.init()
238 1 : // logSeqNum is the next sequence number that will be assigned.
239 1 : // Start assigning sequence numbers from base.SeqNumStart to leave
240 1 : // room for reserved sequence numbers (see comments around
241 1 : // SeqNumStart).
242 1 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
243 1 : d.mu.formatVers.vers.Store(uint64(formatVersion))
244 1 : d.mu.formatVers.marker = formatVersionMarker
245 1 :
246 1 : d.timeNow = time.Now
247 1 : d.openedAt = d.timeNow()
248 1 :
249 1 : d.mu.Lock()
250 1 : defer d.mu.Unlock()
251 1 :
252 1 : jobID := d.mu.nextJobID
253 1 : d.mu.nextJobID++
254 1 :
255 1 : setCurrent := setCurrentFunc(d.FormatMajorVersion(), manifestMarker, opts.FS, dirname, d.dataDir)
256 1 :
257 1 : if !manifestExists {
258 1 : // DB does not exist.
259 1 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
260 0 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
261 0 : }
262 :
263 : // Create the DB.
264 1 : if err := d.mu.versions.create(jobID, dirname, opts, manifestMarker, setCurrent, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
265 0 : return nil, err
266 0 : }
267 1 : } else {
268 1 : if opts.ErrorIfExists {
269 0 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
270 0 : }
271 : // Load the version set.
272 1 : if err := d.mu.versions.load(dirname, opts, manifestFileNum, manifestMarker, setCurrent, d.FormatMajorVersion, &d.mu.Mutex); err != nil {
273 0 : return nil, err
274 0 : }
275 1 : if opts.ErrorIfNotPristine {
276 0 : liveFileNums := make(map[base.DiskFileNum]struct{})
277 0 : d.mu.versions.addLiveFileNums(liveFileNums)
278 0 : if len(liveFileNums) != 0 {
279 0 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
280 0 : }
281 : }
282 : }
283 :
284 : // In read-only mode, we replay directly into the mutable memtable but never
285 : // flush it. We need to delay creation of the memtable until we know the
286 : // sequence number of the first batch that will be inserted.
287 1 : if !d.opts.ReadOnly {
288 1 : var entry *flushableEntry
289 1 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load())
290 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
291 1 : }
292 :
293 : // List the objects
294 1 : ls, err := opts.FS.List(d.walDirname)
295 1 : if err != nil {
296 0 : return nil, err
297 0 : }
298 1 : if d.dirname != d.walDirname {
299 1 : ls2, err := opts.FS.List(d.dirname)
300 1 : if err != nil {
301 0 : return nil, err
302 0 : }
303 1 : ls = append(ls, ls2...)
304 : }
305 1 : providerSettings := objstorageprovider.Settings{
306 1 : Logger: opts.Logger,
307 1 : FS: opts.FS,
308 1 : FSDirName: dirname,
309 1 : FSDirInitialListing: ls,
310 1 : FSCleaner: opts.Cleaner,
311 1 : NoSyncOnClose: opts.NoSyncOnClose,
312 1 : BytesPerSync: opts.BytesPerSync,
313 1 : }
314 1 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
315 1 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
316 1 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
317 1 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
318 1 :
319 1 : d.objProvider, err = objstorageprovider.Open(providerSettings)
320 1 : if err != nil {
321 0 : return nil, err
322 0 : }
323 :
324 1 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
325 1 :
326 1 : if manifestExists {
327 1 : curVersion := d.mu.versions.currentVersion()
328 1 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
329 0 : return nil, err
330 0 : }
331 : }
332 :
333 1 : tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
334 1 : d.tableCache = newTableCacheContainer(
335 1 : opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize,
336 1 : &sstable.CategoryStatsCollector{})
337 1 : d.newIters = d.tableCache.newIters
338 1 : d.tableNewRangeKeyIter = d.tableCache.newRangeKeyIter
339 1 :
340 1 : // Replay any newer log files than the ones named in the manifest.
341 1 : type fileNumAndName struct {
342 1 : num base.DiskFileNum
343 1 : name string
344 1 : }
345 1 : var logFiles []fileNumAndName
346 1 : var previousOptionsFileNum FileNum
347 1 : var previousOptionsFilename string
348 1 : for _, filename := range ls {
349 1 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
350 1 : if !ok {
351 1 : continue
352 : }
353 :
354 : // Don't reuse any obsolete file numbers to avoid modifying an
355 : // ingested sstable's original external file.
356 1 : if d.mu.versions.nextFileNum <= uint64(fn.FileNum()) {
357 1 : d.mu.versions.nextFileNum = uint64(fn.FileNum()) + 1
358 1 : }
359 :
360 1 : switch ft {
361 1 : case fileTypeLog:
362 1 : if fn >= d.mu.versions.minUnflushedLogNum {
363 1 : logFiles = append(logFiles, fileNumAndName{fn, filename})
364 1 : }
365 1 : if d.logRecycler.minRecycleLogNum <= fn.FileNum() {
366 1 : d.logRecycler.minRecycleLogNum = fn.FileNum() + 1
367 1 : }
368 1 : case fileTypeOptions:
369 1 : if previousOptionsFileNum < fn.FileNum() {
370 1 : previousOptionsFileNum = fn.FileNum()
371 1 : previousOptionsFilename = filename
372 1 : }
373 0 : case fileTypeTemp, fileTypeOldTemp:
374 0 : if !d.opts.ReadOnly {
375 0 : // Some codepaths write to a temporary file and then
376 0 : // rename it to its final location when complete. A
377 0 : // temp file is leftover if a process exits before the
378 0 : // rename. Remove it.
379 0 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
380 0 : if err != nil {
381 0 : return nil, err
382 0 : }
383 : }
384 : }
385 : }
386 :
387 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
388 : // objProvider. This avoids FileNum collisions with obsolete sstables.
389 1 : objects := d.objProvider.List()
390 1 : for _, obj := range objects {
391 1 : if d.mu.versions.nextFileNum <= uint64(obj.DiskFileNum) {
392 0 : d.mu.versions.nextFileNum = uint64(obj.DiskFileNum) + 1
393 0 : }
394 : }
395 :
396 : // Validate the most-recent OPTIONS file, if there is one.
397 1 : var strictWALTail bool
398 1 : if previousOptionsFilename != "" {
399 1 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
400 1 : strictWALTail, err = checkOptions(opts, path)
401 1 : if err != nil {
402 0 : return nil, err
403 0 : }
404 : }
405 :
406 1 : slices.SortFunc(logFiles, func(a, b fileNumAndName) int {
407 1 : return cmp.Compare(a.num, b.num)
408 1 : })
409 :
410 1 : var ve versionEdit
411 1 : var toFlush flushableList
412 1 : for i, lf := range logFiles {
413 1 : lastWAL := i == len(logFiles)-1
414 1 : flush, maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS,
415 1 : opts.FS.PathJoin(d.walDirname, lf.name), lf.num, strictWALTail && !lastWAL)
416 1 : if err != nil {
417 0 : return nil, err
418 0 : }
419 1 : toFlush = append(toFlush, flush...)
420 1 : d.mu.versions.markFileNumUsed(lf.num)
421 1 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
422 1 : d.mu.versions.logSeqNum.Store(maxSeqNum)
423 1 : }
424 : }
425 1 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
426 1 :
427 1 : if !d.opts.ReadOnly {
428 1 : // Create an empty .log file.
429 1 : newLogNum := d.mu.versions.getNextDiskFileNum()
430 1 :
431 1 : // This logic is slightly different than RocksDB's. Specifically, RocksDB
432 1 : // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
433 1 : // newLogNum. There should be no difference in using either value.
434 1 : ve.MinUnflushedLogNum = newLogNum
435 1 :
436 1 : // Create the manifest with the updated MinUnflushedLogNum before
437 1 : // creating the new log file. If we created the log file first, a
438 1 : // crash before the manifest is synced could leave two WALs with
439 1 : // unclean tails.
440 1 : d.mu.versions.logLock()
441 1 : if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
442 1 : return nil
443 1 : }); err != nil {
444 0 : return nil, err
445 0 : }
446 :
447 1 : for _, entry := range toFlush {
448 1 : entry.readerUnrefLocked(true)
449 1 : }
450 :
451 1 : newLogName := base.MakeFilepath(opts.FS, d.walDirname, fileTypeLog, newLogNum)
452 1 : d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: 0})
453 1 : logFile, err := opts.FS.Create(newLogName)
454 1 : if err != nil {
455 0 : return nil, err
456 0 : }
457 1 : if err := d.walDir.Sync(); err != nil {
458 0 : return nil, err
459 0 : }
460 1 : d.opts.EventListener.WALCreated(WALCreateInfo{
461 1 : JobID: jobID,
462 1 : Path: newLogName,
463 1 : FileNum: newLogNum,
464 1 : })
465 1 : // This isn't strictly necessary as we don't use the log number for
466 1 : // memtables being flushed, only for the next unflushed memtable.
467 1 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
468 1 :
469 1 : logFile = vfs.NewSyncingFile(logFile, vfs.SyncingFileOptions{
470 1 : NoSyncOnClose: d.opts.NoSyncOnClose,
471 1 : BytesPerSync: d.opts.WALBytesPerSync,
472 1 : PreallocateSize: d.walPreallocateSize(),
473 1 : })
474 1 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
475 1 : Buckets: FsyncLatencyBuckets,
476 1 : })
477 1 :
478 1 : logWriterConfig := record.LogWriterConfig{
479 1 : WALMinSyncInterval: d.opts.WALMinSyncInterval,
480 1 : WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
481 1 : QueueSemChan: d.commit.logSyncQSem,
482 1 : }
483 1 : d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
484 1 : d.mu.versions.metrics.WAL.Files++
485 : }
486 1 : d.updateReadStateLocked(d.opts.DebugCheck)
487 1 :
488 1 : // If the Options specify a format major version higher than the
489 1 : // loaded database's, upgrade it. If this is a new database, this
490 1 : // code path also performs an initial upgrade from the starting
491 1 : // implicit MostCompatible version.
492 1 : //
493 1 : // We ratchet the version this far into Open so that migrations have a read
494 1 : // state available.
495 1 : if !d.opts.ReadOnly && opts.FormatMajorVersion > d.FormatMajorVersion() {
496 1 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
497 0 : return nil, err
498 0 : }
499 : }
500 :
501 1 : if !d.opts.ReadOnly {
502 1 : // Write the current options to disk.
503 1 : d.optionsFileNum = d.mu.versions.getNextDiskFileNum()
504 1 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
505 1 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
506 1 :
507 1 : // Write them to a temporary file first, in case we crash before
508 1 : // we're done. A corrupt options file prevents opening the
509 1 : // database.
510 1 : optionsFile, err := opts.FS.Create(tmpPath)
511 1 : if err != nil {
512 0 : return nil, err
513 0 : }
514 1 : serializedOpts := []byte(opts.String())
515 1 : if _, err := optionsFile.Write(serializedOpts); err != nil {
516 0 : return nil, errors.CombineErrors(err, optionsFile.Close())
517 0 : }
518 1 : d.optionsFileSize = uint64(len(serializedOpts))
519 1 : if err := optionsFile.Sync(); err != nil {
520 0 : return nil, errors.CombineErrors(err, optionsFile.Close())
521 0 : }
522 1 : if err := optionsFile.Close(); err != nil {
523 0 : return nil, err
524 0 : }
525 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
526 : // guaranteed to be atomic because the destination path does not
527 : // exist.
528 1 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
529 0 : return nil, err
530 0 : }
531 1 : if err := d.dataDir.Sync(); err != nil {
532 0 : return nil, err
533 0 : }
534 : }
535 :
536 1 : if !d.opts.ReadOnly {
537 1 : d.scanObsoleteFiles(ls)
538 1 : d.deleteObsoleteFiles(jobID)
539 1 : } else {
540 0 : // All the log files are obsolete.
541 0 : d.mu.versions.metrics.WAL.Files = int64(len(logFiles))
542 0 : }
543 1 : d.mu.tableStats.cond.L = &d.mu.Mutex
544 1 : d.mu.tableValidation.cond.L = &d.mu.Mutex
545 1 : if !d.opts.ReadOnly {
546 1 : d.maybeCollectTableStatsLocked()
547 1 : }
548 1 : d.calculateDiskAvailableBytes()
549 1 :
550 1 : d.maybeScheduleFlush()
551 1 : d.maybeScheduleCompaction()
552 1 :
553 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
554 1 : //
555 1 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
556 1 : // finalizer to never be run. The problem is due to this limitation of
557 1 : // finalizers mention in the SetFinalizer docs:
558 1 : //
559 1 : // If a cyclic structure includes a block with a finalizer, that cycle is
560 1 : // not guaranteed to be garbage collected and the finalizer is not
561 1 : // guaranteed to run, because there is no ordering that respects the
562 1 : // dependencies.
563 1 : //
564 1 : // DB has cycles with several of its internal structures: readState,
565 1 : // newIters, tableCache, versions, etc. Each of this individually cause a
566 1 : // cycle and prevent the finalizer from being run. But we can workaround this
567 1 : // finializer limitation by setting a finalizer on another object that is
568 1 : // tied to the lifetime of DB: the DB.closed atomic.Value.
569 1 : dPtr := fmt.Sprintf("%p", d)
570 1 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
571 0 : v := obj.(*atomic.Value)
572 0 : if err := v.Load(); err == nil {
573 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
574 0 : os.Exit(1)
575 0 : }
576 : })
577 :
578 1 : return d, nil
579 : }
580 :
581 : // prepareAndOpenDirs opens the directories for the store (and creates them if
582 : // necessary).
583 : //
584 : // Returns an error if ReadOnly is set and the directories don't exist.
585 : func prepareAndOpenDirs(
586 : dirname string, opts *Options,
587 1 : ) (walDirname string, dataDir vfs.File, walDir vfs.File, err error) {
588 1 : walDirname = opts.WALDir
589 1 : if opts.WALDir == "" {
590 1 : walDirname = dirname
591 1 : }
592 :
593 : // Create directories if needed.
594 1 : if !opts.ReadOnly {
595 1 : if err := opts.FS.MkdirAll(dirname, 0755); err != nil {
596 0 : return "", nil, nil, err
597 0 : }
598 1 : if walDirname != dirname {
599 1 : if err := opts.FS.MkdirAll(walDirname, 0755); err != nil {
600 0 : return "", nil, nil, err
601 0 : }
602 : }
603 : }
604 :
605 1 : dataDir, err = opts.FS.OpenDir(dirname)
606 1 : if err != nil {
607 0 : if opts.ReadOnly && oserror.IsNotExist(err) {
608 0 : return "", nil, nil, errors.Errorf("pebble: database %q does not exist", dirname)
609 0 : }
610 0 : return "", nil, nil, err
611 : }
612 :
613 1 : if walDirname == dirname {
614 1 : walDir = dataDir
615 1 : } else {
616 1 : walDir, err = opts.FS.OpenDir(walDirname)
617 1 : if err != nil {
618 0 : dataDir.Close()
619 0 : return "", nil, nil, err
620 0 : }
621 : }
622 1 : return walDirname, dataDir, walDir, nil
623 : }
624 :
625 : // GetVersion returns the engine version string from the latest options
626 : // file present in dir. Used to check what Pebble or RocksDB version was last
627 : // used to write to the database stored in this directory. An empty string is
628 : // returned if no valid OPTIONS file with a version key was found.
629 0 : func GetVersion(dir string, fs vfs.FS) (string, error) {
630 0 : ls, err := fs.List(dir)
631 0 : if err != nil {
632 0 : return "", err
633 0 : }
634 0 : var version string
635 0 : lastOptionsSeen := FileNum(0)
636 0 : for _, filename := range ls {
637 0 : ft, fn, ok := base.ParseFilename(fs, filename)
638 0 : if !ok {
639 0 : continue
640 : }
641 0 : switch ft {
642 0 : case fileTypeOptions:
643 0 : // If this file has a higher number than the last options file
644 0 : // processed, reset version. This is because rocksdb often
645 0 : // writes multiple options files without deleting previous ones.
646 0 : // Otherwise, skip parsing this options file.
647 0 : if fn.FileNum() > lastOptionsSeen {
648 0 : version = ""
649 0 : lastOptionsSeen = fn.FileNum()
650 0 : } else {
651 0 : continue
652 : }
653 0 : f, err := fs.Open(fs.PathJoin(dir, filename))
654 0 : if err != nil {
655 0 : return "", err
656 0 : }
657 0 : data, err := io.ReadAll(f)
658 0 : f.Close()
659 0 :
660 0 : if err != nil {
661 0 : return "", err
662 0 : }
663 0 : err = parseOptions(string(data), func(section, key, value string) error {
664 0 : switch {
665 0 : case section == "Version":
666 0 : switch key {
667 0 : case "pebble_version":
668 0 : version = value
669 0 : case "rocksdb_version":
670 0 : version = fmt.Sprintf("rocksdb v%s", value)
671 : }
672 : }
673 0 : return nil
674 : })
675 0 : if err != nil {
676 0 : return "", err
677 0 : }
678 : }
679 : }
680 0 : return version, nil
681 : }
682 :
683 : // replayWAL replays the edits in the specified log file. If the DB is in
684 : // read only mode, then the WALs are replayed into memtables and not flushed. If
685 : // the DB is not in read only mode, then the contents of the WAL are guaranteed
686 : // to be flushed.
687 : //
688 : // The toFlush return value is a list of flushables associated with the WAL
689 : // being replayed which will be flushed. Once the version edit has been applied
690 : // to the manifest, it is up to the caller of replayWAL to unreference the
691 : // toFlush flushables returned by replayWAL.
692 : //
693 : // d.mu must be held when calling this, but the mutex may be dropped and
694 : // re-acquired during the course of this method.
695 : func (d *DB) replayWAL(
696 : jobID int,
697 : ve *versionEdit,
698 : fs vfs.FS,
699 : filename string,
700 : logNum base.DiskFileNum,
701 : strictWALTail bool,
702 1 : ) (toFlush flushableList, maxSeqNum uint64, err error) {
703 1 : file, err := fs.Open(filename)
704 1 : if err != nil {
705 0 : return nil, 0, err
706 0 : }
707 1 : defer file.Close()
708 1 : var (
709 1 : b Batch
710 1 : buf bytes.Buffer
711 1 : mem *memTable
712 1 : entry *flushableEntry
713 1 : rr = record.NewReader(file, logNum)
714 1 : offset int64 // byte offset in rr
715 1 : lastFlushOffset int64
716 1 : keysReplayed int64 // number of keys replayed
717 1 : batchesReplayed int64 // number of batches replayed
718 1 : )
719 1 :
720 1 : if d.opts.ReadOnly {
721 0 : // In read-only mode, we replay directly into the mutable memtable which will
722 0 : // never be flushed.
723 0 : mem = d.mu.mem.mutable
724 0 : if mem != nil {
725 0 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
726 0 : }
727 : }
728 :
729 : // Flushes the current memtable, if not nil.
730 1 : flushMem := func() {
731 1 : if mem == nil {
732 1 : return
733 1 : }
734 1 : var logSize uint64
735 1 : if offset >= lastFlushOffset {
736 1 : logSize = uint64(offset - lastFlushOffset)
737 1 : }
738 : // Else, this was the initial memtable in the read-only case which must have
739 : // been empty, but we need to flush it since we don't want to add to it later.
740 1 : lastFlushOffset = offset
741 1 : entry.logSize = logSize
742 1 : if !d.opts.ReadOnly {
743 1 : toFlush = append(toFlush, entry)
744 1 : }
745 1 : mem, entry = nil, nil
746 : }
747 : // Creates a new memtable if there is no current memtable.
748 1 : ensureMem := func(seqNum uint64) {
749 1 : if mem != nil {
750 1 : return
751 1 : }
752 1 : mem, entry = d.newMemTable(logNum, seqNum)
753 1 : if d.opts.ReadOnly {
754 0 : d.mu.mem.mutable = mem
755 0 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
756 0 : }
757 : }
758 :
759 : // updateVE is used to update ve with information about new files created
760 : // during the flush of any flushable not of type ingestedFlushable. For the
761 : // flushable of type ingestedFlushable we use custom handling below.
762 1 : updateVE := func() error {
763 1 : // TODO(bananabrick): See if we can use the actual base level here,
764 1 : // instead of using 1.
765 1 : c := newFlush(d.opts, d.mu.versions.currentVersion(),
766 1 : 1 /* base level */, toFlush, d.timeNow())
767 1 : newVE, _, _, err := d.runCompaction(jobID, c)
768 1 : if err != nil {
769 0 : return errors.Wrapf(err, "running compaction during WAL replay")
770 0 : }
771 1 : ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
772 1 : return nil
773 : }
774 :
775 1 : for {
776 1 : offset = rr.Offset()
777 1 : r, err := rr.Next()
778 1 : if err == nil {
779 1 : _, err = io.Copy(&buf, r)
780 1 : }
781 1 : if err != nil {
782 1 : // It is common to encounter a zeroed or invalid chunk due to WAL
783 1 : // preallocation and WAL recycling. We need to distinguish these
784 1 : // errors from EOF in order to recognize that the record was
785 1 : // truncated and to avoid replaying subsequent WALs, but want
786 1 : // to otherwise treat them like EOF.
787 1 : if err == io.EOF {
788 1 : break
789 0 : } else if record.IsInvalidRecord(err) && !strictWALTail {
790 0 : break
791 : }
792 0 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
793 : }
794 :
795 1 : if buf.Len() < batchHeaderLen {
796 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt log file %q (num %s)",
797 0 : filename, errors.Safe(logNum))
798 0 : }
799 :
800 1 : if d.opts.ErrorIfNotPristine {
801 0 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
802 0 : }
803 :
804 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
805 : // which is used below.
806 1 : b = Batch{}
807 1 : b.db = d
808 1 : b.SetRepr(buf.Bytes())
809 1 : seqNum := b.SeqNum()
810 1 : maxSeqNum = seqNum + uint64(b.Count())
811 1 : keysReplayed += int64(b.Count())
812 1 : batchesReplayed++
813 1 : {
814 1 : br := b.Reader()
815 1 : if kind, encodedFileNum, _, _ := br.Next(); kind == InternalKeyKindIngestSST {
816 1 : fileNums := make([]base.DiskFileNum, 0, b.Count())
817 1 : addFileNum := func(encodedFileNum []byte) {
818 1 : fileNum, n := binary.Uvarint(encodedFileNum)
819 1 : if n <= 0 {
820 0 : panic("pebble: ingest sstable file num is invalid.")
821 : }
822 1 : fileNums = append(fileNums, base.FileNum(fileNum).DiskFileNum())
823 : }
824 1 : addFileNum(encodedFileNum)
825 1 :
826 1 : for i := 1; i < int(b.Count()); i++ {
827 0 : kind, encodedFileNum, _, ok := br.Next()
828 0 : if kind != InternalKeyKindIngestSST {
829 0 : panic("pebble: invalid batch key kind.")
830 : }
831 0 : if !ok {
832 0 : panic("pebble: invalid batch count.")
833 : }
834 0 : addFileNum(encodedFileNum)
835 : }
836 :
837 1 : if _, _, _, ok := br.Next(); ok {
838 0 : panic("pebble: invalid number of entries in batch.")
839 : }
840 :
841 1 : meta := make([]*fileMetadata, len(fileNums))
842 1 : for i, n := range fileNums {
843 1 : var readable objstorage.Readable
844 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
845 1 : if err != nil {
846 0 : return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
847 0 : }
848 1 : if objMeta.IsRemote() {
849 1 : readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
850 1 : if err != nil {
851 0 : return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
852 0 : }
853 1 : } else {
854 1 : path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
855 1 : f, err := d.opts.FS.Open(path)
856 1 : if err != nil {
857 0 : return nil, 0, err
858 0 : }
859 :
860 1 : readable, err = sstable.NewSimpleReadable(f)
861 1 : if err != nil {
862 0 : return nil, 0, err
863 0 : }
864 : }
865 : // NB: ingestLoad1 will close readable.
866 1 : meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, n)
867 1 : if err != nil {
868 0 : return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
869 0 : }
870 : }
871 :
872 1 : if uint32(len(meta)) != b.Count() {
873 0 : panic("pebble: couldn't load all files in WAL entry.")
874 : }
875 :
876 1 : entry, err = d.newIngestedFlushableEntry(
877 1 : meta, seqNum, logNum,
878 1 : )
879 1 : if err != nil {
880 0 : return nil, 0, err
881 0 : }
882 :
883 1 : if d.opts.ReadOnly {
884 0 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
885 0 : // We added the IngestSST flushable to the queue. But there
886 0 : // must be at least one WAL entry waiting to be replayed. We
887 0 : // have to ensure this newer WAL entry isn't replayed into
888 0 : // the current value of d.mu.mem.mutable because the current
889 0 : // mutable memtable exists before this flushable entry in
890 0 : // the memtable queue. To ensure this, we just need to unset
891 0 : // d.mu.mem.mutable. When a newer WAL is replayed, we will
892 0 : // set d.mu.mem.mutable to a newer value.
893 0 : d.mu.mem.mutable = nil
894 1 : } else {
895 1 : toFlush = append(toFlush, entry)
896 1 : // During WAL replay, the lsm only has L0, hence, the
897 1 : // baseLevel is 1. For the sake of simplicity, we place the
898 1 : // ingested files in L0 here, instead of finding their
899 1 : // target levels. This is a simplification for the sake of
900 1 : // simpler code. It is expected that WAL replay should be
901 1 : // rare, and that flushables of type ingestedFlushable
902 1 : // should also be rare. So, placing the ingested files in L0
903 1 : // is alright.
904 1 : //
905 1 : // TODO(bananabrick): Maybe refactor this function to allow
906 1 : // us to easily place ingested files in levels as low as
907 1 : // possible during WAL replay. It would require breaking up
908 1 : // the application of ve to the manifest into chunks and is
909 1 : // not pretty w/o a refactor to this function and how it's
910 1 : // used.
911 1 : c := newFlush(
912 1 : d.opts, d.mu.versions.currentVersion(),
913 1 : 1, /* base level */
914 1 : []*flushableEntry{entry},
915 1 : d.timeNow(),
916 1 : )
917 1 : for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
918 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
919 1 : }
920 : }
921 1 : return toFlush, maxSeqNum, nil
922 : }
923 : }
924 :
925 1 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
926 0 : flushMem()
927 0 : // Make a copy of the data slice since it is currently owned by buf and will
928 0 : // be reused in the next iteration.
929 0 : b.data = slices.Clone(b.data)
930 0 : b.flushable = newFlushableBatch(&b, d.opts.Comparer)
931 0 : entry := d.newFlushableEntry(b.flushable, logNum, b.SeqNum())
932 0 : // Disable memory accounting by adding a reader ref that will never be
933 0 : // removed.
934 0 : entry.readerRefs.Add(1)
935 0 : if d.opts.ReadOnly {
936 0 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
937 0 : // We added the flushable batch to the flushable to the queue.
938 0 : // But there must be at least one WAL entry waiting to be
939 0 : // replayed. We have to ensure this newer WAL entry isn't
940 0 : // replayed into the current value of d.mu.mem.mutable because
941 0 : // the current mutable memtable exists before this flushable
942 0 : // entry in the memtable queue. To ensure this, we just need to
943 0 : // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
944 0 : // set d.mu.mem.mutable to a newer value.
945 0 : d.mu.mem.mutable = nil
946 0 : } else {
947 0 : toFlush = append(toFlush, entry)
948 0 : }
949 1 : } else {
950 1 : ensureMem(seqNum)
951 1 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
952 0 : return nil, 0, err
953 0 : }
954 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
955 : // batch may not initially fit, but will eventually fit (since it is smaller than
956 : // largeBatchThreshold).
957 1 : for err == arenaskl.ErrArenaFull {
958 1 : flushMem()
959 1 : ensureMem(seqNum)
960 1 : err = mem.prepare(&b)
961 1 : if err != nil && err != arenaskl.ErrArenaFull {
962 0 : return nil, 0, err
963 0 : }
964 : }
965 1 : if err = mem.apply(&b, seqNum); err != nil {
966 0 : return nil, 0, err
967 0 : }
968 1 : mem.writerUnref()
969 : }
970 1 : buf.Reset()
971 : }
972 :
973 1 : d.opts.Logger.Infof("[JOB %d] WAL file %s with log number %s stopped reading at offset: %d; replayed %d keys in %d batches", jobID, filename, logNum.String(), offset, keysReplayed, batchesReplayed)
974 1 : flushMem()
975 1 :
976 1 : // mem is nil here.
977 1 : if !d.opts.ReadOnly {
978 1 : err = updateVE()
979 1 : if err != nil {
980 0 : return nil, 0, err
981 0 : }
982 : }
983 1 : return toFlush, maxSeqNum, err
984 : }
985 :
986 1 : func checkOptions(opts *Options, path string) (strictWALTail bool, err error) {
987 1 : f, err := opts.FS.Open(path)
988 1 : if err != nil {
989 0 : return false, err
990 0 : }
991 1 : defer f.Close()
992 1 :
993 1 : data, err := io.ReadAll(f)
994 1 : if err != nil {
995 0 : return false, err
996 0 : }
997 1 : return opts.checkOptions(string(data))
998 : }
999 :
1000 : // DBDesc briefly describes high-level state about a database.
1001 : type DBDesc struct {
1002 : // Exists is true if an existing database was found.
1003 : Exists bool
1004 : // FormatMajorVersion indicates the database's current format
1005 : // version.
1006 : FormatMajorVersion FormatMajorVersion
1007 : // ManifestFilename is the filename of the current active manifest,
1008 : // if the database exists.
1009 : ManifestFilename string
1010 : }
1011 :
1012 : // Peek looks for an existing database in dirname on the provided FS. It
1013 : // returns a brief description of the database. Peek is read-only and
1014 : // does not open the database
1015 0 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1016 0 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname)
1017 0 : if err != nil {
1018 0 : return nil, err
1019 0 : }
1020 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1021 : // PeekMarker variant that avoids opening the directory.
1022 0 : if err := versMarker.Close(); err != nil {
1023 0 : return nil, err
1024 0 : }
1025 :
1026 : // Find the currently active manifest, if there is one.
1027 0 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(vers, fs, dirname)
1028 0 : if err != nil {
1029 0 : return nil, err
1030 0 : }
1031 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1032 : // PeekMarker variant that avoids opening the directory.
1033 0 : if err := manifestMarker.Close(); err != nil {
1034 0 : return nil, err
1035 0 : }
1036 :
1037 0 : desc := &DBDesc{
1038 0 : Exists: exists,
1039 0 : FormatMajorVersion: vers,
1040 0 : }
1041 0 : if exists {
1042 0 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1043 0 : }
1044 0 : return desc, nil
1045 : }
1046 :
1047 : // LockDirectory acquires the database directory lock in the named directory,
1048 : // preventing another process from opening the database. LockDirectory returns a
1049 : // handle to the held lock that may be passed to Open through Options.Lock to
1050 : // subsequently open the database, skipping lock acquistion during Open.
1051 : //
1052 : // LockDirectory may be used to expand the critical section protected by the
1053 : // database lock to include setup before the call to Open.
1054 1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1055 1 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.FileNum(0).DiskFileNum()))
1056 1 : if err != nil {
1057 0 : return nil, err
1058 0 : }
1059 1 : l := &Lock{dirname: dirname, fileLock: fileLock}
1060 1 : l.refs.Store(1)
1061 1 : invariants.SetFinalizer(l, func(obj interface{}) {
1062 1 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1063 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1064 : }
1065 : })
1066 1 : return l, nil
1067 : }
1068 :
1069 : // Lock represents a file lock on a directory. It may be passed to Open through
1070 : // Options.Lock to elide lock aquisition during Open.
1071 : type Lock struct {
1072 : dirname string
1073 : fileLock io.Closer
1074 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1075 : // or 2.
1076 : //
1077 : // When acquired by the client and passed to Open, refs = 1 and the Open
1078 : // call increments it to 2. When the database is closed, it's decremented to
1079 : // 1. Finally when the original caller, calls Close on the Lock, it's
1080 : // drecemented to zero and the underlying file lock is released.
1081 : //
1082 : // When Open acquires the file lock, refs remains at 1 until the database is
1083 : // closed.
1084 : refs atomic.Int32
1085 : }
1086 :
1087 0 : func (l *Lock) refForOpen() error {
1088 0 : // During Open, when a user passed in a lock, the reference count must be
1089 0 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1090 0 : // it's 2, the lock is already in use by another database within the
1091 0 : // process.
1092 0 : if !l.refs.CompareAndSwap(1, 2) {
1093 0 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1094 0 : }
1095 0 : return nil
1096 : }
1097 :
1098 : // Close releases the lock, permitting another process to lock and open the
1099 : // database. Close must not be called until after a database using the Lock has
1100 : // been closed.
1101 1 : func (l *Lock) Close() error {
1102 1 : if l.refs.Add(-1) > 0 {
1103 0 : return nil
1104 0 : }
1105 1 : defer func() { l.fileLock = nil }()
1106 1 : return l.fileLock.Close()
1107 : }
1108 :
1109 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1110 : // does not exist.
1111 : //
1112 : // Note that errors can be wrapped with more details; use errors.Is().
1113 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1114 :
1115 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1116 : // already exists.
1117 : //
1118 : // Note that errors can be wrapped with more details; use errors.Is().
1119 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1120 :
1121 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1122 : // already exists and is not pristine.
1123 : //
1124 : // Note that errors can be wrapped with more details; use errors.Is().
1125 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1126 :
1127 : // IsCorruptionError returns true if the given error indicates database
1128 : // corruption.
1129 0 : func IsCorruptionError(err error) bool {
1130 0 : return errors.Is(err, base.ErrCorruption)
1131 0 : }
1132 :
1133 1 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1134 1 : var errs []error
1135 1 : dedup := make(map[base.DiskFileNum]struct{})
1136 1 : for level, files := range v.Levels {
1137 1 : iter := files.Iter()
1138 1 : for f := iter.First(); f != nil; f = iter.Next() {
1139 1 : backingState := f.FileBacking
1140 1 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1141 1 : continue
1142 : }
1143 1 : dedup[backingState.DiskFileNum] = struct{}{}
1144 1 : fileNum := backingState.DiskFileNum
1145 1 : fileSize := backingState.Size
1146 1 : // We skip over remote objects; those are instead checked asynchronously
1147 1 : // by the table stats loading job.
1148 1 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1149 1 : var size int64
1150 1 : if err == nil {
1151 1 : if meta.IsRemote() {
1152 1 : continue
1153 : }
1154 1 : size, err = objProvider.Size(meta)
1155 : }
1156 1 : if err != nil {
1157 0 : errs = append(errs, errors.Wrapf(err, "L%d: %s", errors.Safe(level), fileNum))
1158 0 : continue
1159 : }
1160 :
1161 1 : if size != int64(fileSize) {
1162 0 : errs = append(errs, errors.Errorf(
1163 0 : "L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)",
1164 0 : errors.Safe(level), fileNum, objProvider.Path(meta),
1165 0 : errors.Safe(size), errors.Safe(fileSize)))
1166 0 : continue
1167 : }
1168 : }
1169 : }
1170 1 : return errors.Join(errs...)
1171 : }
|