LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-10-21 08:19Z d10d78a2 - meta test only.lcov Lines: 86.5 % 482 417
Test Date: 2025-10-21 08:23:01 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "runtime/pprof"
      11              :         "slices"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/crlib/crtime"
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/errors/oserror"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/invariants"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              :         "github.com/cockroachdb/pebble/wal"
      23              :         "github.com/cockroachdb/tokenbucket"
      24              : )
      25              : 
      26              : // Cleaner exports the base.Cleaner type.
      27              : type Cleaner = base.Cleaner
      28              : 
      29              : // DeleteCleaner exports the base.DeleteCleaner type.
      30              : type DeleteCleaner = base.DeleteCleaner
      31              : 
      32              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      33              : type ArchiveCleaner = base.ArchiveCleaner
      34              : 
      35              : type cleanupManager struct {
      36              :         opts        *Options
      37              :         objProvider objstorage.Provider
      38              :         deletePacer *deletionPacer
      39              : 
      40              :         // jobsCh is used as the cleanup job queue.
      41              :         jobsCh chan *cleanupJob
      42              :         // waitGroup is used to wait for the background goroutine to exit.
      43              :         waitGroup sync.WaitGroup
      44              :         // stopCh is closed when the pacer is disabled after closing the cleanup manager.
      45              :         stopCh chan struct{}
      46              : 
      47              :         mu struct {
      48              :                 sync.Mutex
      49              :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      50              :                 totalJobs              int
      51              :                 completedStats         obsoleteObjectStats
      52              :                 completedJobs          int
      53              :                 completedJobsCond      sync.Cond
      54              :                 jobsQueueWarningIssued bool
      55              :         }
      56              : }
      57              : 
      58              : // CompletedStats returns the stats summarizing objects deleted. The returned
      59              : // stats increase monotonically over the lifetime of the DB.
      60            0 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
      61            0 :         m.mu.Lock()
      62            0 :         defer m.mu.Unlock()
      63            0 :         return m.mu.completedStats
      64            0 : }
      65              : 
      66              : // We can queue this many jobs before we have to block EnqueueJob.
      67              : const jobsQueueDepth = 1000
      68              : const jobsQueueHighThreshold = jobsQueueDepth * 3 / 4
      69              : const jobsQueueLowThreshold = jobsQueueDepth / 10
      70              : 
      71              : // obsoleteFile holds information about a file that needs to be deleted soon.
      72              : type obsoleteFile struct {
      73              :         fileType base.FileType
      74              :         fs       vfs.FS
      75              :         path     string
      76              :         fileNum  base.DiskFileNum
      77              :         fileSize uint64 // approx for log files
      78              :         isLocal  bool
      79              : }
      80              : 
      81            1 : func (of *obsoleteFile) needsPacing() bool {
      82            1 :         // We only need to pace local objects--sstables and blob files.
      83            1 :         return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
      84            1 : }
      85              : 
      86              : type cleanupJob struct {
      87              :         jobID         JobID
      88              :         obsoleteFiles []obsoleteFile
      89              :         stats         obsoleteObjectStats
      90              : }
      91              : 
      92              : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      93              : // The cleanupManager must be Close()d.
      94              : func openCleanupManager(
      95              :         opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
      96            1 : ) *cleanupManager {
      97            1 :         cm := &cleanupManager{
      98            1 :                 opts:        opts,
      99            1 :                 objProvider: objProvider,
     100            1 :                 deletePacer: newDeletionPacer(
     101            1 :                         crtime.NowMono(),
     102            1 :                         opts.FreeSpaceThresholdBytes,
     103            1 :                         opts.TargetByteDeletionRate,
     104            1 :                         opts.FreeSpaceTimeframe,
     105            1 :                         opts.ObsoleteBytesMaxRatio,
     106            1 :                         opts.ObsoleteBytesTimeframe,
     107            1 :                         getDeletePacerInfo,
     108            1 :                 ),
     109            1 :                 jobsCh: make(chan *cleanupJob, jobsQueueDepth),
     110            1 :                 stopCh: make(chan struct{}),
     111            1 :         }
     112            1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
     113            1 :         cm.waitGroup.Add(1)
     114            1 : 
     115            1 :         go func() {
     116            1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
     117            1 :                         cm.mainLoop()
     118            1 :                 })
     119              :         }()
     120              : 
     121            1 :         return cm
     122              : }
     123              : 
     124              : // Close stops the background goroutine, waiting until all queued jobs are completed.
     125              : // Delete pacing is disabled for the remaining jobs.
     126            1 : func (cm *cleanupManager) Close() {
     127            1 :         close(cm.jobsCh)
     128            1 :         close(cm.stopCh)
     129            1 :         cm.waitGroup.Wait()
     130            1 : }
     131              : 
     132              : // EnqueueJob adds a cleanup job to the manager's queue.
     133              : func (cm *cleanupManager) EnqueueJob(
     134              :         jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
     135            1 : ) {
     136            1 :         job := &cleanupJob{
     137            1 :                 jobID:         jobID,
     138            1 :                 obsoleteFiles: obsoleteFiles,
     139            1 :                 stats:         stats,
     140            1 :         }
     141            1 : 
     142            1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     143            1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     144            1 :         // rather than when we get to the job, otherwise the reported bytes will be
     145            1 :         // subject to the throttling rate which defeats the purpose.
     146            1 :         var pacingBytes uint64
     147            1 :         for _, of := range obsoleteFiles {
     148            1 :                 if of.needsPacing() {
     149            1 :                         pacingBytes += of.fileSize
     150            1 :                 }
     151              :         }
     152            1 :         if pacingBytes > 0 {
     153            1 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     154            1 :         }
     155              : 
     156            1 :         cm.mu.Lock()
     157            1 :         cm.mu.totalJobs++
     158            1 :         cm.maybeLogLocked()
     159            1 :         cm.mu.Unlock()
     160            1 : 
     161            1 :         cm.jobsCh <- job
     162              : }
     163              : 
     164              : // Wait until the completion of all jobs that were already queued.
     165              : //
     166              : // Does not wait for jobs that are enqueued during the call.
     167              : //
     168              : // Note that DB.mu should not be held while calling this method; the background
     169              : // goroutine needs to acquire DB.mu to update deleted table metrics.
     170            1 : func (cm *cleanupManager) Wait() {
     171            1 :         cm.mu.Lock()
     172            1 :         defer cm.mu.Unlock()
     173            1 :         n := cm.mu.totalJobs
     174            1 :         for cm.mu.completedJobs < n {
     175            1 :                 cm.mu.completedJobsCond.Wait()
     176            1 :         }
     177              : }
     178              : 
     179              : // mainLoop runs the manager's background goroutine.
     180            1 : func (cm *cleanupManager) mainLoop() {
     181            1 :         defer cm.waitGroup.Done()
     182            1 : 
     183            1 :         paceTimer := time.NewTimer(time.Duration(0))
     184            1 :         defer paceTimer.Stop()
     185            1 : 
     186            1 :         var tb tokenbucket.TokenBucket
     187            1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     188            1 :         tb.Init(1.0, 1.0)
     189            1 :         for job := range cm.jobsCh {
     190            1 :                 cm.deleteObsoleteFilesInJob(job, &tb, paceTimer)
     191            1 :                 cm.mu.Lock()
     192            1 :                 cm.mu.completedJobs++
     193            1 :                 cm.mu.completedStats.Add(job.stats)
     194            1 :                 cm.mu.completedJobsCond.Broadcast()
     195            1 :                 cm.maybeLogLocked()
     196            1 :                 cm.mu.Unlock()
     197            1 :         }
     198              : }
     199              : 
     200              : // deleteObsoleteFilesInJob deletes all obsolete files in the given job. If tb
     201              : // and paceTimer are provided, files that need pacing will be throttled
     202              : // according to the deletion rate. If tb is nil, files are deleted immediately
     203              : // without pacing (used when the cleanup manager is being closed).
     204              : func (cm *cleanupManager) deleteObsoleteFilesInJob(
     205              :         job *cleanupJob, tb *tokenbucket.TokenBucket, paceTimer *time.Timer,
     206            1 : ) {
     207            1 :         for _, of := range job.obsoleteFiles {
     208            1 :                 switch of.fileType {
     209            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     210            1 :                         if tb != nil {
     211            1 :                                 cm.maybePace(tb, &of, paceTimer)
     212            1 :                         }
     213            1 :                         cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     214            1 :                 default:
     215            1 :                         cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
     216              :                 }
     217              :         }
     218              : }
     219              : 
     220              : // maybePace sleeps before deleting an object if appropriate. It is always
     221              : // called from the background goroutine.
     222              : func (cm *cleanupManager) maybePace(
     223              :         tb *tokenbucket.TokenBucket, of *obsoleteFile, paceTimer *time.Timer,
     224            1 : ) {
     225            1 :         if !of.needsPacing() {
     226            1 :                 return
     227            1 :         }
     228            1 :         if len(cm.jobsCh) >= jobsQueueHighThreshold {
     229            0 :                 // If there are many jobs queued up, disable pacing. In this state, we
     230            0 :                 // execute deletion jobs at the same rate as new jobs get queued.
     231            0 :                 return
     232            0 :         }
     233            1 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
     234            1 :         if tokens == 0.0 {
     235            1 :                 // The token bucket might be in debt; it could make us wait even for 0
     236            1 :                 // tokens. We don't want that if the pacer decided throttling should be
     237            1 :                 // disabled.
     238            1 :                 return
     239            1 :         }
     240              :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     241              :         // the token bucket accumulates up to one second of unused tokens.
     242            1 :         for {
     243            1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     244            1 :                 if ok {
     245            1 :                         break
     246              :                 }
     247            1 :                 paceTimer.Reset(d)
     248            1 :                 select {
     249            1 :                 case <-paceTimer.C:
     250            1 :                 case <-cm.stopCh:
     251            1 :                         // The cleanup manager is being closed. Delete without pacing.
     252            1 :                         return
     253              :                 }
     254              :         }
     255              : }
     256              : 
     257              : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     258              : func (cm *cleanupManager) deleteObsoleteFile(
     259              :         fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
     260            1 : ) {
     261            1 :         // TODO(peter): need to handle this error, probably by re-adding the
     262            1 :         // file that couldn't be deleted to one of the obsolete slices map.
     263            1 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     264            1 :         if oserror.IsNotExist(err) {
     265            0 :                 return
     266            0 :         }
     267              : 
     268            1 :         switch fileType {
     269            1 :         case base.FileTypeLog:
     270            1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     271            1 :                         JobID:   int(jobID),
     272            1 :                         Path:    path,
     273            1 :                         FileNum: fileNum,
     274            1 :                         Err:     err,
     275            1 :                 })
     276            1 :         case base.FileTypeManifest:
     277            1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     278            1 :                         JobID:   int(jobID),
     279            1 :                         Path:    path,
     280            1 :                         FileNum: fileNum,
     281            1 :                         Err:     err,
     282            1 :                 })
     283            0 :         case base.FileTypeTable, base.FileTypeBlob:
     284            0 :                 panic("invalid deletion of object file")
     285              :         }
     286              : }
     287              : 
     288              : func (cm *cleanupManager) deleteObsoleteObject(
     289              :         fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
     290            1 : ) {
     291            1 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     292            0 :                 panic("not an object")
     293              :         }
     294              : 
     295            1 :         var path string
     296            1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     297            1 :         if err != nil {
     298            0 :                 path = "<nil>"
     299            1 :         } else {
     300            1 :                 path = cm.objProvider.Path(meta)
     301            1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     302            1 :         }
     303            1 :         if cm.objProvider.IsNotExistError(err) {
     304            0 :                 return
     305            0 :         }
     306              : 
     307            1 :         switch fileType {
     308            1 :         case base.FileTypeTable:
     309            1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     310            1 :                         JobID:   int(jobID),
     311            1 :                         Path:    path,
     312            1 :                         FileNum: fileNum,
     313            1 :                         Err:     err,
     314            1 :                 })
     315            1 :         case base.FileTypeBlob:
     316            1 :                 cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
     317            1 :                         JobID:   int(jobID),
     318            1 :                         Path:    path,
     319            1 :                         FileNum: fileNum,
     320            1 :                         Err:     err,
     321            1 :                 })
     322              :         }
     323              : }
     324              : 
     325              : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     326              : // when the job queue gets back to less than 10% full.
     327              : //
     328              : // Must be called with cm.mu locked.
     329            1 : func (cm *cleanupManager) maybeLogLocked() {
     330            1 : 
     331            1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     332            1 : 
     333            1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > jobsQueueHighThreshold {
     334            0 :                 cm.mu.jobsQueueWarningIssued = true
     335            0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", jobsQueueHighThreshold)
     336            0 :         }
     337              : 
     338            1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < jobsQueueLowThreshold {
     339            0 :                 cm.mu.jobsQueueWarningIssued = false
     340            0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", jobsQueueLowThreshold)
     341            0 :         }
     342              : }
     343              : 
     344            1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     345            1 :         var pacerInfo deletionPacerInfo
     346            1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     347            1 :         // but in practice this was observed to take constant time, regardless of
     348            1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     349            1 :         // take 10 microseconds or less.
     350            1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     351            1 :         d.mu.Lock()
     352            1 :         m := &d.mu.versions.metrics
     353            1 :         pacerInfo.obsoleteBytes = m.Table.ObsoleteSize + m.BlobFiles.ObsoleteSize
     354            1 :         total := m.Total()
     355            1 :         d.mu.Unlock()
     356            1 :         pacerInfo.liveBytes = uint64(total.AggregateSize())
     357            1 :         return pacerInfo
     358            1 : }
     359              : 
     360              : // scanObsoleteFiles compares the provided directory listing to the set of
     361              : // known, in-use files to find files no longer needed and adds those to the
     362              : // internal lists of obsolete files. Note that the files are not actually
     363              : // deleted by this method. A subsequent call to deleteObsoleteFiles must be
     364              : // performed. Must be not be called concurrently with compactions and flushes
     365              : // and will panic if any are in-progress. db.mu must be held when calling this
     366              : // function.
     367            1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     368            1 :         if d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     369            0 :                 panic(errors.AssertionFailedf("compaction or flush in progress"))
     370              :         }
     371              : 
     372            1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     373            1 :         d.mu.versions.addLiveFileNums(liveFileNums)
     374            1 :         // Protect against files which are only referred to by the ingestedFlushable
     375            1 :         // from being deleted. These are added to the flushable queue on WAL replay
     376            1 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     377            1 :         // file scan to avoid double-deleting these files.
     378            1 :         for _, f := range flushableIngests {
     379            1 :                 for _, file := range f.files {
     380            1 :                         liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
     381            1 :                 }
     382              :         }
     383              : 
     384            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     385            1 : 
     386            1 :         var obsoleteTables []obsoleteFile
     387            1 :         var obsoleteBlobs []obsoleteFile
     388            1 :         var obsoleteOptions []obsoleteFile
     389            1 :         var obsoleteManifests []obsoleteFile
     390            1 : 
     391            1 :         for _, filename := range list {
     392            1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     393            1 :                 if !ok {
     394            1 :                         continue
     395              :                 }
     396            1 :                 makeObsoleteFile := func() obsoleteFile {
     397            1 :                         of := obsoleteFile{
     398            1 :                                 fileType: fileType,
     399            1 :                                 fs:       d.opts.FS,
     400            1 :                                 path:     d.opts.FS.PathJoin(d.dirname, filename),
     401            1 :                                 fileNum:  diskFileNum,
     402            1 :                                 isLocal:  true,
     403            1 :                         }
     404            1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     405            0 :                                 of.fileSize = uint64(stat.Size())
     406            0 :                         }
     407            1 :                         return of
     408              :                 }
     409            1 :                 switch fileType {
     410            1 :                 case base.FileTypeManifest:
     411            1 :                         if diskFileNum >= manifestFileNum {
     412            1 :                                 continue
     413              :                         }
     414            1 :                         obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
     415            1 :                 case base.FileTypeOptions:
     416            1 :                         if diskFileNum >= d.optionsFileNum {
     417            0 :                                 continue
     418              :                         }
     419            1 :                         obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
     420            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     421              :                         // Objects are handled through the objstorage provider below.
     422            1 :                 default:
     423              :                         // Don't delete files we don't know about.
     424              :                 }
     425              :         }
     426              : 
     427            1 :         objects := d.objProvider.List()
     428            1 :         for _, obj := range objects {
     429            1 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     430            1 :                         continue
     431              :                 }
     432            1 :                 if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
     433            0 :                         // Ignore object types we don't know about.
     434            0 :                         continue
     435              :                 }
     436            1 :                 of := obsoleteFile{
     437            1 :                         fileType: obj.FileType,
     438            1 :                         fs:       d.opts.FS,
     439            1 :                         path:     base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
     440            1 :                         fileNum:  obj.DiskFileNum,
     441            1 :                         isLocal:  true,
     442            1 :                 }
     443            1 :                 if size, err := d.objProvider.Size(obj); err == nil {
     444            1 :                         of.fileSize = uint64(size)
     445            1 :                 }
     446            1 :                 if obj.FileType == base.FileTypeTable {
     447            1 :                         obsoleteTables = append(obsoleteTables, of)
     448            1 :                 } else {
     449            1 :                         obsoleteBlobs = append(obsoleteBlobs, of)
     450            1 :                 }
     451              :         }
     452              : 
     453            1 :         d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
     454            1 :         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     455            1 :         d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
     456            1 :         d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
     457            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     458              : }
     459              : 
     460              : // disableFileDeletions disables file deletions and then waits for any
     461              : // in-progress deletion to finish. The caller is required to call
     462              : // enableFileDeletions in order to enable file deletions again. It is ok for
     463              : // multiple callers to disable file deletions simultaneously, though they must
     464              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     465              : // (there is an internal reference count on file deletion disablement).
     466              : //
     467              : // d.mu must be held when calling this method.
     468            1 : func (d *DB) disableFileDeletions() {
     469            1 :         d.mu.fileDeletions.disableCount++
     470            1 :         d.mu.Unlock()
     471            1 :         defer d.mu.Lock()
     472            1 :         d.cleanupManager.Wait()
     473            1 : }
     474              : 
     475              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     476              : // is queued if necessary.
     477              : //
     478              : // d.mu must be held when calling this method.
     479            1 : func (d *DB) enableFileDeletions() {
     480            1 :         if d.mu.fileDeletions.disableCount <= 0 {
     481            0 :                 panic("pebble: file deletion disablement invariant violated")
     482              :         }
     483            1 :         d.mu.fileDeletions.disableCount--
     484            1 :         if d.mu.fileDeletions.disableCount > 0 {
     485            0 :                 return
     486            0 :         }
     487            1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     488              : }
     489              : 
     490              : type fileInfo = base.FileInfo
     491              : 
     492              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     493              : //
     494              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     495              : //
     496              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     497              : // cleanup job will be scheduled when file deletions are re-enabled.
     498            1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     499            1 :         if d.mu.fileDeletions.disableCount > 0 {
     500            1 :                 return
     501            1 :         }
     502            1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     503            1 : 
     504            1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     505            1 :         // log that has not had its contents flushed to an sstable.
     506            1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     507            1 :         if err != nil {
     508            0 :                 panic(err)
     509              :         }
     510              : 
     511            1 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     512            1 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     513            1 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     514            1 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     515            1 : 
     516            1 :         // Ensure everything is already sorted. We want determinism for testing, and
     517            1 :         // we need the manifests to be sorted because we want to delete some
     518            1 :         // contiguous prefix of the older manifests.
     519            1 :         if invariants.Enabled {
     520            1 :                 switch {
     521            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
     522            0 :                         d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
     523            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
     524            0 :                         d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
     525            0 :                 case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
     526            0 :                         d.opts.Logger.Fatalf("obsoleteTables is not sorted")
     527            0 :                 case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
     528            0 :                         d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
     529              :                 }
     530              :         }
     531              : 
     532            1 :         var obsoleteManifests []obsoleteFile
     533            1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     534            1 :         if manifestsToDelete > 0 {
     535            1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     536            1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     537            1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     538            0 :                         d.mu.versions.obsoleteManifests = nil
     539            0 :                 }
     540              :         }
     541              : 
     542            1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     543            1 :         d.mu.versions.obsoleteOptions = nil
     544            1 : 
     545            1 :         // Compute the stats for the files being queued for deletion and add them to
     546            1 :         // the running total. These stats will be used during DB.Metrics() to
     547            1 :         // calculate the count and size of pending obsolete files by diffing these
     548            1 :         // stats and the stats reported by the cleanup manager.
     549            1 :         var objectStats obsoleteObjectStats
     550            1 :         objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
     551            1 :         objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
     552            1 :         d.mu.fileDeletions.queuedStats.Add(objectStats)
     553            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     554            1 : 
     555            1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     556            1 :         // Note the unusual order: Unlock and then Lock.
     557            1 :         d.mu.Unlock()
     558            1 :         defer d.mu.Lock()
     559            1 : 
     560            1 :         n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
     561            1 :         filesToDelete := make([]obsoleteFile, 0, n)
     562            1 :         filesToDelete = append(filesToDelete, obsoleteManifests...)
     563            1 :         filesToDelete = append(filesToDelete, obsoleteOptions...)
     564            1 :         filesToDelete = append(filesToDelete, obsoleteTables...)
     565            1 :         filesToDelete = append(filesToDelete, obsoleteBlobs...)
     566            1 :         for _, f := range obsoleteLogs {
     567            1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     568            1 :                         fileType: base.FileTypeLog,
     569            1 :                         fs:       f.FS,
     570            1 :                         path:     f.Path,
     571            1 :                         fileNum:  base.DiskFileNum(f.NumWAL),
     572            1 :                         fileSize: f.ApproxFileSize,
     573            1 :                         isLocal:  true,
     574            1 :                 })
     575            1 :         }
     576            1 :         for _, f := range obsoleteTables {
     577            1 :                 d.fileCache.Evict(f.fileNum, base.FileTypeTable)
     578            1 :         }
     579            1 :         for _, f := range obsoleteBlobs {
     580            1 :                 d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
     581            1 :         }
     582            1 :         if len(filesToDelete) > 0 {
     583            1 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
     584            1 :         }
     585            1 :         if d.opts.private.testingAlwaysWaitForCleanup {
     586            0 :                 d.cleanupManager.Wait()
     587            0 :         }
     588              : }
     589              : 
     590            1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
     591            1 :         d.mu.Lock()
     592            1 :         defer d.mu.Unlock()
     593            1 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
     594            1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     595            1 :         }
     596              : }
     597              : 
     598            1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
     599            1 :         if len(b) == 0 {
     600            1 :                 return a
     601            1 :         }
     602              : 
     603            1 :         a = append(a, b...)
     604            1 :         slices.SortFunc(a, cmpObsoleteFileNumbers)
     605            1 :         return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
     606            1 :                 return a.fileNum == b.fileNum
     607            1 :         })
     608              : }
     609              : 
     610            1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
     611            1 :         return cmp.Compare(a.fileNum, b.fileNum)
     612            1 : }
     613              : 
     614              : // objectInfo describes an object in object storage (either a sstable or a blob
     615              : // file).
     616              : type objectInfo struct {
     617              :         fileInfo
     618              :         isLocal bool
     619              : }
     620              : 
     621            1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
     622            1 :         return obsoleteFile{
     623            1 :                 fileType: fileType,
     624            1 :                 fs:       fs,
     625            1 :                 path:     base.MakeFilepath(fs, dirname, fileType, o.FileNum),
     626            1 :                 fileNum:  o.FileNum,
     627            1 :                 fileSize: o.FileSize,
     628            1 :                 isLocal:  o.isLocal,
     629            1 :         }
     630            1 : }
     631              : 
     632            1 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
     633            1 :         for _, of := range files {
     634            1 :                 if of.isLocal {
     635            1 :                         local.count++
     636            1 :                         local.size += of.fileSize
     637            1 :                 }
     638            1 :                 total.count++
     639            1 :                 total.size += of.fileSize
     640              :         }
     641            1 :         return total, local
     642              : }
     643              : 
     644              : type obsoleteObjectStats struct {
     645              :         tablesLocal    countAndSize
     646              :         tablesAll      countAndSize
     647              :         blobFilesLocal countAndSize
     648              :         blobFilesAll   countAndSize
     649              : }
     650              : 
     651            1 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
     652            1 :         s.tablesLocal.Add(other.tablesLocal)
     653            1 :         s.tablesAll.Add(other.tablesAll)
     654            1 :         s.blobFilesLocal.Add(other.blobFilesLocal)
     655            1 :         s.blobFilesAll.Add(other.blobFilesAll)
     656            1 : }
     657              : 
     658            0 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
     659            0 :         s.tablesLocal.Sub(other.tablesLocal)
     660            0 :         s.tablesAll.Sub(other.tablesAll)
     661            0 :         s.blobFilesLocal.Sub(other.blobFilesLocal)
     662            0 :         s.blobFilesAll.Sub(other.blobFilesAll)
     663            0 : }
     664              : 
     665              : type countAndSize struct {
     666              :         count uint64
     667              :         size  uint64
     668              : }
     669              : 
     670            1 : func (c *countAndSize) Add(other countAndSize) {
     671            1 :         c.count += other.count
     672            1 :         c.size += other.size
     673            1 : }
     674              : 
     675            0 : func (c *countAndSize) Sub(other countAndSize) {
     676            0 :         c.count = invariants.SafeSub(c.count, other.count)
     677            0 :         c.size = invariants.SafeSub(c.size, other.size)
     678            0 : }
     679              : 
     680            1 : func makeZombieObjects() zombieObjects {
     681            1 :         return zombieObjects{
     682            1 :                 objs: make(map[base.DiskFileNum]objectInfo),
     683            1 :         }
     684            1 : }
     685              : 
     686              : // zombieObjects tracks a set of objects that are no longer required by the most
     687              : // recent version of the LSM, but may still need to be accessed by an open
     688              : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
     689              : // may access them are closed.
     690              : type zombieObjects struct {
     691              :         objs       map[base.DiskFileNum]objectInfo
     692              :         totalSize  uint64
     693              :         localSize  uint64
     694              :         localCount uint64
     695              : }
     696              : 
     697              : // Add adds an object to the set of zombie objects.
     698            1 : func (z *zombieObjects) Add(obj objectInfo) {
     699            1 :         if _, ok := z.objs[obj.FileNum]; ok {
     700            0 :                 panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
     701              :         }
     702            1 :         z.objs[obj.FileNum] = obj
     703            1 :         z.totalSize += obj.FileSize
     704            1 :         if obj.isLocal {
     705            1 :                 z.localSize += obj.FileSize
     706            1 :                 z.localCount++
     707            1 :         }
     708              : }
     709              : 
     710              : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
     711            1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
     712            1 :         z.Add(objectInfo{
     713            1 :                 fileInfo: fileInfo{
     714            1 :                         FileNum:  meta.DiskFileNum,
     715            1 :                         FileSize: size,
     716            1 :                 },
     717            1 :                 isLocal: !meta.IsRemote(),
     718            1 :         })
     719            1 : }
     720              : 
     721              : // Count returns the number of zombie objects.
     722            1 : func (z *zombieObjects) Count() int {
     723            1 :         return len(z.objs)
     724            1 : }
     725              : 
     726              : // Extract removes an object from the set of zombie objects, returning the
     727              : // object that was removed.
     728            1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
     729            1 :         obj, ok := z.objs[fileNum]
     730            1 :         if !ok {
     731            0 :                 panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
     732              :         }
     733            1 :         delete(z.objs, fileNum)
     734            1 : 
     735            1 :         // Detect underflow in case we have a bug that causes an object's size to be
     736            1 :         // mutated.
     737            1 :         if z.totalSize < obj.FileSize {
     738            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
     739              :         }
     740            1 :         if obj.isLocal && z.localSize < obj.FileSize {
     741            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
     742              :         }
     743              : 
     744            1 :         z.totalSize -= obj.FileSize
     745            1 :         if obj.isLocal {
     746            1 :                 z.localSize -= obj.FileSize
     747            1 :                 z.localCount--
     748            1 :         }
     749            1 :         return obj
     750              : }
     751              : 
     752              : // TotalSize returns the size of all objects in the set.
     753            0 : func (z *zombieObjects) TotalSize() uint64 {
     754            0 :         return z.totalSize
     755            0 : }
     756              : 
     757              : // LocalStats returns the count and size of all local objects in the set.
     758            0 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
     759            0 :         return z.localCount, z.localSize
     760            0 : }
        

Generated by: LCOV version 2.0-1