LCOV - code coverage report
Current view: top level - pebble/replay - workload_capture.go (source / functions) Hit Total Coverage
Test: 2024-05-17 08:16Z 2ac449bb - tests + meta.lcov Lines: 233 248 94.0 %
Date: 2024-05-17 08:17:28 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package replay
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "io"
      10             :         "sync"
      11             :         "sync/atomic"
      12             : 
      13             :         "github.com/cockroachdb/pebble"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/vfs"
      16             : )
      17             : 
      18             : type workloadCaptureState uint8
      19             : 
      20             : const (
      21             :         obsolete = workloadCaptureState(1) << iota
      22             :         readyForProcessing
      23             :         capturedSuccessfully
      24             : )
      25             : 
      26           1 : func (wcs workloadCaptureState) is(flag workloadCaptureState) bool { return wcs&flag != 0 }
      27             : 
      28             : type manifestDetails struct {
      29             :         sourceFilepath string
      30             :         sourceFile     vfs.File
      31             : 
      32             :         destFile vfs.File
      33             : }
      34             : 
      35             : // WorkloadCollector is designed to capture workloads by handling manifest
      36             : // files, flushed SSTs and ingested SSTs. The collector hooks into the
      37             : // pebble.EventListener and pebble.Cleaner in order keep track of file states.
      38             : type WorkloadCollector struct {
      39             :         mu struct {
      40             :                 sync.Mutex
      41             :                 fileState map[string]workloadCaptureState
      42             :                 // pendingSSTables holds a slice of file paths to sstables that need to
      43             :                 // be copied but haven't yet. The `copyFiles` goroutine grabs these
      44             :                 // files, and the flush and ingest event handlers append them.
      45             :                 pendingSSTables []string
      46             :                 // manifestIndex is an index into `manifests`, pointing to the
      47             :                 // manifest currently being copied.
      48             :                 manifestIndex int
      49             :                 // appending to manifests requires holding mu. Only the `copyFiles`
      50             :                 // goroutine is permitted to read or edit the struct contents once
      51             :                 // appended, so it does not need to hold mu while accessing the structs'
      52             :                 // fields.
      53             :                 manifests []*manifestDetails
      54             : 
      55             :                 // The following condition variable and counts are used in tests to
      56             :                 // synchronize with the copying goroutine.
      57             :                 copyCond       sync.Cond
      58             :                 tablesCopied   int
      59             :                 tablesEnqueued int
      60             :         }
      61             :         // Stores the current manifest that is being used by the database.
      62             :         curManifest atomic.Uint64
      63             :         // Stores whether the workload collector is enabled.
      64             :         enabled atomic.Bool
      65             :         buffer  []byte
      66             :         // config contains information that is only set on the creation of the
      67             :         // WorkloadCollector.
      68             :         config struct {
      69             :                 // srcFS and srcDir represent the location from which the workload collector
      70             :                 // collects the files from.
      71             :                 srcFS  vfs.FS
      72             :                 srcDir string
      73             :                 // destFS and destDir represent the location to which the workload collector
      74             :                 // sends the files to.
      75             :                 destFS  vfs.FS
      76             :                 destDir string
      77             :                 // cleaner stores the cleaner to use when files become obsolete and need to
      78             :                 // be cleaned.
      79             :                 cleaner base.Cleaner
      80             :         }
      81             :         copier struct {
      82             :                 sync.Cond
      83             :                 stop bool
      84             :                 done chan struct{}
      85             :         }
      86             : }
      87             : 
      88             : // NewWorkloadCollector is used externally to create a New WorkloadCollector.
      89           1 : func NewWorkloadCollector(srcDir string) *WorkloadCollector {
      90           1 :         wc := &WorkloadCollector{}
      91           1 :         wc.buffer = make([]byte, 1<<10 /* 1KB */)
      92           1 :         wc.config.srcDir = srcDir
      93           1 :         wc.mu.copyCond.L = &wc.mu.Mutex
      94           1 :         wc.mu.fileState = make(map[string]workloadCaptureState)
      95           1 :         wc.copier.Cond.L = &wc.mu.Mutex
      96           1 :         return wc
      97           1 : }
      98             : 
      99             : // Attach is used to set up the WorkloadCollector by attaching itself to
     100             : // pebble.Options EventListener and Cleaner.
     101           1 : func (w *WorkloadCollector) Attach(opts *pebble.Options) {
     102           1 :         opts.AddEventListener(pebble.EventListener{
     103           1 :                 FlushEnd:        w.onFlushEnd,
     104           1 :                 ManifestCreated: w.onManifestCreated,
     105           1 :                 TableIngested:   w.onTableIngest,
     106           1 :         })
     107           1 : 
     108           1 :         opts.EnsureDefaults()
     109           1 :         // Replace the original Cleaner with the workload collector's implementation,
     110           1 :         // which will invoke the original Cleaner, but only once the collector's copied
     111           1 :         // what it needs.
     112           1 :         c := cleaner{
     113           1 :                 name:  fmt.Sprintf("replay.WorkloadCollector(%q)", opts.Cleaner),
     114           1 :                 clean: w.clean,
     115           1 :         }
     116           1 :         w.config.cleaner, opts.Cleaner = opts.Cleaner, c
     117           1 :         w.config.srcFS = opts.FS
     118           1 : }
     119             : 
     120             : // enqueueCopyLocked enqueues the sstable with the provided filenum be copied in
     121             : // the background. Requires w.mu.
     122           1 : func (w *WorkloadCollector) enqueueCopyLocked(fileNum base.DiskFileNum) {
     123           1 :         fileName := base.MakeFilename(base.FileTypeTable, fileNum)
     124           1 :         w.mu.fileState[fileName] |= readyForProcessing
     125           1 :         w.mu.pendingSSTables = append(w.mu.pendingSSTables, w.srcFilepath(fileName))
     126           1 :         w.mu.tablesEnqueued++
     127           1 : }
     128             : 
     129             : // cleanFile calls the cleaner on the specified path and removes the path from
     130             : // the fileState map.
     131           1 : func (w *WorkloadCollector) cleanFile(fileType base.FileType, path string) error {
     132           1 :         err := w.config.cleaner.Clean(w.config.srcFS, fileType, path)
     133           1 :         if err == nil {
     134           1 :                 w.mu.Lock()
     135           1 :                 delete(w.mu.fileState, w.config.srcFS.PathBase(path))
     136           1 :                 w.mu.Unlock()
     137           1 :         }
     138           1 :         return err
     139             : }
     140             : 
     141             : // clean deletes files only after they have been processed or are not required
     142             : // for the workload collection.
     143           1 : func (w *WorkloadCollector) clean(fs vfs.FS, fileType base.FileType, path string) error {
     144           1 :         if !w.IsRunning() {
     145           1 :                 return w.cleanFile(fileType, path)
     146           1 :         }
     147           1 :         w.mu.Lock()
     148           1 :         fileName := fs.PathBase(path)
     149           1 :         if fileState, ok := w.mu.fileState[fileName]; !ok || fileState.is(capturedSuccessfully) {
     150           1 :                 // Delete the file if it has been captured or the file is not important
     151           1 :                 // to capture which means it can be deleted.
     152           1 :                 w.mu.Unlock()
     153           1 :                 return w.cleanFile(fileType, path)
     154           1 :         }
     155           1 :         w.mu.fileState[fileName] |= obsolete
     156           1 :         w.mu.Unlock()
     157           1 :         return nil
     158             : }
     159             : 
     160             : // onTableIngest is attached to a pebble.DB as an EventListener.TableIngested
     161             : // func. It enqueues all ingested tables to be copied.
     162           1 : func (w *WorkloadCollector) onTableIngest(info pebble.TableIngestInfo) {
     163           1 :         if !w.IsRunning() {
     164           0 :                 return
     165           0 :         }
     166           1 :         w.mu.Lock()
     167           1 :         defer w.mu.Unlock()
     168           1 :         for _, table := range info.Tables {
     169           1 :                 w.enqueueCopyLocked(base.PhysicalTableDiskFileNum(table.FileNum))
     170           1 :         }
     171           1 :         w.copier.Broadcast()
     172             : }
     173             : 
     174             : // onFlushEnd is attached to a pebble.DB as an EventListener.FlushEnd func. It
     175             : // enqueues all flushed tables to be copied.
     176           1 : func (w *WorkloadCollector) onFlushEnd(info pebble.FlushInfo) {
     177           1 :         if !w.IsRunning() {
     178           1 :                 return
     179           1 :         }
     180           1 :         w.mu.Lock()
     181           1 :         defer w.mu.Unlock()
     182           1 :         for _, table := range info.Output {
     183           1 :                 w.enqueueCopyLocked(base.PhysicalTableDiskFileNum(table.FileNum))
     184           1 :         }
     185           1 :         w.copier.Broadcast()
     186             : }
     187             : 
     188             : // onManifestCreated is attached to a pebble.DB as an
     189             : // EventListener.ManifestCreated func. It records the new manifest so that
     190             : // it's copied asynchronously in the background.
     191           1 : func (w *WorkloadCollector) onManifestCreated(info pebble.ManifestCreateInfo) {
     192           1 :         w.curManifest.Store(uint64(info.FileNum))
     193           1 :         if !w.enabled.Load() {
     194           1 :                 return
     195           1 :         }
     196           1 :         w.mu.Lock()
     197           1 :         defer w.mu.Unlock()
     198           1 : 
     199           1 :         // mark the manifest file as ready for processing to prevent it from being
     200           1 :         // cleaned before we process it.
     201           1 :         fileName := base.MakeFilename(base.FileTypeManifest, info.FileNum)
     202           1 :         w.mu.fileState[fileName] |= readyForProcessing
     203           1 :         w.mu.manifests = append(w.mu.manifests, &manifestDetails{
     204           1 :                 sourceFilepath: info.Path,
     205           1 :         })
     206             : }
     207             : 
     208             : // copyFiles is run in a separate goroutine, copying sstables and manifests.
     209           1 : func (w *WorkloadCollector) copyFiles() {
     210           1 :         w.mu.Lock()
     211           1 :         defer w.mu.Unlock()
     212           1 :         // NB: This loop must hold w.mu at the beginning of each iteration. It may
     213           1 :         // drop w.mu at times, but it must reacquire it before the next iteration.
     214           1 :         for !w.copier.stop {
     215           1 :                 // The following performs the workload capture. It waits on a condition
     216           1 :                 // variable (fileListener) to let it know when new files are available to be
     217           1 :                 // collected.
     218           1 :                 if len(w.mu.pendingSSTables) == 0 {
     219           1 :                         w.copier.Wait()
     220           1 :                 }
     221             :                 // Grab the manifests to copy.
     222           1 :                 index := w.mu.manifestIndex
     223           1 :                 pendingManifests := w.mu.manifests[index:]
     224           1 :                 var pending []string
     225           1 :                 pending, w.mu.pendingSSTables = w.mu.pendingSSTables, nil
     226           1 :                 func() {
     227           1 :                         // Note the unusual lock order; Temporarily unlock the
     228           1 :                         // mutex, but re-acquire it before returning.
     229           1 :                         w.mu.Unlock()
     230           1 :                         defer w.mu.Lock()
     231           1 : 
     232           1 :                         // Copy any updates to the manifests files.
     233           1 :                         w.copyManifests(index, pendingManifests)
     234           1 :                         // Copy the SSTables provided in pending. copySSTables takes
     235           1 :                         // ownership of the pending slice.
     236           1 :                         w.copySSTables(pending)
     237           1 :                 }()
     238             : 
     239             :                 // This helps in tests; Tests can wait on the copyCond condition
     240             :                 // variable until the necessary bits have been copied.
     241           1 :                 w.mu.tablesCopied += len(pending)
     242           1 :                 w.mu.copyCond.Broadcast()
     243             :         }
     244             : 
     245           1 :         for idx := range w.mu.manifests {
     246           1 :                 if f := w.mu.manifests[idx].sourceFile; f != nil {
     247           1 :                         if err := f.Close(); err != nil {
     248           0 :                                 panic(err)
     249             :                         }
     250           1 :                         w.mu.manifests[idx].sourceFile = nil
     251             :                 }
     252           1 :                 if f := w.mu.manifests[idx].destFile; f != nil {
     253           1 :                         if err := f.Close(); err != nil {
     254           0 :                                 panic(err)
     255             :                         }
     256           1 :                         w.mu.manifests[idx].destFile = nil
     257             :                 }
     258             :         }
     259           1 :         close(w.copier.done)
     260             : }
     261             : 
     262             : // copyManifests copies any un-copied portions of the source manifests.
     263           1 : func (w *WorkloadCollector) copyManifests(startAtIndex int, manifests []*manifestDetails) {
     264           1 :         destFS := w.config.destFS
     265           1 : 
     266           1 :         for index, manifest := range manifests {
     267           1 :                 if manifest.destFile == nil && manifest.sourceFile == nil {
     268           1 :                         // This is the first time we've read from this manifest, and we
     269           1 :                         // don't yet have open file descriptors for the src or dst files. It
     270           1 :                         // is safe to write to manifest.{destFile,sourceFile} without
     271           1 :                         // holding d.mu, because the copyFiles goroutine is the only
     272           1 :                         // goroutine that accesses the fields of the `manifestDetails`
     273           1 :                         // struct.
     274           1 :                         var err error
     275           1 :                         manifest.destFile, err = destFS.Create(w.destFilepath(destFS.PathBase(manifest.sourceFilepath)), vfs.WriteCategoryUnspecified)
     276           1 :                         if err != nil {
     277           0 :                                 panic(err)
     278             :                         }
     279           1 :                         manifest.sourceFile, err = w.config.srcFS.Open(manifest.sourceFilepath)
     280           1 :                         if err != nil {
     281           0 :                                 panic(err)
     282             :                         }
     283             :                 }
     284             : 
     285           1 :                 numBytesRead, err := io.CopyBuffer(manifest.destFile, manifest.sourceFile, w.buffer)
     286           1 :                 if err != nil {
     287           0 :                         panic(err)
     288             :                 }
     289             : 
     290             :                 // Read 0 bytes from the current manifest and this is not the
     291             :                 // latest/newest manifest which means we have read its entirety. No new
     292             :                 // data will be written to it, because only the latest manifest may
     293             :                 // receive edits. Close the current source and destination files and
     294             :                 // move the manifest to start at the next index in w.mu.manifests.
     295           1 :                 if numBytesRead == 0 && index != len(manifests)-1 {
     296           1 :                         // Rotating the manifests so we can close the files.
     297           1 :                         if err := manifests[index].sourceFile.Close(); err != nil {
     298           0 :                                 panic(err)
     299             :                         }
     300           1 :                         manifests[index].sourceFile = nil
     301           1 :                         if err := manifests[index].destFile.Close(); err != nil {
     302           0 :                                 panic(err)
     303             :                         }
     304           1 :                         manifests[index].destFile = nil
     305           1 :                         w.mu.Lock()
     306           1 :                         w.mu.manifestIndex = startAtIndex + index + 1
     307           1 :                         w.mu.Unlock()
     308             :                 }
     309             :         }
     310             : }
     311             : 
     312             : // copySSTables copies the provided sstables to the stored workload. If a file
     313             : // has already been marked as obsolete, then file will be cleaned by the
     314             : // w.config.cleaner after it is copied. The provided slice will be mutated and
     315             : // should not be used following the call to this function.
     316           1 : func (w *WorkloadCollector) copySSTables(pending []string) {
     317           1 :         for _, filePath := range pending {
     318           1 :                 err := vfs.CopyAcrossFS(w.config.srcFS,
     319           1 :                         filePath,
     320           1 :                         w.config.destFS,
     321           1 :                         w.destFilepath(w.config.srcFS.PathBase(filePath)))
     322           1 :                 if err != nil {
     323           0 :                         panic(err)
     324             :                 }
     325             :         }
     326             : 
     327             :         // Identify the subset of `pending` files that should now be cleaned. The
     328             :         // WorkloadCollector intercepts Cleaner.Clean calls to defer cleaning until
     329             :         // copying has completed. If Cleaner.Clean has already been invoked for any
     330             :         // of the files that copied, we can now actually Clean them.
     331           1 :         pendingClean := pending[:0]
     332           1 :         w.mu.Lock()
     333           1 :         for _, filePath := range pending {
     334           1 :                 fileName := w.config.srcFS.PathBase(filePath)
     335           1 :                 if w.mu.fileState[fileName].is(obsolete) {
     336           1 :                         pendingClean = append(pendingClean, filePath)
     337           1 :                 } else {
     338           1 :                         w.mu.fileState[fileName] |= capturedSuccessfully
     339           1 :                 }
     340             :         }
     341           1 :         w.mu.Unlock()
     342           1 : 
     343           1 :         for _, path := range pendingClean {
     344           1 :                 _ = w.cleanFile(base.FileTypeTable, path)
     345           1 :         }
     346             : }
     347             : 
     348             : // Start begins collecting a workload. All flushed and ingested sstables, plus
     349             : // corresponding manifests are copied to the provided destination path on the
     350             : // provided FS.
     351           1 : func (w *WorkloadCollector) Start(destFS vfs.FS, destPath string) {
     352           1 :         w.mu.Lock()
     353           1 :         defer w.mu.Unlock()
     354           1 : 
     355           1 :         // If the collector not is running then that means w.enabled == 0 so swap it
     356           1 :         // to 1 and continue else return since it is already running.
     357           1 :         if !w.enabled.CompareAndSwap(false, true) {
     358           0 :                 return
     359           0 :         }
     360           1 :         w.config.destFS = destFS
     361           1 :         w.config.destDir = destPath
     362           1 : 
     363           1 :         // Initialize the tracked manifests to the database's current manifest, if
     364           1 :         // the database has already started. Every database Open creates a new
     365           1 :         // manifest. There are two cases:
     366           1 :         //   1. The database has already been opened. Then `w.atomic.curManifest`
     367           1 :         //      contains the file number of the current manifest. We must initialize
     368           1 :         //      the w.mu.manifests slice to contain this first manifest.
     369           1 :         //   2. The database has not yet been opened. Then `w.atomic.curManifest` is
     370           1 :         //      still zero. Once the associated database is opened, it'll invoke
     371           1 :         //      onManifestCreated which will handle enqueuing the manifest on
     372           1 :         //      `w.mu.manifests`.
     373           1 :         if fileNum := base.DiskFileNum(w.curManifest.Load()); fileNum != 0 {
     374           1 :                 fileName := base.MakeFilename(base.FileTypeManifest, fileNum)
     375           1 :                 w.mu.manifests = append(w.mu.manifests[:0], &manifestDetails{sourceFilepath: w.srcFilepath(fileName)})
     376           1 :                 w.mu.fileState[fileName] |= readyForProcessing
     377           1 :         }
     378             : 
     379             :         // Begin copying files asynchronously in the background.
     380           1 :         w.copier.done = make(chan struct{})
     381           1 :         w.copier.stop = false
     382           1 :         go w.copyFiles()
     383             : }
     384             : 
     385             : // WaitAndStop waits for all enqueued sstables to be copied over, and then
     386             : // calls Stop. Gracefully ensures that all sstables referenced in the collected
     387             : // manifest's latest version edit will exist in the copy directory.
     388           1 : func (w *WorkloadCollector) WaitAndStop() {
     389           1 :         w.mu.Lock()
     390           1 :         for w.mu.tablesEnqueued != w.mu.tablesCopied {
     391           1 :                 w.mu.copyCond.Wait()
     392           1 :         }
     393           1 :         w.mu.Unlock()
     394           1 :         w.Stop()
     395             : }
     396             : 
     397             : // Stop stops collection of the workload.
     398           1 : func (w *WorkloadCollector) Stop() {
     399           1 :         w.mu.Lock()
     400           1 :         // If the collector is running then that means w.enabled == true so swap it to
     401           1 :         // false and continue else return since it is not running.
     402           1 :         if !w.enabled.CompareAndSwap(true, false) {
     403           0 :                 w.mu.Unlock()
     404           0 :                 return
     405           0 :         }
     406           1 :         w.copier.stop = true
     407           1 :         w.copier.Broadcast()
     408           1 :         w.mu.Unlock()
     409           1 :         <-w.copier.done
     410             : }
     411             : 
     412             : // IsRunning returns whether the WorkloadCollector is currently running.
     413           1 : func (w *WorkloadCollector) IsRunning() bool {
     414           1 :         return w.enabled.Load()
     415           1 : }
     416             : 
     417             : // srcFilepath returns the file path to the named file in the source directory
     418             : // on the source filesystem.
     419           1 : func (w *WorkloadCollector) srcFilepath(name string) string {
     420           1 :         return w.config.srcFS.PathJoin(w.config.srcDir, name)
     421           1 : }
     422             : 
     423             : // destFilepath returns the file path to the named file in the destination
     424             : // directory on the destination filesystem.
     425           1 : func (w *WorkloadCollector) destFilepath(name string) string {
     426           1 :         return w.config.destFS.PathJoin(w.config.destDir, name)
     427           1 : }
     428             : 
     429             : type cleaner struct {
     430             :         name  string
     431             :         clean func(vfs.FS, base.FileType, string) error
     432             : }
     433             : 
     434           1 : func (c cleaner) String() string { return c.name }
     435           1 : func (c cleaner) Clean(fs vfs.FS, fileType base.FileType, path string) error {
     436           1 :         return c.clean(fs, fileType, path)
     437           1 : }

Generated by: LCOV version 1.14