LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-05-18 08:18Z f2f316e7 - tests only.lcov Lines: 93.9 % 456 428
Test Date: 2025-05-18 08:19:41 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "runtime/pprof"
      11              :         "slices"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/crlib/crtime"
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/errors/oserror"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/invariants"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              :         "github.com/cockroachdb/pebble/wal"
      23              :         "github.com/cockroachdb/tokenbucket"
      24              : )
      25              : 
      26              : // Cleaner exports the base.Cleaner type.
      27              : type Cleaner = base.Cleaner
      28              : 
      29              : // DeleteCleaner exports the base.DeleteCleaner type.
      30              : type DeleteCleaner = base.DeleteCleaner
      31              : 
      32              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      33              : type ArchiveCleaner = base.ArchiveCleaner
      34              : 
      35              : type cleanupManager struct {
      36              :         opts            *Options
      37              :         objProvider     objstorage.Provider
      38              :         onTableDeleteFn func(fileSize uint64, isLocal bool)
      39              :         deletePacer     *deletionPacer
      40              : 
      41              :         // jobsCh is used as the cleanup job queue.
      42              :         jobsCh chan *cleanupJob
      43              :         // waitGroup is used to wait for the background goroutine to exit.
      44              :         waitGroup sync.WaitGroup
      45              : 
      46              :         mu struct {
      47              :                 sync.Mutex
      48              :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      49              :                 totalJobs              int
      50              :                 completedJobs          int
      51              :                 completedJobsCond      sync.Cond
      52              :                 jobsQueueWarningIssued bool
      53              :         }
      54              : }
      55              : 
      56              : // We can queue this many jobs before we have to block EnqueueJob.
      57              : const jobsQueueDepth = 1000
      58              : 
      59              : // obsoleteFile holds information about a file that needs to be deleted soon.
      60              : type obsoleteFile struct {
      61              :         fileType base.FileType
      62              :         fs       vfs.FS
      63              :         path     string
      64              :         fileNum  base.DiskFileNum
      65              :         fileSize uint64 // approx for log files
      66              :         isLocal  bool
      67              : }
      68              : 
      69              : type cleanupJob struct {
      70              :         jobID         JobID
      71              :         obsoleteFiles []obsoleteFile
      72              : }
      73              : 
      74              : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      75              : // The cleanupManager must be Close()d.
      76              : func openCleanupManager(
      77              :         opts *Options,
      78              :         objProvider objstorage.Provider,
      79              :         onTableDeleteFn func(fileSize uint64, isLocal bool),
      80              :         getDeletePacerInfo func() deletionPacerInfo,
      81            1 : ) *cleanupManager {
      82            1 :         cm := &cleanupManager{
      83            1 :                 opts:            opts,
      84            1 :                 objProvider:     objProvider,
      85            1 :                 onTableDeleteFn: onTableDeleteFn,
      86            1 :                 deletePacer: newDeletionPacer(
      87            1 :                         crtime.NowMono(),
      88            1 :                         opts.FreeSpaceThresholdBytes,
      89            1 :                         int64(opts.TargetByteDeletionRate),
      90            1 :                         opts.FreeSpaceTimeframe,
      91            1 :                         opts.ObsoleteBytesMaxRatio,
      92            1 :                         opts.ObsoleteBytesTimeframe,
      93            1 :                         getDeletePacerInfo,
      94            1 :                 ),
      95            1 :                 jobsCh: make(chan *cleanupJob, jobsQueueDepth),
      96            1 :         }
      97            1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      98            1 :         cm.waitGroup.Add(1)
      99            1 : 
     100            1 :         go func() {
     101            1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
     102            1 :                         cm.mainLoop()
     103            1 :                 })
     104              :         }()
     105              : 
     106            1 :         return cm
     107              : }
     108              : 
     109              : // Close stops the background goroutine, waiting until all queued jobs are completed.
     110              : // Delete pacing is disabled for the remaining jobs.
     111            1 : func (cm *cleanupManager) Close() {
     112            1 :         close(cm.jobsCh)
     113            1 :         cm.waitGroup.Wait()
     114            1 : }
     115              : 
     116              : // EnqueueJob adds a cleanup job to the manager's queue.
     117            1 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
     118            1 :         job := &cleanupJob{
     119            1 :                 jobID:         jobID,
     120            1 :                 obsoleteFiles: obsoleteFiles,
     121            1 :         }
     122            1 : 
     123            1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     124            1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     125            1 :         // rather than when we get to the job, otherwise the reported bytes will be
     126            1 :         // subject to the throttling rate which defeats the purpose.
     127            1 :         var pacingBytes uint64
     128            1 :         for _, of := range obsoleteFiles {
     129            1 :                 if cm.needsPacing(of.fileType, of.fileNum) {
     130            1 :                         pacingBytes += of.fileSize
     131            1 :                 }
     132              :         }
     133            1 :         if pacingBytes > 0 {
     134            1 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     135            1 :         }
     136              : 
     137            1 :         cm.mu.Lock()
     138            1 :         cm.mu.totalJobs++
     139            1 :         cm.maybeLogLocked()
     140            1 :         cm.mu.Unlock()
     141            1 : 
     142            1 :         cm.jobsCh <- job
     143              : }
     144              : 
     145              : // Wait until the completion of all jobs that were already queued.
     146              : //
     147              : // Does not wait for jobs that are enqueued during the call.
     148              : //
     149              : // Note that DB.mu should not be held while calling this method; the background
     150              : // goroutine needs to acquire DB.mu to update deleted table metrics.
     151            1 : func (cm *cleanupManager) Wait() {
     152            1 :         cm.mu.Lock()
     153            1 :         defer cm.mu.Unlock()
     154            1 :         n := cm.mu.totalJobs
     155            1 :         for cm.mu.completedJobs < n {
     156            1 :                 cm.mu.completedJobsCond.Wait()
     157            1 :         }
     158              : }
     159              : 
     160              : // mainLoop runs the manager's background goroutine.
     161            1 : func (cm *cleanupManager) mainLoop() {
     162            1 :         defer cm.waitGroup.Done()
     163            1 : 
     164            1 :         var tb tokenbucket.TokenBucket
     165            1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     166            1 :         tb.Init(1.0, 1.0)
     167            1 :         for job := range cm.jobsCh {
     168            1 :                 for _, of := range job.obsoleteFiles {
     169            1 :                         switch of.fileType {
     170            1 :                         case base.FileTypeTable:
     171            1 :                                 cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
     172            1 :                                 cm.onTableDeleteFn(of.fileSize, of.isLocal)
     173            1 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     174            1 :                         case base.FileTypeBlob:
     175            1 :                                 cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
     176            1 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     177            1 :                         default:
     178            1 :                                 cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
     179              :                         }
     180              :                 }
     181            1 :                 cm.mu.Lock()
     182            1 :                 cm.mu.completedJobs++
     183            1 :                 cm.mu.completedJobsCond.Broadcast()
     184            1 :                 cm.maybeLogLocked()
     185            1 :                 cm.mu.Unlock()
     186              :         }
     187              : }
     188              : 
     189              : // fileNumIfSST is read iff fileType is fileTypeTable.
     190            1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
     191            1 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     192            1 :                 return false
     193            1 :         }
     194            1 :         meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
     195            1 :         if err != nil {
     196            1 :                 // The object was already removed from the provider; we won't actually
     197            1 :                 // delete anything, so we don't need to pace.
     198            1 :                 return false
     199            1 :         }
     200              :         // Don't throttle deletion of remote objects.
     201            1 :         return !meta.IsRemote()
     202              : }
     203              : 
     204              : // maybePace sleeps before deleting an object if appropriate. It is always
     205              : // called from the background goroutine.
     206              : func (cm *cleanupManager) maybePace(
     207              :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     208            1 : ) {
     209            1 :         if !cm.needsPacing(fileType, fileNum) {
     210            1 :                 return
     211            1 :         }
     212              : 
     213            1 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
     214            1 :         if tokens == 0.0 {
     215            1 :                 // The token bucket might be in debt; it could make us wait even for 0
     216            1 :                 // tokens. We don't want that if the pacer decided throttling should be
     217            1 :                 // disabled.
     218            1 :                 return
     219            1 :         }
     220              :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     221              :         // the token bucket accumulates up to one second of unused tokens.
     222            1 :         for {
     223            1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     224            1 :                 if ok {
     225            1 :                         break
     226              :                 }
     227            0 :                 time.Sleep(d)
     228              :         }
     229              : }
     230              : 
     231              : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     232              : func (cm *cleanupManager) deleteObsoleteFile(
     233              :         fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
     234            1 : ) {
     235            1 :         // TODO(peter): need to handle this error, probably by re-adding the
     236            1 :         // file that couldn't be deleted to one of the obsolete slices map.
     237            1 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     238            1 :         if oserror.IsNotExist(err) {
     239            1 :                 return
     240            1 :         }
     241              : 
     242            1 :         switch fileType {
     243            1 :         case base.FileTypeLog:
     244            1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     245            1 :                         JobID:   int(jobID),
     246            1 :                         Path:    path,
     247            1 :                         FileNum: fileNum,
     248            1 :                         Err:     err,
     249            1 :                 })
     250            1 :         case base.FileTypeManifest:
     251            1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     252            1 :                         JobID:   int(jobID),
     253            1 :                         Path:    path,
     254            1 :                         FileNum: fileNum,
     255            1 :                         Err:     err,
     256            1 :                 })
     257            0 :         case base.FileTypeTable, base.FileTypeBlob:
     258            0 :                 panic("invalid deletion of object file")
     259              :         }
     260              : }
     261              : 
     262              : func (cm *cleanupManager) deleteObsoleteObject(
     263              :         fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
     264            1 : ) {
     265            1 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     266            0 :                 panic("not an object")
     267              :         }
     268              : 
     269            1 :         var path string
     270            1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     271            1 :         if err != nil {
     272            1 :                 path = "<nil>"
     273            1 :         } else {
     274            1 :                 path = cm.objProvider.Path(meta)
     275            1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     276            1 :         }
     277            1 :         if cm.objProvider.IsNotExistError(err) {
     278            1 :                 return
     279            1 :         }
     280              : 
     281            1 :         switch fileType {
     282            1 :         case base.FileTypeTable:
     283            1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     284            1 :                         JobID:   int(jobID),
     285            1 :                         Path:    path,
     286            1 :                         FileNum: fileNum,
     287            1 :                         Err:     err,
     288            1 :                 })
     289            1 :         case base.FileTypeBlob:
     290            1 :                 cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
     291            1 :                         JobID:   int(jobID),
     292            1 :                         Path:    path,
     293            1 :                         FileNum: fileNum,
     294            1 :                         Err:     err,
     295            1 :                 })
     296              :         }
     297              : }
     298              : 
     299              : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     300              : // when the job queue gets back to less than 10% full.
     301              : //
     302              : // Must be called with cm.mu locked.
     303            1 : func (cm *cleanupManager) maybeLogLocked() {
     304            1 :         const highThreshold = jobsQueueDepth * 3 / 4
     305            1 :         const lowThreshold = jobsQueueDepth / 10
     306            1 : 
     307            1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     308            1 : 
     309            1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     310            0 :                 cm.mu.jobsQueueWarningIssued = true
     311            0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     312            0 :         }
     313              : 
     314            1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     315            0 :                 cm.mu.jobsQueueWarningIssued = false
     316            0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     317            0 :         }
     318              : }
     319              : 
     320            1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     321            1 :         var pacerInfo deletionPacerInfo
     322            1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     323            1 :         // but in practice this was observed to take constant time, regardless of
     324            1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     325            1 :         // take 10 microseconds or less.
     326            1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     327            1 :         d.mu.Lock()
     328            1 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     329            1 :         total := d.mu.versions.metrics.Total()
     330            1 :         d.mu.Unlock()
     331            1 :         pacerInfo.liveBytes = uint64(total.AggregateSize())
     332            1 :         return pacerInfo
     333            1 : }
     334              : 
     335              : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
     336            1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
     337            1 :         d.mu.Lock()
     338            1 :         d.mu.versions.metrics.Table.ObsoleteCount--
     339            1 :         d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
     340            1 :         if isLocal {
     341            1 :                 d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
     342            1 :                 d.mu.versions.metrics.Table.Local.ObsoleteCount--
     343            1 :         }
     344            1 :         d.mu.Unlock()
     345              : }
     346              : 
     347              : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     348              : // and adds those to the internal lists of obsolete files. Note that the files
     349              : // are not actually deleted by this method. A subsequent call to
     350              : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     351              : // with compactions and flushes. db.mu must be held when calling this function.
     352            1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     353            1 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     354            1 :         // flushes from interfering. The original value is restored on completion.
     355            1 :         disabledPrev := d.opts.DisableAutomaticCompactions
     356            1 :         defer func() {
     357            1 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     358            1 :         }()
     359            1 :         d.opts.DisableAutomaticCompactions = true
     360            1 : 
     361            1 :         // Wait for any ongoing compaction to complete before continuing.
     362            1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     363            1 :                 d.mu.compact.cond.Wait()
     364            1 :         }
     365              : 
     366            1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     367            1 :         d.mu.versions.addLiveFileNums(liveFileNums)
     368            1 :         // Protect against files which are only referred to by the ingestedFlushable
     369            1 :         // from being deleted. These are added to the flushable queue on WAL replay
     370            1 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     371            1 :         // file scan to avoid double-deleting these files.
     372            1 :         for _, f := range flushableIngests {
     373            1 :                 for _, file := range f.files {
     374            1 :                         liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
     375            1 :                 }
     376              :         }
     377              : 
     378            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     379            1 : 
     380            1 :         var obsoleteTables []objectInfo
     381            1 :         var obsoleteBlobs []objectInfo
     382            1 :         var obsoleteOptions []obsoleteFile
     383            1 :         var obsoleteManifests []obsoleteFile
     384            1 : 
     385            1 :         for _, filename := range list {
     386            1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     387            1 :                 if !ok {
     388            1 :                         continue
     389              :                 }
     390            1 :                 makeObsoleteFile := func() obsoleteFile {
     391            1 :                         of := obsoleteFile{
     392            1 :                                 fileType: fileType,
     393            1 :                                 fs:       d.opts.FS,
     394            1 :                                 path:     d.opts.FS.PathJoin(d.dirname, filename),
     395            1 :                                 fileNum:  diskFileNum,
     396            1 :                                 isLocal:  true,
     397            1 :                         }
     398            1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     399            1 :                                 of.fileSize = uint64(stat.Size())
     400            1 :                         }
     401            1 :                         return of
     402              :                 }
     403            1 :                 switch fileType {
     404            1 :                 case base.FileTypeManifest:
     405            1 :                         if diskFileNum >= manifestFileNum {
     406            1 :                                 continue
     407              :                         }
     408            1 :                         obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
     409            1 :                 case base.FileTypeOptions:
     410            1 :                         if diskFileNum >= d.optionsFileNum {
     411            1 :                                 continue
     412              :                         }
     413            1 :                         obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
     414            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     415              :                         // Objects are handled through the objstorage provider below.
     416            1 :                 default:
     417              :                         // Don't delete files we don't know about.
     418              :                 }
     419              :         }
     420              : 
     421            1 :         objects := d.objProvider.List()
     422            1 :         for _, obj := range objects {
     423            1 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     424            1 :                         continue
     425              :                 }
     426            1 :                 makeObjectInfo := func() objectInfo {
     427            1 :                         fileInfo := fileInfo{FileNum: obj.DiskFileNum}
     428            1 :                         if size, err := d.objProvider.Size(obj); err == nil {
     429            1 :                                 fileInfo.FileSize = uint64(size)
     430            1 :                         }
     431            1 :                         return objectInfo{
     432            1 :                                 fileInfo: fileInfo,
     433            1 :                                 isLocal:  !obj.IsRemote(),
     434            1 :                         }
     435              :                 }
     436              : 
     437            1 :                 switch obj.FileType {
     438            1 :                 case base.FileTypeTable:
     439            1 :                         obsoleteTables = append(obsoleteTables, makeObjectInfo())
     440            1 :                 case base.FileTypeBlob:
     441            1 :                         obsoleteBlobs = append(obsoleteBlobs, makeObjectInfo())
     442            0 :                 default:
     443              :                         // Ignore object types we don't know about.
     444              :                 }
     445              :         }
     446              : 
     447            1 :         d.mu.versions.obsoleteTables = mergeObjectInfos(d.mu.versions.obsoleteTables, obsoleteTables)
     448            1 :         d.mu.versions.obsoleteBlobs = mergeObjectInfos(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     449            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     450            1 :         d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
     451            1 :         d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
     452              : }
     453              : 
     454              : // disableFileDeletions disables file deletions and then waits for any
     455              : // in-progress deletion to finish. The caller is required to call
     456              : // enableFileDeletions in order to enable file deletions again. It is ok for
     457              : // multiple callers to disable file deletions simultaneously, though they must
     458              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     459              : // (there is an internal reference count on file deletion disablement).
     460              : //
     461              : // d.mu must be held when calling this method.
     462            1 : func (d *DB) disableFileDeletions() {
     463            1 :         d.mu.disableFileDeletions++
     464            1 :         d.mu.Unlock()
     465            1 :         defer d.mu.Lock()
     466            1 :         d.cleanupManager.Wait()
     467            1 : }
     468              : 
     469              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     470              : // is queued if necessary.
     471              : //
     472              : // d.mu must be held when calling this method.
     473            1 : func (d *DB) enableFileDeletions() {
     474            1 :         if d.mu.disableFileDeletions <= 0 {
     475            1 :                 panic("pebble: file deletion disablement invariant violated")
     476              :         }
     477            1 :         d.mu.disableFileDeletions--
     478            1 :         if d.mu.disableFileDeletions > 0 {
     479            0 :                 return
     480            0 :         }
     481            1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     482              : }
     483              : 
     484              : type fileInfo = base.FileInfo
     485              : 
     486              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     487              : //
     488              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     489              : //
     490              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     491              : // cleanup job will be scheduled when file deletions are re-enabled.
     492            1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     493            1 :         if d.mu.disableFileDeletions > 0 {
     494            1 :                 return
     495            1 :         }
     496            1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     497            1 : 
     498            1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     499            1 :         // log that has not had its contents flushed to an sstable.
     500            1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     501            1 :         if err != nil {
     502            0 :                 panic(err)
     503              :         }
     504              : 
     505            1 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     506            1 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     507            1 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     508            1 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     509            1 : 
     510            1 :         // Ensure everything is already sorted. We want determinism for testing, and
     511            1 :         // we need the manifests to be sorted because we want to delete some
     512            1 :         // contiguous prefix of the older manifests.
     513            1 :         if invariants.Enabled {
     514            1 :                 switch {
     515            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
     516            0 :                         d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
     517            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
     518            0 :                         d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
     519            0 :                 case !slices.IsSortedFunc(obsoleteTables, cmpObjectFileNums):
     520            0 :                         d.opts.Logger.Fatalf("obsoleteTables is not sorted")
     521            0 :                 case !slices.IsSortedFunc(obsoleteBlobs, cmpObjectFileNums):
     522            0 :                         d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
     523              :                 }
     524              :         }
     525              : 
     526            1 :         var obsoleteManifests []obsoleteFile
     527            1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     528            1 :         if manifestsToDelete > 0 {
     529            1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     530            1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     531            1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     532            0 :                         d.mu.versions.obsoleteManifests = nil
     533            0 :                 }
     534              :         }
     535              : 
     536            1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     537            1 :         d.mu.versions.obsoleteOptions = nil
     538            1 : 
     539            1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     540            1 :         // Note the unusual order: Unlock and then Lock.
     541            1 :         d.mu.Unlock()
     542            1 :         defer d.mu.Lock()
     543            1 : 
     544            1 :         filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
     545            1 :         filesToDelete = append(filesToDelete, obsoleteManifests...)
     546            1 :         filesToDelete = append(filesToDelete, obsoleteOptions...)
     547            1 :         for _, f := range obsoleteLogs {
     548            1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     549            1 :                         fileType: base.FileTypeLog,
     550            1 :                         fs:       f.FS,
     551            1 :                         path:     f.Path,
     552            1 :                         fileNum:  base.DiskFileNum(f.NumWAL),
     553            1 :                         fileSize: f.ApproxFileSize,
     554            1 :                         isLocal:  true,
     555            1 :                 })
     556            1 :         }
     557            1 :         for _, f := range obsoleteTables {
     558            1 :                 d.fileCache.Evict(f.FileNum, base.FileTypeTable)
     559            1 :                 filesToDelete = append(filesToDelete, f.asObsoleteFile(d.opts.FS, base.FileTypeTable, d.dirname))
     560            1 :         }
     561            1 :         for _, f := range obsoleteBlobs {
     562            1 :                 d.fileCache.Evict(f.FileNum, base.FileTypeBlob)
     563            1 :                 filesToDelete = append(filesToDelete, f.asObsoleteFile(d.opts.FS, base.FileTypeBlob, d.dirname))
     564            1 :         }
     565              : 
     566            1 :         if len(filesToDelete) > 0 {
     567            1 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete)
     568            1 :         }
     569            1 :         if d.opts.private.testingAlwaysWaitForCleanup {
     570            1 :                 d.cleanupManager.Wait()
     571            1 :         }
     572              : }
     573              : 
     574            1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
     575            1 :         d.mu.Lock()
     576            1 :         defer d.mu.Unlock()
     577            1 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
     578            1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     579            1 :         }
     580              : }
     581              : 
     582            1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
     583            1 :         if len(b) == 0 {
     584            1 :                 return a
     585            1 :         }
     586              : 
     587            1 :         a = append(a, b...)
     588            1 :         slices.SortFunc(a, cmpObsoleteFileNumbers)
     589            1 :         return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
     590            1 :                 return a.fileNum == b.fileNum
     591            1 :         })
     592              : }
     593              : 
     594            1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
     595            1 :         return cmp.Compare(a.fileNum, b.fileNum)
     596            1 : }
     597              : 
     598              : // objectInfo describes an object in object storage (either a sstable or a blob
     599              : // file).
     600              : type objectInfo struct {
     601              :         fileInfo
     602              :         isLocal bool
     603              : }
     604              : 
     605            1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
     606            1 :         return obsoleteFile{
     607            1 :                 fileType: fileType,
     608            1 :                 fs:       fs,
     609            1 :                 path:     base.MakeFilepath(fs, dirname, fileType, o.FileNum),
     610            1 :                 fileNum:  o.FileNum,
     611            1 :                 fileSize: o.FileSize,
     612            1 :                 isLocal:  o.isLocal,
     613            1 :         }
     614            1 : }
     615              : 
     616            1 : func makeZombieObjects() zombieObjects {
     617            1 :         return zombieObjects{
     618            1 :                 objs: make(map[base.DiskFileNum]objectInfo),
     619            1 :         }
     620            1 : }
     621              : 
     622              : // zombieObjects tracks a set of objects that are no longer required by the most
     623              : // recent version of the LSM, but may still need to be accessed by an open
     624              : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
     625              : // may access them are closed.
     626              : type zombieObjects struct {
     627              :         objs       map[base.DiskFileNum]objectInfo
     628              :         totalSize  uint64
     629              :         localSize  uint64
     630              :         localCount uint64
     631              : }
     632              : 
     633              : // Add adds an object to the set of zombie objects.
     634            1 : func (z *zombieObjects) Add(obj objectInfo) {
     635            1 :         if _, ok := z.objs[obj.FileNum]; ok {
     636            0 :                 panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
     637              :         }
     638            1 :         z.objs[obj.FileNum] = obj
     639            1 :         z.totalSize += obj.FileSize
     640            1 :         if obj.isLocal {
     641            1 :                 z.localSize += obj.FileSize
     642            1 :                 z.localCount++
     643            1 :         }
     644              : }
     645              : 
     646              : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
     647            1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
     648            1 :         z.Add(objectInfo{
     649            1 :                 fileInfo: fileInfo{
     650            1 :                         FileNum:  meta.DiskFileNum,
     651            1 :                         FileSize: size,
     652            1 :                 },
     653            1 :                 isLocal: !meta.IsRemote(),
     654            1 :         })
     655            1 : }
     656              : 
     657              : // Count returns the number of zombie objects.
     658            1 : func (z *zombieObjects) Count() int {
     659            1 :         return len(z.objs)
     660            1 : }
     661              : 
     662              : // Extract removes an object from the set of zombie objects, returning the
     663              : // object that was removed.
     664            1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
     665            1 :         obj, ok := z.objs[fileNum]
     666            1 :         if !ok {
     667            0 :                 panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
     668              :         }
     669            1 :         delete(z.objs, fileNum)
     670            1 : 
     671            1 :         // Detect underflow in case we have a bug that causes an object's size to be
     672            1 :         // mutated.
     673            1 :         if z.totalSize < obj.FileSize {
     674            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
     675              :         }
     676            1 :         if obj.isLocal && z.localSize < obj.FileSize {
     677            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
     678              :         }
     679              : 
     680            1 :         z.totalSize -= obj.FileSize
     681            1 :         if obj.isLocal {
     682            1 :                 z.localSize -= obj.FileSize
     683            1 :                 z.localCount--
     684            1 :         }
     685            1 :         return obj
     686              : }
     687              : 
     688              : // TotalSize returns the size of all objects in the set.
     689            1 : func (z *zombieObjects) TotalSize() uint64 {
     690            1 :         return z.totalSize
     691            1 : }
     692              : 
     693              : // LocalStats returns the count and size of all local objects in the set.
     694            1 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
     695            1 :         return z.localCount, z.localSize
     696            1 : }
     697              : 
     698            1 : func mergeObjectInfos(a, b []objectInfo) []objectInfo {
     699            1 :         if len(b) == 0 {
     700            1 :                 return a
     701            1 :         }
     702            1 :         a = append(a, b...)
     703            1 :         slices.SortFunc(a, cmpObjectFileNums)
     704            1 :         return slices.CompactFunc(a, func(a, b objectInfo) bool {
     705            1 :                 return a.FileNum == b.FileNum
     706            1 :         })
     707              : }
     708              : 
     709            1 : func cmpObjectFileNums(a, b objectInfo) int {
     710            1 :         return cmp.Compare(a.FileNum, b.FileNum)
     711            1 : }
        

Generated by: LCOV version 2.0-1