Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "runtime/pprof"
11 : "slices"
12 : "sync"
13 : "time"
14 :
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/errors/oserror"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/vfs"
22 : "github.com/cockroachdb/pebble/wal"
23 : "github.com/cockroachdb/tokenbucket"
24 : )
25 :
26 : // Cleaner exports the base.Cleaner type.
27 : type Cleaner = base.Cleaner
28 :
29 : // DeleteCleaner exports the base.DeleteCleaner type.
30 : type DeleteCleaner = base.DeleteCleaner
31 :
32 : // ArchiveCleaner exports the base.ArchiveCleaner type.
33 : type ArchiveCleaner = base.ArchiveCleaner
34 :
35 : type cleanupManager struct {
36 : opts *Options
37 : objProvider objstorage.Provider
38 : onTableDeleteFn func(fileSize uint64, isLocal bool)
39 : deletePacer *deletionPacer
40 :
41 : // jobsCh is used as the cleanup job queue.
42 : jobsCh chan *cleanupJob
43 : // waitGroup is used to wait for the background goroutine to exit.
44 : waitGroup sync.WaitGroup
45 :
46 : mu struct {
47 : sync.Mutex
48 : // totalJobs is the total number of enqueued jobs (completed or in progress).
49 : totalJobs int
50 : completedJobs int
51 : completedJobsCond sync.Cond
52 : jobsQueueWarningIssued bool
53 : }
54 : }
55 :
56 : // We can queue this many jobs before we have to block EnqueueJob.
57 : const jobsQueueDepth = 1000
58 :
59 : // obsoleteFile holds information about a file that needs to be deleted soon.
60 : type obsoleteFile struct {
61 : fileType base.FileType
62 : fs vfs.FS
63 : path string
64 : fileNum base.DiskFileNum
65 : fileSize uint64 // approx for log files
66 : isLocal bool
67 : }
68 :
69 : type cleanupJob struct {
70 : jobID JobID
71 : obsoleteFiles []obsoleteFile
72 : }
73 :
74 : // openCleanupManager creates a cleanupManager and starts its background goroutine.
75 : // The cleanupManager must be Close()d.
76 : func openCleanupManager(
77 : opts *Options,
78 : objProvider objstorage.Provider,
79 : onTableDeleteFn func(fileSize uint64, isLocal bool),
80 : getDeletePacerInfo func() deletionPacerInfo,
81 1 : ) *cleanupManager {
82 1 : cm := &cleanupManager{
83 1 : opts: opts,
84 1 : objProvider: objProvider,
85 1 : onTableDeleteFn: onTableDeleteFn,
86 1 : deletePacer: newDeletionPacer(
87 1 : crtime.NowMono(),
88 1 : opts.FreeSpaceThresholdBytes,
89 1 : int64(opts.TargetByteDeletionRate),
90 1 : opts.FreeSpaceTimeframe,
91 1 : opts.ObsoleteBytesMaxRatio,
92 1 : opts.ObsoleteBytesTimeframe,
93 1 : getDeletePacerInfo,
94 1 : ),
95 1 : jobsCh: make(chan *cleanupJob, jobsQueueDepth),
96 1 : }
97 1 : cm.mu.completedJobsCond.L = &cm.mu.Mutex
98 1 : cm.waitGroup.Add(1)
99 1 :
100 1 : go func() {
101 1 : pprof.Do(context.Background(), gcLabels, func(context.Context) {
102 1 : cm.mainLoop()
103 1 : })
104 : }()
105 :
106 1 : return cm
107 : }
108 :
109 : // Close stops the background goroutine, waiting until all queued jobs are completed.
110 : // Delete pacing is disabled for the remaining jobs.
111 1 : func (cm *cleanupManager) Close() {
112 1 : close(cm.jobsCh)
113 1 : cm.waitGroup.Wait()
114 1 : }
115 :
116 : // EnqueueJob adds a cleanup job to the manager's queue.
117 1 : func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
118 1 : job := &cleanupJob{
119 1 : jobID: jobID,
120 1 : obsoleteFiles: obsoleteFiles,
121 1 : }
122 1 :
123 1 : // Report deleted bytes to the pacer, which can use this data to potentially
124 1 : // increase the deletion rate to keep up. We want to do this at enqueue time
125 1 : // rather than when we get to the job, otherwise the reported bytes will be
126 1 : // subject to the throttling rate which defeats the purpose.
127 1 : var pacingBytes uint64
128 1 : for _, of := range obsoleteFiles {
129 1 : if cm.needsPacing(of.fileType, of.fileNum) {
130 1 : pacingBytes += of.fileSize
131 1 : }
132 : }
133 1 : if pacingBytes > 0 {
134 1 : cm.deletePacer.ReportDeletion(crtime.NowMono(), pacingBytes)
135 1 : }
136 :
137 1 : cm.mu.Lock()
138 1 : cm.mu.totalJobs++
139 1 : cm.maybeLogLocked()
140 1 : cm.mu.Unlock()
141 1 :
142 1 : cm.jobsCh <- job
143 : }
144 :
145 : // Wait until the completion of all jobs that were already queued.
146 : //
147 : // Does not wait for jobs that are enqueued during the call.
148 : //
149 : // Note that DB.mu should not be held while calling this method; the background
150 : // goroutine needs to acquire DB.mu to update deleted table metrics.
151 1 : func (cm *cleanupManager) Wait() {
152 1 : cm.mu.Lock()
153 1 : defer cm.mu.Unlock()
154 1 : n := cm.mu.totalJobs
155 1 : for cm.mu.completedJobs < n {
156 1 : cm.mu.completedJobsCond.Wait()
157 1 : }
158 : }
159 :
160 : // mainLoop runs the manager's background goroutine.
161 1 : func (cm *cleanupManager) mainLoop() {
162 1 : defer cm.waitGroup.Done()
163 1 :
164 1 : var tb tokenbucket.TokenBucket
165 1 : // Use a token bucket with 1 token / second refill rate and 1 token burst.
166 1 : tb.Init(1.0, 1.0)
167 1 : for job := range cm.jobsCh {
168 1 : for _, of := range job.obsoleteFiles {
169 1 : switch of.fileType {
170 1 : case base.FileTypeTable:
171 1 : cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
172 1 : cm.onTableDeleteFn(of.fileSize, of.isLocal)
173 1 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
174 0 : case base.FileTypeBlob:
175 0 : cm.maybePace(&tb, of.fileType, of.fileNum, of.fileSize)
176 0 : cm.deleteObsoleteObject(of.fileType, job.jobID, of.fileNum)
177 1 : default:
178 1 : cm.deleteObsoleteFile(of.fs, of.fileType, job.jobID, of.path, of.fileNum)
179 : }
180 : }
181 1 : cm.mu.Lock()
182 1 : cm.mu.completedJobs++
183 1 : cm.mu.completedJobsCond.Broadcast()
184 1 : cm.maybeLogLocked()
185 1 : cm.mu.Unlock()
186 : }
187 : }
188 :
189 : // fileNumIfSST is read iff fileType is fileTypeTable.
190 1 : func (cm *cleanupManager) needsPacing(fileType base.FileType, fileNumIfSST base.DiskFileNum) bool {
191 1 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
192 1 : return false
193 1 : }
194 1 : meta, err := cm.objProvider.Lookup(fileType, fileNumIfSST)
195 1 : if err != nil {
196 1 : // The object was already removed from the provider; we won't actually
197 1 : // delete anything, so we don't need to pace.
198 1 : return false
199 1 : }
200 : // Don't throttle deletion of remote objects.
201 1 : return !meta.IsRemote()
202 : }
203 :
204 : // maybePace sleeps before deleting an object if appropriate. It is always
205 : // called from the background goroutine.
206 : func (cm *cleanupManager) maybePace(
207 : tb *tokenbucket.TokenBucket, fileType base.FileType, fileNum base.DiskFileNum, fileSize uint64,
208 1 : ) {
209 1 : if !cm.needsPacing(fileType, fileNum) {
210 1 : return
211 1 : }
212 :
213 1 : tokens := cm.deletePacer.PacingDelay(crtime.NowMono(), fileSize)
214 1 : if tokens == 0.0 {
215 1 : // The token bucket might be in debt; it could make us wait even for 0
216 1 : // tokens. We don't want that if the pacer decided throttling should be
217 1 : // disabled.
218 1 : return
219 1 : }
220 : // Wait for tokens. We use a token bucket instead of sleeping outright because
221 : // the token bucket accumulates up to one second of unused tokens.
222 1 : for {
223 1 : ok, d := tb.TryToFulfill(tokenbucket.Tokens(tokens))
224 1 : if ok {
225 1 : break
226 : }
227 0 : time.Sleep(d)
228 : }
229 : }
230 :
231 : // deleteObsoleteFile deletes a (non-object) file that is no longer needed.
232 : func (cm *cleanupManager) deleteObsoleteFile(
233 : fs vfs.FS, fileType base.FileType, jobID JobID, path string, fileNum base.DiskFileNum,
234 1 : ) {
235 1 : // TODO(peter): need to handle this error, probably by re-adding the
236 1 : // file that couldn't be deleted to one of the obsolete slices map.
237 1 : err := cm.opts.Cleaner.Clean(fs, fileType, path)
238 1 : if oserror.IsNotExist(err) {
239 1 : return
240 1 : }
241 :
242 1 : switch fileType {
243 1 : case base.FileTypeLog:
244 1 : cm.opts.EventListener.WALDeleted(WALDeleteInfo{
245 1 : JobID: int(jobID),
246 1 : Path: path,
247 1 : FileNum: fileNum,
248 1 : Err: err,
249 1 : })
250 1 : case base.FileTypeManifest:
251 1 : cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
252 1 : JobID: int(jobID),
253 1 : Path: path,
254 1 : FileNum: fileNum,
255 1 : Err: err,
256 1 : })
257 0 : case base.FileTypeTable, base.FileTypeBlob:
258 0 : panic("invalid deletion of object file")
259 : }
260 : }
261 :
262 : func (cm *cleanupManager) deleteObsoleteObject(
263 : fileType base.FileType, jobID JobID, fileNum base.DiskFileNum,
264 1 : ) {
265 1 : if fileType != base.FileTypeTable && fileType != base.FileTypeBlob {
266 0 : panic("not an object")
267 : }
268 :
269 1 : var path string
270 1 : meta, err := cm.objProvider.Lookup(fileType, fileNum)
271 1 : if err != nil {
272 1 : path = "<nil>"
273 1 : } else {
274 1 : path = cm.objProvider.Path(meta)
275 1 : err = cm.objProvider.Remove(fileType, fileNum)
276 1 : }
277 1 : if cm.objProvider.IsNotExistError(err) {
278 1 : return
279 1 : }
280 :
281 1 : switch fileType {
282 1 : case base.FileTypeTable:
283 1 : cm.opts.EventListener.TableDeleted(TableDeleteInfo{
284 1 : JobID: int(jobID),
285 1 : Path: path,
286 1 : FileNum: fileNum,
287 1 : Err: err,
288 1 : })
289 0 : case base.FileTypeBlob:
290 0 : cm.opts.EventListener.BlobFileDeleted(BlobFileDeleteInfo{
291 0 : JobID: int(jobID),
292 0 : Path: path,
293 0 : FileNum: fileNum,
294 0 : Err: err,
295 0 : })
296 : }
297 : }
298 :
299 : // maybeLogLocked issues a log if the job queue gets 75% full and issues a log
300 : // when the job queue gets back to less than 10% full.
301 : //
302 : // Must be called with cm.mu locked.
303 1 : func (cm *cleanupManager) maybeLogLocked() {
304 1 : const highThreshold = jobsQueueDepth * 3 / 4
305 1 : const lowThreshold = jobsQueueDepth / 10
306 1 :
307 1 : jobsInQueue := cm.mu.totalJobs - cm.mu.completedJobs
308 1 :
309 1 : if !cm.mu.jobsQueueWarningIssued && jobsInQueue > highThreshold {
310 0 : cm.mu.jobsQueueWarningIssued = true
311 0 : cm.opts.Logger.Infof("cleanup falling behind; job queue has over %d jobs", highThreshold)
312 0 : }
313 :
314 1 : if cm.mu.jobsQueueWarningIssued && jobsInQueue < lowThreshold {
315 0 : cm.mu.jobsQueueWarningIssued = false
316 0 : cm.opts.Logger.Infof("cleanup back to normal; job queue has under %d jobs", lowThreshold)
317 0 : }
318 : }
319 :
320 1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
321 1 : var pacerInfo deletionPacerInfo
322 1 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
323 1 : // but in practice this was observed to take constant time, regardless of
324 1 : // volume size used, at least on linux with ext4 and zfs. All invocations
325 1 : // take 10 microseconds or less.
326 1 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
327 1 : d.mu.Lock()
328 1 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
329 1 : total := d.mu.versions.metrics.Total()
330 1 : d.mu.Unlock()
331 1 : pacerInfo.liveBytes = uint64(total.AggregateSize())
332 1 : return pacerInfo
333 1 : }
334 :
335 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
336 1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
337 1 : d.mu.Lock()
338 1 : d.mu.versions.metrics.Table.ObsoleteCount--
339 1 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
340 1 : if isLocal {
341 1 : d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
342 1 : d.mu.versions.metrics.Table.Local.ObsoleteCount--
343 1 : }
344 1 : d.mu.Unlock()
345 : }
346 :
347 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
348 : // and adds those to the internal lists of obsolete files. Note that the files
349 : // are not actually deleted by this method. A subsequent call to
350 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
351 : // with compactions and flushes. db.mu must be held when calling this function.
352 1 : func (d *DB) scanObsoleteFiles(list []string, flushableIngests []*ingestedFlushable) {
353 1 : // Disable automatic compactions temporarily to avoid concurrent compactions /
354 1 : // flushes from interfering. The original value is restored on completion.
355 1 : disabledPrev := d.opts.DisableAutomaticCompactions
356 1 : defer func() {
357 1 : d.opts.DisableAutomaticCompactions = disabledPrev
358 1 : }()
359 1 : d.opts.DisableAutomaticCompactions = true
360 1 :
361 1 : // Wait for any ongoing compaction to complete before continuing.
362 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
363 1 : d.mu.compact.cond.Wait()
364 1 : }
365 :
366 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
367 1 : d.mu.versions.addLiveFileNums(liveFileNums)
368 1 : // Protect against files which are only referred to by the ingestedFlushable
369 1 : // from being deleted. These are added to the flushable queue on WAL replay
370 1 : // and handle their own obsoletion/deletion. We exclude them from this obsolete
371 1 : // file scan to avoid double-deleting these files.
372 1 : for _, f := range flushableIngests {
373 1 : for _, file := range f.files {
374 1 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
375 1 : }
376 : }
377 :
378 1 : manifestFileNum := d.mu.versions.manifestFileNum
379 1 :
380 1 : var obsoleteTables []objectInfo
381 1 : var obsoleteBlobs []objectInfo
382 1 : var obsoleteOptions []obsoleteFile
383 1 : var obsoleteManifests []obsoleteFile
384 1 :
385 1 : for _, filename := range list {
386 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
387 1 : if !ok {
388 1 : continue
389 : }
390 1 : makeObsoleteFile := func() obsoleteFile {
391 1 : of := obsoleteFile{
392 1 : fileType: fileType,
393 1 : fs: d.opts.FS,
394 1 : path: d.opts.FS.PathJoin(d.dirname, filename),
395 1 : fileNum: diskFileNum,
396 1 : isLocal: true,
397 1 : }
398 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
399 0 : of.fileSize = uint64(stat.Size())
400 0 : }
401 1 : return of
402 : }
403 1 : switch fileType {
404 1 : case base.FileTypeManifest:
405 1 : if diskFileNum >= manifestFileNum {
406 1 : continue
407 : }
408 1 : obsoleteManifests = append(obsoleteManifests, makeObsoleteFile())
409 1 : case base.FileTypeOptions:
410 1 : if diskFileNum >= d.optionsFileNum {
411 1 : continue
412 : }
413 1 : obsoleteOptions = append(obsoleteOptions, makeObsoleteFile())
414 1 : case base.FileTypeTable, base.FileTypeBlob:
415 : // Objects are handled through the objstorage provider below.
416 1 : default:
417 : // Don't delete files we don't know about.
418 : }
419 : }
420 :
421 1 : objects := d.objProvider.List()
422 1 : for _, obj := range objects {
423 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
424 1 : continue
425 : }
426 1 : makeObjectInfo := func() objectInfo {
427 1 : fileInfo := fileInfo{FileNum: obj.DiskFileNum}
428 1 : if size, err := d.objProvider.Size(obj); err == nil {
429 1 : fileInfo.FileSize = uint64(size)
430 1 : }
431 1 : return objectInfo{
432 1 : fileInfo: fileInfo,
433 1 : isLocal: !obj.IsRemote(),
434 1 : }
435 : }
436 :
437 1 : switch obj.FileType {
438 1 : case base.FileTypeTable:
439 1 : obsoleteTables = append(obsoleteTables, makeObjectInfo())
440 0 : case base.FileTypeBlob:
441 0 : obsoleteBlobs = append(obsoleteBlobs, makeObjectInfo())
442 0 : default:
443 : // Ignore object types we don't know about.
444 : }
445 : }
446 :
447 1 : d.mu.versions.obsoleteTables = mergeObjectInfos(d.mu.versions.obsoleteTables, obsoleteTables)
448 1 : d.mu.versions.obsoleteBlobs = mergeObjectInfos(d.mu.versions.obsoleteBlobs, obsoleteBlobs)
449 1 : d.mu.versions.updateObsoleteObjectMetricsLocked()
450 1 : d.mu.versions.obsoleteManifests = mergeObsoleteFiles(d.mu.versions.obsoleteManifests, obsoleteManifests)
451 1 : d.mu.versions.obsoleteOptions = mergeObsoleteFiles(d.mu.versions.obsoleteOptions, obsoleteOptions)
452 : }
453 :
454 : // disableFileDeletions disables file deletions and then waits for any
455 : // in-progress deletion to finish. The caller is required to call
456 : // enableFileDeletions in order to enable file deletions again. It is ok for
457 : // multiple callers to disable file deletions simultaneously, though they must
458 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
459 : // (there is an internal reference count on file deletion disablement).
460 : //
461 : // d.mu must be held when calling this method.
462 1 : func (d *DB) disableFileDeletions() {
463 1 : d.mu.disableFileDeletions++
464 1 : d.mu.Unlock()
465 1 : defer d.mu.Lock()
466 1 : d.cleanupManager.Wait()
467 1 : }
468 :
469 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
470 : // is queued if necessary.
471 : //
472 : // d.mu must be held when calling this method.
473 1 : func (d *DB) enableFileDeletions() {
474 1 : if d.mu.disableFileDeletions <= 0 {
475 0 : panic("pebble: file deletion disablement invariant violated")
476 : }
477 1 : d.mu.disableFileDeletions--
478 1 : if d.mu.disableFileDeletions > 0 {
479 0 : return
480 0 : }
481 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
482 : }
483 :
484 : type fileInfo = base.FileInfo
485 :
486 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
487 : //
488 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
489 : //
490 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
491 : // cleanup job will be scheduled when file deletions are re-enabled.
492 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
493 1 : if d.mu.disableFileDeletions > 0 {
494 1 : return
495 1 : }
496 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
497 1 :
498 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
499 1 : // log that has not had its contents flushed to an sstable.
500 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
501 1 : if err != nil {
502 0 : panic(err)
503 : }
504 :
505 1 : obsoleteTables := slices.Clone(d.mu.versions.obsoleteTables)
506 1 : d.mu.versions.obsoleteTables = d.mu.versions.obsoleteTables[:0]
507 1 : obsoleteBlobs := slices.Clone(d.mu.versions.obsoleteBlobs)
508 1 : d.mu.versions.obsoleteBlobs = d.mu.versions.obsoleteBlobs[:0]
509 1 :
510 1 : // Ensure everything is already sorted. We want determinism for testing, and
511 1 : // we need the manifests to be sorted because we want to delete some
512 1 : // contiguous prefix of the older manifests.
513 1 : if invariants.Enabled {
514 1 : switch {
515 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteManifests, cmpObsoleteFileNumbers):
516 0 : d.opts.Logger.Fatalf("obsoleteManifests is not sorted")
517 0 : case !slices.IsSortedFunc(d.mu.versions.obsoleteOptions, cmpObsoleteFileNumbers):
518 0 : d.opts.Logger.Fatalf("obsoleteOptions is not sorted")
519 0 : case !slices.IsSortedFunc(obsoleteTables, cmpObjectFileNums):
520 0 : d.opts.Logger.Fatalf("obsoleteTables is not sorted")
521 0 : case !slices.IsSortedFunc(obsoleteBlobs, cmpObjectFileNums):
522 0 : d.opts.Logger.Fatalf("obsoleteBlobs is not sorted")
523 : }
524 : }
525 :
526 1 : var obsoleteManifests []obsoleteFile
527 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
528 1 : if manifestsToDelete > 0 {
529 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
530 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
531 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
532 0 : d.mu.versions.obsoleteManifests = nil
533 0 : }
534 : }
535 :
536 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
537 1 : d.mu.versions.obsoleteOptions = nil
538 1 :
539 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
540 1 : // Note the unusual order: Unlock and then Lock.
541 1 : d.mu.Unlock()
542 1 : defer d.mu.Lock()
543 1 :
544 1 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
545 1 : filesToDelete = append(filesToDelete, obsoleteManifests...)
546 1 : filesToDelete = append(filesToDelete, obsoleteOptions...)
547 1 : for _, f := range obsoleteLogs {
548 1 : filesToDelete = append(filesToDelete, obsoleteFile{
549 1 : fileType: base.FileTypeLog,
550 1 : fs: f.FS,
551 1 : path: f.Path,
552 1 : fileNum: base.DiskFileNum(f.NumWAL),
553 1 : fileSize: f.ApproxFileSize,
554 1 : isLocal: true,
555 1 : })
556 1 : }
557 1 : for _, f := range obsoleteTables {
558 1 : d.fileCache.Evict(f.FileNum, base.FileTypeTable)
559 1 : filesToDelete = append(filesToDelete, f.asObsoleteFile(d.opts.FS, base.FileTypeTable, d.dirname))
560 1 : }
561 1 : for _, f := range obsoleteBlobs {
562 0 : d.fileCache.Evict(f.FileNum, base.FileTypeBlob)
563 0 : filesToDelete = append(filesToDelete, f.asObsoleteFile(d.opts.FS, base.FileTypeBlob, d.dirname))
564 0 : }
565 :
566 1 : if len(filesToDelete) > 0 {
567 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
568 1 : }
569 1 : if d.opts.private.testingAlwaysWaitForCleanup {
570 0 : d.cleanupManager.Wait()
571 0 : }
572 : }
573 :
574 1 : func (d *DB) maybeScheduleObsoleteObjectDeletion() {
575 1 : d.mu.Lock()
576 1 : defer d.mu.Unlock()
577 1 : if len(d.mu.versions.obsoleteTables) > 0 || len(d.mu.versions.obsoleteBlobs) > 0 {
578 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
579 1 : }
580 : }
581 :
582 1 : func mergeObsoleteFiles(a, b []obsoleteFile) []obsoleteFile {
583 1 : if len(b) == 0 {
584 1 : return a
585 1 : }
586 :
587 1 : a = append(a, b...)
588 1 : slices.SortFunc(a, cmpObsoleteFileNumbers)
589 1 : return slices.CompactFunc(a, func(a, b obsoleteFile) bool {
590 1 : return a.fileNum == b.fileNum
591 1 : })
592 : }
593 :
594 1 : func cmpObsoleteFileNumbers(a, b obsoleteFile) int {
595 1 : return cmp.Compare(a.fileNum, b.fileNum)
596 1 : }
597 :
598 : // objectInfo describes an object in object storage (either a sstable or a blob
599 : // file).
600 : type objectInfo struct {
601 : fileInfo
602 : isLocal bool
603 : }
604 :
605 1 : func (o objectInfo) asObsoleteFile(fs vfs.FS, fileType base.FileType, dirname string) obsoleteFile {
606 1 : return obsoleteFile{
607 1 : fileType: fileType,
608 1 : fs: fs,
609 1 : path: base.MakeFilepath(fs, dirname, fileType, o.FileNum),
610 1 : fileNum: o.FileNum,
611 1 : fileSize: o.FileSize,
612 1 : isLocal: o.isLocal,
613 1 : }
614 1 : }
615 :
616 1 : func makeZombieObjects() zombieObjects {
617 1 : return zombieObjects{
618 1 : objs: make(map[base.DiskFileNum]objectInfo),
619 1 : }
620 1 : }
621 :
622 : // zombieObjects tracks a set of objects that are no longer required by the most
623 : // recent version of the LSM, but may still need to be accessed by an open
624 : // iterator. Such objects are 'dead,' but cannot be deleted until iterators that
625 : // may access them are closed.
626 : type zombieObjects struct {
627 : objs map[base.DiskFileNum]objectInfo
628 : totalSize uint64
629 : localSize uint64
630 : localCount uint64
631 : }
632 :
633 : // Add adds an object to the set of zombie objects.
634 1 : func (z *zombieObjects) Add(obj objectInfo) {
635 1 : if _, ok := z.objs[obj.FileNum]; ok {
636 0 : panic(errors.AssertionFailedf("zombie object %s already exists", obj.FileNum))
637 : }
638 1 : z.objs[obj.FileNum] = obj
639 1 : z.totalSize += obj.FileSize
640 1 : if obj.isLocal {
641 1 : z.localSize += obj.FileSize
642 1 : z.localCount++
643 1 : }
644 : }
645 :
646 : // AddMetadata is like Add, but takes an ObjectMetadata and the object's size.
647 1 : func (z *zombieObjects) AddMetadata(meta *objstorage.ObjectMetadata, size uint64) {
648 1 : z.Add(objectInfo{
649 1 : fileInfo: fileInfo{
650 1 : FileNum: meta.DiskFileNum,
651 1 : FileSize: size,
652 1 : },
653 1 : isLocal: !meta.IsRemote(),
654 1 : })
655 1 : }
656 :
657 : // Count returns the number of zombie objects.
658 1 : func (z *zombieObjects) Count() int {
659 1 : return len(z.objs)
660 1 : }
661 :
662 : // Extract removes an object from the set of zombie objects, returning the
663 : // object that was removed.
664 1 : func (z *zombieObjects) Extract(fileNum base.DiskFileNum) objectInfo {
665 1 : obj, ok := z.objs[fileNum]
666 1 : if !ok {
667 0 : panic(errors.AssertionFailedf("zombie object %s not found", fileNum))
668 : }
669 1 : delete(z.objs, fileNum)
670 1 :
671 1 : // Detect underflow in case we have a bug that causes an object's size to be
672 1 : // mutated.
673 1 : if z.totalSize < obj.FileSize {
674 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than total size %d", fileNum, obj.FileSize, z.totalSize))
675 : }
676 1 : if obj.isLocal && z.localSize < obj.FileSize {
677 0 : panic(errors.AssertionFailedf("zombie object %s size %d is greater than local size %d", fileNum, obj.FileSize, z.localSize))
678 : }
679 :
680 1 : z.totalSize -= obj.FileSize
681 1 : if obj.isLocal {
682 1 : z.localSize -= obj.FileSize
683 1 : z.localCount--
684 1 : }
685 1 : return obj
686 : }
687 :
688 : // TotalSize returns the size of all objects in the set.
689 1 : func (z *zombieObjects) TotalSize() uint64 {
690 1 : return z.totalSize
691 1 : }
692 :
693 : // LocalStats returns the count and size of all local objects in the set.
694 1 : func (z *zombieObjects) LocalStats() (count uint64, size uint64) {
695 1 : return z.localCount, z.localSize
696 1 : }
697 :
698 1 : func mergeObjectInfos(a, b []objectInfo) []objectInfo {
699 1 : if len(b) == 0 {
700 1 : return a
701 1 : }
702 1 : a = append(a, b...)
703 1 : slices.SortFunc(a, cmpObjectFileNums)
704 1 : return slices.CompactFunc(a, func(a, b objectInfo) bool {
705 1 : return a.FileNum == b.FileNum
706 1 : })
707 : }
708 :
709 1 : func cmpObjectFileNums(a, b objectInfo) int {
710 1 : return cmp.Compare(a.FileNum, b.FileNum)
711 1 : }
|