LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-10-15 08:18Z f213f584 - tests only.lcov Lines: 93.6 % 488 457
Test Date: 2025-10-15 08:19:53 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "runtime/pprof"
      11              :         "slices"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/crlib/crtime"
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/errors/oserror"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/invariants"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              :         "github.com/cockroachdb/pebble/wal"
      23              :         "github.com/cockroachdb/tokenbucket"
      24              : )
      25              : 
      26              : // Cleaner exports the base.Cleaner type.
      27              : type Cleaner = base.Cleaner
      28              : 
      29              : // DeleteCleaner exports the base.DeleteCleaner type.
      30              : type DeleteCleaner = base.DeleteCleaner
      31              : 
      32              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      33              : type ArchiveCleaner = base.ArchiveCleaner
      34              : 
      35              : type cleanupManager struct {
      36              :         opts        *Options
      37              :         objProvider objstorage.Provider
      38              :         deletePacer *deletionPacer
      39              : 
      40              :         // jobsCh is used as the cleanup job queue.
      41              :         jobsCh chan *cleanupJob
      42              :         // waitGroup is used to wait for the background goroutine to exit.
      43              :         waitGroup sync.WaitGroup
      44              :         // stopCh is closed when the pacer is disabled after closing the cleanup manager.
      45              :         stopCh chan struct{}
      46              : 
      47              :         mu struct {
      48              :                 sync.Mutex
      49              :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      50              :                 totalJobs              int
      51              :                 completedStats         obsoleteObjectStats
      52              :                 completedJobs          int
      53              :                 completedJobsCond      sync.Cond
      54              :                 jobsQueueWarningIssued bool
      55              :         }
      56              : }
      57              : 
      58              : // CompletedStats returns the stats summarizing objects deleted. The returned
      59              : // stats increase monotonically over the lifetime of the DB.
      60            1 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
      61            1 :         m.mu.Lock()
      62            1 :         defer m.mu.Unlock()
      63            1 :         return m.mu.completedStats
      64            1 : }
      65              : 
      66              : // We can queue this many jobs before we have to block EnqueueJob.
      67              : const jobsQueueDepth = 1000
      68              : 
      69              : // obsoleteFile holds information about a file that needs to be deleted soon.
      70              : type obsoleteFile struct {
      71              :         fileType base.FileType
      72              :         fs       vfs.FS
      73              :         path     string
      74              :         fileNum  base.DiskFileNum
      75              :         fileSize uint64 // approx for log files
      76              :         isLocal  bool
      77              : }
      78              : 
      79            1 : func (of *obsoleteFile) needsPacing() bool {
      80            1 :         // We only need to pace local objects--sstables and blob files.
      81            1 :         return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
      82            1 : }
      83              : 
      84              : type cleanupJob struct {
      85              :         jobID         JobID
      86              :         obsoleteFiles []obsoleteFile
      87              :         stats         obsoleteObjectStats
      88              : }
      89              : 
      90              : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      91              : // The cleanupManager must be Close()d.
      92              : func openCleanupManager(
      93              :         opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
      94            1 : ) *cleanupManager {
      95            1 :         cm := &cleanupManager{
      96            1 :                 opts:        opts,
      97            1 :                 objProvider: objProvider,
      98            1 :                 deletePacer: newDeletionPacer(
      99            1 :                         crtime.NowMono(),
     100            1 :                         opts.FreeSpaceThresholdBytes,
     101            1 :                         opts.TargetByteDeletionRate,
     102            1 :                         opts.FreeSpaceTimeframe,
     103            1 :                         opts.ObsoleteBytesMaxRatio,
     104            1 :                         opts.ObsoleteBytesTimeframe,
     105            1 :                         getDeletePacerInfo,
     106            1 :                 ),
     107            1 :                 jobsCh: make(chan *cleanupJob, jobsQueueDepth),
     108            1 :                 stopCh: make(chan struct{}),
     109            1 :         }
     110            1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
     111            1 :         cm.waitGroup.Add(1)
     112            1 : 
     113            1 :         go func() {
     114            1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
     115            1 :                         cm.mainLoop()
     116            1 :                 })
     117              :         }()
     118              : 
     119            1 :         return cm
     120              : }
     121              : 
     122              : // Close stops the background goroutine, waiting until all queued jobs are completed.
     123              : // Delete pacing is disabled for the remaining jobs.
     124            1 : func (cm *cleanupManager) Close() {
     125            1 :         close(cm.jobsCh)
     126            1 :         close(cm.stopCh)
     127            1 :         cm.waitGroup.Wait()
     128            1 : }
     129              : 
     130              : // EnqueueJob adds a cleanup job to the manager's queue.
     131              : func (cm *cleanupManager) EnqueueJob(
     132              :         jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
     133            1 : ) {
     134            1 :         job := &cleanupJob{
     135            1 :                 jobID:         jobID,
     136            1 :                 obsoleteFiles: obsoleteFiles,
     137            1 :                 stats:         stats,
     138            1 :         }
     139            1 : 
     140            1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     141            1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     142            1 :         // rather than when we get to the job, otherwise the reported bytes will be
     143            1 :         // subject to the throttling rate which defeats the purpose.
     144            1 :         var pacingBytes uint64
     145            1 :         for _, of := range obsoleteFiles {
     146            1 :                 if of.needsPacing() {
     147            1 :                         pacingBytes += of.fileSize
     148            1 :                 }
     149              :         }
     150            1 :         if pacingBytes > 0 {
     151            1 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     152            1 :         }
     153              : 
     154            1 :         cm.mu.Lock()
     155            1 :         cm.mu.totalJobs++
     156            1 :         cm.maybeLogLocked()
     157            1 :         cm.mu.Unlock()
     158            1 : 
     159            1 :         cm.jobsCh <- job
     160              : }
     161              : 
     162              : // Wait until the completion of all jobs that were already queued.
     163              : //
     164              : // Does not wait for jobs that are enqueued during the call.
     165              : //
     166              : // Note that DB.mu should not be held while calling this method; the background
     167              : // goroutine needs to acquire DB.mu to update deleted table metrics.
     168            1 : func (cm *cleanupManager) Wait() {
     169            1 :         cm.mu.Lock()
     170            1 :         defer cm.mu.Unlock()
     171            1 :         n := cm.mu.totalJobs
     172            1 :         for cm.mu.completedJobs < n {
     173            1 :                 cm.mu.completedJobsCond.Wait()
     174            1 :         }
     175              : }
     176              : 
     177              : // mainLoop runs the manager's background goroutine.
     178            1 : func (cm *cleanupManager) mainLoop() {
     179            1 :         defer cm.waitGroup.Done()
     180            1 : 
     181            1 :         paceTimer := time.NewTimer(time.Duration(0))
     182            1 :         defer paceTimer.Stop()
     183            1 : 
     184            1 :         var tb tokenbucket.TokenBucket
     185            1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     186            1 :         tb.Init(1.0, 1.0)
     187            1 :         for job := range cm.jobsCh {
     188            1 :                 cm.deleteObsoleteFilesInJob(job, &tb, paceTimer)
     189            1 :                 cm.mu.Lock()
     190            1 :                 cm.mu.completedJobs++
     191            1 :                 cm.mu.completedStats.Add(job.stats)
     192            1 :                 cm.mu.completedJobsCond.Broadcast()
     193            1 :                 cm.maybeLogLocked()
     194            1 :                 cm.mu.Unlock()
     195            1 :         }
     196              : }
     197              : 
     198              : // deleteObsoleteFilesInJob deletes all obsolete files in the given job. If tb
     199              : // and paceTimer are provided, files that need pacing will be throttled
     200              : // according to the deletion rate. If tb is nil, files are deleted immediately
     201              : // without pacing (used when the cleanup manager is being closed).
     202              : func (cm *cleanupManager) deleteObsoleteFilesInJob(
     203              :         job *cleanupJob, tb *tokenbucket.TokenBucket, paceTimer *time.Timer,
     204            1 : ) {
     205            1 :         for _, of := range job.obsoleteFiles {
     206            1 :                 switch of.fileType {
     207            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     208            1 :                         if tb != nil {
     209            1 :                                 cm.maybePace(tb, &of, paceTimer)
     210            1 :                         }
     211            1 :                         cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     212            1 :                 default:
     213            1 :                         cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
     214              :                 }
     215              :         }
     216              : }
     217              : 
     218              : // maybePace sleeps before deleting an object if appropriate. It is always
     219              : // called from the background goroutine.
     220              : func (cm *cleanupManager) maybePace(
     221              :         tb *tokenbucket.TokenBucket, of *obsoleteFile, paceTimer *time.Timer,
     222            1 : ) {
     223            1 :         if !of.needsPacing() {
     224            1 :                 return
     225            1 :         }
     226              : 
     227            1 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
     228            1 :         if tokens == 0.0 {
     229            1 :                 // The token bucket might be in debt; it could make us wait even for 0
     230            1 :                 // tokens. We don't want that if the pacer decided throttling should be
     231            1 :                 // disabled.
     232            1 :                 return
     233            1 :         }
     234              :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     235              :         // the token bucket accumulates up to one second of unused tokens.
     236            1 :         for {
     237            1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     238            1 :                 if ok {
     239            1 :                         break
     240              :                 }
     241            1 :                 paceTimer.Reset(d)
     242            1 :                 select {
     243            0 :                 case <-paceTimer.C:
     244            1 :                 case <-cm.stopCh:
     245            1 :                         // The cleanup manager is being closed. Delete without pacing.
     246            1 :                         return
     247              :                 }
     248              :         }
     249              : }
     250              : 
     251              : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     252              : func (cm *cleanupManager) deleteObsoleteFile(
     253              :         fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
     254            1 : ) {
     255            1 :         // TODO(peter): need to handle this error, probably by re-adding the
     256            1 :         // file that couldn't be deleted to one of the obsolete slices map.
     257            1 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     258            1 :         if oserror.IsNotExist(err) {
     259            0 :                 return
     260            0 :         }
     261              : 
     262            1 :         switch fileType {
     263            1 :         case base.FileTypeLog:
     264            1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     265            1 :                         JobID:   int(jobID),
     266            1 :                         Path:    path,
     267            1 :                         FileNum: fileNum,
     268            1 :                         Err:     err,
     269            1 :                 })
     270            1 :         case base.FileTypeManifest:
     271            1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     272            1 :                         JobID:   int(jobID),
     273            1 :                         Path:    path,
     274            1 :                         FileNum: fileNum,
     275            1 :                         Err:     err,
     276            1 :                 })
     277            0 :         case base.FileTypeTable, base.FileTypeBlob:
     278            0 :                 panic("invalid deletion of object file")
     279              :         }
     280              : }
     281              : 
     282              : func (cm *cleanupManager) deleteObsoleteObject(
     283              :         fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
     284            1 : ) {
     285            1 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     286            0 :                 panic("not an object")
     287              :         }
     288              : 
     289            1 :         var path string
     290            1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     291            1 :         if err != nil {
     292            1 :                 path = "<nil>"
     293            1 :         } else {
     294            1 :                 path = cm.objProvider.Path(meta)
     295            1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     296            1 :         }
     297            1 :         if cm.objProvider.IsNotExistError(err) {
     298            1 :                 return
     299            1 :         }
     300              : 
     301            1 :         switch fileType {
     302            1 :         case base.FileTypeTable:
     303            1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     304            1 :                         JobID:   int(jobID),
     305            1 :                         Path:    path,
     306            1 :                         FileNum: fileNum,
     307            1 :                         Err:     err,
     308            1 :                 })
     309            1 :         case base.FileTypeBlob:
     310            1 :                 cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
     311            1 :                         JobID:   int(jobID),
     312            1 :                         Path:    path,
     313            1 :                         FileNum: fileNum,
     314            1 :                         Err:     err,
     315            1 :                 })
     316              :         }
     317              : }
     318              : 
     319              : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     320              : // when the job queue gets back to less than 10% full.
     321              : //
     322              : // Must be called with cm.mu locked.
     323            1 : func (cm *cleanupManager) maybeLogLocked() {
     324            1 :         const highThreshold = jobsQueueDepth * 3 / 4
     325            1 :         const lowThreshold = jobsQueueDepth / 10
     326            1 : 
     327            1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     328            1 : 
     329            1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     330            0 :                 cm.mu.jobsQueueWarningIssued = true
     331            0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     332            0 :         }
     333              : 
     334            1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     335            0 :                 cm.mu.jobsQueueWarningIssued = false
     336            0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     337            0 :         }
     338              : }
     339              : 
     340            1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     341            1 :         var pacerInfo deletionPacerInfo
     342            1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     343            1 :         // but in practice this was observed to take constant time, regardless of
     344            1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     345            1 :         // take 10 microseconds or less.
     346            1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     347            1 :         d.mu.Lock()
     348            1 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     349            1 :         total := d.mu.versions.metrics.Total()
     350            1 :         d.mu.Unlock()
     351            1 :         pacerInfo.liveBytes = uint64(total.AggregateSize())
     352            1 :         return pacerInfo
     353            1 : }
     354              : 
     355              : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     356              : // and adds those to the internal lists of obsolete files. Note that the files
     357              : // are not actually deleted by this method. A subsequent call to
     358              : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     359              : // with compactions and flushes. db.mu must be held when calling this function.
     360            1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     361            1 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     362            1 :         // flushes from interfering. The original value is restored on completion.
     363            1 :         disabledPrev := d.opts.DisableAutomaticCompactions
     364            1 :         defer func() {
     365            1 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     366            1 :         }()
     367            1 :         d.opts.DisableAutomaticCompactions = true
     368            1 : 
     369            1 :         // Wait for any ongoing compaction to complete before continuing.
     370            1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     371            1 :                 d.mu.compact.cond.Wait()
     372            1 :         }
     373              : 
     374            1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     375            1 :         d.mu.versions.addLiveFileNums(liveFileNums)
     376            1 :         // Protect against files which are only referred to by the ingestedFlushable
     377            1 :         // from being deleted. These are added to the flushable queue on WAL replay
     378            1 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     379            1 :         // file scan to avoid double-deleting these files.
     380            1 :         for _, f := range flushableIngests {
     381            1 :                 for _, file := range f.files {
     382            1 :                         liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
     383            1 :                 }
     384              :         }
     385              : 
     386            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     387            1 : 
     388            1 :         var obsoleteTables []obsoleteFile
     389            1 :         var obsoleteBlobs []obsoleteFile
     390            1 :         var obsoleteOptions []obsoleteFile
     391            1 :         var obsoleteManifests []obsoleteFile
     392            1 : 
     393            1 :         for _, filename := range list {
     394            1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     395            1 :                 if !ok {
     396            1 :                         continue
     397              :                 }
     398            1 :                 makeObsoleteFile := func() obsoleteFile {
     399            1 :                         of := obsoleteFile{
     400            1 :                                 fileType: fileType,
     401            1 :                                 fs:       d.opts.FS,
     402            1 :                                 path:     d.opts.FS.PathJoin(d.dirname, filename),
     403            1 :                                 fileNum:  diskFileNum,
     404            1 :                                 isLocal:  true,
     405            1 :                         }
     406            1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     407            1 :                                 of.fileSize = uint64(stat.Size())
     408            1 :                         }
     409            1 :                         return of
     410              :                 }
     411            1 :                 switch fileType {
     412            1 :                 case base.FileTypeManifest:
     413            1 :                         if diskFileNum >= manifestFileNum {
     414            1 :                                 continue
     415              :                         }
     416            1 :                         obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
     417            1 :                 case base.FileTypeOptions:
     418            1 :                         if diskFileNum >= d.optionsFileNum {
     419            1 :                                 continue
     420              :                         }
     421            1 :                         obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
     422            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     423              :                         // Objects are handled through the objstorage provider below.
     424            1 :                 default:
     425              :                         // Don't delete files we don't know about.
     426              :                 }
     427              :         }
     428              : 
     429            1 :         objects := d.objProvider.List()
     430            1 :         for _, obj := range objects {
     431            1 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     432            1 :                         continue
     433              :                 }
     434            1 :                 if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
     435            0 :                         // Ignore object types we don't know about.
     436            0 :                         continue
     437              :                 }
     438            1 :                 of := obsoleteFile{
     439            1 :                         fileType: obj.FileType,
     440            1 :                         fs:       d.opts.FS,
     441            1 :                         path:     base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
     442            1 :                         fileNum:  obj.DiskFileNum,
     443            1 :                         isLocal:  true,
     444            1 :                 }
     445            1 :                 if size, err := d.objProvider.Size(obj); err == nil {
     446            1 :                         of.fileSize = uint64(size)
     447            1 :                 }
     448            1 :                 if obj.FileType == base.FileTypeTable {
     449            1 :                         obsoleteTables = append(obsoleteTables, of)
     450            1 :                 } else {
     451            1 :                         obsoleteBlobs = append(obsoleteBlobs, of)
     452            1 :                 }
     453              :         }
     454              : 
     455            1 :         d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
     456            1 :         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     457            1 :         d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
     458            1 :         d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
     459            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     460              : }
     461              : 
     462              : // disableFileDeletions disables file deletions and then waits for any
     463              : // in-progress deletion to finish. The caller is required to call
     464              : // enableFileDeletions in order to enable file deletions again. It is ok for
     465              : // multiple callers to disable file deletions simultaneously, though they must
     466              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     467              : // (there is an internal reference count on file deletion disablement).
     468              : //
     469              : // d.mu must be held when calling this method.
     470            1 : func (d *DB) disableFileDeletions() {
     471            1 :         d.mu.fileDeletions.disableCount++
     472            1 :         d.mu.Unlock()
     473            1 :         defer d.mu.Lock()
     474            1 :         d.cleanupManager.Wait()
     475            1 : }
     476              : 
     477              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     478              : // is queued if necessary.
     479              : //
     480              : // d.mu must be held when calling this method.
     481            1 : func (d *DB) enableFileDeletions() {
     482            1 :         if d.mu.fileDeletions.disableCount <= 0 {
     483            1 :                 panic("pebble: file deletion disablement invariant violated")
     484              :         }
     485            1 :         d.mu.fileDeletions.disableCount--
     486            1 :         if d.mu.fileDeletions.disableCount > 0 {
     487            0 :                 return
     488            0 :         }
     489            1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     490              : }
     491              : 
     492              : type fileInfo = base.FileInfo
     493              : 
     494              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     495              : //
     496              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     497              : //
     498              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     499              : // cleanup job will be scheduled when file deletions are re-enabled.
     500            1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     501            1 :         if d.mu.fileDeletions.disableCount > 0 {
     502            1 :                 return
     503            1 :         }
     504            1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     505            1 : 
     506            1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     507            1 :         // log that has not had its contents flushed to an sstable.
     508            1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     509            1 :         if err != nil {
     510            0 :                 panic(err)
     511              :         }
     512              : 
     513            1 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     514            1 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     515            1 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     516            1 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     517            1 : 
     518            1 :         // Ensure everything is already sorted. We want determinism for testing, and
     519            1 :         // we need the manifests to be sorted because we want to delete some
     520            1 :         // contiguous prefix of the older manifests.
     521            1 :         if invariants.Enabled {
     522            1 :                 switch {
     523            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
     524            0 :                         d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
     525            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
     526            0 :                         d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
     527            0 :                 case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
     528            0 :                         d.opts.Logger.Fatalf("obsoleteTables is not sorted")
     529            0 :                 case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
     530            0 :                         d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
     531              :                 }
     532              :         }
     533              : 
     534            1 :         var obsoleteManifests []obsoleteFile
     535            1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     536            1 :         if manifestsToDelete > 0 {
     537            1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     538            1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     539            1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     540            0 :                         d.mu.versions.obsoleteManifests = nil
     541            0 :                 }
     542              :         }
     543              : 
     544            1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     545            1 :         d.mu.versions.obsoleteOptions = nil
     546            1 : 
     547            1 :         // Compute the stats for the files being queued for deletion and add them to
     548            1 :         // the running total. These stats will be used during DB.Metrics() to
     549            1 :         // calculate the count and size of pending obsolete files by diffing these
     550            1 :         // stats and the stats reported by the cleanup manager.
     551            1 :         var objectStats obsoleteObjectStats
     552            1 :         objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
     553            1 :         objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
     554            1 :         d.mu.fileDeletions.queuedStats.Add(objectStats)
     555            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     556            1 : 
     557            1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     558            1 :         // Note the unusual order: Unlock and then Lock.
     559            1 :         d.mu.Unlock()
     560            1 :         defer d.mu.Lock()
     561            1 : 
     562            1 :         n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
     563            1 :         filesToDelete := make([]obsoleteFile, 0, n)
     564            1 :         filesToDelete = append(filesToDelete, obsoleteManifests...)
     565            1 :         filesToDelete = append(filesToDelete, obsoleteOptions...)
     566            1 :         filesToDelete = append(filesToDelete, obsoleteTables...)
     567            1 :         filesToDelete = append(filesToDelete, obsoleteBlobs...)
     568            1 :         for _, f := range obsoleteLogs {
     569            1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     570            1 :                         fileType: base.FileTypeLog,
     571            1 :                         fs:       f.FS,
     572            1 :                         path:     f.Path,
     573            1 :                         fileNum:  base.DiskFileNum(f.NumWAL),
     574            1 :                         fileSize: f.ApproxFileSize,
     575            1 :                         isLocal:  true,
     576            1 :                 })
     577            1 :         }
     578            1 :         for _, f := range obsoleteTables {
     579            1 :                 d.fileCache.Evict(f.fileNum, base.FileTypeTable)
     580            1 :         }
     581            1 :         for _, f := range obsoleteBlobs {
     582            1 :                 d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
     583            1 :         }
     584            1 :         if len(filesToDelete) > 0 {
     585            1 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
     586            1 :         }
     587            1 :         if d.opts.private.testingAlwaysWaitForCleanup {
     588            1 :                 d.cleanupManager.Wait()
     589            1 :         }
     590              : }
     591              : 
     592            1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
     593            1 :         d.mu.Lock()
     594            1 :         defer d.mu.Unlock()
     595            1 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
     596            1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     597            1 :         }
     598              : }
     599              : 
     600            1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
     601            1 :         if len(b) == 0 {
     602            1 :                 return a
     603            1 :         }
     604              : 
     605            1 :         a = append(a, b...)
     606            1 :         slices.SortFunc(a, cmpObsoleteFileNumbers)
     607            1 :         return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
     608            1 :                 return a.fileNum == b.fileNum
     609            1 :         })
     610              : }
     611              : 
     612            1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
     613            1 :         return cmp.Compare(a.fileNum, b.fileNum)
     614            1 : }
     615              : 
     616              : // objectInfo describes an object in object storage (either a sstable or a blob
     617              : // file).
     618              : type objectInfo struct {
     619              :         fileInfo
     620              :         isLocal bool
     621              : }
     622              : 
     623            1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
     624            1 :         return obsoleteFile{
     625            1 :                 fileType: fileType,
     626            1 :                 fs:       fs,
     627            1 :                 path:     base.MakeFilepath(fs, dirname, fileType, o.FileNum),
     628            1 :                 fileNum:  o.FileNum,
     629            1 :                 fileSize: o.FileSize,
     630            1 :                 isLocal:  o.isLocal,
     631            1 :         }
     632            1 : }
     633              : 
     634            1 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
     635            1 :         for _, of := range files {
     636            1 :                 if of.isLocal {
     637            1 :                         local.count++
     638            1 :                         local.size += of.fileSize
     639            1 :                 }
     640            1 :                 total.count++
     641            1 :                 total.size += of.fileSize
     642              :         }
     643            1 :         return total, local
     644              : }
     645              : 
     646              : type obsoleteObjectStats struct {
     647              :         tablesLocal    countAndSize
     648              :         tablesAll      countAndSize
     649              :         blobFilesLocal countAndSize
     650              :         blobFilesAll   countAndSize
     651              : }
     652              : 
     653            1 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
     654            1 :         s.tablesLocal.Add(other.tablesLocal)
     655            1 :         s.tablesAll.Add(other.tablesAll)
     656            1 :         s.blobFilesLocal.Add(other.blobFilesLocal)
     657            1 :         s.blobFilesAll.Add(other.blobFilesAll)
     658            1 : }
     659              : 
     660            1 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
     661            1 :         s.tablesLocal.Sub(other.tablesLocal)
     662            1 :         s.tablesAll.Sub(other.tablesAll)
     663            1 :         s.blobFilesLocal.Sub(other.blobFilesLocal)
     664            1 :         s.blobFilesAll.Sub(other.blobFilesAll)
     665            1 : }
     666              : 
     667              : type countAndSize struct {
     668              :         count uint64
     669              :         size  uint64
     670              : }
     671              : 
     672            1 : func (c *countAndSize) Add(other countAndSize) {
     673            1 :         c.count += other.count
     674            1 :         c.size += other.size
     675            1 : }
     676              : 
     677            1 : func (c *countAndSize) Sub(other countAndSize) {
     678            1 :         c.count = invariants.SafeSub(c.count, other.count)
     679            1 :         c.size = invariants.SafeSub(c.size, other.size)
     680            1 : }
     681              : 
     682            1 : func makeZombieObjects() zombieObjects {
     683            1 :         return zombieObjects{
     684            1 :                 objs: make(map[base.DiskFileNum]objectInfo),
     685            1 :         }
     686            1 : }
     687              : 
     688              : // zombieObjects tracks a set of objects that are no longer required by the most
     689              : // recent version of the LSM, but may still need to be accessed by an open
     690              : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
     691              : // may access them are closed.
     692              : type zombieObjects struct {
     693              :         objs       map[base.DiskFileNum]objectInfo
     694              :         totalSize  uint64
     695              :         localSize  uint64
     696              :         localCount uint64
     697              : }
     698              : 
     699              : // Add adds an object to the set of zombie objects.
     700            1 : func (z *zombieObjects) Add(obj objectInfo) {
     701            1 :         if _, ok := z.objs[obj.FileNum]; ok {
     702            0 :                 panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
     703              :         }
     704            1 :         z.objs[obj.FileNum] = obj
     705            1 :         z.totalSize += obj.FileSize
     706            1 :         if obj.isLocal {
     707            1 :                 z.localSize += obj.FileSize
     708            1 :                 z.localCount++
     709            1 :         }
     710              : }
     711              : 
     712              : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
     713            1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
     714            1 :         z.Add(objectInfo{
     715            1 :                 fileInfo: fileInfo{
     716            1 :                         FileNum:  meta.DiskFileNum,
     717            1 :                         FileSize: size,
     718            1 :                 },
     719            1 :                 isLocal: !meta.IsRemote(),
     720            1 :         })
     721            1 : }
     722              : 
     723              : // Count returns the number of zombie objects.
     724            1 : func (z *zombieObjects) Count() int {
     725            1 :         return len(z.objs)
     726            1 : }
     727              : 
     728              : // Extract removes an object from the set of zombie objects, returning the
     729              : // object that was removed.
     730            1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
     731            1 :         obj, ok := z.objs[fileNum]
     732            1 :         if !ok {
     733            0 :                 panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
     734              :         }
     735            1 :         delete(z.objs, fileNum)
     736            1 : 
     737            1 :         // Detect underflow in case we have a bug that causes an object's size to be
     738            1 :         // mutated.
     739            1 :         if z.totalSize < obj.FileSize {
     740            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
     741              :         }
     742            1 :         if obj.isLocal && z.localSize < obj.FileSize {
     743            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
     744              :         }
     745              : 
     746            1 :         z.totalSize -= obj.FileSize
     747            1 :         if obj.isLocal {
     748            1 :                 z.localSize -= obj.FileSize
     749            1 :                 z.localCount--
     750            1 :         }
     751            1 :         return obj
     752              : }
     753              : 
     754              : // TotalSize returns the size of all objects in the set.
     755            1 : func (z *zombieObjects) TotalSize() uint64 {
     756            1 :         return z.totalSize
     757            1 : }
     758              : 
     759              : // LocalStats returns the count and size of all local objects in the set.
     760            1 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
     761            1 :         return z.localCount, z.localSize
     762            1 : }
        

Generated by: LCOV version 2.0-1