Line data Source code
1 : // Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "math"
14 : "os"
15 : "sort"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/errors/oserror"
21 : "github.com/cockroachdb/pebble/internal/arenaskl"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/cache"
24 : "github.com/cockroachdb/pebble/internal/constants"
25 : "github.com/cockroachdb/pebble/internal/invariants"
26 : "github.com/cockroachdb/pebble/internal/manifest"
27 : "github.com/cockroachdb/pebble/internal/manual"
28 : "github.com/cockroachdb/pebble/objstorage"
29 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
30 : "github.com/cockroachdb/pebble/record"
31 : "github.com/cockroachdb/pebble/sstable"
32 : "github.com/cockroachdb/pebble/vfs"
33 : "github.com/prometheus/client_golang/prometheus"
34 : )
35 :
36 : const (
37 : initialMemTableSize = 256 << 10 // 256 KB
38 :
39 : // The max batch size is limited by the uint32 offsets stored in
40 : // internal/batchskl.node, DeferredBatchOp, and flushableBatchEntry.
41 : //
42 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
43 : // end of an allocation fits in uint32.
44 : //
45 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
46 : // 2GB).
47 : maxBatchSize = constants.MaxUint32OrInt
48 :
49 : // The max memtable size is limited by the uint32 offsets stored in
50 : // internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
51 : //
52 : // We limit the size to MaxUint32 (just short of 4GB) so that the exclusive
53 : // end of an allocation fits in uint32.
54 : //
55 : // On 32-bit systems, slices are naturally limited to MaxInt (just short of
56 : // 2GB).
57 : maxMemTableSize = constants.MaxUint32OrInt
58 : )
59 :
60 : // TableCacheSize can be used to determine the table
61 : // cache size for a single db, given the maximum open
62 : // files which can be used by a table cache which is
63 : // only used by a single db.
64 1 : func TableCacheSize(maxOpenFiles int) int {
65 1 : tableCacheSize := maxOpenFiles - numNonTableCacheFiles
66 1 : if tableCacheSize < minTableCacheSize {
67 0 : tableCacheSize = minTableCacheSize
68 0 : }
69 1 : return tableCacheSize
70 : }
71 :
72 : // Open opens a DB whose files live in the given directory.
73 1 : func Open(dirname string, opts *Options) (db *DB, _ error) {
74 1 : // Make a copy of the options so that we don't mutate the passed in options.
75 1 : opts = opts.Clone()
76 1 : opts = opts.EnsureDefaults()
77 1 : if err := opts.Validate(); err != nil {
78 0 : return nil, err
79 0 : }
80 1 : if opts.LoggerAndTracer == nil {
81 1 : opts.LoggerAndTracer = &base.LoggerWithNoopTracer{Logger: opts.Logger}
82 1 : } else {
83 1 : opts.Logger = opts.LoggerAndTracer
84 1 : }
85 :
86 : // In all error cases, we return db = nil; this is used by various
87 : // deferred cleanups.
88 :
89 : // Open the database and WAL directories first.
90 1 : walDirname, dataDir, walDir, err := prepareAndOpenDirs(dirname, opts)
91 1 : if err != nil {
92 1 : return nil, errors.Wrapf(err, "error opening database at %q", dirname)
93 1 : }
94 1 : defer func() {
95 1 : if db == nil {
96 1 : if walDir != dataDir {
97 0 : walDir.Close()
98 0 : }
99 1 : dataDir.Close()
100 : }
101 : }()
102 :
103 : // Lock the database directory.
104 1 : var fileLock *Lock
105 1 : if opts.Lock != nil {
106 1 : // The caller already acquired the database lock. Ensure that the
107 1 : // directory matches.
108 1 : if dirname != opts.Lock.dirname {
109 0 : return nil, errors.Newf("pebble: opts.Lock acquired in %q not %q", opts.Lock.dirname, dirname)
110 0 : }
111 1 : if err := opts.Lock.refForOpen(); err != nil {
112 1 : return nil, err
113 1 : }
114 1 : fileLock = opts.Lock
115 1 : } else {
116 1 : fileLock, err = LockDirectory(dirname, opts.FS)
117 1 : if err != nil {
118 1 : return nil, err
119 1 : }
120 : }
121 1 : defer func() {
122 1 : if db == nil {
123 1 : fileLock.Close()
124 1 : }
125 : }()
126 :
127 : // Establish the format major version.
128 1 : formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname)
129 1 : if err != nil {
130 1 : return nil, err
131 1 : }
132 1 : defer func() {
133 1 : if db == nil {
134 1 : formatVersionMarker.Close()
135 1 : }
136 : }()
137 :
138 : // Find the currently active manifest, if there is one.
139 1 : manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(formatVersion, opts.FS, dirname)
140 1 : if err != nil {
141 1 : return nil, errors.Wrapf(err, "pebble: database %q", dirname)
142 1 : }
143 1 : defer func() {
144 1 : if db == nil {
145 1 : manifestMarker.Close()
146 1 : }
147 : }()
148 :
149 : // Atomic markers may leave behind obsolete files if there's a crash
150 : // mid-update. Clean these up if we're not in read-only mode.
151 1 : if !opts.ReadOnly {
152 1 : if err := formatVersionMarker.RemoveObsolete(); err != nil {
153 0 : return nil, err
154 0 : }
155 1 : if err := manifestMarker.RemoveObsolete(); err != nil {
156 0 : return nil, err
157 0 : }
158 : }
159 :
160 1 : if opts.Cache == nil {
161 1 : opts.Cache = cache.New(cacheDefaultSize)
162 1 : } else {
163 1 : opts.Cache.Ref()
164 1 : }
165 :
166 1 : d := &DB{
167 1 : cacheID: opts.Cache.NewID(),
168 1 : dirname: dirname,
169 1 : walDirname: walDirname,
170 1 : opts: opts,
171 1 : cmp: opts.Comparer.Compare,
172 1 : equal: opts.equal(),
173 1 : merge: opts.Merger.Merge,
174 1 : split: opts.Comparer.Split,
175 1 : abbreviatedKey: opts.Comparer.AbbreviatedKey,
176 1 : largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
177 1 : fileLock: fileLock,
178 1 : dataDir: dataDir,
179 1 : walDir: walDir,
180 1 : logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
181 1 : closed: new(atomic.Value),
182 1 : closedCh: make(chan struct{}),
183 1 : }
184 1 : d.mu.versions = &versionSet{}
185 1 : d.diskAvailBytes.Store(math.MaxUint64)
186 1 :
187 1 : defer func() {
188 1 : // If an error or panic occurs during open, attempt to release the manually
189 1 : // allocated memory resources. Note that rather than look for an error, we
190 1 : // look for the return of a nil DB pointer.
191 1 : if r := recover(); db == nil {
192 1 : // Release our references to the Cache. Note that both the DB, and
193 1 : // tableCache have a reference. When we release the reference to
194 1 : // the tableCache, and if there are no other references to
195 1 : // the tableCache, then the tableCache will also release its
196 1 : // reference to the cache.
197 1 : opts.Cache.Unref()
198 1 :
199 1 : if d.tableCache != nil {
200 1 : _ = d.tableCache.close()
201 1 : }
202 :
203 1 : for _, mem := range d.mu.mem.queue {
204 1 : switch t := mem.flushable.(type) {
205 1 : case *memTable:
206 1 : manual.Free(t.arenaBuf)
207 1 : t.arenaBuf = nil
208 : }
209 : }
210 1 : if d.cleanupManager != nil {
211 1 : d.cleanupManager.Close()
212 1 : }
213 1 : if d.objProvider != nil {
214 1 : d.objProvider.Close()
215 1 : }
216 1 : if r != nil {
217 1 : panic(r)
218 : }
219 : }
220 : }()
221 :
222 1 : d.commit = newCommitPipeline(commitEnv{
223 1 : logSeqNum: &d.mu.versions.logSeqNum,
224 1 : visibleSeqNum: &d.mu.versions.visibleSeqNum,
225 1 : apply: d.commitApply,
226 1 : write: d.commitWrite,
227 1 : })
228 1 : d.mu.nextJobID = 1
229 1 : d.mu.mem.nextSize = opts.MemTableSize
230 1 : if d.mu.mem.nextSize > initialMemTableSize {
231 1 : d.mu.mem.nextSize = initialMemTableSize
232 1 : }
233 1 : d.mu.compact.cond.L = &d.mu.Mutex
234 1 : d.mu.compact.inProgress = make(map[*compaction]struct{})
235 1 : d.mu.compact.noOngoingFlushStartTime = time.Now()
236 1 : d.mu.snapshots.init()
237 1 : // logSeqNum is the next sequence number that will be assigned.
238 1 : // Start assigning sequence numbers from base.SeqNumStart to leave
239 1 : // room for reserved sequence numbers (see comments around
240 1 : // SeqNumStart).
241 1 : d.mu.versions.logSeqNum.Store(base.SeqNumStart)
242 1 : d.mu.formatVers.vers.Store(uint64(formatVersion))
243 1 : d.mu.formatVers.marker = formatVersionMarker
244 1 :
245 1 : d.timeNow = time.Now
246 1 : d.openedAt = d.timeNow()
247 1 :
248 1 : d.mu.Lock()
249 1 : defer d.mu.Unlock()
250 1 :
251 1 : jobID := d.mu.nextJobID
252 1 : d.mu.nextJobID++
253 1 :
254 1 : setCurrent := setCurrentFunc(d.FormatMajorVersion(), manifestMarker, opts.FS, dirname, d.dataDir)
255 1 :
256 1 : if !manifestExists {
257 1 : // DB does not exist.
258 1 : if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
259 1 : return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
260 1 : }
261 :
262 : // Create the DB.
263 1 : if err := d.mu.versions.create(jobID, dirname, opts, manifestMarker, setCurrent, &d.mu.Mutex); err != nil {
264 1 : return nil, err
265 1 : }
266 1 : } else {
267 1 : if opts.ErrorIfExists {
268 1 : return nil, errors.Wrapf(ErrDBAlreadyExists, "dirname=%q", dirname)
269 1 : }
270 : // Load the version set.
271 1 : if err := d.mu.versions.load(dirname, opts, manifestFileNum.FileNum(), manifestMarker, setCurrent, &d.mu.Mutex); err != nil {
272 1 : return nil, err
273 1 : }
274 1 : if opts.ErrorIfNotPristine {
275 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
276 1 : d.mu.versions.addLiveFileNums(liveFileNums)
277 1 : if len(liveFileNums) != 0 {
278 1 : return nil, errors.Wrapf(ErrDBNotPristine, "dirname=%q", dirname)
279 1 : }
280 : }
281 : }
282 :
283 : // In read-only mode, we replay directly into the mutable memtable but never
284 : // flush it. We need to delay creation of the memtable until we know the
285 : // sequence number of the first batch that will be inserted.
286 1 : if !d.opts.ReadOnly {
287 1 : var entry *flushableEntry
288 1 : d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum.Load())
289 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
290 1 : }
291 :
292 : // List the objects
293 1 : ls, err := opts.FS.List(d.walDirname)
294 1 : if err != nil {
295 1 : return nil, err
296 1 : }
297 1 : if d.dirname != d.walDirname {
298 1 : ls2, err := opts.FS.List(d.dirname)
299 1 : if err != nil {
300 0 : return nil, err
301 0 : }
302 1 : ls = append(ls, ls2...)
303 : }
304 1 : providerSettings := objstorageprovider.Settings{
305 1 : Logger: opts.Logger,
306 1 : FS: opts.FS,
307 1 : FSDirName: dirname,
308 1 : FSDirInitialListing: ls,
309 1 : FSCleaner: opts.Cleaner,
310 1 : NoSyncOnClose: opts.NoSyncOnClose,
311 1 : BytesPerSync: opts.BytesPerSync,
312 1 : }
313 1 : providerSettings.Remote.StorageFactory = opts.Experimental.RemoteStorage
314 1 : providerSettings.Remote.CreateOnShared = opts.Experimental.CreateOnShared
315 1 : providerSettings.Remote.CreateOnSharedLocator = opts.Experimental.CreateOnSharedLocator
316 1 : providerSettings.Remote.CacheSizeBytes = opts.Experimental.SecondaryCacheSizeBytes
317 1 :
318 1 : d.objProvider, err = objstorageprovider.Open(providerSettings)
319 1 : if err != nil {
320 1 : return nil, err
321 1 : }
322 :
323 1 : d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo)
324 1 :
325 1 : if manifestExists {
326 1 : curVersion := d.mu.versions.currentVersion()
327 1 : if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil {
328 0 : return nil, err
329 0 : }
330 : }
331 :
332 1 : tableCacheSize := TableCacheSize(opts.MaxOpenFiles)
333 1 : d.tableCache = newTableCacheContainer(opts.TableCache, d.cacheID, d.objProvider, d.opts, tableCacheSize)
334 1 : d.newIters = d.tableCache.newIters
335 1 : d.tableNewRangeKeyIter = d.tableCache.newRangeKeyIter
336 1 :
337 1 : // Replay any newer log files than the ones named in the manifest.
338 1 : type fileNumAndName struct {
339 1 : num FileNum
340 1 : name string
341 1 : }
342 1 : var logFiles []fileNumAndName
343 1 : var previousOptionsFileNum FileNum
344 1 : var previousOptionsFilename string
345 1 : for _, filename := range ls {
346 1 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
347 1 : if !ok {
348 1 : continue
349 : }
350 :
351 : // Don't reuse any obsolete file numbers to avoid modifying an
352 : // ingested sstable's original external file.
353 1 : if d.mu.versions.nextFileNum <= fn.FileNum() {
354 1 : d.mu.versions.nextFileNum = fn.FileNum() + 1
355 1 : }
356 :
357 1 : switch ft {
358 1 : case fileTypeLog:
359 1 : if fn.FileNum() >= d.mu.versions.minUnflushedLogNum {
360 1 : logFiles = append(logFiles, fileNumAndName{fn.FileNum(), filename})
361 1 : }
362 1 : if d.logRecycler.minRecycleLogNum <= fn.FileNum() {
363 1 : d.logRecycler.minRecycleLogNum = fn.FileNum() + 1
364 1 : }
365 1 : case fileTypeOptions:
366 1 : if previousOptionsFileNum < fn.FileNum() {
367 1 : previousOptionsFileNum = fn.FileNum()
368 1 : previousOptionsFilename = filename
369 1 : }
370 1 : case fileTypeTemp, fileTypeOldTemp:
371 1 : if !d.opts.ReadOnly {
372 1 : // Some codepaths write to a temporary file and then
373 1 : // rename it to its final location when complete. A
374 1 : // temp file is leftover if a process exits before the
375 1 : // rename. Remove it.
376 1 : err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
377 1 : if err != nil {
378 0 : return nil, err
379 0 : }
380 : }
381 : }
382 : }
383 :
384 : // Ratchet d.mu.versions.nextFileNum ahead of all known objects in the
385 : // objProvider. This avoids FileNum collisions with obsolete sstables.
386 1 : objects := d.objProvider.List()
387 1 : for _, obj := range objects {
388 1 : if d.mu.versions.nextFileNum <= obj.DiskFileNum.FileNum() {
389 1 : d.mu.versions.nextFileNum = obj.DiskFileNum.FileNum() + 1
390 1 : }
391 : }
392 :
393 : // Validate the most-recent OPTIONS file, if there is one.
394 1 : var strictWALTail bool
395 1 : if previousOptionsFilename != "" {
396 1 : path := opts.FS.PathJoin(dirname, previousOptionsFilename)
397 1 : strictWALTail, err = checkOptions(opts, path)
398 1 : if err != nil {
399 1 : return nil, err
400 1 : }
401 : }
402 :
403 1 : sort.Slice(logFiles, func(i, j int) bool {
404 1 : return logFiles[i].num < logFiles[j].num
405 1 : })
406 :
407 1 : var ve versionEdit
408 1 : var toFlush flushableList
409 1 : for i, lf := range logFiles {
410 1 : lastWAL := i == len(logFiles)-1
411 1 : flush, maxSeqNum, err := d.replayWAL(jobID, &ve, opts.FS,
412 1 : opts.FS.PathJoin(d.walDirname, lf.name), lf.num, strictWALTail && !lastWAL)
413 1 : if err != nil {
414 1 : return nil, err
415 1 : }
416 1 : toFlush = append(toFlush, flush...)
417 1 : d.mu.versions.markFileNumUsed(lf.num)
418 1 : if d.mu.versions.logSeqNum.Load() < maxSeqNum {
419 1 : d.mu.versions.logSeqNum.Store(maxSeqNum)
420 1 : }
421 : }
422 1 : d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load())
423 1 :
424 1 : if !d.opts.ReadOnly {
425 1 : // Create an empty .log file.
426 1 : newLogNum := d.mu.versions.getNextFileNum()
427 1 :
428 1 : // This logic is slightly different than RocksDB's. Specifically, RocksDB
429 1 : // sets MinUnflushedLogNum to max-recovered-log-num + 1. We set it to the
430 1 : // newLogNum. There should be no difference in using either value.
431 1 : ve.MinUnflushedLogNum = newLogNum
432 1 :
433 1 : // Create the manifest with the updated MinUnflushedLogNum before
434 1 : // creating the new log file. If we created the log file first, a
435 1 : // crash before the manifest is synced could leave two WALs with
436 1 : // unclean tails.
437 1 : d.mu.versions.logLock()
438 1 : if err := d.mu.versions.logAndApply(jobID, &ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo {
439 1 : return nil
440 1 : }); err != nil {
441 1 : return nil, err
442 1 : }
443 :
444 1 : for _, entry := range toFlush {
445 1 : entry.readerUnrefLocked(true)
446 1 : }
447 :
448 1 : newLogName := base.MakeFilepath(opts.FS, d.walDirname, fileTypeLog, newLogNum.DiskFileNum())
449 1 : d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum.DiskFileNum(), fileSize: 0})
450 1 : logFile, err := opts.FS.Create(newLogName)
451 1 : if err != nil {
452 1 : return nil, err
453 1 : }
454 1 : if err := d.walDir.Sync(); err != nil {
455 1 : return nil, err
456 1 : }
457 1 : d.opts.EventListener.WALCreated(WALCreateInfo{
458 1 : JobID: jobID,
459 1 : Path: newLogName,
460 1 : FileNum: newLogNum,
461 1 : })
462 1 : // This isn't strictly necessary as we don't use the log number for
463 1 : // memtables being flushed, only for the next unflushed memtable.
464 1 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum = newLogNum
465 1 :
466 1 : logFile = vfs.NewSyncingFile(logFile, vfs.SyncingFileOptions{
467 1 : NoSyncOnClose: d.opts.NoSyncOnClose,
468 1 : BytesPerSync: d.opts.WALBytesPerSync,
469 1 : PreallocateSize: d.walPreallocateSize(),
470 1 : })
471 1 : d.mu.log.metrics.fsyncLatency = prometheus.NewHistogram(prometheus.HistogramOpts{
472 1 : Buckets: FsyncLatencyBuckets,
473 1 : })
474 1 :
475 1 : logWriterConfig := record.LogWriterConfig{
476 1 : WALMinSyncInterval: d.opts.WALMinSyncInterval,
477 1 : WALFsyncLatency: d.mu.log.metrics.fsyncLatency,
478 1 : QueueSemChan: d.commit.logSyncQSem,
479 1 : }
480 1 : d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
481 1 : d.mu.versions.metrics.WAL.Files++
482 : }
483 1 : d.updateReadStateLocked(d.opts.DebugCheck)
484 1 :
485 1 : // If the Options specify a format major version higher than the
486 1 : // loaded database's, upgrade it. If this is a new database, this
487 1 : // code path also performs an initial upgrade from the starting
488 1 : // implicit MostCompatible version.
489 1 : //
490 1 : // We ratchet the version this far into Open so that migrations have a read
491 1 : // state available.
492 1 : if !d.opts.ReadOnly && opts.FormatMajorVersion > d.FormatMajorVersion() {
493 1 : if err := d.ratchetFormatMajorVersionLocked(opts.FormatMajorVersion); err != nil {
494 0 : return nil, err
495 0 : }
496 : }
497 :
498 1 : if !d.opts.ReadOnly {
499 1 : // Write the current options to disk.
500 1 : d.optionsFileNum = d.mu.versions.getNextFileNum().DiskFileNum()
501 1 : tmpPath := base.MakeFilepath(opts.FS, dirname, fileTypeTemp, d.optionsFileNum)
502 1 : optionsPath := base.MakeFilepath(opts.FS, dirname, fileTypeOptions, d.optionsFileNum)
503 1 :
504 1 : // Write them to a temporary file first, in case we crash before
505 1 : // we're done. A corrupt options file prevents opening the
506 1 : // database.
507 1 : optionsFile, err := opts.FS.Create(tmpPath)
508 1 : if err != nil {
509 1 : return nil, err
510 1 : }
511 1 : serializedOpts := []byte(opts.String())
512 1 : if _, err := optionsFile.Write(serializedOpts); err != nil {
513 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
514 1 : }
515 1 : d.optionsFileSize = uint64(len(serializedOpts))
516 1 : if err := optionsFile.Sync(); err != nil {
517 1 : return nil, errors.CombineErrors(err, optionsFile.Close())
518 1 : }
519 1 : if err := optionsFile.Close(); err != nil {
520 0 : return nil, err
521 0 : }
522 : // Atomically rename to the OPTIONS-XXXXXX path. This rename is
523 : // guaranteed to be atomic because the destination path does not
524 : // exist.
525 1 : if err := opts.FS.Rename(tmpPath, optionsPath); err != nil {
526 1 : return nil, err
527 1 : }
528 1 : if err := d.dataDir.Sync(); err != nil {
529 1 : return nil, err
530 1 : }
531 : }
532 :
533 1 : if !d.opts.ReadOnly {
534 1 : d.scanObsoleteFiles(ls)
535 1 : d.deleteObsoleteFiles(jobID)
536 1 : } else {
537 1 : // All the log files are obsolete.
538 1 : d.mu.versions.metrics.WAL.Files = int64(len(logFiles))
539 1 : }
540 1 : d.mu.tableStats.cond.L = &d.mu.Mutex
541 1 : d.mu.tableValidation.cond.L = &d.mu.Mutex
542 1 : if !d.opts.ReadOnly {
543 1 : d.maybeCollectTableStatsLocked()
544 1 : }
545 1 : d.calculateDiskAvailableBytes()
546 1 :
547 1 : d.maybeScheduleFlush()
548 1 : d.maybeScheduleCompaction()
549 1 :
550 1 : // Note: this is a no-op if invariants are disabled or race is enabled.
551 1 : //
552 1 : // Setting a finalizer on *DB causes *DB to never be reclaimed and the
553 1 : // finalizer to never be run. The problem is due to this limitation of
554 1 : // finalizers mention in the SetFinalizer docs:
555 1 : //
556 1 : // If a cyclic structure includes a block with a finalizer, that cycle is
557 1 : // not guaranteed to be garbage collected and the finalizer is not
558 1 : // guaranteed to run, because there is no ordering that respects the
559 1 : // dependencies.
560 1 : //
561 1 : // DB has cycles with several of its internal structures: readState,
562 1 : // newIters, tableCache, versions, etc. Each of this individually cause a
563 1 : // cycle and prevent the finalizer from being run. But we can workaround this
564 1 : // finializer limitation by setting a finalizer on another object that is
565 1 : // tied to the lifetime of DB: the DB.closed atomic.Value.
566 1 : dPtr := fmt.Sprintf("%p", d)
567 1 : invariants.SetFinalizer(d.closed, func(obj interface{}) {
568 0 : v := obj.(*atomic.Value)
569 0 : if err := v.Load(); err == nil {
570 0 : fmt.Fprintf(os.Stderr, "%s: unreferenced DB not closed\n", dPtr)
571 0 : os.Exit(1)
572 0 : }
573 : })
574 :
575 1 : return d, nil
576 : }
577 :
578 : // prepareAndOpenDirs opens the directories for the store (and creates them if
579 : // necessary).
580 : //
581 : // Returns an error if ReadOnly is set and the directories don't exist.
582 : func prepareAndOpenDirs(
583 : dirname string, opts *Options,
584 1 : ) (walDirname string, dataDir vfs.File, walDir vfs.File, err error) {
585 1 : walDirname = opts.WALDir
586 1 : if opts.WALDir == "" {
587 1 : walDirname = dirname
588 1 : }
589 :
590 : // Create directories if needed.
591 1 : if !opts.ReadOnly {
592 1 : if err := opts.FS.MkdirAll(dirname, 0755); err != nil {
593 1 : return "", nil, nil, err
594 1 : }
595 1 : if walDirname != dirname {
596 1 : if err := opts.FS.MkdirAll(walDirname, 0755); err != nil {
597 0 : return "", nil, nil, err
598 0 : }
599 : }
600 : }
601 :
602 1 : dataDir, err = opts.FS.OpenDir(dirname)
603 1 : if err != nil {
604 1 : if opts.ReadOnly && oserror.IsNotExist(err) {
605 1 : return "", nil, nil, errors.Errorf("pebble: database %q does not exist", dirname)
606 1 : }
607 1 : return "", nil, nil, err
608 : }
609 :
610 1 : if walDirname == dirname {
611 1 : walDir = dataDir
612 1 : } else {
613 1 : walDir, err = opts.FS.OpenDir(walDirname)
614 1 : if err != nil {
615 1 : dataDir.Close()
616 1 : return "", nil, nil, err
617 1 : }
618 : }
619 1 : return walDirname, dataDir, walDir, nil
620 : }
621 :
622 : // GetVersion returns the engine version string from the latest options
623 : // file present in dir. Used to check what Pebble or RocksDB version was last
624 : // used to write to the database stored in this directory. An empty string is
625 : // returned if no valid OPTIONS file with a version key was found.
626 1 : func GetVersion(dir string, fs vfs.FS) (string, error) {
627 1 : ls, err := fs.List(dir)
628 1 : if err != nil {
629 0 : return "", err
630 0 : }
631 1 : var version string
632 1 : lastOptionsSeen := FileNum(0)
633 1 : for _, filename := range ls {
634 1 : ft, fn, ok := base.ParseFilename(fs, filename)
635 1 : if !ok {
636 1 : continue
637 : }
638 1 : switch ft {
639 1 : case fileTypeOptions:
640 1 : // If this file has a higher number than the last options file
641 1 : // processed, reset version. This is because rocksdb often
642 1 : // writes multiple options files without deleting previous ones.
643 1 : // Otherwise, skip parsing this options file.
644 1 : if fn.FileNum() > lastOptionsSeen {
645 1 : version = ""
646 1 : lastOptionsSeen = fn.FileNum()
647 1 : } else {
648 1 : continue
649 : }
650 1 : f, err := fs.Open(fs.PathJoin(dir, filename))
651 1 : if err != nil {
652 0 : return "", err
653 0 : }
654 1 : data, err := io.ReadAll(f)
655 1 : f.Close()
656 1 :
657 1 : if err != nil {
658 0 : return "", err
659 0 : }
660 1 : err = parseOptions(string(data), func(section, key, value string) error {
661 1 : switch {
662 1 : case section == "Version":
663 1 : switch key {
664 1 : case "pebble_version":
665 1 : version = value
666 1 : case "rocksdb_version":
667 1 : version = fmt.Sprintf("rocksdb v%s", value)
668 : }
669 : }
670 1 : return nil
671 : })
672 1 : if err != nil {
673 0 : return "", err
674 0 : }
675 : }
676 : }
677 1 : return version, nil
678 : }
679 :
680 : // replayWAL replays the edits in the specified log file. If the DB is in
681 : // read only mode, then the WALs are replayed into memtables and not flushed. If
682 : // the DB is not in read only mode, then the contents of the WAL are guaranteed
683 : // to be flushed.
684 : //
685 : // The toFlush return value is a list of flushables associated with the WAL
686 : // being replayed which will be flushed. Once the version edit has been applied
687 : // to the manifest, it is up to the caller of replayWAL to unreference the
688 : // toFlush flushables returned by replayWAL.
689 : //
690 : // d.mu must be held when calling this, but the mutex may be dropped and
691 : // re-acquired during the course of this method.
692 : func (d *DB) replayWAL(
693 : jobID int, ve *versionEdit, fs vfs.FS, filename string, logNum FileNum, strictWALTail bool,
694 1 : ) (toFlush flushableList, maxSeqNum uint64, err error) {
695 1 : file, err := fs.Open(filename)
696 1 : if err != nil {
697 0 : return nil, 0, err
698 0 : }
699 1 : defer file.Close()
700 1 : var (
701 1 : b Batch
702 1 : buf bytes.Buffer
703 1 : mem *memTable
704 1 : entry *flushableEntry
705 1 : rr = record.NewReader(file, logNum)
706 1 : offset int64 // byte offset in rr
707 1 : lastFlushOffset int64
708 1 : keysReplayed int64 // number of keys replayed
709 1 : batchesReplayed int64 // number of batches replayed
710 1 : )
711 1 :
712 1 : if d.opts.ReadOnly {
713 1 : // In read-only mode, we replay directly into the mutable memtable which will
714 1 : // never be flushed.
715 1 : mem = d.mu.mem.mutable
716 1 : if mem != nil {
717 1 : entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
718 1 : }
719 : }
720 :
721 : // Flushes the current memtable, if not nil.
722 1 : flushMem := func() {
723 1 : if mem == nil {
724 1 : return
725 1 : }
726 1 : var logSize uint64
727 1 : if offset >= lastFlushOffset {
728 1 : logSize = uint64(offset - lastFlushOffset)
729 1 : }
730 : // Else, this was the initial memtable in the read-only case which must have
731 : // been empty, but we need to flush it since we don't want to add to it later.
732 1 : lastFlushOffset = offset
733 1 : entry.logSize = logSize
734 1 : if !d.opts.ReadOnly {
735 1 : toFlush = append(toFlush, entry)
736 1 : }
737 1 : mem, entry = nil, nil
738 : }
739 : // Creates a new memtable if there is no current memtable.
740 1 : ensureMem := func(seqNum uint64) {
741 1 : if mem != nil {
742 1 : return
743 1 : }
744 1 : mem, entry = d.newMemTable(logNum, seqNum)
745 1 : if d.opts.ReadOnly {
746 1 : d.mu.mem.mutable = mem
747 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
748 1 : }
749 : }
750 :
751 : // updateVE is used to update ve with information about new files created
752 : // during the flush of any flushable not of type ingestedFlushable. For the
753 : // flushable of type ingestedFlushable we use custom handling below.
754 1 : updateVE := func() error {
755 1 : // TODO(bananabrick): See if we can use the actual base level here,
756 1 : // instead of using 1.
757 1 : c := newFlush(d.opts, d.mu.versions.currentVersion(),
758 1 : 1 /* base level */, toFlush, d.timeNow())
759 1 : newVE, _, _, err := d.runCompaction(jobID, c)
760 1 : if err != nil {
761 0 : return errors.Wrapf(err, "running compaction during WAL replay")
762 0 : }
763 1 : ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...)
764 1 : return nil
765 : }
766 :
767 1 : for {
768 1 : offset = rr.Offset()
769 1 : r, err := rr.Next()
770 1 : if err == nil {
771 1 : _, err = io.Copy(&buf, r)
772 1 : }
773 1 : if err != nil {
774 1 : // It is common to encounter a zeroed or invalid chunk due to WAL
775 1 : // preallocation and WAL recycling. We need to distinguish these
776 1 : // errors from EOF in order to recognize that the record was
777 1 : // truncated and to avoid replaying subsequent WALs, but want
778 1 : // to otherwise treat them like EOF.
779 1 : if err == io.EOF {
780 1 : break
781 1 : } else if record.IsInvalidRecord(err) && !strictWALTail {
782 1 : break
783 : }
784 1 : return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
785 : }
786 :
787 1 : if buf.Len() < batchHeaderLen {
788 0 : return nil, 0, base.CorruptionErrorf("pebble: corrupt log file %q (num %s)",
789 0 : filename, errors.Safe(logNum))
790 0 : }
791 :
792 1 : if d.opts.ErrorIfNotPristine {
793 1 : return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
794 1 : }
795 :
796 : // Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
797 : // which is used below.
798 1 : b = Batch{db: d}
799 1 : b.SetRepr(buf.Bytes())
800 1 : seqNum := b.SeqNum()
801 1 : maxSeqNum = seqNum + uint64(b.Count())
802 1 : keysReplayed += int64(b.Count())
803 1 : batchesReplayed++
804 1 : {
805 1 : br := b.Reader()
806 1 : if kind, encodedFileNum, _, _ := br.Next(); kind == InternalKeyKindIngestSST {
807 1 : fileNums := make([]base.DiskFileNum, 0, b.Count())
808 1 : addFileNum := func(encodedFileNum []byte) {
809 1 : fileNum, n := binary.Uvarint(encodedFileNum)
810 1 : if n <= 0 {
811 0 : panic("pebble: ingest sstable file num is invalid.")
812 : }
813 1 : fileNums = append(fileNums, base.FileNum(fileNum).DiskFileNum())
814 : }
815 1 : addFileNum(encodedFileNum)
816 1 :
817 1 : for i := 1; i < int(b.Count()); i++ {
818 1 : kind, encodedFileNum, _, ok := br.Next()
819 1 : if kind != InternalKeyKindIngestSST {
820 0 : panic("pebble: invalid batch key kind.")
821 : }
822 1 : if !ok {
823 0 : panic("pebble: invalid batch count.")
824 : }
825 1 : addFileNum(encodedFileNum)
826 : }
827 :
828 1 : if _, _, _, ok := br.Next(); ok {
829 0 : panic("pebble: invalid number of entries in batch.")
830 : }
831 :
832 1 : meta := make([]*fileMetadata, len(fileNums))
833 1 : for i, n := range fileNums {
834 1 : var readable objstorage.Readable
835 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, n)
836 1 : if err != nil {
837 0 : return nil, 0, errors.Wrap(err, "pebble: error when looking up ingested SSTs")
838 0 : }
839 1 : if objMeta.IsRemote() {
840 0 : readable, err = d.objProvider.OpenForReading(context.TODO(), fileTypeTable, n, objstorage.OpenOptions{MustExist: true})
841 0 : if err != nil {
842 0 : return nil, 0, errors.Wrap(err, "pebble: error when opening flushable ingest files")
843 0 : }
844 1 : } else {
845 1 : path := base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n)
846 1 : f, err := d.opts.FS.Open(path)
847 1 : if err != nil {
848 0 : return nil, 0, err
849 0 : }
850 :
851 1 : readable, err = sstable.NewSimpleReadable(f)
852 1 : if err != nil {
853 0 : return nil, 0, err
854 0 : }
855 : }
856 : // NB: ingestLoad1 will close readable.
857 1 : meta[i], err = ingestLoad1(d.opts, d.FormatMajorVersion(), readable, d.cacheID, n)
858 1 : if err != nil {
859 0 : return nil, 0, errors.Wrap(err, "pebble: error when loading flushable ingest files")
860 0 : }
861 : }
862 :
863 1 : if uint32(len(meta)) != b.Count() {
864 0 : panic("pebble: couldn't load all files in WAL entry.")
865 : }
866 :
867 1 : entry, err = d.newIngestedFlushableEntry(
868 1 : meta, seqNum, logNum,
869 1 : )
870 1 : if err != nil {
871 0 : return nil, 0, err
872 0 : }
873 :
874 1 : if d.opts.ReadOnly {
875 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
876 1 : // We added the IngestSST flushable to the queue. But there
877 1 : // must be at least one WAL entry waiting to be replayed. We
878 1 : // have to ensure this newer WAL entry isn't replayed into
879 1 : // the current value of d.mu.mem.mutable because the current
880 1 : // mutable memtable exists before this flushable entry in
881 1 : // the memtable queue. To ensure this, we just need to unset
882 1 : // d.mu.mem.mutable. When a newer WAL is replayed, we will
883 1 : // set d.mu.mem.mutable to a newer value.
884 1 : d.mu.mem.mutable = nil
885 1 : } else {
886 1 : toFlush = append(toFlush, entry)
887 1 : // During WAL replay, the lsm only has L0, hence, the
888 1 : // baseLevel is 1. For the sake of simplicity, we place the
889 1 : // ingested files in L0 here, instead of finding their
890 1 : // target levels. This is a simplification for the sake of
891 1 : // simpler code. It is expected that WAL replay should be
892 1 : // rare, and that flushables of type ingestedFlushable
893 1 : // should also be rare. So, placing the ingested files in L0
894 1 : // is alright.
895 1 : //
896 1 : // TODO(bananabrick): Maybe refactor this function to allow
897 1 : // us to easily place ingested files in levels as low as
898 1 : // possible during WAL replay. It would require breaking up
899 1 : // the application of ve to the manifest into chunks and is
900 1 : // not pretty w/o a refactor to this function and how it's
901 1 : // used.
902 1 : c := newFlush(
903 1 : d.opts, d.mu.versions.currentVersion(),
904 1 : 1, /* base level */
905 1 : []*flushableEntry{entry},
906 1 : d.timeNow(),
907 1 : )
908 1 : for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
909 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
910 1 : }
911 : }
912 1 : return toFlush, maxSeqNum, nil
913 : }
914 : }
915 :
916 1 : if b.memTableSize >= uint64(d.largeBatchThreshold) {
917 1 : flushMem()
918 1 : // Make a copy of the data slice since it is currently owned by buf and will
919 1 : // be reused in the next iteration.
920 1 : b.data = append([]byte(nil), b.data...)
921 1 : b.flushable = newFlushableBatch(&b, d.opts.Comparer)
922 1 : entry := d.newFlushableEntry(b.flushable, logNum, b.SeqNum())
923 1 : // Disable memory accounting by adding a reader ref that will never be
924 1 : // removed.
925 1 : entry.readerRefs.Add(1)
926 1 : if d.opts.ReadOnly {
927 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
928 1 : // We added the flushable batch to the flushable to the queue.
929 1 : // But there must be at least one WAL entry waiting to be
930 1 : // replayed. We have to ensure this newer WAL entry isn't
931 1 : // replayed into the current value of d.mu.mem.mutable because
932 1 : // the current mutable memtable exists before this flushable
933 1 : // entry in the memtable queue. To ensure this, we just need to
934 1 : // unset d.mu.mem.mutable. When a newer WAL is replayed, we will
935 1 : // set d.mu.mem.mutable to a newer value.
936 1 : d.mu.mem.mutable = nil
937 1 : } else {
938 1 : toFlush = append(toFlush, entry)
939 1 : }
940 1 : } else {
941 1 : ensureMem(seqNum)
942 1 : if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
943 0 : return nil, 0, err
944 0 : }
945 : // We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
946 : // batch may not initially fit, but will eventually fit (since it is smaller than
947 : // largeBatchThreshold).
948 1 : for err == arenaskl.ErrArenaFull {
949 1 : flushMem()
950 1 : ensureMem(seqNum)
951 1 : err = mem.prepare(&b)
952 1 : if err != nil && err != arenaskl.ErrArenaFull {
953 0 : return nil, 0, err
954 0 : }
955 : }
956 1 : if err = mem.apply(&b, seqNum); err != nil {
957 0 : return nil, 0, err
958 0 : }
959 1 : mem.writerUnref()
960 : }
961 1 : buf.Reset()
962 : }
963 :
964 1 : d.opts.Logger.Infof("[JOB %d] WAL file %s with log number %s stopped reading at offset: %d; replayed %d keys in %d batches", jobID, filename, logNum.String(), offset, keysReplayed, batchesReplayed)
965 1 : flushMem()
966 1 :
967 1 : // mem is nil here.
968 1 : if !d.opts.ReadOnly {
969 1 : err = updateVE()
970 1 : if err != nil {
971 0 : return nil, 0, err
972 0 : }
973 : }
974 1 : return toFlush, maxSeqNum, err
975 : }
976 :
977 1 : func checkOptions(opts *Options, path string) (strictWALTail bool, err error) {
978 1 : f, err := opts.FS.Open(path)
979 1 : if err != nil {
980 0 : return false, err
981 0 : }
982 1 : defer f.Close()
983 1 :
984 1 : data, err := io.ReadAll(f)
985 1 : if err != nil {
986 0 : return false, err
987 0 : }
988 1 : return opts.checkOptions(string(data))
989 : }
990 :
991 : // DBDesc briefly describes high-level state about a database.
992 : type DBDesc struct {
993 : // Exists is true if an existing database was found.
994 : Exists bool
995 : // FormatMajorVersion indicates the database's current format
996 : // version.
997 : FormatMajorVersion FormatMajorVersion
998 : // ManifestFilename is the filename of the current active manifest,
999 : // if the database exists.
1000 : ManifestFilename string
1001 : }
1002 :
1003 : // Peek looks for an existing database in dirname on the provided FS. It
1004 : // returns a brief description of the database. Peek is read-only and
1005 : // does not open the database
1006 1 : func Peek(dirname string, fs vfs.FS) (*DBDesc, error) {
1007 1 : vers, versMarker, err := lookupFormatMajorVersion(fs, dirname)
1008 1 : if err != nil {
1009 1 : return nil, err
1010 1 : }
1011 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1012 : // PeekMarker variant that avoids opening the directory.
1013 1 : if err := versMarker.Close(); err != nil {
1014 0 : return nil, err
1015 0 : }
1016 :
1017 : // Find the currently active manifest, if there is one.
1018 1 : manifestMarker, manifestFileNum, exists, err := findCurrentManifest(vers, fs, dirname)
1019 1 : if err != nil {
1020 0 : return nil, err
1021 0 : }
1022 : // TODO(jackson): Immediately closing the marker is clunky. Add a
1023 : // PeekMarker variant that avoids opening the directory.
1024 1 : if err := manifestMarker.Close(); err != nil {
1025 0 : return nil, err
1026 0 : }
1027 :
1028 1 : desc := &DBDesc{
1029 1 : Exists: exists,
1030 1 : FormatMajorVersion: vers,
1031 1 : }
1032 1 : if exists {
1033 1 : desc.ManifestFilename = base.MakeFilepath(fs, dirname, fileTypeManifest, manifestFileNum)
1034 1 : }
1035 1 : return desc, nil
1036 : }
1037 :
1038 : // LockDirectory acquires the database directory lock in the named directory,
1039 : // preventing another process from opening the database. LockDirectory returns a
1040 : // handle to the held lock that may be passed to Open through Options.Lock to
1041 : // subsequently open the database, skipping lock acquistion during Open.
1042 : //
1043 : // LockDirectory may be used to expand the critical section protected by the
1044 : // database lock to include setup before the call to Open.
1045 1 : func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) {
1046 1 : fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, base.FileNum(0).DiskFileNum()))
1047 1 : if err != nil {
1048 1 : return nil, err
1049 1 : }
1050 1 : l := &Lock{dirname: dirname, fileLock: fileLock}
1051 1 : l.refs.Store(1)
1052 1 : invariants.SetFinalizer(l, func(obj interface{}) {
1053 1 : if refs := obj.(*Lock).refs.Load(); refs > 0 {
1054 0 : panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs))
1055 : }
1056 : })
1057 1 : return l, nil
1058 : }
1059 :
1060 : // Lock represents a file lock on a directory. It may be passed to Open through
1061 : // Options.Lock to elide lock aquisition during Open.
1062 : type Lock struct {
1063 : dirname string
1064 : fileLock io.Closer
1065 : // refs is a count of the number of handles on the lock. refs must be 0, 1
1066 : // or 2.
1067 : //
1068 : // When acquired by the client and passed to Open, refs = 1 and the Open
1069 : // call increments it to 2. When the database is closed, it's decremented to
1070 : // 1. Finally when the original caller, calls Close on the Lock, it's
1071 : // drecemented to zero and the underlying file lock is released.
1072 : //
1073 : // When Open acquires the file lock, refs remains at 1 until the database is
1074 : // closed.
1075 : refs atomic.Int32
1076 : }
1077 :
1078 1 : func (l *Lock) refForOpen() error {
1079 1 : // During Open, when a user passed in a lock, the reference count must be
1080 1 : // exactly 1. If it's zero, the lock is no longer held and is invalid. If
1081 1 : // it's 2, the lock is already in use by another database within the
1082 1 : // process.
1083 1 : if !l.refs.CompareAndSwap(1, 2) {
1084 1 : return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?")
1085 1 : }
1086 1 : return nil
1087 : }
1088 :
1089 : // Close releases the lock, permitting another process to lock and open the
1090 : // database. Close must not be called until after a database using the Lock has
1091 : // been closed.
1092 1 : func (l *Lock) Close() error {
1093 1 : if l.refs.Add(-1) > 0 {
1094 1 : return nil
1095 1 : }
1096 1 : defer func() { l.fileLock = nil }()
1097 1 : return l.fileLock.Close()
1098 : }
1099 :
1100 : // ErrDBDoesNotExist is generated when ErrorIfNotExists is set and the database
1101 : // does not exist.
1102 : //
1103 : // Note that errors can be wrapped with more details; use errors.Is().
1104 : var ErrDBDoesNotExist = errors.New("pebble: database does not exist")
1105 :
1106 : // ErrDBAlreadyExists is generated when ErrorIfExists is set and the database
1107 : // already exists.
1108 : //
1109 : // Note that errors can be wrapped with more details; use errors.Is().
1110 : var ErrDBAlreadyExists = errors.New("pebble: database already exists")
1111 :
1112 : // ErrDBNotPristine is generated when ErrorIfNotPristine is set and the database
1113 : // already exists and is not pristine.
1114 : //
1115 : // Note that errors can be wrapped with more details; use errors.Is().
1116 : var ErrDBNotPristine = errors.New("pebble: database already exists and is not pristine")
1117 :
1118 : // IsCorruptionError returns true if the given error indicates database
1119 : // corruption.
1120 0 : func IsCorruptionError(err error) bool {
1121 0 : return errors.Is(err, base.ErrCorruption)
1122 0 : }
1123 :
1124 1 : func checkConsistency(v *manifest.Version, dirname string, objProvider objstorage.Provider) error {
1125 1 : var buf bytes.Buffer
1126 1 : var args []interface{}
1127 1 :
1128 1 : dedup := make(map[base.DiskFileNum]struct{})
1129 1 : for level, files := range v.Levels {
1130 1 : iter := files.Iter()
1131 1 : for f := iter.First(); f != nil; f = iter.Next() {
1132 1 : backingState := f.FileBacking
1133 1 : if _, ok := dedup[backingState.DiskFileNum]; ok {
1134 1 : continue
1135 : }
1136 1 : dedup[backingState.DiskFileNum] = struct{}{}
1137 1 : fileNum := backingState.DiskFileNum
1138 1 : fileSize := backingState.Size
1139 1 : // We allow foreign objects to have a mismatch between sizes. This is
1140 1 : // because we might skew the backing size stored by our objprovider
1141 1 : // to prevent us from over-prioritizing this file for compaction.
1142 1 : meta, err := objProvider.Lookup(base.FileTypeTable, fileNum)
1143 1 : var size int64
1144 1 : if err == nil {
1145 1 : if objProvider.IsSharedForeign(meta) {
1146 0 : continue
1147 : }
1148 1 : size, err = objProvider.Size(meta)
1149 : }
1150 1 : if err != nil {
1151 1 : buf.WriteString("L%d: %s: %v\n")
1152 1 : args = append(args, errors.Safe(level), errors.Safe(fileNum), err)
1153 1 : continue
1154 : }
1155 :
1156 1 : if size != int64(fileSize) {
1157 0 : buf.WriteString("L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)\n")
1158 0 : args = append(args, errors.Safe(level), errors.Safe(fileNum), objProvider.Path(meta),
1159 0 : errors.Safe(size), errors.Safe(fileSize))
1160 0 : continue
1161 : }
1162 : }
1163 : }
1164 :
1165 1 : if buf.Len() == 0 {
1166 1 : return nil
1167 1 : }
1168 1 : return errors.Errorf(buf.String(), args...)
1169 : }
|