LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-08-19 08:18Z 07506b8d - tests + meta.lcov Lines: 94.1 % 476 448
Test Date: 2025-08-19 08:19:58 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "runtime/pprof"
      11              :         "slices"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/crlib/crtime"
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/errors/oserror"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/invariants"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              :         "github.com/cockroachdb/pebble/wal"
      23              :         "github.com/cockroachdb/tokenbucket"
      24              : )
      25              : 
      26              : // Cleaner exports the base.Cleaner type.
      27              : type Cleaner = base.Cleaner
      28              : 
      29              : // DeleteCleaner exports the base.DeleteCleaner type.
      30              : type DeleteCleaner = base.DeleteCleaner
      31              : 
      32              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      33              : type ArchiveCleaner = base.ArchiveCleaner
      34              : 
      35              : type cleanupManager struct {
      36              :         opts        *Options
      37              :         objProvider objstorage.Provider
      38              :         deletePacer *deletionPacer
      39              : 
      40              :         // jobsCh is used as the cleanup job queue.
      41              :         jobsCh chan *cleanupJob
      42              :         // waitGroup is used to wait for the background goroutine to exit.
      43              :         waitGroup sync.WaitGroup
      44              : 
      45              :         mu struct {
      46              :                 sync.Mutex
      47              :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      48              :                 totalJobs              int
      49              :                 completedStats         obsoleteObjectStats
      50              :                 completedJobs          int
      51              :                 completedJobsCond      sync.Cond
      52              :                 jobsQueueWarningIssued bool
      53              :         }
      54              : }
      55              : 
      56              : // CompletedStats returns the stats summarizing objects deleted. The returned
      57              : // stats increase monotonically over the lifetime of the DB.
      58            2 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
      59            2 :         m.mu.Lock()
      60            2 :         defer m.mu.Unlock()
      61            2 :         return m.mu.completedStats
      62            2 : }
      63              : 
      64              : // We can queue this many jobs before we have to block EnqueueJob.
      65              : const jobsQueueDepth = 1000
      66              : 
      67              : // obsoleteFile holds information about a file that needs to be deleted soon.
      68              : type obsoleteFile struct {
      69              :         fileType base.FileType
      70              :         fs       vfs.FS
      71              :         path     string
      72              :         fileNum  base.DiskFileNum
      73              :         fileSize uint64 // approx for log files
      74              :         isLocal  bool
      75              : }
      76              : 
      77            2 : func (of *obsoleteFile) needsPacing() bool {
      78            2 :         // We only need to pace local objects--sstables and blob files.
      79            2 :         return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
      80            2 : }
      81              : 
      82              : type cleanupJob struct {
      83              :         jobID         JobID
      84              :         obsoleteFiles []obsoleteFile
      85              :         stats         obsoleteObjectStats
      86              : }
      87              : 
      88              : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      89              : // The cleanupManager must be Close()d.
      90              : func openCleanupManager(
      91              :         opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
      92            2 : ) *cleanupManager {
      93            2 :         cm := &cleanupManager{
      94            2 :                 opts:        opts,
      95            2 :                 objProvider: objProvider,
      96            2 :                 deletePacer: newDeletionPacer(
      97            2 :                         crtime.NowMono(),
      98            2 :                         opts.FreeSpaceThresholdBytes,
      99            2 :                         int64(opts.TargetByteDeletionRate),
     100            2 :                         opts.FreeSpaceTimeframe,
     101            2 :                         opts.ObsoleteBytesMaxRatio,
     102            2 :                         opts.ObsoleteBytesTimeframe,
     103            2 :                         getDeletePacerInfo,
     104            2 :                 ),
     105            2 :                 jobsCh: make(chan *cleanupJob, jobsQueueDepth),
     106            2 :         }
     107            2 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
     108            2 :         cm.waitGroup.Add(1)
     109            2 : 
     110            2 :         go func() {
     111            2 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
     112            2 :                         cm.mainLoop()
     113            2 :                 })
     114              :         }()
     115              : 
     116            2 :         return cm
     117              : }
     118              : 
     119              : // Close stops the background goroutine, waiting until all queued jobs are completed.
     120              : // Delete pacing is disabled for the remaining jobs.
     121            2 : func (cm *cleanupManager) Close() {
     122            2 :         close(cm.jobsCh)
     123            2 :         cm.waitGroup.Wait()
     124            2 : }
     125              : 
     126              : // EnqueueJob adds a cleanup job to the manager's queue.
     127              : func (cm *cleanupManager) EnqueueJob(
     128              :         jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
     129            2 : ) {
     130            2 :         job := &cleanupJob{
     131            2 :                 jobID:         jobID,
     132            2 :                 obsoleteFiles: obsoleteFiles,
     133            2 :                 stats:         stats,
     134            2 :         }
     135            2 : 
     136            2 :         // Report deleted bytes to the pacer, which can use this data to potentially
     137            2 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     138            2 :         // rather than when we get to the job, otherwise the reported bytes will be
     139            2 :         // subject to the throttling rate which defeats the purpose.
     140            2 :         var pacingBytes uint64
     141            2 :         for _, of := range obsoleteFiles {
     142            2 :                 if of.needsPacing() {
     143            2 :                         pacingBytes += of.fileSize
     144            2 :                 }
     145              :         }
     146            2 :         if pacingBytes > 0 {
     147            2 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     148            2 :         }
     149              : 
     150            2 :         cm.mu.Lock()
     151            2 :         cm.mu.totalJobs++
     152            2 :         cm.maybeLogLocked()
     153            2 :         cm.mu.Unlock()
     154            2 : 
     155            2 :         cm.jobsCh <- job
     156              : }
     157              : 
     158              : // Wait until the completion of all jobs that were already queued.
     159              : //
     160              : // Does not wait for jobs that are enqueued during the call.
     161              : //
     162              : // Note that DB.mu should not be held while calling this method; the background
     163              : // goroutine needs to acquire DB.mu to update deleted table metrics.
     164            2 : func (cm *cleanupManager) Wait() {
     165            2 :         cm.mu.Lock()
     166            2 :         defer cm.mu.Unlock()
     167            2 :         n := cm.mu.totalJobs
     168            2 :         for cm.mu.completedJobs < n {
     169            2 :                 cm.mu.completedJobsCond.Wait()
     170            2 :         }
     171              : }
     172              : 
     173              : // mainLoop runs the manager's background goroutine.
     174            2 : func (cm *cleanupManager) mainLoop() {
     175            2 :         defer cm.waitGroup.Done()
     176            2 : 
     177            2 :         var tb tokenbucket.TokenBucket
     178            2 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     179            2 :         tb.Init(1.0, 1.0)
     180            2 :         for job := range cm.jobsCh {
     181            2 :                 for _, of := range job.obsoleteFiles {
     182            2 :                         switch of.fileType {
     183            2 :                         case base.FileTypeTable:
     184            2 :                                 cm.maybePace(&tb, &of)
     185            2 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     186            2 :                         case base.FileTypeBlob:
     187            2 :                                 cm.maybePace(&tb, &of)
     188            2 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     189            2 :                         default:
     190            2 :                                 cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
     191              :                         }
     192              :                 }
     193            2 :                 cm.mu.Lock()
     194            2 :                 cm.mu.completedJobs++
     195            2 :                 cm.mu.completedStats.Add(job.stats)
     196            2 :                 cm.mu.completedJobsCond.Broadcast()
     197            2 :                 cm.maybeLogLocked()
     198            2 :                 cm.mu.Unlock()
     199              :         }
     200              : }
     201              : 
     202              : // maybePace sleeps before deleting an object if appropriate. It is always
     203              : // called from the background goroutine.
     204            2 : func (cm *cleanupManager) maybePace(tb *tokenbucket.TokenBucket, of *obsoleteFile) {
     205            2 :         if !of.needsPacing() {
     206            2 :                 return
     207            2 :         }
     208              : 
     209            2 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
     210            2 :         if tokens == 0.0 {
     211            2 :                 // The token bucket might be in debt; it could make us wait even for 0
     212            2 :                 // tokens. We don't want that if the pacer decided throttling should be
     213            2 :                 // disabled.
     214            2 :                 return
     215            2 :         }
     216              :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     217              :         // the token bucket accumulates up to one second of unused tokens.
     218            2 :         for {
     219            2 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     220            2 :                 if ok {
     221            2 :                         break
     222              :                 }
     223            1 :                 time.Sleep(d)
     224              :         }
     225              : }
     226              : 
     227              : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     228              : func (cm *cleanupManager) deleteObsoleteFile(
     229              :         fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
     230            2 : ) {
     231            2 :         // TODO(peter): need to handle this error, probably by re-adding the
     232            2 :         // file that couldn't be deleted to one of the obsolete slices map.
     233            2 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     234            2 :         if oserror.IsNotExist(err) {
     235            2 :                 return
     236            2 :         }
     237              : 
     238            2 :         switch fileType {
     239            2 :         case base.FileTypeLog:
     240            2 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     241            2 :                         JobID:   int(jobID),
     242            2 :                         Path:    path,
     243            2 :                         FileNum: fileNum,
     244            2 :                         Err:     err,
     245            2 :                 })
     246            2 :         case base.FileTypeManifest:
     247            2 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     248            2 :                         JobID:   int(jobID),
     249            2 :                         Path:    path,
     250            2 :                         FileNum: fileNum,
     251            2 :                         Err:     err,
     252            2 :                 })
     253            0 :         case base.FileTypeTable, base.FileTypeBlob:
     254            0 :                 panic("invalid deletion of object file")
     255              :         }
     256              : }
     257              : 
     258              : func (cm *cleanupManager) deleteObsoleteObject(
     259              :         fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
     260            2 : ) {
     261            2 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     262            0 :                 panic("not an object")
     263              :         }
     264              : 
     265            2 :         var path string
     266            2 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     267            2 :         if err != nil {
     268            2 :                 path = "<nil>"
     269            2 :         } else {
     270            2 :                 path = cm.objProvider.Path(meta)
     271            2 :                 err = cm.objProvider.Remove(fileType, fileNum)
     272            2 :         }
     273            2 :         if cm.objProvider.IsNotExistError(err) {
     274            2 :                 return
     275            2 :         }
     276              : 
     277            2 :         switch fileType {
     278            2 :         case base.FileTypeTable:
     279            2 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     280            2 :                         JobID:   int(jobID),
     281            2 :                         Path:    path,
     282            2 :                         FileNum: fileNum,
     283            2 :                         Err:     err,
     284            2 :                 })
     285            2 :         case base.FileTypeBlob:
     286            2 :                 cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
     287            2 :                         JobID:   int(jobID),
     288            2 :                         Path:    path,
     289            2 :                         FileNum: fileNum,
     290            2 :                         Err:     err,
     291            2 :                 })
     292              :         }
     293              : }
     294              : 
     295              : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     296              : // when the job queue gets back to less than 10% full.
     297              : //
     298              : // Must be called with cm.mu locked.
     299            2 : func (cm *cleanupManager) maybeLogLocked() {
     300            2 :         const highThreshold = jobsQueueDepth * 3 / 4
     301            2 :         const lowThreshold = jobsQueueDepth / 10
     302            2 : 
     303            2 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     304            2 : 
     305            2 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     306            0 :                 cm.mu.jobsQueueWarningIssued = true
     307            0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     308            0 :         }
     309              : 
     310            2 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     311            0 :                 cm.mu.jobsQueueWarningIssued = false
     312            0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     313            0 :         }
     314              : }
     315              : 
     316            2 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     317            2 :         var pacerInfo deletionPacerInfo
     318            2 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     319            2 :         // but in practice this was observed to take constant time, regardless of
     320            2 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     321            2 :         // take 10 microseconds or less.
     322            2 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     323            2 :         d.mu.Lock()
     324            2 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     325            2 :         total := d.mu.versions.metrics.Total()
     326            2 :         d.mu.Unlock()
     327            2 :         pacerInfo.liveBytes = uint64(total.AggregateSize())
     328            2 :         return pacerInfo
     329            2 : }
     330              : 
     331              : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     332              : // and adds those to the internal lists of obsolete files. Note that the files
     333              : // are not actually deleted by this method. A subsequent call to
     334              : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     335              : // with compactions and flushes. db.mu must be held when calling this function.
     336            2 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     337            2 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     338            2 :         // flushes from interfering. The original value is restored on completion.
     339            2 :         disabledPrev := d.opts.DisableAutomaticCompactions
     340            2 :         defer func() {
     341            2 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     342            2 :         }()
     343            2 :         d.opts.DisableAutomaticCompactions = true
     344            2 : 
     345            2 :         // Wait for any ongoing compaction to complete before continuing.
     346            2 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     347            2 :                 d.mu.compact.cond.Wait()
     348            2 :         }
     349              : 
     350            2 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     351            2 :         d.mu.versions.addLiveFileNums(liveFileNums)
     352            2 :         // Protect against files which are only referred to by the ingestedFlushable
     353            2 :         // from being deleted. These are added to the flushable queue on WAL replay
     354            2 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     355            2 :         // file scan to avoid double-deleting these files.
     356            2 :         for _, f := range flushableIngests {
     357            2 :                 for _, file := range f.files {
     358            2 :                         liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
     359            2 :                 }
     360              :         }
     361              : 
     362            2 :         manifestFileNum := d.mu.versions.manifestFileNum
     363            2 : 
     364            2 :         var obsoleteTables []obsoleteFile
     365            2 :         var obsoleteBlobs []obsoleteFile
     366            2 :         var obsoleteOptions []obsoleteFile
     367            2 :         var obsoleteManifests []obsoleteFile
     368            2 : 
     369            2 :         for _, filename := range list {
     370            2 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     371            2 :                 if !ok {
     372            2 :                         continue
     373              :                 }
     374            2 :                 makeObsoleteFile := func() obsoleteFile {
     375            2 :                         of := obsoleteFile{
     376            2 :                                 fileType: fileType,
     377            2 :                                 fs:       d.opts.FS,
     378            2 :                                 path:     d.opts.FS.PathJoin(d.dirname, filename),
     379            2 :                                 fileNum:  diskFileNum,
     380            2 :                                 isLocal:  true,
     381            2 :                         }
     382            2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     383            1 :                                 of.fileSize = uint64(stat.Size())
     384            1 :                         }
     385            2 :                         return of
     386              :                 }
     387            2 :                 switch fileType {
     388            2 :                 case base.FileTypeManifest:
     389            2 :                         if diskFileNum >= manifestFileNum {
     390            2 :                                 continue
     391              :                         }
     392            2 :                         obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
     393            2 :                 case base.FileTypeOptions:
     394            2 :                         if diskFileNum >= d.optionsFileNum {
     395            2 :                                 continue
     396              :                         }
     397            2 :                         obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
     398            2 :                 case base.FileTypeTable, base.FileTypeBlob:
     399              :                         // Objects are handled through the objstorage provider below.
     400            2 :                 default:
     401              :                         // Don't delete files we don't know about.
     402              :                 }
     403              :         }
     404              : 
     405            2 :         objects := d.objProvider.List()
     406            2 :         for _, obj := range objects {
     407            2 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     408            2 :                         continue
     409              :                 }
     410            2 :                 if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
     411            0 :                         // Ignore object types we don't know about.
     412            0 :                         continue
     413              :                 }
     414            2 :                 of := obsoleteFile{
     415            2 :                         fileType: obj.FileType,
     416            2 :                         fs:       d.opts.FS,
     417            2 :                         path:     base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
     418            2 :                         fileNum:  obj.DiskFileNum,
     419            2 :                         isLocal:  true,
     420            2 :                 }
     421            2 :                 if size, err := d.objProvider.Size(obj); err == nil {
     422            2 :                         of.fileSize = uint64(size)
     423            2 :                 }
     424            2 :                 if obj.FileType == base.FileTypeTable {
     425            2 :                         obsoleteTables = append(obsoleteTables, of)
     426            2 :                 } else {
     427            2 :                         obsoleteBlobs = append(obsoleteBlobs, of)
     428            2 :                 }
     429              :         }
     430              : 
     431            2 :         d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
     432            2 :         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     433            2 :         d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
     434            2 :         d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
     435            2 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     436              : }
     437              : 
     438              : // disableFileDeletions disables file deletions and then waits for any
     439              : // in-progress deletion to finish. The caller is required to call
     440              : // enableFileDeletions in order to enable file deletions again. It is ok for
     441              : // multiple callers to disable file deletions simultaneously, though they must
     442              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     443              : // (there is an internal reference count on file deletion disablement).
     444              : //
     445              : // d.mu must be held when calling this method.
     446            2 : func (d *DB) disableFileDeletions() {
     447            2 :         d.mu.fileDeletions.disableCount++
     448            2 :         d.mu.Unlock()
     449            2 :         defer d.mu.Lock()
     450            2 :         d.cleanupManager.Wait()
     451            2 : }
     452              : 
     453              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     454              : // is queued if necessary.
     455              : //
     456              : // d.mu must be held when calling this method.
     457            2 : func (d *DB) enableFileDeletions() {
     458            2 :         if d.mu.fileDeletions.disableCount <= 0 {
     459            1 :                 panic("pebble: file deletion disablement invariant violated")
     460              :         }
     461            2 :         d.mu.fileDeletions.disableCount--
     462            2 :         if d.mu.fileDeletions.disableCount > 0 {
     463            0 :                 return
     464            0 :         }
     465            2 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     466              : }
     467              : 
     468              : type fileInfo = base.FileInfo
     469              : 
     470              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     471              : //
     472              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     473              : //
     474              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     475              : // cleanup job will be scheduled when file deletions are re-enabled.
     476            2 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     477            2 :         if d.mu.fileDeletions.disableCount > 0 {
     478            2 :                 return
     479            2 :         }
     480            2 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     481            2 : 
     482            2 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     483            2 :         // log that has not had its contents flushed to an sstable.
     484            2 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     485            2 :         if err != nil {
     486            0 :                 panic(err)
     487              :         }
     488              : 
     489            2 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     490            2 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     491            2 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     492            2 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     493            2 : 
     494            2 :         // Ensure everything is already sorted. We want determinism for testing, and
     495            2 :         // we need the manifests to be sorted because we want to delete some
     496            2 :         // contiguous prefix of the older manifests.
     497            2 :         if invariants.Enabled {
     498            2 :                 switch {
     499            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
     500            0 :                         d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
     501            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
     502            0 :                         d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
     503            0 :                 case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
     504            0 :                         d.opts.Logger.Fatalf("obsoleteTables is not sorted")
     505            0 :                 case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
     506            0 :                         d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
     507              :                 }
     508              :         }
     509              : 
     510            2 :         var obsoleteManifests []obsoleteFile
     511            2 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     512            2 :         if manifestsToDelete > 0 {
     513            2 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     514            2 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     515            2 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     516            0 :                         d.mu.versions.obsoleteManifests = nil
     517            0 :                 }
     518              :         }
     519              : 
     520            2 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     521            2 :         d.mu.versions.obsoleteOptions = nil
     522            2 : 
     523            2 :         // Compute the stats for the files being queued for deletion and add them to
     524            2 :         // the running total. These stats will be used during DB.Metrics() to
     525            2 :         // calculate the count and size of pending obsolete files by diffing these
     526            2 :         // stats and the stats reported by the cleanup manager.
     527            2 :         var objectStats obsoleteObjectStats
     528            2 :         objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
     529            2 :         objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
     530            2 :         d.mu.fileDeletions.queuedStats.Add(objectStats)
     531            2 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     532            2 : 
     533            2 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     534            2 :         // Note the unusual order: Unlock and then Lock.
     535            2 :         d.mu.Unlock()
     536            2 :         defer d.mu.Lock()
     537            2 : 
     538            2 :         n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
     539            2 :         filesToDelete := make([]obsoleteFile, 0, n)
     540            2 :         filesToDelete = append(filesToDelete, obsoleteManifests...)
     541            2 :         filesToDelete = append(filesToDelete, obsoleteOptions...)
     542            2 :         filesToDelete = append(filesToDelete, obsoleteTables...)
     543            2 :         filesToDelete = append(filesToDelete, obsoleteBlobs...)
     544            2 :         for _, f := range obsoleteLogs {
     545            2 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     546            2 :                         fileType: base.FileTypeLog,
     547            2 :                         fs:       f.FS,
     548            2 :                         path:     f.Path,
     549            2 :                         fileNum:  base.DiskFileNum(f.NumWAL),
     550            2 :                         fileSize: f.ApproxFileSize,
     551            2 :                         isLocal:  true,
     552            2 :                 })
     553            2 :         }
     554            2 :         for _, f := range obsoleteTables {
     555            2 :                 d.fileCache.Evict(f.fileNum, base.FileTypeTable)
     556            2 :         }
     557            2 :         for _, f := range obsoleteBlobs {
     558            2 :                 d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
     559            2 :         }
     560            2 :         if len(filesToDelete) > 0 {
     561            2 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
     562            2 :         }
     563            2 :         if d.opts.private.testingAlwaysWaitForCleanup {
     564            1 :                 d.cleanupManager.Wait()
     565            1 :         }
     566              : }
     567              : 
     568            2 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
     569            2 :         d.mu.Lock()
     570            2 :         defer d.mu.Unlock()
     571            2 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
     572            2 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     573            2 :         }
     574              : }
     575              : 
     576            2 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
     577            2 :         if len(b) == 0 {
     578            2 :                 return a
     579            2 :         }
     580              : 
     581            2 :         a = append(a, b...)
     582            2 :         slices.SortFunc(a, cmpObsoleteFileNumbers)
     583            2 :         return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
     584            2 :                 return a.fileNum == b.fileNum
     585            2 :         })
     586              : }
     587              : 
     588            2 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
     589            2 :         return cmp.Compare(a.fileNum, b.fileNum)
     590            2 : }
     591              : 
     592              : // objectInfo describes an object in object storage (either a sstable or a blob
     593              : // file).
     594              : type objectInfo struct {
     595              :         fileInfo
     596              :         isLocal bool
     597              : }
     598              : 
     599            2 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
     600            2 :         return obsoleteFile{
     601            2 :                 fileType: fileType,
     602            2 :                 fs:       fs,
     603            2 :                 path:     base.MakeFilepath(fs, dirname, fileType, o.FileNum),
     604            2 :                 fileNum:  o.FileNum,
     605            2 :                 fileSize: o.FileSize,
     606            2 :                 isLocal:  o.isLocal,
     607            2 :         }
     608            2 : }
     609              : 
     610            2 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
     611            2 :         for _, of := range files {
     612            2 :                 if of.isLocal {
     613            2 :                         local.count++
     614            2 :                         local.size += of.fileSize
     615            2 :                 }
     616            2 :                 total.count++
     617            2 :                 total.size += of.fileSize
     618              :         }
     619            2 :         return total, local
     620              : }
     621              : 
     622              : type obsoleteObjectStats struct {
     623              :         tablesLocal    countAndSize
     624              :         tablesAll      countAndSize
     625              :         blobFilesLocal countAndSize
     626              :         blobFilesAll   countAndSize
     627              : }
     628              : 
     629            2 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
     630            2 :         s.tablesLocal.Add(other.tablesLocal)
     631            2 :         s.tablesAll.Add(other.tablesAll)
     632            2 :         s.blobFilesLocal.Add(other.blobFilesLocal)
     633            2 :         s.blobFilesAll.Add(other.blobFilesAll)
     634            2 : }
     635              : 
     636            2 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
     637            2 :         s.tablesLocal.Sub(other.tablesLocal)
     638            2 :         s.tablesAll.Sub(other.tablesAll)
     639            2 :         s.blobFilesLocal.Sub(other.blobFilesLocal)
     640            2 :         s.blobFilesAll.Sub(other.blobFilesAll)
     641            2 : }
     642              : 
     643              : type countAndSize struct {
     644              :         count uint64
     645              :         size  uint64
     646              : }
     647              : 
     648            2 : func (c *countAndSize) Add(other countAndSize) {
     649            2 :         c.count += other.count
     650            2 :         c.size += other.size
     651            2 : }
     652              : 
     653            2 : func (c *countAndSize) Sub(other countAndSize) {
     654            2 :         c.count = invariants.SafeSub(c.count, other.count)
     655            2 :         c.size = invariants.SafeSub(c.size, other.size)
     656            2 : }
     657              : 
     658            2 : func makeZombieObjects() zombieObjects {
     659            2 :         return zombieObjects{
     660            2 :                 objs: make(map[base.DiskFileNum]objectInfo),
     661            2 :         }
     662            2 : }
     663              : 
     664              : // zombieObjects tracks a set of objects that are no longer required by the most
     665              : // recent version of the LSM, but may still need to be accessed by an open
     666              : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
     667              : // may access them are closed.
     668              : type zombieObjects struct {
     669              :         objs       map[base.DiskFileNum]objectInfo
     670              :         totalSize  uint64
     671              :         localSize  uint64
     672              :         localCount uint64
     673              : }
     674              : 
     675              : // Add adds an object to the set of zombie objects.
     676            2 : func (z *zombieObjects) Add(obj objectInfo) {
     677            2 :         if _, ok := z.objs[obj.FileNum]; ok {
     678            0 :                 panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
     679              :         }
     680            2 :         z.objs[obj.FileNum] = obj
     681            2 :         z.totalSize += obj.FileSize
     682            2 :         if obj.isLocal {
     683            2 :                 z.localSize += obj.FileSize
     684            2 :                 z.localCount++
     685            2 :         }
     686              : }
     687              : 
     688              : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
     689            1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
     690            1 :         z.Add(objectInfo{
     691            1 :                 fileInfo: fileInfo{
     692            1 :                         FileNum:  meta.DiskFileNum,
     693            1 :                         FileSize: size,
     694            1 :                 },
     695            1 :                 isLocal: !meta.IsRemote(),
     696            1 :         })
     697            1 : }
     698              : 
     699              : // Count returns the number of zombie objects.
     700            2 : func (z *zombieObjects) Count() int {
     701            2 :         return len(z.objs)
     702            2 : }
     703              : 
     704              : // Extract removes an object from the set of zombie objects, returning the
     705              : // object that was removed.
     706            2 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
     707            2 :         obj, ok := z.objs[fileNum]
     708            2 :         if !ok {
     709            0 :                 panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
     710              :         }
     711            2 :         delete(z.objs, fileNum)
     712            2 : 
     713            2 :         // Detect underflow in case we have a bug that causes an object's size to be
     714            2 :         // mutated.
     715            2 :         if z.totalSize < obj.FileSize {
     716            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
     717              :         }
     718            2 :         if obj.isLocal && z.localSize < obj.FileSize {
     719            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
     720              :         }
     721              : 
     722            2 :         z.totalSize -= obj.FileSize
     723            2 :         if obj.isLocal {
     724            2 :                 z.localSize -= obj.FileSize
     725            2 :                 z.localCount--
     726            2 :         }
     727            2 :         return obj
     728              : }
     729              : 
     730              : // TotalSize returns the size of all objects in the set.
     731            2 : func (z *zombieObjects) TotalSize() uint64 {
     732            2 :         return z.totalSize
     733            2 : }
     734              : 
     735              : // LocalStats returns the count and size of all local objects in the set.
     736            2 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
     737            2 :         return z.localCount, z.localSize
     738            2 : }
        

Generated by: LCOV version 2.0-1