LCOV - code coverage report
Current view: top level - pebble - pacer.go (source / functions) Hit Total Coverage
Test: 2024-06-14 08:16Z 29f4bd63 - meta test only.lcov Lines: 76 81 93.8 %
Date: 2024-06-14 08:16:56 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "sync"
       9             :         "time"
      10             : )
      11             : 
      12             : // deletionPacerInfo contains any info from the db necessary to make deletion
      13             : // pacing decisions (to limit background IO usage so that it does not contend
      14             : // with foreground traffic).
      15             : type deletionPacerInfo struct {
      16             :         freeBytes     uint64
      17             :         obsoleteBytes uint64
      18             :         liveBytes     uint64
      19             : }
      20             : 
      21             : // deletionPacer rate limits deletions of obsolete files. This is necessary to
      22             : // prevent overloading the disk with too many deletions too quickly after a
      23             : // large compaction, or an iterator close. On some SSDs, disk performance can be
      24             : // negatively impacted if too many blocks are deleted very quickly, so this
      25             : // mechanism helps mitigate that.
      26             : type deletionPacer struct {
      27             :         // If there are less than freeSpaceThreshold bytes of free space on
      28             :         // disk, increase the pace of deletions such that we delete enough bytes to
      29             :         // get back to the threshold within the freeSpaceTimeframe.
      30             :         freeSpaceThreshold uint64
      31             :         freeSpaceTimeframe time.Duration
      32             : 
      33             :         // If the ratio of obsolete bytes to live bytes is greater than
      34             :         // obsoleteBytesMaxRatio, increase the pace of deletions such that we delete
      35             :         // enough bytes to get back to the threshold within the obsoleteBytesTimeframe.
      36             :         obsoleteBytesMaxRatio  float64
      37             :         obsoleteBytesTimeframe time.Duration
      38             : 
      39             :         mu struct {
      40             :                 sync.Mutex
      41             : 
      42             :                 // history keeps rack of recent deletion history; it used to increase the
      43             :                 // deletion rate to match the pace of deletions.
      44             :                 history history
      45             :         }
      46             : 
      47             :         targetByteDeletionRate int64
      48             : 
      49             :         getInfo func() deletionPacerInfo
      50             : }
      51             : 
      52             : const deletePacerHistory = 5 * time.Minute
      53             : 
      54             : // newDeletionPacer instantiates a new deletionPacer for use when deleting
      55             : // obsolete files.
      56             : //
      57             : // targetByteDeletionRate is the rate (in bytes/sec) at which we want to
      58             : // normally limit deletes (when we are not falling behind or running out of
      59             : // space). A value of 0.0 disables pacing.
      60             : func newDeletionPacer(
      61             :         now time.Time, targetByteDeletionRate int64, getInfo func() deletionPacerInfo,
      62           1 : ) *deletionPacer {
      63           1 :         d := &deletionPacer{
      64           1 :                 freeSpaceThreshold: 16 << 30, // 16 GB
      65           1 :                 freeSpaceTimeframe: 10 * time.Second,
      66           1 : 
      67           1 :                 obsoleteBytesMaxRatio:  0.20,
      68           1 :                 obsoleteBytesTimeframe: 5 * time.Minute,
      69           1 : 
      70           1 :                 targetByteDeletionRate: targetByteDeletionRate,
      71           1 :                 getInfo:                getInfo,
      72           1 :         }
      73           1 :         d.mu.history.Init(now, deletePacerHistory)
      74           1 :         return d
      75           1 : }
      76             : 
      77             : // ReportDeletion is used to report a deletion to the pacer. The pacer uses it
      78             : // to keep track of the recent rate of deletions and potentially increase the
      79             : // deletion rate accordingly.
      80             : //
      81             : // ReportDeletion is thread-safe.
      82           1 : func (p *deletionPacer) ReportDeletion(now time.Time, bytesToDelete uint64) {
      83           1 :         p.mu.Lock()
      84           1 :         defer p.mu.Unlock()
      85           1 :         p.mu.history.Add(now, int64(bytesToDelete))
      86           1 : }
      87             : 
      88             : // PacingDelay returns the recommended pacing wait time (in seconds) for
      89             : // deleting the given number of bytes.
      90             : //
      91             : // PacingDelay is thread-safe.
      92           1 : func (p *deletionPacer) PacingDelay(now time.Time, bytesToDelete uint64) (waitSeconds float64) {
      93           1 :         if p.targetByteDeletionRate == 0 {
      94           1 :                 // Pacing disabled.
      95           1 :                 return 0.0
      96           1 :         }
      97             : 
      98           1 :         baseRate := float64(p.targetByteDeletionRate)
      99           1 :         // If recent deletion rate is more than our target, use that so that we don't
     100           1 :         // fall behind.
     101           1 :         historicRate := func() float64 {
     102           1 :                 p.mu.Lock()
     103           1 :                 defer p.mu.Unlock()
     104           1 :                 return float64(p.mu.history.Sum(now)) / deletePacerHistory.Seconds()
     105           1 :         }()
     106           1 :         if historicRate > baseRate {
     107           0 :                 baseRate = historicRate
     108           0 :         }
     109             : 
     110             :         // Apply heuristics to increase the deletion rate.
     111           1 :         var extraRate float64
     112           1 :         info := p.getInfo()
     113           1 :         if info.freeBytes <= p.freeSpaceThreshold {
     114           0 :                 // Increase the rate so that we can free up enough bytes within the timeframe.
     115           0 :                 extraRate = float64(p.freeSpaceThreshold-info.freeBytes) / p.freeSpaceTimeframe.Seconds()
     116           0 :         }
     117           1 :         if info.liveBytes == 0 {
     118           1 :                 // We don't know the obsolete bytes ratio. Disable pacing altogether.
     119           1 :                 return 0.0
     120           1 :         }
     121           1 :         obsoleteBytesRatio := float64(info.obsoleteBytes) / float64(info.liveBytes)
     122           1 :         if obsoleteBytesRatio >= p.obsoleteBytesMaxRatio {
     123           1 :                 // Increase the rate so that we can free up enough bytes within the timeframe.
     124           1 :                 r := (obsoleteBytesRatio - p.obsoleteBytesMaxRatio) * float64(info.liveBytes) / p.obsoleteBytesTimeframe.Seconds()
     125           1 :                 if extraRate < r {
     126           1 :                         extraRate = r
     127           1 :                 }
     128             :         }
     129             : 
     130           1 :         return float64(bytesToDelete) / (baseRate + extraRate)
     131             : }
     132             : 
     133             : // history is a helper used to keep track of the recent history of a set of
     134             : // data points (in our case deleted bytes), at limited granularity.
     135             : // Specifically, we split the desired timeframe into 100 "epochs" and all times
     136             : // are effectively rounded down to the nearest epoch boundary.
     137             : type history struct {
     138             :         epochDuration time.Duration
     139             :         startTime     time.Time
     140             :         // currEpoch is the epoch of the most recent operation.
     141             :         currEpoch int64
     142             :         // val contains the recent epoch values.
     143             :         // val[currEpoch % historyEpochs] is the current epoch.
     144             :         // val[(currEpoch + 1) % historyEpochs] is the oldest epoch.
     145             :         val [historyEpochs]int64
     146             :         // sum is always equal to the sum of values in val.
     147             :         sum int64
     148             : }
     149             : 
     150             : const historyEpochs = 100
     151             : 
     152             : // Init the history helper to keep track of data over the given number of
     153             : // seconds.
     154           1 : func (h *history) Init(now time.Time, timeframe time.Duration) {
     155           1 :         *h = history{
     156           1 :                 epochDuration: timeframe / time.Duration(historyEpochs),
     157           1 :                 startTime:     now,
     158           1 :                 currEpoch:     0,
     159           1 :                 sum:           0,
     160           1 :         }
     161           1 : }
     162             : 
     163             : // Add adds a value for the current time.
     164           1 : func (h *history) Add(now time.Time, val int64) {
     165           1 :         h.advance(now)
     166           1 :         h.val[h.currEpoch%historyEpochs] += val
     167           1 :         h.sum += val
     168           1 : }
     169             : 
     170             : // Sum returns the sum of recent values. The result is approximate in that the
     171             : // cut-off time is within 1% of the exact one.
     172           1 : func (h *history) Sum(now time.Time) int64 {
     173           1 :         h.advance(now)
     174           1 :         return h.sum
     175           1 : }
     176             : 
     177           1 : func (h *history) epoch(t time.Time) int64 {
     178           1 :         return int64(t.Sub(h.startTime) / h.epochDuration)
     179           1 : }
     180             : 
     181             : // advance advances the time to the given time.
     182           1 : func (h *history) advance(now time.Time) {
     183           1 :         epoch := h.epoch(now)
     184           1 :         for h.currEpoch < epoch {
     185           1 :                 h.currEpoch++
     186           1 :                 // Forget the data for the oldest epoch.
     187           1 :                 h.sum -= h.val[h.currEpoch%historyEpochs]
     188           1 :                 h.val[h.currEpoch%historyEpochs] = 0
     189           1 :         }
     190             : }

Generated by: LCOV version 1.14