Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors/oserror"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/vfs"
20 : "github.com/cockroachdb/pebble/wal"
21 : "github.com/cockroachdb/tokenbucket"
22 : )
23 :
24 : // Cleaner exports the base.Cleaner type.
25 : type Cleaner = base.Cleaner
26 :
27 : // DeleteCleaner exports the base.DeleteCleaner type.
28 : type DeleteCleaner = base.DeleteCleaner
29 :
30 : // ArchiveCleaner exports the base.ArchiveCleaner type.
31 : type ArchiveCleaner = base.ArchiveCleaner
32 :
33 : type cleanupManager struct {
34 : opts *Options
35 : objProvider objstorage.Provider
36 : onTableDeleteFn func(fileSize uint64, isLocal bool)
37 : deletePacer *deletionPacer
38 :
39 : // jobsCh is used as the cleanup job queue.
40 : jobsCh chan *cleanupJob
41 : // waitGroup is used to wait for the background goroutine to exit.
42 : waitGroup sync.WaitGroup
43 :
44 : mu struct {
45 : sync.Mutex
46 : // totalJobs is the total number of enqueued jobs (completed or in progress).
47 : totalJobs int
48 : completedJobs int
49 : completedJobsCond sync.Cond
50 : jobsQueueWarningIssued bool
51 : }
52 : }
53 :
54 : // We can queue this many jobs before we have to block EnqueueJob.
55 : const jobsQueueDepth = 1000
56 :
57 : // deletableFile is used for non log files.
58 : type deletableFile struct {
59 : dir string
60 : fileNum base.DiskFileNum
61 : fileSize uint64
62 : isLocal bool
63 : }
64 :
65 : // obsoleteFile holds information about a file that needs to be deleted soon.
66 : type obsoleteFile struct {
67 : fileType fileType
68 : // nonLogFile is populated when fileType != fileTypeLog.
69 : nonLogFile deletableFile
70 : // logFile is populated when fileType == fileTypeLog.
71 : logFile wal.DeletableLog
72 : }
73 :
74 : type cleanupJob struct {
75 : jobID JobID
76 : obsoleteFiles []obsoleteFile
77 : }
78 :
79 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
80 : // The cleanupManager must be Close()d.
81 : func openCleanupManager(
82 : opts *Options,
83 : objProvider objstorage.Provider,
84 : onTableDeleteFn func(fileSize uint64, isLocal bool),
85 : getDeletePacerInfo func() deletionPacerInfo,
86 1 : ) *cleanupManager {
87 1 : cm := &cleanupManager{
88 1 : opts: opts,
89 1 : objProvider: objProvider,
90 1 : onTableDeleteFn: onTableDeleteFn,
91 1 : deletePacer: newDeletionPacer(crtime.NowMono(), int64(opts.TargetByteDeletionRate), getDeletePacerInfo),
92 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
93 1 : }
94 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
95 1 : cm.waitGroup.Add(1)
96 1 :
97 1 : go func() {
98 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
99 1 : cm.mainLoop()
100 1 : })
101 : }()
102 :
103 1 : return cm
104 : }
105 :
106 : // Close stops the background goroutine, waiting until all queued jobs are completed.
107 : // Delete pacing is disabled for the remaining jobs.
108 1 : func (cm *cleanupManager) Close() {
109 1 : close(cm.jobsCh)
110 1 : cm.waitGroup.Wait()
111 1 : }
112 :
113 : // EnqueueJob adds a cleanup job to the manager's queue.
114 1 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
115 1 : job := &cleanupJob{
116 1 : jobID: jobID,
117 1 : obsoleteFiles: obsoleteFiles,
118 1 : }
119 1 :
120 1 : // Report deleted bytes to the pacer, which can use this data to potentially
121 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
122 1 : // rather than when we get to the job, otherwise the reported bytes will be
123 1 : // subject to the throttling rate which defeats the purpose.
124 1 : var pacingBytes uint64
125 1 : for _, of := range obsoleteFiles {
126 1 : if cm.needsPacing(of.fileType, of.nonLogFile.fileNum) {
127 1 : pacingBytes += of.nonLogFile.fileSize
128 1 : }
129 : }
130 1 : if pacingBytes > 0 {
131 1 : cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
132 1 : }
133 :
134 1 : cm.mu.Lock()
135 1 : cm.mu.totalJobs++
136 1 : cm.maybeLogLocked()
137 1 : cm.mu.Unlock()
138 1 :
139 1 : cm.jobsCh <- job
140 : }
141 :
142 : // Wait until the completion of all jobs that were already queued.
143 : //
144 : // Does not wait for jobs that are enqueued during the call.
145 : //
146 : // Note that DB.mu should not be held while calling this method; the background
147 : // goroutine needs to acquire DB.mu to update deleted table metrics.
148 1 : func (cm *cleanupManager) Wait() {
149 1 : cm.mu.Lock()
150 1 : defer cm.mu.Unlock()
151 1 : n := cm.mu.totalJobs
152 1 : for cm.mu.completedJobs < n {
153 1 : cm.mu.completedJobsCond.Wait()
154 1 : }
155 : }
156 :
157 : // mainLoop runs the manager's background goroutine.
158 1 : func (cm *cleanupManager) mainLoop() {
159 1 : defer cm.waitGroup.Done()
160 1 :
161 1 : var tb tokenbucket.TokenBucket
162 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
163 1 : tb.Init(1.0, 1.0)
164 1 : for job := range cm.jobsCh {
165 1 : for _, of := range job.obsoleteFiles {
166 1 : switch of.fileType {
167 1 : case fileTypeTable:
168 1 : cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
169 1 : cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
170 1 : cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
171 1 : case fileTypeLog:
172 1 : cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
173 1 : base.DiskFileNum(of.logFile.NumWAL), of.logFile.ApproxFileSize)
174 1 : default:
175 1 : path := base.MakeFilepath(cm.opts.FS, of.nonLogFile.dir, of.fileType, of.nonLogFile.fileNum)
176 1 : cm.deleteObsoleteFile(
177 1 : cm.opts.FS, of.fileType, job.jobID, path, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
178 : }
179 : }
180 1 : cm.mu.Lock()
181 1 : cm.mu.completedJobs++
182 1 : cm.mu.completedJobsCond.Broadcast()
183 1 : cm.maybeLogLocked()
184 1 : cm.mu.Unlock()
185 : }
186 : }
187 :
188 : // fileNumIfSST is read iff fileType is fileTypeTable.
189 1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
190 1 : if fileType != fileTypeTable {
191 1 : return false
192 1 : }
193 1 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
194 1 : if err != nil {
195 1 : // The object was already removed from the provider; we won't actually
196 1 : // delete anything, so we don't need to pace.
197 1 : return false
198 1 : }
199 : // Don't throttle deletion of remote objects.
200 1 : return !meta.IsRemote()
201 : }
202 :
203 : // maybePace sleeps before deleting an object if appropriate. It is always
204 : // called from the background goroutine.
205 : func (cm *cleanupManager) maybePace(
206 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
207 1 : ) {
208 1 : if !cm.needsPacing(fileType, fileNum) {
209 1 : return
210 1 : }
211 :
212 1 : tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
213 1 : if tokens == 0.0 {
214 1 : // The token bucket might be in debt; it could make us wait even for 0
215 1 : // tokens. We don't want that if the pacer decided throttling should be
216 1 : // disabled.
217 1 : return
218 1 : }
219 : // Wait for tokens. We use a token bucket instead of sleeping outright because
220 : // the token bucket accumulates up to one second of unused tokens.
221 1 : for {
222 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
223 1 : if ok {
224 1 : break
225 : }
226 0 : time.Sleep(d)
227 : }
228 : }
229 :
230 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
231 : func (cm *cleanupManager) deleteObsoleteFile(
232 : fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
233 1 : ) {
234 1 : // TODO(peter): need to handle this error, probably by re-adding the
235 1 : // file that couldn't be deleted to one of the obsolete slices map.
236 1 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
237 1 : if oserror.IsNotExist(err) {
238 0 : return
239 0 : }
240 :
241 1 : switch fileType {
242 1 : case fileTypeLog:
243 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
244 1 : JobID: int(jobID),
245 1 : Path: path,
246 1 : FileNum: fileNum,
247 1 : Err: err,
248 1 : })
249 1 : case fileTypeManifest:
250 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
251 1 : JobID: int(jobID),
252 1 : Path: path,
253 1 : FileNum: fileNum,
254 1 : Err: err,
255 1 : })
256 0 : case fileTypeTable:
257 0 : panic("invalid deletion of object file")
258 : }
259 : }
260 :
261 : func (cm *cleanupManager) deleteObsoleteObject(
262 : fileType fileType, jobID JobID, fileNum base.DiskFileNum,
263 1 : ) {
264 1 : if fileType != fileTypeTable {
265 0 : panic("not an object")
266 : }
267 :
268 1 : var path string
269 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
270 1 : if err != nil {
271 1 : path = "<nil>"
272 1 : } else {
273 1 : path = cm.objProvider.Path(meta)
274 1 : err = cm.objProvider.Remove(fileType, fileNum)
275 1 : }
276 1 : if cm.objProvider.IsNotExistError(err) {
277 1 : return
278 1 : }
279 :
280 1 : switch fileType {
281 1 : case fileTypeTable:
282 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
283 1 : JobID: int(jobID),
284 1 : Path: path,
285 1 : FileNum: fileNum,
286 1 : Err: err,
287 1 : })
288 : }
289 : }
290 :
291 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
292 : // when the job queue gets back to less than 10% full.
293 : //
294 : // Must be called with cm.mu locked.
295 1 : func (cm *cleanupManager) maybeLogLocked() {
296 1 : const highThreshold = jobsQueueDepth * 3 / 4
297 1 : const lowThreshold = jobsQueueDepth / 10
298 1 :
299 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
300 1 :
301 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
302 0 : cm.mu.jobsQueueWarningIssued = true
303 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
304 0 : }
305 :
306 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
307 0 : cm.mu.jobsQueueWarningIssued = false
308 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
309 0 : }
310 : }
311 :
312 1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
313 1 : var pacerInfo deletionPacerInfo
314 1 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
315 1 : // but in practice this was observed to take constant time, regardless of
316 1 : // volume size used, at least on linux with ext4 and zfs. All invocations
317 1 : // take 10 microseconds or less.
318 1 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
319 1 : d.mu.Lock()
320 1 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
321 1 : pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
322 1 : d.mu.Unlock()
323 1 : return pacerInfo
324 1 : }
325 :
326 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
327 1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
328 1 : d.mu.Lock()
329 1 : d.mu.versions.metrics.Table.ObsoleteCount--
330 1 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
331 1 : if isLocal {
332 1 : d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
333 1 : }
334 1 : d.mu.Unlock()
335 : }
336 :
337 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
338 : // and adds those to the internal lists of obsolete files. Note that the files
339 : // are not actually deleted by this method. A subsequent call to
340 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
341 : // with compactions and flushes. db.mu must be held when calling this function.
342 1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
343 1 : // Disable automatic compactions temporarily to avoid concurrent compactions /
344 1 : // flushes from interfering. The original value is restored on completion.
345 1 : disabledPrev := d.opts.DisableAutomaticCompactions
346 1 : defer func() {
347 1 : d.opts.DisableAutomaticCompactions = disabledPrev
348 1 : }()
349 1 : d.opts.DisableAutomaticCompactions = true
350 1 :
351 1 : // Wait for any ongoing compaction to complete before continuing.
352 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
353 1 : d.mu.compact.cond.Wait()
354 1 : }
355 :
356 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
357 1 : d.mu.versions.addLiveFileNums(liveFileNums)
358 1 : // Protect against files which are only referred to by the ingestedFlushable
359 1 : // from being deleted. These are added to the flushable queue on WAL replay
360 1 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
361 1 : // file scan to avoid double-deleting these files.
362 1 : for _, f := range flushableIngests {
363 1 : for _, file := range f.files {
364 1 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
365 1 : }
366 : }
367 :
368 1 : manifestFileNum := d.mu.versions.manifestFileNum
369 1 :
370 1 : var obsoleteTables []tableInfo
371 1 : var obsoleteManifests []fileInfo
372 1 : var obsoleteOptions []fileInfo
373 1 :
374 1 : for _, filename := range list {
375 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
376 1 : if !ok {
377 1 : continue
378 : }
379 1 : switch fileType {
380 1 : case fileTypeManifest:
381 1 : if diskFileNum >= manifestFileNum {
382 1 : continue
383 : }
384 1 : fi := fileInfo{FileNum: diskFileNum}
385 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
386 1 : fi.FileSize = uint64(stat.Size())
387 1 : }
388 1 : obsoleteManifests = append(obsoleteManifests, fi)
389 1 : case fileTypeOptions:
390 1 : if diskFileNum >= d.optionsFileNum {
391 1 : continue
392 : }
393 1 : fi := fileInfo{FileNum: diskFileNum}
394 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
395 1 : fi.FileSize = uint64(stat.Size())
396 1 : }
397 1 : obsoleteOptions = append(obsoleteOptions, fi)
398 1 : case fileTypeTable:
399 : // Objects are handled through the objstorage provider below.
400 1 : default:
401 : // Don't delete files we don't know about.
402 : }
403 : }
404 :
405 1 : objects := d.objProvider.List()
406 1 : for _, obj := range objects {
407 1 : switch obj.FileType {
408 1 : case fileTypeTable:
409 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
410 1 : continue
411 : }
412 1 : fileInfo := fileInfo{
413 1 : FileNum: obj.DiskFileNum,
414 1 : }
415 1 : if size, err := d.objProvider.Size(obj); err == nil {
416 1 : fileInfo.FileSize = uint64(size)
417 1 : }
418 1 : obsoleteTables = append(obsoleteTables, tableInfo{
419 1 : fileInfo: fileInfo,
420 1 : isLocal: !obj.IsRemote(),
421 1 : })
422 :
423 0 : default:
424 : // Ignore object types we don't know about.
425 : }
426 : }
427 :
428 1 : d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
429 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
430 1 : d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
431 1 : d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
432 : }
433 :
434 : // disableFileDeletions disables file deletions and then waits for any
435 : // in-progress deletion to finish. The caller is required to call
436 : // enableFileDeletions in order to enable file deletions again. It is ok for
437 : // multiple callers to disable file deletions simultaneously, though they must
438 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
439 : // (there is an internal reference count on file deletion disablement).
440 : //
441 : // d.mu must be held when calling this method.
442 1 : func (d *DB) disableFileDeletions() {
443 1 : d.mu.disableFileDeletions++
444 1 : d.mu.Unlock()
445 1 : defer d.mu.Lock()
446 1 : d.cleanupManager.Wait()
447 1 : }
448 :
449 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
450 : // is queued if necessary.
451 : //
452 : // d.mu must be held when calling this method.
453 1 : func (d *DB) enableFileDeletions() {
454 1 : if d.mu.disableFileDeletions <= 0 {
455 1 : panic("pebble: file deletion disablement invariant violated")
456 : }
457 1 : d.mu.disableFileDeletions--
458 1 : if d.mu.disableFileDeletions > 0 {
459 0 : return
460 0 : }
461 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
462 : }
463 :
464 : type fileInfo = base.FileInfo
465 :
466 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
467 : //
468 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
469 : //
470 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
471 : // cleanup job will be scheduled when file deletions are re-enabled.
472 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
473 1 : if d.mu.disableFileDeletions > 0 {
474 1 : return
475 1 : }
476 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
477 1 :
478 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
479 1 : // log that has not had its contents flushed to an sstable.
480 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
481 1 : if err != nil {
482 0 : panic(err)
483 : }
484 :
485 1 : obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
486 1 : d.mu.versions.obsoleteTables = nil
487 1 :
488 1 : for _, tbl := range obsoleteTables {
489 1 : delete(d.mu.versions.zombieTables, tbl.FileNum)
490 1 : }
491 :
492 : // Sort the manifests cause we want to delete some contiguous prefix
493 : // of the older manifests.
494 1 : slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
495 1 : return cmp.Compare(a.FileNum, b.FileNum)
496 1 : })
497 :
498 1 : var obsoleteManifests []fileInfo
499 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
500 1 : if manifestsToDelete > 0 {
501 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
502 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
503 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
504 0 : d.mu.versions.obsoleteManifests = nil
505 0 : }
506 : }
507 :
508 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
509 1 : d.mu.versions.obsoleteOptions = nil
510 1 :
511 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
512 1 : // Note the unusual order: Unlock and then Lock.
513 1 : d.mu.Unlock()
514 1 : defer d.mu.Lock()
515 1 :
516 1 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
517 1 : for _, f := range obsoleteLogs {
518 1 : filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
519 1 : }
520 : // We sort to make the order of deletions deterministic, which is nice for
521 : // tests.
522 1 : slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
523 1 : return cmp.Compare(a.FileNum, b.FileNum)
524 1 : })
525 1 : for _, f := range obsoleteTables {
526 1 : d.tableCache.evict(f.FileNum)
527 1 : filesToDelete = append(filesToDelete, obsoleteFile{
528 1 : fileType: fileTypeTable,
529 1 : nonLogFile: deletableFile{
530 1 : dir: d.dirname,
531 1 : fileNum: f.FileNum,
532 1 : fileSize: f.FileSize,
533 1 : isLocal: f.isLocal,
534 1 : },
535 1 : })
536 1 : }
537 1 : files := [2]struct {
538 1 : fileType fileType
539 1 : obsolete []fileInfo
540 1 : }{
541 1 : {fileTypeManifest, obsoleteManifests},
542 1 : {fileTypeOptions, obsoleteOptions},
543 1 : }
544 1 : for _, f := range files {
545 1 : // We sort to make the order of deletions deterministic, which is nice for
546 1 : // tests.
547 1 : slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
548 1 : return cmp.Compare(a.FileNum, b.FileNum)
549 1 : })
550 1 : for _, fi := range f.obsolete {
551 1 : dir := d.dirname
552 1 : filesToDelete = append(filesToDelete, obsoleteFile{
553 1 : fileType: f.fileType,
554 1 : nonLogFile: deletableFile{
555 1 : dir: dir,
556 1 : fileNum: fi.FileNum,
557 1 : fileSize: fi.FileSize,
558 1 : isLocal: true,
559 1 : },
560 1 : })
561 1 : }
562 : }
563 1 : if len(filesToDelete) > 0 {
564 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
565 1 : }
566 1 : if d.opts.private.testingAlwaysWaitForCleanup {
567 1 : d.cleanupManager.Wait()
568 1 : }
569 : }
570 :
571 1 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
572 1 : d.mu.Lock()
573 1 : defer d.mu.Unlock()
574 1 : d.maybeScheduleObsoleteTableDeletionLocked()
575 1 : }
576 :
577 1 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
578 1 : if len(d.mu.versions.obsoleteTables) > 0 {
579 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
580 1 : }
581 : }
582 :
583 1 : func merge(a, b []fileInfo) []fileInfo {
584 1 : if len(b) == 0 {
585 1 : return a
586 1 : }
587 :
588 1 : a = append(a, b...)
589 1 : slices.SortFunc(a, func(a, b fileInfo) int {
590 1 : return cmp.Compare(a.FileNum, b.FileNum)
591 1 : })
592 1 : return slices.CompactFunc(a, func(a, b fileInfo) bool {
593 1 : return a.FileNum == b.FileNum
594 1 : })
595 : }
596 :
597 1 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
598 1 : if len(b) == 0 {
599 1 : return a
600 1 : }
601 :
602 1 : a = append(a, b...)
603 1 : slices.SortFunc(a, func(a, b tableInfo) int {
604 1 : return cmp.Compare(a.FileNum, b.FileNum)
605 1 : })
606 1 : return slices.CompactFunc(a, func(a, b tableInfo) bool {
607 1 : return a.FileNum == b.FileNum
608 1 : })
609 : }
|