LCOV - code coverage report
Current view: top level - pebble - obsolete_files.go (source / functions) Coverage Total Hit
Test: 2025-11-23 08:19Z d7ce913e - meta test only.lcov Lines: 87.5 % 280 245
Test Date: 2025-11-23 08:20:36 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "slices"
      10              : 
      11              :         "github.com/cockroachdb/errors"
      12              :         "github.com/cockroachdb/errors/oserror"
      13              :         "github.com/cockroachdb/pebble/internal/base"
      14              :         "github.com/cockroachdb/pebble/internal/deletepacer"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/metrics"
      17              :         "github.com/cockroachdb/pebble/objstorage"
      18              :         "github.com/cockroachdb/pebble/vfs"
      19              :         "github.com/cockroachdb/pebble/wal"
      20              : )
      21              : 
      22              : // Cleaner exports the base.Cleaner type.
      23              : type Cleaner = base.Cleaner
      24              : 
      25              : // DeleteCleaner exports the base.DeleteCleaner type.
      26              : type DeleteCleaner = base.DeleteCleaner
      27              : 
      28              : // ArchiveCleaner exports the base.ArchiveCleaner type.
      29              : type ArchiveCleaner = base.ArchiveCleaner
      30              : 
      31              : func openDeletePacer(
      32              :         opts *Options, objProvider objstorage.Provider, diskFreeSpaceFn deletepacer.DiskFreeSpaceFn,
      33            1 : ) *deletepacer.DeletePacer {
      34            1 :         return deletepacer.Open(
      35            1 :                 opts.DeletionPacing,
      36            1 :                 opts.Logger,
      37            1 :                 diskFreeSpaceFn,
      38            1 :                 func(of deletepacer.ObsoleteFile, jobID int) {
      39            1 :                         deleteObsoleteFile(opts.Cleaner, objProvider, opts.EventListener, of, jobID)
      40            1 :                 },
      41              :         )
      42              : }
      43              : 
      44              : // deleteObsoleteFile deletes a file or object, once the delete pacer decided it is time.
      45              : func deleteObsoleteFile(
      46              :         cleaner Cleaner,
      47              :         objProvider objstorage.Provider,
      48              :         eventListener *EventListener,
      49              :         of deletepacer.ObsoleteFile,
      50              :         jobID int,
      51            1 : ) {
      52            1 :         path := of.Path
      53            1 :         var err error
      54            1 :         if of.FileType == base.FileTypeTable || of.FileType == base.FileTypeBlob {
      55            1 :                 var meta objstorage.ObjectMetadata
      56            1 :                 meta, err = objProvider.Lookup(of.FileType, of.FileNum)
      57            1 :                 if err != nil {
      58            0 :                         path = "<nil>"
      59            1 :                 } else {
      60            1 :                         path = objProvider.Path(meta)
      61            1 :                         err = objProvider.Remove(of.FileType, of.FileNum)
      62            1 :                 }
      63            1 :                 if objProvider.IsNotExistError(err) {
      64            0 :                         return
      65            0 :                 }
      66            1 :         } else {
      67            1 :                 // TODO(peter): need to handle this error, probably by re-adding the
      68            1 :                 // file that couldn't be deleted to one of the obsolete slices map.
      69            1 :                 err := cleaner.Clean(of.FS, of.FileType, path)
      70            1 :                 if oserror.IsNotExist(err) {
      71            0 :                         return
      72            0 :                 }
      73              :         }
      74              : 
      75            1 :         switch of.FileType {
      76            1 :         case base.FileTypeTable:
      77            1 :                 eventListener.TableDeleted(TableDeleteInfo{
      78            1 :                         JobID:   jobID,
      79            1 :                         Path:    path,
      80            1 :                         FileNum: of.FileNum,
      81            1 :                         Err:     err,
      82            1 :                 })
      83            1 :         case base.FileTypeBlob:
      84            1 :                 eventListener.BlobFileDeleted(BlobFileDeleteInfo{
      85            1 :                         JobID:   jobID,
      86            1 :                         Path:    path,
      87            1 :                         FileNum: of.FileNum,
      88            1 :                         Err:     err,
      89            1 :                 })
      90            1 :         case base.FileTypeLog:
      91            1 :                 eventListener.WALDeleted(WALDeleteInfo{
      92            1 :                         JobID:   jobID,
      93            1 :                         Path:    path,
      94            1 :                         FileNum: of.FileNum,
      95            1 :                         Err:     err,
      96            1 :                 })
      97            1 :         case base.FileTypeManifest:
      98            1 :                 eventListener.ManifestDeleted(ManifestDeleteInfo{
      99            1 :                         JobID:   jobID,
     100            1 :                         Path:    path,
     101            1 :                         FileNum: of.FileNum,
     102            1 :                         Err:     err,
     103            1 :                 })
     104              :         }
     105              : }
     106              : 
     107              : // scanObsoleteFiles compares the provided directory listing to the set of
     108              : // known, in-use files to find files no longer needed and adds those to the
     109              : // internal lists of obsolete files. Note that the files are not actually
     110              : // deleted by this method. A subsequent call to deleteObsoleteFiles must be
     111              : // performed. Must be not be called concurrently with compactions and flushes
     112              : // and will panic if any are in-progress. db.mu must be held when calling this
     113              : // function.
     114            1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
     115            1 :         if d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
     116            0 :                 panic(errors.AssertionFailedf("compaction or flush in progress"))
     117              :         }
     118              : 
     119            1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
     120            1 :         d.mu.versions.addLiveFileNums(liveFileNums)
     121            1 :         // Protect against files which are only referred to by the ingestedFlushable
     122            1 :         // from being deleted. These are added to the flushable queue on WAL replay
     123            1 :         // and handle their own obsoletion/deletion. We exclude them from this obsolete
     124            1 :         // file scan to avoid double-deleting these files.
     125            1 :         for _, f := range flushableIngests {
     126            1 :                 for _, file := range f.files {
     127            1 :                         liveFileNums[file.TableBacking.DiskFileNum] = struct{}{}
     128            1 :                 }
     129              :         }
     130              : 
     131            1 :         manifestFileNum := d.mu.versions.manifestFileNum
     132            1 : 
     133            1 :         var obsoleteTables []deletepacer.ObsoleteFile
     134            1 :         var obsoleteBlobs []deletepacer.ObsoleteFile
     135            1 :         var obsoleteOptions []deletepacer.ObsoleteFile
     136            1 :         var obsoleteManifests []deletepacer.ObsoleteFile
     137            1 : 
     138            1 :         for _, filename := range list {
     139            1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
     140            1 :                 if !ok {
     141            1 :                         continue
     142              :                 }
     143            1 :                 makeObsoleteFile := func() deletepacer.ObsoleteFile {
     144            1 :                         of := deletepacer.ObsoleteFile{
     145            1 :                                 FileType:  fileType,
     146            1 :                                 FS:        d.opts.FS,
     147            1 :                                 Path:      d.opts.FS.PathJoin(d.dirname, filename),
     148            1 :                                 FileNum:   diskFileNum,
     149            1 :                                 Placement: base.Local,
     150            1 :                         }
     151            1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
     152            0 :                                 of.FileSize = uint64(stat.Size())
     153            0 :                         }
     154            1 :                         return of
     155              :                 }
     156            1 :                 switch fileType {
     157            1 :                 case base.FileTypeManifest:
     158            1 :                         if diskFileNum >= manifestFileNum {
     159            1 :                                 continue
     160              :                         }
     161            1 :                         obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
     162            1 :                 case base.FileTypeOptions:
     163            1 :                         if diskFileNum >= d.optionsFileNum {
     164            0 :                                 continue
     165              :                         }
     166            1 :                         obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
     167            1 :                 case base.FileTypeTable, base.FileTypeBlob:
     168              :                         // Objects are handled through the objstorage provider below.
     169            1 :                 default:
     170              :                         // Don't delete files we don't know about.
     171              :                 }
     172              :         }
     173              : 
     174            1 :         objects := d.objProvider.List()
     175            1 :         for _, obj := range objects {
     176            1 :                 if _, ok := liveFileNums[obj.DiskFileNum]; ok {
     177            1 :                         continue
     178              :                 }
     179            1 :                 if obj.FileType != base.FileTypeTable && obj.FileType != base.FileTypeBlob {
     180            0 :                         // Ignore object types we don't know about.
     181            0 :                         continue
     182              :                 }
     183            1 :                 of := deletepacer.ObsoleteFile{
     184            1 :                         FileType:  obj.FileType,
     185            1 :                         FS:        d.opts.FS,
     186            1 :                         Path:      base.MakeFilepath(d.opts.FS, d.dirname, obj.FileType, obj.DiskFileNum),
     187            1 :                         FileNum:   obj.DiskFileNum,
     188            1 :                         Placement: obj.Placement(),
     189            1 :                 }
     190            1 :                 if size, err := d.objProvider.Size(obj); err == nil {
     191            1 :                         of.FileSize = uint64(size)
     192            1 :                 }
     193            1 :                 if obj.FileType == base.FileTypeTable {
     194            1 :                         obsoleteTables = append(obsoleteTables, of)
     195            1 :                 } else {
     196            1 :                         obsoleteBlobs = append(obsoleteBlobs, of)
     197            1 :                 }
     198              :         }
     199              : 
     200            1 :         d.mu.versions.obsoleteTables = mergeObsoleteFiles(d.mu.versions.obsoleteTables, obsoleteTables)
     201            1 :         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
     202            1 :         d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
     203            1 :         d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
     204              : }
     205              : 
     206              : // disableFileDeletions disables file deletions and then waits for any
     207              : // in-progress deletion to finish. The caller is required to call
     208              : // enableFileDeletions in order to enable file deletions again. It is ok for
     209              : // multiple callers to disable file deletions simultaneously, though they must
     210              : // all invoke enableFileDeletions in order for file deletions to be re-enabled
     211              : // (there is an internal reference count on file deletion disablement).
     212              : //
     213              : // d.mu must be held when calling this method.
     214            1 : func (d *DB) disableFileDeletions() {
     215            1 :         d.mu.fileDeletions.disableCount++
     216            1 :         d.mu.Unlock()
     217            1 :         defer d.mu.Lock()
     218            1 :         d.deletePacer.WaitForTesting()
     219            1 : }
     220              : 
     221              : // enableFileDeletions enables previously disabled file deletions. A cleanup job
     222              : // is queued if necessary.
     223              : //
     224              : // d.mu must be held when calling this method.
     225            1 : func (d *DB) enableFileDeletions() {
     226            1 :         if d.mu.fileDeletions.disableCount <= 0 {
     227            0 :                 panic("pebble: file deletion disablement invariant violated")
     228              :         }
     229            1 :         d.mu.fileDeletions.disableCount--
     230            1 :         if d.mu.fileDeletions.disableCount > 0 {
     231            0 :                 return
     232            0 :         }
     233            1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
     234              : }
     235              : 
     236              : type fileInfo = base.FileInfo
     237              : 
     238              : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
     239              : //
     240              : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
     241              : //
     242              : // Does nothing if file deletions are disabled (see disableFileDeletions). A
     243              : // cleanup job will be scheduled when file deletions are re-enabled.
     244            1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
     245            1 :         if d.mu.fileDeletions.disableCount > 0 {
     246            1 :                 return
     247            1 :         }
     248            1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
     249            1 : 
     250            1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
     251            1 :         // log that has not had its contents flushed to an sstable.
     252            1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
     253            1 :         if err != nil {
     254            0 :                 panic(err)
     255              :         }
     256              : 
     257            1 :         obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
     258            1 :         d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
     259            1 :         obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
     260            1 :         d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
     261            1 : 
     262            1 :         // Ensure everything is already sorted. We want determinism for testing, and
     263            1 :         // we need the manifests to be sorted because we want to delete some
     264            1 :         // contiguous prefix of the older manifests.
     265            1 :         if invariants.Enabled {
     266            1 :                 switch {
     267            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
     268            0 :                         d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
     269            0 :                 case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
     270            0 :                         d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
     271            0 :                 case !slices.IsSortedFunc(obsoleteTables, cmpObsoleteFileNumbers):
     272            0 :                         d.opts.Logger.Fatalf("obsoleteTables is not sorted")
     273            0 :                 case !slices.IsSortedFunc(obsoleteBlobs, cmpObsoleteFileNumbers):
     274            0 :                         d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
     275              :                 }
     276              :         }
     277              : 
     278            1 :         var obsoleteManifests []deletepacer.ObsoleteFile
     279            1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
     280            1 :         if manifestsToDelete > 0 {
     281            1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
     282            1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
     283            1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
     284            0 :                         d.mu.versions.obsoleteManifests = nil
     285            0 :                 }
     286              :         }
     287              : 
     288            1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
     289            1 :         d.mu.versions.obsoleteOptions = nil
     290            1 : 
     291            1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
     292            1 :         // Note the unusual order: Unlock and then Lock.
     293            1 :         d.mu.Unlock()
     294            1 :         defer d.mu.Lock()
     295            1 : 
     296            1 :         n := len(obsoleteLogs) + len(obsoleteTables) + len(obsoleteBlobs) + len(obsoleteManifests) + len(obsoleteOptions)
     297            1 :         filesToDelete := make([]deletepacer.ObsoleteFile, 0, n)
     298            1 :         filesToDelete = append(filesToDelete, obsoleteManifests...)
     299            1 :         filesToDelete = append(filesToDelete, obsoleteOptions...)
     300            1 :         filesToDelete = append(filesToDelete, obsoleteTables...)
     301            1 :         filesToDelete = append(filesToDelete, obsoleteBlobs...)
     302            1 :         for _, f := range obsoleteLogs {
     303            1 :                 filesToDelete = append(filesToDelete, deletepacer.ObsoleteFile{
     304            1 :                         FileType:  base.FileTypeLog,
     305            1 :                         FS:        f.FS,
     306            1 :                         Path:      f.Path,
     307            1 :                         FileNum:   base.DiskFileNum(f.NumWAL),
     308            1 :                         FileSize:  f.ApproxFileSize,
     309            1 :                         Placement: base.Local,
     310            1 :                 })
     311            1 :         }
     312            1 :         for _, f := range obsoleteTables {
     313            1 :                 d.fileCache.Evict(f.FileNum, base.FileTypeTable)
     314            1 :         }
     315            1 :         for _, f := range obsoleteBlobs {
     316            1 :                 d.fileCache.Evict(f.FileNum, base.FileTypeBlob)
     317            1 :         }
     318            1 :         if len(filesToDelete) > 0 {
     319            1 :                 d.deletePacer.Enqueue(int(jobID), filesToDelete...)
     320            1 :         }
     321            1 :         if d.opts.private.testingAlwaysWaitForCleanup {
     322            0 :                 d.deletePacer.WaitForTesting()
     323            0 :         }
     324              : }
     325              : 
     326            1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
     327            1 :         d.mu.Lock()
     328            1 :         defer d.mu.Unlock()
     329            1 :         if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
     330            1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
     331            1 :         }
     332              : }
     333              : 
     334            1 : func mergeObsoleteFiles(a, b []deletepacer.ObsoleteFile) []deletepacer.ObsoleteFile {
     335            1 :         if len(b) == 0 {
     336            1 :                 return a
     337            1 :         }
     338              : 
     339            1 :         a = append(a, b...)
     340            1 :         slices.SortFunc(a, cmpObsoleteFileNumbers)
     341            1 :         return slices.CompactFunc(a, func(a, b deletepacer.ObsoleteFile) bool {
     342            1 :                 return a.FileNum == b.FileNum
     343            1 :         })
     344              : }
     345              : 
     346            1 : func cmpObsoleteFileNumbers(a, b deletepacer.ObsoleteFile) int {
     347            1 :         return cmp.Compare(a.FileNum, b.FileNum)
     348            1 : }
     349              : 
     350              : // objectInfo describes an object in object storage (either a sstable or a blob
     351              : // file).
     352              : type objectInfo struct {
     353              :         fileInfo
     354              :         placement base.Placement
     355              : }
     356              : 
     357              : func (o objectInfo) asObsoleteFile(
     358              :         fs vfs.FS, fileType base.FileType, dirname string,
     359            1 : ) deletepacer.ObsoleteFile {
     360            1 :         return deletepacer.ObsoleteFile{
     361            1 :                 FileType:  fileType,
     362            1 :                 FS:        fs,
     363            1 :                 Path:      base.MakeFilepath(fs, dirname, fileType, o.FileNum),
     364            1 :                 FileNum:   o.FileNum,
     365            1 :                 FileSize:  o.FileSize,
     366            1 :                 Placement: o.placement,
     367            1 :         }
     368            1 : }
     369              : 
     370            1 : func makeZombieObjects() zombieObjects {
     371            1 :         return zombieObjects{
     372            1 :                 objs: make(map[base.DiskFileNum]objectInfo),
     373            1 :         }
     374            1 : }
     375              : 
     376              : // zombieObjects tracks a set of objects that are no longer required by the most
     377              : // recent version of the LSM, but may still need to be accessed by an open
     378              : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
     379              : // may access them are closed.
     380              : type zombieObjects struct {
     381              :         objs map[base.DiskFileNum]objectInfo
     382              : }
     383              : 
     384              : // Add adds an object to the set of zombie objects.
     385            1 : func (z *zombieObjects) Add(obj objectInfo) {
     386            1 :         if _, ok := z.objs[obj.FileNum]; ok {
     387            0 :                 panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
     388              :         }
     389            1 :         z.objs[obj.FileNum] = obj
     390              : }
     391              : 
     392              : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
     393            1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
     394            1 :         z.Add(objectInfo{
     395            1 :                 fileInfo: fileInfo{
     396            1 :                         FileNum:  meta.DiskFileNum,
     397            1 :                         FileSize: size,
     398            1 :                 },
     399            1 :                 placement: meta.Placement(),
     400            1 :         })
     401            1 : }
     402              : 
     403              : // Count returns the number of zombie objects.
     404            1 : func (z *zombieObjects) Count() int {
     405            1 :         return len(z.objs)
     406            1 : }
     407              : 
     408              : // Extract removes an object from the set of zombie objects, returning the
     409              : // object that was removed.
     410            1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
     411            1 :         obj, ok := z.objs[fileNum]
     412            1 :         if !ok {
     413            0 :                 panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
     414              :         }
     415            1 :         delete(z.objs, fileNum)
     416            1 :         return obj
     417              : }
     418              : 
     419              : // Metrics returns the count and size of all objects in the set, broken down by placement.
     420            0 : func (z *zombieObjects) Metrics() metrics.CountAndSizeByPlacement {
     421            0 :         var res metrics.CountAndSizeByPlacement
     422            0 :         for _, obj := range z.objs {
     423            0 :                 res.Inc(obj.FileSize, obj.placement)
     424            0 :         }
     425            0 :         return res
     426              : }
        

Generated by: LCOV version 2.0-1