Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/errors/oserror"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/vfs"
19 : "github.com/cockroachdb/pebble/wal"
20 : "github.com/cockroachdb/tokenbucket"
21 : )
22 :
23 : // Cleaner exports the base.Cleaner type.
24 : type Cleaner = base.Cleaner
25 :
26 : // DeleteCleaner exports the base.DeleteCleaner type.
27 : type DeleteCleaner = base.DeleteCleaner
28 :
29 : // ArchiveCleaner exports the base.ArchiveCleaner type.
30 : type ArchiveCleaner = base.ArchiveCleaner
31 :
32 : type cleanupManager struct {
33 : opts *Options
34 : objProvider objstorage.Provider
35 : onTableDeleteFn func(fileSize uint64, isLocal bool)
36 : deletePacer *deletionPacer
37 :
38 : // jobsCh is used as the cleanup job queue.
39 : jobsCh chan *cleanupJob
40 : // waitGroup is used to wait for the background goroutine to exit.
41 : waitGroup sync.WaitGroup
42 :
43 : mu struct {
44 : sync.Mutex
45 : // totalJobs is the total number of enqueued jobs (completed or in progress).
46 : totalJobs int
47 : completedJobs int
48 : completedJobsCond sync.Cond
49 : jobsQueueWarningIssued bool
50 : }
51 : }
52 :
53 : // We can queue this many jobs before we have to block EnqueueJob.
54 : const jobsQueueDepth = 1000
55 :
56 : // deletableFile is used for non log files.
57 : type deletableFile struct {
58 : dir string
59 : fileNum base.DiskFileNum
60 : fileSize uint64
61 : isLocal bool
62 : }
63 :
64 : // obsoleteFile holds information about a file that needs to be deleted soon.
65 : type obsoleteFile struct {
66 : fileType fileType
67 : // nonLogFile is populated when fileType != fileTypeLog.
68 : nonLogFile deletableFile
69 : // logFile is populated when fileType == fileTypeLog.
70 : logFile wal.DeletableLog
71 : }
72 :
73 : type cleanupJob struct {
74 : jobID JobID
75 : obsoleteFiles []obsoleteFile
76 : }
77 :
78 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
79 : // The cleanupManager must be Close()d.
80 : func openCleanupManager(
81 : opts *Options,
82 : objProvider objstorage.Provider,
83 : onTableDeleteFn func(fileSize uint64, isLocal bool),
84 : getDeletePacerInfo func() deletionPacerInfo,
85 2 : ) *cleanupManager {
86 2 : cm := &cleanupManager{
87 2 : opts: opts,
88 2 : objProvider: objProvider,
89 2 : onTableDeleteFn: onTableDeleteFn,
90 2 : deletePacer: newDeletionPacer(time.Now(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
91 2 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
92 2 : }
93 2 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
94 2 : cm.waitGroup.Add(1)
95 2 :
96 2 : go func() {
97 2 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
98 2 : cm.mainLoop()
99 2 : })
100 : }()
101 :
102 2 : return cm
103 : }
104 :
105 : // Close stops the background goroutine, waiting until all queued jobs are completed.
106 : // Delete pacing is disabled for the remaining jobs.
107 2 : func (cm *cleanupManager) Close() {
108 2 : close(cm.jobsCh)
109 2 : cm.waitGroup.Wait()
110 2 : }
111 :
112 : // EnqueueJob adds a cleanup job to the manager's queue.
113 2 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
114 2 : job := &cleanupJob{
115 2 : jobID: jobID,
116 2 : obsoleteFiles: obsoleteFiles,
117 2 : }
118 2 :
119 2 : // Report deleted bytes to the pacer, which can use this data to potentially
120 2 : // increase the deletion rate to keep up. We want to do this at enqueue time
121 2 : // rather than when we get to the job, otherwise the reported bytes will be
122 2 : // subject to the throttling rate which defeats the purpose.
123 2 : var pacingBytes uint64
124 2 : for _, of := range obsoleteFiles {
125 2 : if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
126 2 : pacingBytes += of.nonLogFile.fileSize
127 2 : }
128 : }
129 2 : if pacingBytes > 0 {
130 2 : cm.deletePacer.ReportDeletion(time.Now(), pacingBytes)
131 2 : }
132 :
133 2 : cm.mu.Lock()
134 2 : cm.mu.totalJobs++
135 2 : cm.maybeLogLocked()
136 2 : cm.mu.Unlock()
137 2 :
138 2 : cm.jobsCh <- job
139 : }
140 :
141 : // Wait until the completion of all jobs that were already queued.
142 : //
143 : // Does not wait for jobs that are enqueued during the call.
144 : //
145 : // Note that DB.mu should not be held while calling this method; the background
146 : // goroutine needs to acquire DB.mu to update deleted table metrics.
147 2 : func (cm *cleanupManager) Wait() {
148 2 : cm.mu.Lock()
149 2 : defer cm.mu.Unlock()
150 2 : n := cm.mu.totalJobs
151 2 : for cm.mu.completedJobs < n {
152 2 : cm.mu.completedJobsCond.Wait()
153 2 : }
154 : }
155 :
156 : // mainLoop runs the manager's background goroutine.
157 2 : func (cm *cleanupManager) mainLoop() {
158 2 : defer cm.waitGroup.Done()
159 2 :
160 2 : var tb tokenbucket.TokenBucket
161 2 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
162 2 : tb.Init(1.0, 1.0)
163 2 : for job := range cm.jobsCh {
164 2 : for _, of := range job.obsoleteFiles {
165 2 : switch of.fileType {
166 2 : case fileTypeTable:
167 2 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
168 2 : cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
169 2 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
170 2 : case fileTypeLog:
171 2 : cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
172 2 : base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
173 2 : default:
174 2 : path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
175 2 : cm.deleteObsoleteFile(
176 2 : cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
177 : }
178 : }
179 2 : cm.mu.Lock()
180 2 : cm.mu.completedJobs++
181 2 : cm.mu.completedJobsCond.Broadcast()
182 2 : cm.maybeLogLocked()
183 2 : cm.mu.Unlock()
184 : }
185 : }
186 :
187 : // fileNumIfSST is read iff fileType is fileTypeTable.
188 2 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
189 2 : if fileType != fileTypeTable {
190 2 : return false
191 2 : }
192 2 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
193 2 : if err != nil {
194 1 : // The object was already removed from the provider; we won't actually
195 1 : // delete anything, so we don't need to pace.
196 1 : return false
197 1 : }
198 : // Don't throttle deletion of remote objects.
199 2 : return !meta.IsRemote()
200 : }
201 :
202 : // maybePace sleeps before deleting an object if appropriate. It is always
203 : // called from the background goroutine.
204 : func (cm *cleanupManager) maybePace(
205 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
206 2 : ) {
207 2 : if !cm.needsPacing(fileType, fileNum) {
208 2 : return
209 2 : }
210 :
211 2 : tokens := cm.deletePacer.PacingDelay(time.Now(), fileSize)
212 2 : if tokens == 0.0 {
213 2 : // The token bucket might be in debt; it could make us wait even for 0
214 2 : // tokens. We don't want that if the pacer decided throttling should be
215 2 : // disabled.
216 2 : return
217 2 : }
218 : // Wait for tokens. We use a token bucket instead of sleeping outright because
219 : // the token bucket accumulates up to one second of unused tokens.
220 2 : for {
221 2 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
222 2 : if ok {
223 2 : break
224 : }
225 1 : time.Sleep(d)
226 : }
227 : }
228 :
229 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
230 : func (cm *cleanupManager) deleteObsoleteFile(
231 : fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
232 2 : ) {
233 2 : // TODO(peter): need to handle this error, probably by re-adding the
234 2 : // file that couldn't be deleted to one of the obsolete slices map.
235 2 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
236 2 : if oserror.IsNotExist(err) {
237 0 : return
238 0 : }
239 :
240 2 : switch fileType {
241 2 : case fileTypeLog:
242 2 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
243 2 : JobID: int(jobID),
244 2 : Path: path,
245 2 : FileNum: fileNum,
246 2 : Err: err,
247 2 : })
248 2 : case fileTypeManifest:
249 2 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
250 2 : JobID: int(jobID),
251 2 : Path: path,
252 2 : FileNum: fileNum,
253 2 : Err: err,
254 2 : })
255 0 : case fileTypeTable:
256 0 : panic("invalid deletion of object file")
257 : }
258 : }
259 :
260 : func (cm *cleanupManager) deleteObsoleteObject(
261 : fileType fileType, jobID JobID, fileNum base.DiskFileNum,
262 2 : ) {
263 2 : if fileType != fileTypeTable {
264 0 : panic("not an object")
265 : }
266 :
267 2 : var path string
268 2 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
269 2 : if err != nil {
270 1 : path = "<nil>"
271 2 : } else {
272 2 : path = cm.objProvider.Path(meta)
273 2 : err = cm.objProvider.Remove(fileType, fileNum)
274 2 : }
275 2 : if cm.objProvider.IsNotExistError(err) {
276 1 : return
277 1 : }
278 :
279 2 : switch fileType {
280 2 : case fileTypeTable:
281 2 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
282 2 : JobID: int(jobID),
283 2 : Path: path,
284 2 : FileNum: fileNum,
285 2 : Err: err,
286 2 : })
287 : }
288 : }
289 :
290 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
291 : // when the job queue gets back to less than 10% full.
292 : //
293 : // Must be called with cm.mu locked.
294 2 : func (cm *cleanupManager) maybeLogLocked() {
295 2 : const highThreshold = jobsQueueDepth * 3 / 4
296 2 : const lowThreshold = jobsQueueDepth / 10
297 2 :
298 2 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
299 2 :
300 2 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
301 0 : cm.mu.jobsQueueWarningIssued = true
302 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
303 0 : }
304 :
305 2 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
306 0 : cm.mu.jobsQueueWarningIssued = false
307 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
308 0 : }
309 : }
310 :
311 2 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
312 2 : var pacerInfo deletionPacerInfo
313 2 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
314 2 : // but in practice this was observed to take constant time, regardless of
315 2 : // volume size used, at least on linux with ext4 and zfs. All invocations
316 2 : // take 10 microseconds or less.
317 2 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
318 2 : d.mu.Lock()
319 2 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
320 2 : pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
321 2 : d.mu.Unlock()
322 2 : return pacerInfo
323 2 : }
324 :
325 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
326 2 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
327 2 : d.mu.Lock()
328 2 : d.mu.versions.metrics.Table.ObsoleteCount--
329 2 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
330 2 : if isLocal {
331 2 : d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
332 2 : }
333 2 : d.mu.Unlock()
334 : }
335 :
336 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
337 : // and adds those to the internal lists of obsolete files. Note that the files
338 : // are not actually deleted by this method. A subsequent call to
339 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
340 : // with compactions and flushes. db.mu must be held when calling this function.
341 2 : func (d *DB) scanObsoleteFiles(list []string) {
342 2 : // Disable automatic compactions temporarily to avoid concurrent compactions /
343 2 : // flushes from interfering. The original value is restored on completion.
344 2 : disabledPrev := d.opts.DisableAutomaticCompactions
345 2 : defer func() {
346 2 : d.opts.DisableAutomaticCompactions = disabledPrev
347 2 : }()
348 2 : d.opts.DisableAutomaticCompactions = true
349 2 :
350 2 : // Wait for any ongoing compaction to complete before continuing.
351 2 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
352 0 : d.mu.compact.cond.Wait()
353 0 : }
354 :
355 2 : liveFileNums := make(map[base.DiskFileNum]struct{})
356 2 : d.mu.versions.addLiveFileNums(liveFileNums)
357 2 : // Protect against files which are only referred to by the ingestedFlushable
358 2 : // from being deleted. These are added to the flushable queue on WAL replay
359 2 : // during read only mode and aren't part of the Version. Note that if
360 2 : // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
361 2 : // already been flushed.
362 2 : for _, fEntry := range d.mu.mem.queue {
363 2 : if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
364 0 : for _, file := range f.files {
365 0 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
366 0 : }
367 : }
368 : }
369 :
370 2 : manifestFileNum := d.mu.versions.manifestFileNum
371 2 :
372 2 : var obsoleteTables []tableInfo
373 2 : var obsoleteManifests []fileInfo
374 2 : var obsoleteOptions []fileInfo
375 2 :
376 2 : for _, filename := range list {
377 2 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
378 2 : if !ok {
379 2 : continue
380 : }
381 2 : switch fileType {
382 2 : case fileTypeManifest:
383 2 : if diskFileNum >= manifestFileNum {
384 1 : continue
385 : }
386 2 : fi := fileInfo{FileNum: diskFileNum}
387 2 : if stat, err := d.opts.FS.Stat(filename); err == nil {
388 1 : fi.FileSize = uint64(stat.Size())
389 1 : }
390 2 : obsoleteManifests = append(obsoleteManifests, fi)
391 2 : case fileTypeOptions:
392 2 : if diskFileNum >= d.optionsFileNum {
393 0 : continue
394 : }
395 2 : fi := fileInfo{FileNum: diskFileNum}
396 2 : if stat, err := d.opts.FS.Stat(filename); err == nil {
397 1 : fi.FileSize = uint64(stat.Size())
398 1 : }
399 2 : obsoleteOptions = append(obsoleteOptions, fi)
400 2 : case fileTypeTable:
401 : // Objects are handled through the objstorage provider below.
402 2 : default:
403 : // Don't delete files we don't know about.
404 : }
405 : }
406 :
407 2 : objects := d.objProvider.List()
408 2 : for _, obj := range objects {
409 2 : switch obj.FileType {
410 2 : case fileTypeTable:
411 2 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
412 2 : continue
413 : }
414 2 : fileInfo := fileInfo{
415 2 : FileNum: obj.DiskFileNum,
416 2 : }
417 2 : if size, err := d.objProvider.Size(obj); err == nil {
418 2 : fileInfo.FileSize = uint64(size)
419 2 : }
420 2 : obsoleteTables = append(obsoleteTables, tableInfo{
421 2 : fileInfo: fileInfo,
422 2 : isLocal: !obj.IsRemote(),
423 2 : })
424 :
425 0 : default:
426 : // Ignore object types we don't know about.
427 : }
428 : }
429 :
430 2 : d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
431 2 : d.mu.versions.updateObsoleteTableMetricsLocked()
432 2 : d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
433 2 : d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
434 : }
435 :
436 : // disableFileDeletions disables file deletions and then waits for any
437 : // in-progress deletion to finish. The caller is required to call
438 : // enableFileDeletions in order to enable file deletions again. It is ok for
439 : // multiple callers to disable file deletions simultaneously, though they must
440 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
441 : // (there is an internal reference count on file deletion disablement).
442 : //
443 : // d.mu must be held when calling this method.
444 2 : func (d *DB) disableFileDeletions() {
445 2 : d.mu.disableFileDeletions++
446 2 : d.mu.Unlock()
447 2 : defer d.mu.Lock()
448 2 : d.cleanupManager.Wait()
449 2 : }
450 :
451 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
452 : // is queued if necessary.
453 : //
454 : // d.mu must be held when calling this method.
455 2 : func (d *DB) enableFileDeletions() {
456 2 : if d.mu.disableFileDeletions <= 0 {
457 1 : panic("pebble: file deletion disablement invariant violated")
458 : }
459 2 : d.mu.disableFileDeletions--
460 2 : if d.mu.disableFileDeletions > 0 {
461 0 : return
462 0 : }
463 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
464 : }
465 :
466 : type fileInfo = base.FileInfo
467 :
468 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
469 : //
470 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
471 : //
472 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
473 : // cleanup job will be scheduled when file deletions are re-enabled.
474 2 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
475 2 : if d.mu.disableFileDeletions > 0 {
476 2 : return
477 2 : }
478 2 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
479 2 :
480 2 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
481 2 : // log that has not had its contents flushed to an sstable.
482 2 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
483 2 : if err != nil {
484 0 : panic(err)
485 : }
486 :
487 2 : obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
488 2 : d.mu.versions.obsoleteTables = nil
489 2 :
490 2 : for _, tbl := range obsoleteTables {
491 2 : delete(d.mu.versions.zombieTables, tbl.FileNum)
492 2 : }
493 :
494 : // Sort the manifests cause we want to delete some contiguous prefix
495 : // of the older manifests.
496 2 : slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
497 2 : return cmp.Compare(a.FileNum, b.FileNum)
498 2 : })
499 :
500 2 : var obsoleteManifests []fileInfo
501 2 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
502 2 : if manifestsToDelete > 0 {
503 2 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
504 2 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
505 2 : if len(d.mu.versions.obsoleteManifests) == 0 {
506 0 : d.mu.versions.obsoleteManifests = nil
507 0 : }
508 : }
509 :
510 2 : obsoleteOptions := d.mu.versions.obsoleteOptions
511 2 : d.mu.versions.obsoleteOptions = nil
512 2 :
513 2 : // Release d.mu while preparing the cleanup job and possibly waiting.
514 2 : // Note the unusual order: Unlock and then Lock.
515 2 : d.mu.Unlock()
516 2 : defer d.mu.Lock()
517 2 :
518 2 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
519 2 : for _, f := range obsoleteLogs {
520 2 : filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
521 2 : }
522 : // We sort to make the order of deletions deterministic, which is nice for
523 : // tests.
524 2 : slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
525 2 : return cmp.Compare(a.FileNum, b.FileNum)
526 2 : })
527 2 : for _, f := range obsoleteTables {
528 2 : d.tableCache.evict(f.FileNum)
529 2 : filesToDelete = append(filesToDelete, obsoleteFile{
530 2 : fileType: fileTypeTable,
531 2 : nonLogFile: deletableFile{
532 2 : dir: d.dirname,
533 2 : fileNum: f.FileNum,
534 2 : fileSize: f.FileSize,
535 2 : isLocal: f.isLocal,
536 2 : },
537 2 : })
538 2 : }
539 2 : files := [2]struct {
540 2 : fileType fileType
541 2 : obsolete []fileInfo
542 2 : }{
543 2 : {fileTypeManifest, obsoleteManifests},
544 2 : {fileTypeOptions, obsoleteOptions},
545 2 : }
546 2 : for _, f := range files {
547 2 : // We sort to make the order of deletions deterministic, which is nice for
548 2 : // tests.
549 2 : slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
550 2 : return cmp.Compare(a.FileNum, b.FileNum)
551 2 : })
552 2 : for _, fi := range f.obsolete {
553 2 : dir := d.dirname
554 2 : filesToDelete = append(filesToDelete, obsoleteFile{
555 2 : fileType: f.fileType,
556 2 : nonLogFile: deletableFile{
557 2 : dir: dir,
558 2 : fileNum: fi.FileNum,
559 2 : fileSize: fi.FileSize,
560 2 : isLocal: true,
561 2 : },
562 2 : })
563 2 : }
564 : }
565 2 : if len(filesToDelete) > 0 {
566 2 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
567 2 : }
568 2 : if d.opts.private.testingAlwaysWaitForCleanup {
569 1 : d.cleanupManager.Wait()
570 1 : }
571 : }
572 :
573 2 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
574 2 : d.mu.Lock()
575 2 : defer d.mu.Unlock()
576 2 : d.maybeScheduleObsoleteTableDeletionLocked()
577 2 : }
578 :
579 2 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
580 2 : if len(d.mu.versions.obsoleteTables) > 0 {
581 2 : d.deleteObsoleteFiles(d.newJobIDLocked())
582 2 : }
583 : }
584 :
585 2 : func merge(a, b []fileInfo) []fileInfo {
586 2 : if len(b) == 0 {
587 2 : return a
588 2 : }
589 :
590 2 : a = append(a, b...)
591 2 : slices.SortFunc(a, func(a, b fileInfo) int {
592 2 : return cmp.Compare(a.FileNum, b.FileNum)
593 2 : })
594 2 : return slices.CompactFunc(a, func(a, b fileInfo) bool {
595 2 : return a.FileNum == b.FileNum
596 2 : })
597 : }
598 :
599 2 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
600 2 : if len(b) == 0 {
601 2 : return a
602 2 : }
603 :
604 2 : a = append(a, b...)
605 2 : slices.SortFunc(a, func(a, b tableInfo) int {
606 2 : return cmp.Compare(a.FileNum, b.FileNum)
607 2 : })
608 2 : return slices.CompactFunc(a, func(a, b tableInfo) bool {
609 2 : return a.FileNum == b.FileNum
610 2 : })
611 : }
|