LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-07-09 08:19Z a370be5e - tests only.lcov Lines: 93.9 % 476 447
Test Date: 2025-07-09 08:20:52 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "context"
      10              :         "runtime/pprof"
      11              :         "slices"
      12              :         "sync"
      13              :         "time"
      14              : 
      15              :         "github.com/cockroachdb/crlib/crtime"
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/errors/oserror"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/invariants"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              :         "github.com/cockroachdb/pebble/wal"
      23              :         "github.com/cockroachdb/tokenbucket"
      24              : )
      25              : 
      26              : // Cleaner exports the base.Cleaner type.
      27              : type Cleaner = base.Cleaner
      28              : 
      29              : // DeleteCleaner exports the base.DeleteCleaner type.
      30              : type DeleteCleaner = base.DeleteCleaner
      31              : 
      32              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      33              : type ArchiveCleaner = base.ArchiveCleaner
      34              : 
      35              : type cleanupManager struct {
      36              :         opts        *Options
      37              :         objProvider objstorage.Provider
      38              :         deletePacer *deletionPacer
      39              : 
      40              :         // jobsCh is used as the cleanup job queue.
      41              :         jobsCh chan *cleanupJob
      42              :         // waitGroup is used to wait for the background goroutine to exit.
      43              :         waitGroup sync.WaitGroup
      44              : 
      45              :         mu struct {
      46              :                 sync.Mutex
      47              :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      48              :                 totalJobs              int
      49              :                 completedStats         obsoleteObjectStats
      50              :                 completedJobs          int
      51              :                 completedJobsCond      sync.Cond
      52              :                 jobsQueueWarningIssued bool
      53              :         }
      54              : }
      55              : 
      56              : // CompletedStats returns the stats summarizing objects deleted. The returned
      57              : // stats increase monotonically over the lifetime of the DB.
      58            1 : func (m *cleanupManager) CompletedStats() obsoleteObjectStats {
      59            1 :         m.mu.Lock()
      60            1 :         defer m.mu.Unlock()
      61            1 :         return m.mu.completedStats
      62            1 : }
      63              : 
      64              : // We can queue this many jobs before we have to block EnqueueJob.
      65              : const jobsQueueDepth = 1000
      66              : 
      67              : // obsoleteFile holds information about a file that needs to be deleted soon.
      68              : type obsoleteFile struct {
      69              :         fileType base.FileType
      70              :         fs       vfs.FS
      71              :         path     string
      72              :         fileNum  base.DiskFileNum
      73              :         fileSize uint64 // approx for log files
      74              :         isLocal  bool
      75              : }
      76              : 
      77            1 : func (of *obsoleteFile) needsPacing() bool {
      78            1 :         // We only need to pace local objects--sstables and blob files.
      79            1 :         return of.isLocal && (of.fileType == base.FileTypeTable || of.fileType == base.FileTypeBlob)
      80            1 : }
      81              : 
      82              : type cleanupJob struct {
      83              :         jobID         JobID
      84              :         obsoleteFiles []obsoleteFile
      85              :         stats         obsoleteObjectStats
      86              : }
      87              : 
      88              : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      89              : // The cleanupManager must be Close()d.
      90              : func openCleanupManager(
      91              :         opts *Options, objProvider objstorage.Provider, getDeletePacerInfo func() deletionPacerInfo,
      92            1 : ) *cleanupManager {
      93            1 :         cm := &cleanupManager{
      94            1 :                 opts:        opts,
      95            1 :                 objProvider: objProvider,
      96            1 :                 deletePacer: newDeletionPacer(
      97            1 :                         crtime.NowMono(),
      98            1 :                         opts.FreeSpaceThresholdBytes,
      99            1 :                         int64(opts.TargetByteDeletionRate),
     100            1 :                         opts.FreeSpaceTimeframe,
     101            1 :                         opts.ObsoleteBytesMaxRatio,
     102            1 :                         opts.ObsoleteBytesTimeframe,
     103            1 :                         getDeletePacerInfo,
     104            1 :                 ),
     105            1 :                 jobsCh: make(chan *cleanupJob, jobsQueueDepth),
     106            1 :         }
     107            1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
     108            1 :         cm.waitGroup.Add(1)
     109            1 : 
     110            1 :         go func() {
     111            1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
     112            1 :                         cm.mainLoop()
     113            1 :                 })
     114              :         }()
     115              : 
     116            1 :         return cm
     117              : }
     118              : 
     119              : // Close stops the background goroutine, waiting until all queued jobs are completed.
     120              : // Delete pacing is disabled for the remaining jobs.
     121            1 : func (cm *cleanupManager) Close() {
     122            1 :         close(cm.jobsCh)
     123            1 :         cm.waitGroup.Wait()
     124            1 : }
     125              : 
     126              : // EnqueueJob adds a cleanup job to the manager's queue.
     127              : func (cm *cleanupManager) EnqueueJob(
     128              :         jobID JobID, obsoleteFiles []obsoleteFile, stats obsoleteObjectStats,
     129            1 : ) {
     130            1 :         job := &cleanupJob{
     131            1 :                 jobID:         jobID,
     132            1 :                 obsoleteFiles: obsoleteFiles,
     133            1 :                 stats:         stats,
     134            1 :         }
     135            1 : 
     136            1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     137            1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     138            1 :         // rather than when we get to the job, otherwise the reported bytes will be
     139            1 :         // subject to the throttling rate which defeats the purpose.
     140            1 :         var pacingBytes uint64
     141            1 :         for _, of := range obsoleteFiles {
     142            1 :                 if of.needsPacing() {
     143            1 :                         pacingBytes += of.fileSize
     144            1 :                 }
     145              :         }
     146            1 :         if pacingBytes > 0 {
     147            1 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     148            1 :         }
     149              : 
     150            1 :         cm.mu.Lock()
     151            1 :         cm.mu.totalJobs++
     152            1 :         cm.maybeLogLocked()
     153            1 :         cm.mu.Unlock()
     154            1 : 
     155            1 :         cm.jobsCh <- job
     156              : }
     157              : 
     158              : // Wait until the completion of all jobs that were already queued.
     159              : //
     160              : // Does not wait for jobs that are enqueued during the call.
     161              : //
     162              : // Note that DB.mu should not be held while calling this method; the background
     163              : // goroutine needs to acquire DB.mu to update deleted table metrics.
     164            1 : func (cm *cleanupManager) Wait() {
     165            1 :         cm.mu.Lock()
     166            1 :         defer cm.mu.Unlock()
     167            1 :         n := cm.mu.totalJobs
     168            1 :         for cm.mu.completedJobs < n {
     169            1 :                 cm.mu.completedJobsCond.Wait()
     170            1 :         }
     171              : }
     172              : 
     173              : // mainLoop runs the manager's background goroutine.
     174            1 : func (cm *cleanupManager) mainLoop() {
     175            1 :         defer cm.waitGroup.Done()
     176            1 : 
     177            1 :         var tb tokenbucket.TokenBucket
     178            1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     179            1 :         tb.Init(1.0, 1.0)
     180            1 :         for job := range cm.jobsCh {
     181            1 :                 for _, of := range job.obsoleteFiles {
     182            1 :                         switch of.fileType {
     183            1 :                         case base.FileTypeTable:
     184            1 :                                 cm.maybePace(&tb, &of)
     185            1 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     186            1 :                         case base.FileTypeBlob:
     187            1 :                                 cm.maybePace(&tb, &of)
     188            1 :                                 cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
     189            1 :                         default:
     190            1 :                                 cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
     191              :                         }
     192              :                 }
     193            1 :                 cm.mu.Lock()
     194            1 :                 cm.mu.completedJobs++
     195            1 :                 cm.mu.completedStats.Add(job.stats)
     196            1 :                 cm.mu.completedJobsCond.Broadcast()
     197            1 :                 cm.maybeLogLocked()
     198            1 :                 cm.mu.Unlock()
     199              :         }
     200              : }
     201              : 
     202              : // maybePace sleeps before deleting an object if appropriate. It is always
     203              : // called from the background goroutine.
     204            1 : func (cm *cleanupManager) maybePace(tb *tokenbucket.TokenBucket, of *obsoleteFile) {
     205            1 :         if !of.needsPacing() {
     206            1 :                 return
     207            1 :         }
     208              : 
     209            1 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), of.fileSize)
     210            1 :         if tokens == 0.0 {
     211            1 :                 // The token bucket might be in debt; it could make us wait even for 0
     212            1 :                 // tokens. We don't want that if the pacer decided throttling should be
     213            1 :                 // disabled.
     214            1 :                 return
     215            1 :         }
     216              :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     217              :         // the token bucket accumulates up to one second of unused tokens.
     218            1 :         for {
     219            1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     220            1 :                 if ok {
     221            1 :                         break
     222              :                 }
     223            0 :                 time.Sleep(d)
     224              :         }
     225              : }
     226              : 
     227              : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     228              : func (cm *cleanupManager) deleteObsoleteFile(
     229              :         fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
     230            1 : ) {
     231            1 :         // TODO(peter): need to handle this error, probably by re-adding the
     232            1 :         // file that couldn't be deleted to one of the obsolete slices map.
     233            1 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     234            1 :         if oserror.IsNotExist(err) {
     235            1 :                 return
     236            1 :         }
     237              : 
     238            1 :         switch fileType {
     239            1 :         case base.FileTypeLog:
     240            1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     241            1 :                         JobID:   int(jobID),
     242            1 :                         Path:    path,
     243            1 :                         FileNum: fileNum,
     244            1 :                         Err:     err,
     245            1 :                 })
     246            1 :         case base.FileTypeManifest:
     247            1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     248            1 :                         JobID:   int(jobID),
     249            1 :                         Path:    path,
     250            1 :                         FileNum: fileNum,
     251            1 :                         Err:     err,
     252            1 :                 })
     253            0 :         case base.FileTypeTable, base.FileTypeBlob:
     254            0 :                 panic("invalid deletion of object file")
     255              :         }
     256              : }
     257              : 
     258              : func (cm *cleanupManager) deleteObsoleteObject(
     259              :         fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
     260            1 : ) {
     261            1 :         if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
     262            0 :                 panic("not an object")
     263              :         }
     264              : 
     265            1 :         var path string
     266            1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     267            1 :         if err != nil {
     268            1 :                 path = "<nil>"
     269            1 :         } else {
     270            1 :                 path = cm.objProvider.Path(meta)
     271            1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     272            1 :         }
     273            1 :         if cm.objProvider.IsNotExistError(err) {
     274            1 :                 return
     275            1 :         }
     276              : 
     277            1 :         switch fileType {
     278            1 :         case base.FileTypeTable:
     279            1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     280            1 :                         JobID:   int(jobID),
     281            1 :                         Path:    path,
     282            1 :                         FileNum: fileNum,
     283            1 :                         Err:     err,
     284            1 :                 })
     285            1 :         case base.FileTypeBlob:
     286            1 :                 cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
     287            1 :                         JobID:   int(jobID),
     288            1 :                         Path:    path,
     289            1 :                         FileNum: fileNum,
     290            1 :                         Err:     err,
     291            1 :                 })
     292              :         }
     293              : }
     294              : 
     295              : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     296              : // when the job queue gets back to less than 10% full.
     297              : //
     298              : // Must be called with cm.mu locked.
     299            1 : func (cm *cleanupManager) maybeLogLocked() {
     300            1 :         const highThreshold = jobsQueueDepth * 3 / 4
     301            1 :         const lowThreshold = jobsQueueDepth / 10
     302            1 : 
     303            1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     304            1 : 
     305            1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     306            0 :                 cm.mu.jobsQueueWarningIssued = true
     307            0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     308            0 :         }
     309              : 
     310            1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     311            0 :                 cm.mu.jobsQueueWarningIssued = false
     312            0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     313            0 :         }
     314              : }
     315              : 
     316            1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     317            1 :         var pacerInfo deletionPacerInfo
     318            1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     319            1 :         // but in practice this was observed to take constant time, regardless of
     320            1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     321            1 :         // take 10 microseconds or less.
     322            1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     323            1 :         d.mu.Lock()
     324            1 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     325            1 :         total := d.mu.versions.metrics.Total()
     326            1 :         d.mu.Unlock()
     327            1 :         pacerInfo.liveBytes = uint64(total.AggregateSize())
     328            1 :         return pacerInfo
     329            1 : }
     330              : 
     331              : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     332              : // and adds those to the internal lists of obsolete files. Note that the files
     333              : // are not actually deleted by this method. A subsequent call to
     334              : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     335              : // with compactions and flushes. db.mu must be held when calling this function.
     336            1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     337            1 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     338            1 :         // flushes from interfering. The original value is restored on completion.
     339            1 :         disabledPrev := d.opts.DisableAutomaticCompactions
     340            1 :         defer func() {
     341            1 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     342            1 :         }()
     343            1 :         d.opts.DisableAutomaticCompactions = true
     344            1 : 
     345            1 :         // Wait for any ongoing compaction to complete before continuing.
     346            1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     347            1 :                 d.mu.compact.cond.Wait()
     348            1 :         }
     349              : 
     350            1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     351            1 :         d.mu.versions.addLiveFileNums(liveFileNums)
     352            1 :         // Protect against files which are only referred to by the ingestedFlushable
     353            1 :         // from being deleted. These are added to the flushable queue on WAL replay
     354            1 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     355            1 :         // file scan to avoid double-deleting these files.
     356            1 :         for _, f := range flushableIngests {
     357            1 :                 for _, file := range f.files {
     358            1 :                         liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
     359            1 :                 }
     360              :         }
     361              : 
     362            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     363            1 : 
     364            1 :         var obsoleteTables []obsoleteFile
     365            1 :         var obsoleteBlobs []obsoleteFile
     366            1 :         var obsoleteOptions []obsoleteFile
     367            1 :         var obsoleteManifests []obsoleteFile
     368            1 : 
     369            1 :         for _, filename := range list {
     370            1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     371            1 :                 if !ok {
     372            1 :                         continue
     373              :                 }
     374            1 :                 makeObsoleteFile := func() obsoleteFile {
     375            1 :                         of := obsoleteFile{
     376            1 :                                 fileType: fileType,
     377            1 :                                 fs:       d.opts.FS,
     378            1 :                                 path:     d.opts.FS.PathJoin(d.dirname, filename),
     379            1 :                                 fileNum:  diskFileNum,
     380            1 :                                 isLocal:  true,
     381            1 :                         }
     382            1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     383            1 :                                 of.fileSize = uint64(stat.Size())
     384            1 :                         }
     385            1 :                         return of
     386              :                 }
     387            1 :                 switch fileType {
     388            1 :                 case base.FileTypeManifest:
     389            1 :                         if diskFileNum >= manifestFileNum {
     390            1 :                                 continue
     391              :                         }
     392            1 :                         obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
     393            1 :                 case base.FileTypeOptions:
     394            1 :                         if diskFileNum >= d.optionsFileNum {
     395            1 :                                 continue
     396              :                         }
     397            1 :                         obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
     398            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     399              :                         // Objects are handled through the objstorage provider below.
     400            1 :                 default:
     401              :                         // Don't delete files we don't know about.
     402              :                 }
     403              :         }
     404              : 
     405            1 :         objects := d.objProvider.List()
     406            1 :         for _, obj := range objects {
     407            1 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     408            1 :                         continue
     409              :                 }
     410            1 :                 if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
     411            0 :                         // Ignore object types we don't know about.
     412            0 :                         continue
     413              :                 }
     414            1 :                 of := obsoleteFile{
     415            1 :                         fileType: obj.FileType,
     416            1 :                         fs:       d.opts.FS,
     417            1 :                         path:     base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
     418            1 :                         fileNum:  obj.DiskFileNum,
     419            1 :                         isLocal:  true,
     420            1 :                 }
     421            1 :                 if size, err := d.objProvider.Size(obj); err == nil {
     422            1 :                         of.fileSize = uint64(size)
     423            1 :                 }
     424            1 :                 if obj.FileType == base.FileTypeTable {
     425            1 :                         obsoleteTables = append(obsoleteTables, of)
     426            1 :                 } else {
     427            1 :                         obsoleteBlobs = append(obsoleteBlobs, of)
     428            1 :                 }
     429              :         }
     430              : 
     431            1 :         d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
     432            1 :         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     433            1 :         d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
     434            1 :         d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
     435            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     436              : }
     437              : 
     438              : // disableFileDeletions disables file deletions and then waits for any
     439              : // in-progress deletion to finish. The caller is required to call
     440              : // enableFileDeletions in order to enable file deletions again. It is ok for
     441              : // multiple callers to disable file deletions simultaneously, though they must
     442              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     443              : // (there is an internal reference count on file deletion disablement).
     444              : //
     445              : // d.mu must be held when calling this method.
     446            1 : func (d *DB) disableFileDeletions() {
     447            1 :         d.mu.fileDeletions.disableCount++
     448            1 :         d.mu.Unlock()
     449            1 :         defer d.mu.Lock()
     450            1 :         d.cleanupManager.Wait()
     451            1 : }
     452              : 
     453              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     454              : // is queued if necessary.
     455              : //
     456              : // d.mu must be held when calling this method.
     457            1 : func (d *DB) enableFileDeletions() {
     458            1 :         if d.mu.fileDeletions.disableCount <= 0 {
     459            1 :                 panic("pebble: file deletion disablement invariant violated")
     460              :         }
     461            1 :         d.mu.fileDeletions.disableCount--
     462            1 :         if d.mu.fileDeletions.disableCount > 0 {
     463            0 :                 return
     464            0 :         }
     465            1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     466              : }
     467              : 
     468              : type fileInfo = base.FileInfo
     469              : 
     470              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     471              : //
     472              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     473              : //
     474              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     475              : // cleanup job will be scheduled when file deletions are re-enabled.
     476            1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     477            1 :         if d.mu.fileDeletions.disableCount > 0 {
     478            1 :                 return
     479            1 :         }
     480            1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     481            1 : 
     482            1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     483            1 :         // log that has not had its contents flushed to an sstable.
     484            1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     485            1 :         if err != nil {
     486            0 :                 panic(err)
     487              :         }
     488              : 
     489            1 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     490            1 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     491            1 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     492            1 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     493            1 : 
     494            1 :         // Ensure everything is already sorted. We want determinism for testing, and
     495            1 :         // we need the manifests to be sorted because we want to delete some
     496            1 :         // contiguous prefix of the older manifests.
     497            1 :         if invariants.Enabled {
     498            1 :                 switch {
     499            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
     500            0 :                         d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
     501            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
     502            0 :                         d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
     503            0 :                 case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
     504            0 :                         d.opts.Logger.Fatalf("obsoleteTables is not sorted")
     505            0 :                 case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
     506            0 :                         d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
     507              :                 }
     508              :         }
     509              : 
     510            1 :         var obsoleteManifests []obsoleteFile
     511            1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     512            1 :         if manifestsToDelete > 0 {
     513            1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     514            1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     515            1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     516            0 :                         d.mu.versions.obsoleteManifests = nil
     517            0 :                 }
     518              :         }
     519              : 
     520            1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     521            1 :         d.mu.versions.obsoleteOptions = nil
     522            1 : 
     523            1 :         // Compute the stats for the files being queued for deletion and add them to
     524            1 :         // the running total. These stats will be used during DB.Metrics() to
     525            1 :         // calculate the count and size of pending obsolete files by diffing these
     526            1 :         // stats and the stats reported by the cleanup manager.
     527            1 :         var objectStats obsoleteObjectStats
     528            1 :         objectStats.tablesAll, objectStats.tablesLocal = calculateObsoleteObjectStats(obsoleteTables)
     529            1 :         objectStats.blobFilesAll, objectStats.blobFilesLocal = calculateObsoleteObjectStats(obsoleteBlobs)
     530            1 :         d.mu.fileDeletions.queuedStats.Add(objectStats)
     531            1 :         d.mu.versions.updateObsoleteObjectMetricsLocked()
     532            1 : 
     533            1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     534            1 :         // Note the unusual order: Unlock and then Lock.
     535            1 :         d.mu.Unlock()
     536            1 :         defer d.mu.Lock()
     537            1 : 
     538            1 :         n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
     539            1 :         filesToDelete := make([]obsoleteFile, 0, n)
     540            1 :         filesToDelete = append(filesToDelete, obsoleteManifests...)
     541            1 :         filesToDelete = append(filesToDelete, obsoleteOptions...)
     542            1 :         filesToDelete = append(filesToDelete, obsoleteTables...)
     543            1 :         filesToDelete = append(filesToDelete, obsoleteBlobs...)
     544            1 :         for _, f := range obsoleteLogs {
     545            1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     546            1 :                         fileType: base.FileTypeLog,
     547            1 :                         fs:       f.FS,
     548            1 :                         path:     f.Path,
     549            1 :                         fileNum:  base.DiskFileNum(f.NumWAL),
     550            1 :                         fileSize: f.ApproxFileSize,
     551            1 :                         isLocal:  true,
     552            1 :                 })
     553            1 :         }
     554            1 :         for _, f := range obsoleteTables {
     555            1 :                 d.fileCache.Evict(f.fileNum, base.FileTypeTable)
     556            1 :         }
     557            1 :         for _, f := range obsoleteBlobs {
     558            1 :                 d.fileCache.Evict(f.fileNum, base.FileTypeBlob)
     559            1 :         }
     560            1 :         if len(filesToDelete) > 0 {
     561            1 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete, objectStats)
     562            1 :         }
     563            1 :         if d.opts.private.testingAlwaysWaitForCleanup {
     564            1 :                 d.cleanupManager.Wait()
     565            1 :         }
     566              : }
     567              : 
     568            1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
     569            1 :         d.mu.Lock()
     570            1 :         defer d.mu.Unlock()
     571            1 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
     572            1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     573            1 :         }
     574              : }
     575              : 
     576            1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
     577            1 :         if len(b) == 0 {
     578            1 :                 return a
     579            1 :         }
     580              : 
     581            1 :         a = append(a, b...)
     582            1 :         slices.SortFunc(a, cmpObsoleteFileNumbers)
     583            1 :         return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
     584            1 :                 return a.fileNum == b.fileNum
     585            1 :         })
     586              : }
     587              : 
     588            1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
     589            1 :         return cmp.Compare(a.fileNum, b.fileNum)
     590            1 : }
     591              : 
     592              : // objectInfo describes an object in object storage (either a sstable or a blob
     593              : // file).
     594              : type objectInfo struct {
     595              :         fileInfo
     596              :         isLocal bool
     597              : }
     598              : 
     599            1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
     600            1 :         return obsoleteFile{
     601            1 :                 fileType: fileType,
     602            1 :                 fs:       fs,
     603            1 :                 path:     base.MakeFilepath(fs, dirname, fileType, o.FileNum),
     604            1 :                 fileNum:  o.FileNum,
     605            1 :                 fileSize: o.FileSize,
     606            1 :                 isLocal:  o.isLocal,
     607            1 :         }
     608            1 : }
     609              : 
     610            1 : func calculateObsoleteObjectStats(files []obsoleteFile) (total, local countAndSize) {
     611            1 :         for _, of := range files {
     612            1 :                 if of.isLocal {
     613            1 :                         local.count++
     614            1 :                         local.size += of.fileSize
     615            1 :                 }
     616            1 :                 total.count++
     617            1 :                 total.size += of.fileSize
     618              :         }
     619            1 :         return total, local
     620              : }
     621              : 
     622              : type obsoleteObjectStats struct {
     623              :         tablesLocal    countAndSize
     624              :         tablesAll      countAndSize
     625              :         blobFilesLocal countAndSize
     626              :         blobFilesAll   countAndSize
     627              : }
     628              : 
     629            1 : func (s *obsoleteObjectStats) Add(other obsoleteObjectStats) {
     630            1 :         s.tablesLocal.Add(other.tablesLocal)
     631            1 :         s.tablesAll.Add(other.tablesAll)
     632            1 :         s.blobFilesLocal.Add(other.blobFilesLocal)
     633            1 :         s.blobFilesAll.Add(other.blobFilesAll)
     634            1 : }
     635              : 
     636            1 : func (s *obsoleteObjectStats) Sub(other obsoleteObjectStats) {
     637            1 :         s.tablesLocal.Sub(other.tablesLocal)
     638            1 :         s.tablesAll.Sub(other.tablesAll)
     639            1 :         s.blobFilesLocal.Sub(other.blobFilesLocal)
     640            1 :         s.blobFilesAll.Sub(other.blobFilesAll)
     641            1 : }
     642              : 
     643              : type countAndSize struct {
     644              :         count uint64
     645              :         size  uint64
     646              : }
     647              : 
     648            1 : func (c *countAndSize) Add(other countAndSize) {
     649            1 :         c.count += other.count
     650            1 :         c.size += other.size
     651            1 : }
     652              : 
     653            1 : func (c *countAndSize) Sub(other countAndSize) {
     654            1 :         c.count = invariants.SafeSub(c.count, other.count)
     655            1 :         c.size = invariants.SafeSub(c.size, other.size)
     656            1 : }
     657              : 
     658            1 : func makeZombieObjects() zombieObjects {
     659            1 :         return zombieObjects{
     660            1 :                 objs: make(map[base.DiskFileNum]objectInfo),
     661            1 :         }
     662            1 : }
     663              : 
     664              : // zombieObjects tracks a set of objects that are no longer required by the most
     665              : // recent version of the LSM, but may still need to be accessed by an open
     666              : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
     667              : // may access them are closed.
     668              : type zombieObjects struct {
     669              :         objs       map[base.DiskFileNum]objectInfo
     670              :         totalSize  uint64
     671              :         localSize  uint64
     672              :         localCount uint64
     673              : }
     674              : 
     675              : // Add adds an object to the set of zombie objects.
     676            1 : func (z *zombieObjects) Add(obj objectInfo) {
     677            1 :         if _, ok := z.objs[obj.FileNum]; ok {
     678            0 :                 panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
     679              :         }
     680            1 :         z.objs[obj.FileNum] = obj
     681            1 :         z.totalSize += obj.FileSize
     682            1 :         if obj.isLocal {
     683            1 :                 z.localSize += obj.FileSize
     684            1 :                 z.localCount++
     685            1 :         }
     686              : }
     687              : 
     688              : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
     689            1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
     690            1 :         z.Add(objectInfo{
     691            1 :                 fileInfo: fileInfo{
     692            1 :                         FileNum:  meta.DiskFileNum,
     693            1 :                         FileSize: size,
     694            1 :                 },
     695            1 :                 isLocal: !meta.IsRemote(),
     696            1 :         })
     697            1 : }
     698              : 
     699              : // Count returns the number of zombie objects.
     700            1 : func (z *zombieObjects) Count() int {
     701            1 :         return len(z.objs)
     702            1 : }
     703              : 
     704              : // Extract removes an object from the set of zombie objects, returning the
     705              : // object that was removed.
     706            1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
     707            1 :         obj, ok := z.objs[fileNum]
     708            1 :         if !ok {
     709            0 :                 panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
     710              :         }
     711            1 :         delete(z.objs, fileNum)
     712            1 : 
     713            1 :         // Detect underflow in case we have a bug that causes an object's size to be
     714            1 :         // mutated.
     715            1 :         if z.totalSize < obj.FileSize {
     716            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
     717              :         }
     718            1 :         if obj.isLocal && z.localSize < obj.FileSize {
     719            0 :                 panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
     720              :         }
     721              : 
     722            1 :         z.totalSize -= obj.FileSize
     723            1 :         if obj.isLocal {
     724            1 :                 z.localSize -= obj.FileSize
     725            1 :                 z.localCount--
     726            1 :         }
     727            1 :         return obj
     728              : }
     729              : 
     730              : // TotalSize returns the size of all objects in the set.
     731            1 : func (z *zombieObjects) TotalSize() uint64 {
     732            1 :         return z.totalSize
     733            1 : }
     734              : 
     735              : // LocalStats returns the count and size of all local objects in the set.
     736            1 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
     737            1 :         return z.localCount, z.localSize
     738            1 : }
        

Generated by: LCOV version 2.0-1