LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Hit Total Coverage
Test: 2024-11-16 08:16Z 9ed54bc4 - tests only.lcov Lines: 370 388 95.4 %
Date: 2024-11-16 08:17:17 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "cmp"
       9             :         "context"
      10             :         "runtime/pprof"
      11             :         "slices"
      12             :         "sync"
      13             :         "time"
      14             : 
      15             :         "github.com/cockroachdb/crlib/crtime"
      16             :         "github.com/cockroachdb/errors/oserror"
      17             :         "github.com/cockroachdb/pebble/internal/base"
      18             :         "github.com/cockroachdb/pebble/objstorage"
      19             :         "github.com/cockroachdb/pebble/vfs"
      20             :         "github.com/cockroachdb/pebble/wal"
      21             :         "github.com/cockroachdb/tokenbucket"
      22             : )
      23             : 
      24             : // Cleaner exports the base.Cleaner type.
      25             : type Cleaner = base.Cleaner
      26             : 
      27             : // DeleteCleaner exports the base.DeleteCleaner type.
      28             : type DeleteCleaner = base.DeleteCleaner
      29             : 
      30             : // ArchiveCleaner exports the base.ArchiveCleaner type.
      31             : type ArchiveCleaner = base.ArchiveCleaner
      32             : 
      33             : type cleanupManager struct {
      34             :         opts            *Options
      35             :         objProvider     objstorage.Provider
      36             :         onTableDeleteFn func(fileSize uint64, isLocal bool)
      37             :         deletePacer     *deletionPacer
      38             : 
      39             :         // jobsCh is used as the cleanup job queue.
      40             :         jobsCh chan *cleanupJob
      41             :         // waitGroup is used to wait for the background goroutine to exit.
      42             :         waitGroup sync.WaitGroup
      43             : 
      44             :         mu struct {
      45             :                 sync.Mutex
      46             :                 // totalJobs is the total number of enqueued jobs (completed or in progress).
      47             :                 totalJobs              int
      48             :                 completedJobs          int
      49             :                 completedJobsCond      sync.Cond
      50             :                 jobsQueueWarningIssued bool
      51             :         }
      52             : }
      53             : 
      54             : // We can queue this many jobs before we have to block EnqueueJob.
      55             : const jobsQueueDepth = 1000
      56             : 
      57             : // deletableFile is used for non log files.
      58             : type deletableFile struct {
      59             :         dir      string
      60             :         fileNum  base.DiskFileNum
      61             :         fileSize uint64
      62             :         isLocal  bool
      63             : }
      64             : 
      65             : // obsoleteFile holds information about a file that needs to be deleted soon.
      66             : type obsoleteFile struct {
      67             :         fileType fileType
      68             :         // nonLogFile is populated when fileType != fileTypeLog.
      69             :         nonLogFile deletableFile
      70             :         // logFile is populated when fileType == fileTypeLog.
      71             :         logFile wal.DeletableLog
      72             : }
      73             : 
      74             : type cleanupJob struct {
      75             :         jobID         JobID
      76             :         obsoleteFiles []obsoleteFile
      77             : }
      78             : 
      79             : // openCleanupManager creates a cleanupManager and starts its background goroutine.
      80             : // The cleanupManager must be Close()d.
      81             : func openCleanupManager(
      82             :         opts *Options,
      83             :         objProvider objstorage.Provider,
      84             :         onTableDeleteFn func(fileSize uint64, isLocal bool),
      85             :         getDeletePacerInfo func() deletionPacerInfo,
      86           1 : ) *cleanupManager {
      87           1 :         cm := &cleanupManager{
      88           1 :                 opts:            opts,
      89           1 :                 objProvider:     objProvider,
      90           1 :                 onTableDeleteFn: onTableDeleteFn,
      91           1 :                 deletePacer:     newDeletionPacer(crtime.NowMono(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
      92           1 :                 jobsCh:          make(chan *cleanupJob, jobsQueueDepth),
      93           1 :         }
      94           1 :         cm.mu.completedJobsCond.L = &cm.mu.Mutex
      95           1 :         cm.waitGroup.Add(1)
      96           1 : 
      97           1 :         go func() {
      98           1 :                 pprof.Do(context.Background(), gcLabels, func(context.Context) {
      99           1 :                         cm.mainLoop()
     100           1 :                 })
     101             :         }()
     102             : 
     103           1 :         return cm
     104             : }
     105             : 
     106             : // Close stops the background goroutine, waiting until all queued jobs are completed.
     107             : // Delete pacing is disabled for the remaining jobs.
     108           1 : func (cm *cleanupManager) Close() {
     109           1 :         close(cm.jobsCh)
     110           1 :         cm.waitGroup.Wait()
     111           1 : }
     112             : 
     113             : // EnqueueJob adds a cleanup job to the manager's queue.
     114           1 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
     115           1 :         job := &cleanupJob{
     116           1 :                 jobID:         jobID,
     117           1 :                 obsoleteFiles: obsoleteFiles,
     118           1 :         }
     119           1 : 
     120           1 :         // Report deleted bytes to the pacer, which can use this data to potentially
     121           1 :         // increase the deletion rate to keep up. We want to do this at enqueue time
     122           1 :         // rather than when we get to the job, otherwise the reported bytes will be
     123           1 :         // subject to the throttling rate which defeats the purpose.
     124           1 :         var pacingBytes uint64
     125           1 :         for _, of := range obsoleteFiles {
     126           1 :                 if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
     127           1 :                         pacingBytes += of.nonLogFile.fileSize
     128           1 :                 }
     129             :         }
     130           1 :         if pacingBytes > 0 {
     131           1 :                 cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
     132           1 :         }
     133             : 
     134           1 :         cm.mu.Lock()
     135           1 :         cm.mu.totalJobs++
     136           1 :         cm.maybeLogLocked()
     137           1 :         cm.mu.Unlock()
     138           1 : 
     139           1 :         cm.jobsCh <- job
     140             : }
     141             : 
     142             : // Wait until the completion of all jobs that were already queued.
     143             : //
     144             : // Does not wait for jobs that are enqueued during the call.
     145             : //
     146             : // Note that DB.mu should not be held while calling this method; the background
     147             : // goroutine needs to acquire DB.mu to update deleted table metrics.
     148           1 : func (cm *cleanupManager) Wait() {
     149           1 :         cm.mu.Lock()
     150           1 :         defer cm.mu.Unlock()
     151           1 :         n := cm.mu.totalJobs
     152           1 :         for cm.mu.completedJobs < n {
     153           1 :                 cm.mu.completedJobsCond.Wait()
     154           1 :         }
     155             : }
     156             : 
     157             : // mainLoop runs the manager's background goroutine.
     158           1 : func (cm *cleanupManager) mainLoop() {
     159           1 :         defer cm.waitGroup.Done()
     160           1 : 
     161           1 :         var tb tokenbucket.TokenBucket
     162           1 :         // Use a token bucket with 1 token / second refill rate and 1 token burst.
     163           1 :         tb.Init(1.0, 1.0)
     164           1 :         for job := range cm.jobsCh {
     165           1 :                 for _, of := range job.obsoleteFiles {
     166           1 :                         switch of.fileType {
     167           1 :                         case fileTypeTable:
     168           1 :                                 cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     169           1 :                                 cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
     170           1 :                                 cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
     171           1 :                         case fileTypeLog:
     172           1 :                                 cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
     173           1 :                                         base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
     174           1 :                         default:
     175           1 :                                 path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
     176           1 :                                 cm.deleteObsoleteFile(
     177           1 :                                         cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
     178             :                         }
     179             :                 }
     180           1 :                 cm.mu.Lock()
     181           1 :                 cm.mu.completedJobs++
     182           1 :                 cm.mu.completedJobsCond.Broadcast()
     183           1 :                 cm.maybeLogLocked()
     184           1 :                 cm.mu.Unlock()
     185             :         }
     186             : }
     187             : 
     188             : // fileNumIfSST is read iff fileType is fileTypeTable.
     189           1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
     190           1 :         if fileType != fileTypeTable {
     191           1 :                 return false
     192           1 :         }
     193           1 :         meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
     194           1 :         if err != nil {
     195           1 :                 // The object was already removed from the provider; we won't actually
     196           1 :                 // delete anything, so we don't need to pace.
     197           1 :                 return false
     198           1 :         }
     199             :         // Don't throttle deletion of remote objects.
     200           1 :         return !meta.IsRemote()
     201             : }
     202             : 
     203             : // maybePace sleeps before deleting an object if appropriate. It is always
     204             : // called from the background goroutine.
     205             : func (cm *cleanupManager) maybePace(
     206             :         tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
     207           1 : ) {
     208           1 :         if !cm.needsPacing(fileType, fileNum) {
     209           1 :                 return
     210           1 :         }
     211             : 
     212           1 :         tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
     213           1 :         if tokens == 0.0 {
     214           1 :                 // The token bucket might be in debt; it could make us wait even for 0
     215           1 :                 // tokens. We don't want that if the pacer decided throttling should be
     216           1 :                 // disabled.
     217           1 :                 return
     218           1 :         }
     219             :         // Wait for tokens. We use a token bucket instead of sleeping outright because
     220             :         // the token bucket accumulates up to one second of unused tokens.
     221           1 :         for {
     222           1 :                 ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
     223           1 :                 if ok {
     224           1 :                         break
     225             :                 }
     226           0 :                 time.Sleep(d)
     227             :         }
     228             : }
     229             : 
     230             : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
     231             : func (cm *cleanupManager) deleteObsoleteFile(
     232             :         fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
     233           1 : ) {
     234           1 :         // TODO(peter): need to handle this error, probably by re-adding the
     235           1 :         // file that couldn't be deleted to one of the obsolete slices map.
     236           1 :         err := cm.opts.Cleaner.Clean(fs, fileType, path)
     237           1 :         if oserror.IsNotExist(err) {
     238           0 :                 return
     239           0 :         }
     240             : 
     241           1 :         switch fileType {
     242           1 :         case fileTypeLog:
     243           1 :                 cm.opts.EventListener.WALDeleted(WALDeleteInfo{
     244           1 :                         JobID:   int(jobID),
     245           1 :                         Path:    path,
     246           1 :                         FileNum: fileNum,
     247           1 :                         Err:     err,
     248           1 :                 })
     249           1 :         case fileTypeManifest:
     250           1 :                 cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
     251           1 :                         JobID:   int(jobID),
     252           1 :                         Path:    path,
     253           1 :                         FileNum: fileNum,
     254           1 :                         Err:     err,
     255           1 :                 })
     256           0 :         case fileTypeTable:
     257           0 :                 panic("invalid deletion of object file")
     258             :         }
     259             : }
     260             : 
     261             : func (cm *cleanupManager) deleteObsoleteObject(
     262             :         fileType fileType, jobID JobID, fileNum base.DiskFileNum,
     263           1 : ) {
     264           1 :         if fileType != fileTypeTable {
     265           0 :                 panic("not an object")
     266             :         }
     267             : 
     268           1 :         var path string
     269           1 :         meta, err := cm.objProvider.Lookup(fileType, fileNum)
     270           1 :         if err != nil {
     271           1 :                 path = "<nil>"
     272           1 :         } else {
     273           1 :                 path = cm.objProvider.Path(meta)
     274           1 :                 err = cm.objProvider.Remove(fileType, fileNum)
     275           1 :         }
     276           1 :         if cm.objProvider.IsNotExistError(err) {
     277           1 :                 return
     278           1 :         }
     279             : 
     280           1 :         switch fileType {
     281           1 :         case fileTypeTable:
     282           1 :                 cm.opts.EventListener.TableDeleted(TableDeleteInfo{
     283           1 :                         JobID:   int(jobID),
     284           1 :                         Path:    path,
     285           1 :                         FileNum: fileNum,
     286           1 :                         Err:     err,
     287           1 :                 })
     288             :         }
     289             : }
     290             : 
     291             : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
     292             : // when the job queue gets back to less than 10% full.
     293             : //
     294             : // Must be called with cm.mu locked.
     295           1 : func (cm *cleanupManager) maybeLogLocked() {
     296           1 :         const highThreshold = jobsQueueDepth * 3 / 4
     297           1 :         const lowThreshold = jobsQueueDepth / 10
     298           1 : 
     299           1 :         jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
     300           1 : 
     301           1 :         if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
     302           0 :                 cm.mu.jobsQueueWarningIssued = true
     303           0 :                 cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
     304           0 :         }
     305             : 
     306           1 :         if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
     307           0 :                 cm.mu.jobsQueueWarningIssued = false
     308           0 :                 cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
     309           0 :         }
     310             : }
     311             : 
     312           1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
     313           1 :         var pacerInfo deletionPacerInfo
     314           1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
     315           1 :         // but in practice this was observed to take constant time, regardless of
     316           1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
     317           1 :         // take 10 microseconds or less.
     318           1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
     319           1 :         d.mu.Lock()
     320           1 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
     321           1 :         pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
     322           1 :         d.mu.Unlock()
     323           1 :         return pacerInfo
     324           1 : }
     325             : 
     326             : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
     327           1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
     328           1 :         d.mu.Lock()
     329           1 :         d.mu.versions.metrics.Table.ObsoleteCount--
     330           1 :         d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
     331           1 :         if isLocal {
     332           1 :                 d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
     333           1 :         }
     334           1 :         d.mu.Unlock()
     335             : }
     336             : 
     337             : // scanObsoleteFiles scans the filesystem for files that are no longer needed
     338             : // and adds those to the internal lists of obsolete files. Note that the files
     339             : // are not actually deleted by this method. A subsequent call to
     340             : // deleteObsoleteFiles must be performed. Must be not be called concurrently
     341             : // with compactions and flushes. db.mu must be held when calling this function.
     342           1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     343           1 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
     344           1 :         // flushes from interfering. The original value is restored on completion.
     345           1 :         disabledPrev := d.opts.DisableAutomaticCompactions
     346           1 :         defer func() {
     347           1 :                 d.opts.DisableAutomaticCompactions = disabledPrev
     348           1 :         }()
     349           1 :         d.opts.DisableAutomaticCompactions = true
     350           1 : 
     351           1 :         // Wait for any ongoing compaction to complete before continuing.
     352           1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     353           1 :                 d.mu.compact.cond.Wait()
     354           1 :         }
     355             : 
     356           1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     357           1 :         d.mu.versions.addLiveFileNums(liveFileNums)
     358           1 :         // Protect against files which are only referred to by the ingestedFlushable
     359           1 :         // from being deleted. These are added to the flushable queue on WAL replay
     360           1 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     361           1 :         // file scan to avoid double-deleting these files.
     362           1 :         for _, f := range flushableIngests {
     363           1 :                 for _, file := range f.files {
     364           1 :                         liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
     365           1 :                 }
     366             :         }
     367             : 
     368           1 :         manifestFileNum := d.mu.versions.manifestFileNum
     369           1 : 
     370           1 :         var obsoleteTables []tableInfo
     371           1 :         var obsoleteManifests []fileInfo
     372           1 :         var obsoleteOptions []fileInfo
     373           1 : 
     374           1 :         for _, filename := range list {
     375           1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     376           1 :                 if !ok {
     377           1 :                         continue
     378             :                 }
     379           1 :                 switch fileType {
     380           1 :                 case fileTypeManifest:
     381           1 :                         if diskFileNum >= manifestFileNum {
     382           1 :                                 continue
     383             :                         }
     384           1 :                         fi := fileInfo{FileNum: diskFileNum}
     385           1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     386           1 :                                 fi.FileSize = uint64(stat.Size())
     387           1 :                         }
     388           1 :                         obsoleteManifests = append(obsoleteManifests, fi)
     389           1 :                 case fileTypeOptions:
     390           1 :                         if diskFileNum >= d.optionsFileNum {
     391           1 :                                 continue
     392             :                         }
     393           1 :                         fi := fileInfo{FileNum: diskFileNum}
     394           1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     395           1 :                                 fi.FileSize = uint64(stat.Size())
     396           1 :                         }
     397           1 :                         obsoleteOptions = append(obsoleteOptions, fi)
     398           1 :                 case fileTypeTable:
     399             :                         // Objects are handled through the objstorage provider below.
     400           1 :                 default:
     401             :                         // Don't delete files we don't know about.
     402             :                 }
     403             :         }
     404             : 
     405           1 :         objects := d.objProvider.List()
     406           1 :         for _, obj := range objects {
     407           1 :                 switch obj.FileType {
     408           1 :                 case fileTypeTable:
     409           1 :                         if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     410           1 :                                 continue
     411             :                         }
     412           1 :                         fileInfo := fileInfo{
     413           1 :                                 FileNum: obj.DiskFileNum,
     414           1 :                         }
     415           1 :                         if size, err := d.objProvider.Size(obj); err == nil {
     416           1 :                                 fileInfo.FileSize = uint64(size)
     417           1 :                         }
     418           1 :                         obsoleteTables = append(obsoleteTables, tableInfo{
     419           1 :                                 fileInfo: fileInfo,
     420           1 :                                 isLocal:  !obj.IsRemote(),
     421           1 :                         })
     422             : 
     423           0 :                 default:
     424             :                         // Ignore object types we don't know about.
     425             :                 }
     426             :         }
     427             : 
     428           1 :         d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
     429           1 :         d.mu.versions.updateObsoleteTableMetricsLocked()
     430           1 :         d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
     431           1 :         d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
     432             : }
     433             : 
     434             : // disableFileDeletions disables file deletions and then waits for any
     435             : // in-progress deletion to finish. The caller is required to call
     436             : // enableFileDeletions in order to enable file deletions again. It is ok for
     437             : // multiple callers to disable file deletions simultaneously, though they must
     438             : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     439             : // (there is an internal reference count on file deletion disablement).
     440             : //
     441             : // d.mu must be held when calling this method.
     442           1 : func (d *DB) disableFileDeletions() {
     443           1 :         d.mu.disableFileDeletions++
     444           1 :         d.mu.Unlock()
     445           1 :         defer d.mu.Lock()
     446           1 :         d.cleanupManager.Wait()
     447           1 : }
     448             : 
     449             : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     450             : // is queued if necessary.
     451             : //
     452             : // d.mu must be held when calling this method.
     453           1 : func (d *DB) enableFileDeletions() {
     454           1 :         if d.mu.disableFileDeletions <= 0 {
     455           1 :                 panic("pebble: file deletion disablement invariant violated")
     456             :         }
     457           1 :         d.mu.disableFileDeletions--
     458           1 :         if d.mu.disableFileDeletions > 0 {
     459           0 :                 return
     460           0 :         }
     461           1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     462             : }
     463             : 
     464             : type fileInfo = base.FileInfo
     465             : 
     466             : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     467             : //
     468             : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     469             : //
     470             : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     471             : // cleanup job will be scheduled when file deletions are re-enabled.
     472           1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     473           1 :         if d.mu.disableFileDeletions > 0 {
     474           1 :                 return
     475           1 :         }
     476           1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     477           1 : 
     478           1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     479           1 :         // log that has not had its contents flushed to an sstable.
     480           1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     481           1 :         if err != nil {
     482           0 :                 panic(err)
     483             :         }
     484             : 
     485           1 :         obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
     486           1 :         d.mu.versions.obsoleteTables = nil
     487           1 : 
     488           1 :         for _, tbl := range obsoleteTables {
     489           1 :                 delete(d.mu.versions.zombieTables, tbl.FileNum)
     490           1 :         }
     491             : 
     492             :         // Sort the manifests cause we want to delete some contiguous prefix
     493             :         // of the older manifests.
     494           1 :         slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
     495           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     496           1 :         })
     497             : 
     498           1 :         var obsoleteManifests []fileInfo
     499           1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     500           1 :         if manifestsToDelete > 0 {
     501           1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     502           1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     503           1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     504           0 :                         d.mu.versions.obsoleteManifests = nil
     505           0 :                 }
     506             :         }
     507             : 
     508           1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     509           1 :         d.mu.versions.obsoleteOptions = nil
     510           1 : 
     511           1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     512           1 :         // Note the unusual order: Unlock and then Lock.
     513           1 :         d.mu.Unlock()
     514           1 :         defer d.mu.Lock()
     515           1 : 
     516           1 :         filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
     517           1 :         for _, f := range obsoleteLogs {
     518           1 :                 filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
     519           1 :         }
     520             :         // We sort to make the order of deletions deterministic, which is nice for
     521             :         // tests.
     522           1 :         slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
     523           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     524           1 :         })
     525           1 :         for _, f := range obsoleteTables {
     526           1 :                 d.tableCache.evict(f.FileNum)
     527           1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
     528           1 :                         fileType: fileTypeTable,
     529           1 :                         nonLogFile: deletableFile{
     530           1 :                                 dir:      d.dirname,
     531           1 :                                 fileNum:  f.FileNum,
     532           1 :                                 fileSize: f.FileSize,
     533           1 :                                 isLocal:  f.isLocal,
     534           1 :                         },
     535           1 :                 })
     536           1 :         }
     537           1 :         files := [2]struct {
     538           1 :                 fileType fileType
     539           1 :                 obsolete []fileInfo
     540           1 :         }{
     541           1 :                 {fileTypeManifest, obsoleteManifests},
     542           1 :                 {fileTypeOptions, obsoleteOptions},
     543           1 :         }
     544           1 :         for _, f := range files {
     545           1 :                 // We sort to make the order of deletions deterministic, which is nice for
     546           1 :                 // tests.
     547           1 :                 slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
     548           1 :                         return cmp.Compare(a.FileNum, b.FileNum)
     549           1 :                 })
     550           1 :                 for _, fi := range f.obsolete {
     551           1 :                         dir := d.dirname
     552           1 :                         filesToDelete = append(filesToDelete, obsoleteFile{
     553           1 :                                 fileType: f.fileType,
     554           1 :                                 nonLogFile: deletableFile{
     555           1 :                                         dir:      dir,
     556           1 :                                         fileNum:  fi.FileNum,
     557           1 :                                         fileSize: fi.FileSize,
     558           1 :                                         isLocal:  true,
     559           1 :                                 },
     560           1 :                         })
     561           1 :                 }
     562             :         }
     563           1 :         if len(filesToDelete) > 0 {
     564           1 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete)
     565           1 :         }
     566           1 :         if d.opts.private.testingAlwaysWaitForCleanup {
     567           1 :                 d.cleanupManager.Wait()
     568           1 :         }
     569             : }
     570             : 
     571           1 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
     572           1 :         d.mu.Lock()
     573           1 :         defer d.mu.Unlock()
     574           1 :         d.maybeScheduleObsoleteTableDeletionLocked()
     575           1 : }
     576             : 
     577           1 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
     578           1 :         if len(d.mu.versions.obsoleteTables) > 0 {
     579           1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     580           1 :         }
     581             : }
     582             : 
     583           1 : func merge(a, b []fileInfo) []fileInfo {
     584           1 :         if len(b) == 0 {
     585           1 :                 return a
     586           1 :         }
     587             : 
     588           1 :         a = append(a, b...)
     589           1 :         slices.SortFunc(a, func(a, b fileInfo) int {
     590           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     591           1 :         })
     592           1 :         return slices.CompactFunc(a, func(a, b fileInfo) bool {
     593           1 :                 return a.FileNum == b.FileNum
     594           1 :         })
     595             : }
     596             : 
     597           1 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
     598           1 :         if len(b) == 0 {
     599           1 :                 return a
     600           1 :         }
     601             : 
     602           1 :         a = append(a, b...)
     603           1 :         slices.SortFunc(a, func(a, b tableInfo) int {
     604           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     605           1 :         })
     606           1 :         return slices.CompactFunc(a, func(a, b tableInfo) bool {
     607           1 :                 return a.FileNum == b.FileNum
     608           1 :         })
     609             : }

Generated by: LCOV version 1.14