LCOV - code coverage report
Current view: top level - pebble/vfs/atomicfs - marker.go (source / functions) Hit Total Coverage
Test: 2024-06-21 08:16Z a654bd3e - meta test only.lcov Lines: 80 136 58.8 %
Date: 2024-06-21 08:17:34 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package atomicfs
       6             : 
       7             : import (
       8             :         "fmt"
       9             :         "strconv"
      10             :         "strings"
      11             : 
      12             :         "github.com/cockroachdb/errors"
      13             :         "github.com/cockroachdb/errors/oserror"
      14             :         "github.com/cockroachdb/pebble/vfs"
      15             : )
      16             : 
      17             : // ReadMarker looks up the current state of a marker returning just the
      18             : // current value of the marker. Callers that may need to move the marker
      19             : // to a new value should use LocateMarker.
      20           0 : func ReadMarker(fs vfs.FS, dir, markerName string) (string, error) {
      21           0 :         ls, err := fs.List(dir)
      22           0 :         if err != nil {
      23           0 :                 return "", err
      24           0 :         }
      25           0 :         state, err := scanForMarker(fs, ls, markerName)
      26           0 :         if err != nil {
      27           0 :                 return "", err
      28           0 :         }
      29           0 :         return state.value, nil
      30             : }
      31             : 
      32             : // LocateMarker loads the current state of a marker. It returns a handle
      33             : // to the Marker that may be used to move the marker and the
      34             : // current value of the marker.
      35           1 : func LocateMarker(fs vfs.FS, dir, markerName string) (*Marker, string, error) {
      36           1 :         ls, err := fs.List(dir)
      37           1 :         if err != nil {
      38           0 :                 return nil, "", err
      39           0 :         }
      40           1 :         return LocateMarkerInListing(fs, dir, markerName, ls)
      41             : }
      42             : 
      43             : // LocateMarkerInListing is like LocateMarker but takes a listing of the files
      44             : // contained within dir. It's useful when the caller has already listed the
      45             : // directory entries of dir for its own purposes.
      46             : func LocateMarkerInListing(
      47             :         fs vfs.FS, dir, markerName string, ls []string,
      48           1 : ) (*Marker, string, error) {
      49           1 :         state, err := scanForMarker(fs, ls, markerName)
      50           1 :         if err != nil {
      51           0 :                 return nil, "", err
      52           0 :         }
      53           1 :         dirFD, err := fs.OpenDir(dir)
      54           1 :         if err != nil {
      55           0 :                 return nil, "", err
      56           0 :         }
      57           1 :         return &Marker{
      58           1 :                 fs:            fs,
      59           1 :                 dir:           dir,
      60           1 :                 dirFD:         dirFD,
      61           1 :                 name:          markerName,
      62           1 :                 filename:      state.filename,
      63           1 :                 iter:          state.iter,
      64           1 :                 obsoleteFiles: state.obsolete,
      65           1 :         }, state.value, nil
      66             : }
      67             : 
      68             : type scannedState struct {
      69             :         // filename is the latest marker file found (the one with the highest iter value).
      70             :         filename string
      71             :         iter     uint64
      72             :         value    string
      73             :         // obsolete is a list of earlier markers that were found.
      74             :         obsolete []string
      75             : }
      76             : 
      77           1 : func scanForMarker(fs vfs.FS, ls []string, markerName string) (scannedState, error) {
      78           1 :         var state scannedState
      79           1 :         for _, filename := range ls {
      80           1 :                 if !strings.HasPrefix(filename, `marker.`) {
      81           1 :                         continue
      82             :                 }
      83             :                 // Any filenames with the `marker.` prefix are required to be
      84             :                 // well-formed and parse as markers.
      85           1 :                 name, iter, value, err := parseMarkerFilename(filename)
      86           1 :                 if err != nil {
      87           0 :                         return scannedState{}, err
      88           0 :                 }
      89           1 :                 if name != markerName {
      90           1 :                         continue
      91             :                 }
      92             : 
      93           1 :                 if state.filename == "" || state.iter < iter {
      94           1 :                         if state.filename != "" {
      95           0 :                                 state.obsolete = append(state.obsolete, state.filename)
      96           0 :                         }
      97           1 :                         state.filename = filename
      98           1 :                         state.iter = iter
      99           1 :                         state.value = value
     100           0 :                 } else {
     101           0 :                         state.obsolete = append(state.obsolete, filename)
     102           0 :                 }
     103             :         }
     104           1 :         return state, nil
     105             : }
     106             : 
     107             : // A Marker provides an interface for maintaining a single string value on the
     108             : // filesystem. The marker may be atomically moved from value to value.
     109             : //
     110             : // The implementation creates a new marker file for each new value, embedding
     111             : // the value in the marker filename.
     112             : //
     113             : // Marker is not safe for concurrent use. Multiple processes may not read or
     114             : // move the same marker simultaneously. A Marker may only be constructed through
     115             : // LocateMarker.
     116             : //
     117             : // Marker names must be unique within the directory.
     118             : type Marker struct {
     119             :         fs    vfs.FS
     120             :         dir   string
     121             :         dirFD vfs.File
     122             :         // name identifies the marker.
     123             :         name string
     124             :         // filename contains the entire filename of the current marker. It
     125             :         // has a format of `marker.<name>.<iter>.<value>`. It's not
     126             :         // necessarily in sync with iter, since filename is only updated
     127             :         // when the marker is successfully moved.
     128             :         filename string
     129             :         // iter holds the current iteration value. It matches the iteration
     130             :         // value encoded within filename, if filename is non-empty. Iter is
     131             :         // monotonically increasing over the lifetime of a marker. Actual
     132             :         // marker files will always have a positive iter value.
     133             :         iter uint64
     134             :         // obsoleteFiles holds a list of marker files discovered by LocateMarker that
     135             :         // are old values for this marker. These files may exist in certain error
     136             :         // cases or crashes (e.g. if the deletion of the previous marker file failed
     137             :         // during Move).
     138             :         obsoleteFiles []string
     139             : }
     140             : 
     141           1 : func markerFilename(name string, iter uint64, value string) string {
     142           1 :         return fmt.Sprintf("marker.%s.%06d.%s", name, iter, value)
     143           1 : }
     144             : 
     145           1 : func parseMarkerFilename(s string) (name string, iter uint64, value string, err error) {
     146           1 :         // Check for and remove the `marker.` prefix.
     147           1 :         if !strings.HasPrefix(s, `marker.`) {
     148           0 :                 return "", 0, "", errors.Newf("invalid marker filename: %q", s)
     149           0 :         }
     150           1 :         s = s[len(`marker.`):]
     151           1 : 
     152           1 :         // Extract the marker's name.
     153           1 :         i := strings.IndexByte(s, '.')
     154           1 :         if i == -1 {
     155           0 :                 return "", 0, "", errors.Newf("invalid marker filename: %q", s)
     156           0 :         }
     157           1 :         name = s[:i]
     158           1 :         s = s[i+1:]
     159           1 : 
     160           1 :         // Extract the marker's iteration number.
     161           1 :         i = strings.IndexByte(s, '.')
     162           1 :         if i == -1 {
     163           0 :                 return "", 0, "", errors.Newf("invalid marker filename: %q", s)
     164           0 :         }
     165           1 :         iter, err = strconv.ParseUint(s[:i], 10, 64)
     166           1 :         if err != nil {
     167           0 :                 return "", 0, "", errors.Newf("invalid marker filename: %q", s)
     168           0 :         }
     169             : 
     170             :         // Everything after the iteration's `.` delimiter is the value.
     171           1 :         s = s[i+1:]
     172           1 : 
     173           1 :         return name, iter, s, nil
     174             : }
     175             : 
     176             : // Close releases all resources in use by the marker.
     177           1 : func (a *Marker) Close() error {
     178           1 :         return a.dirFD.Close()
     179           1 : }
     180             : 
     181             : // Move atomically moves the marker to a new value.
     182             : //
     183             : // If Move returns a nil error, the new marker value is guaranteed to be
     184             : // persisted to stable storage. If Move returns an error, the current
     185             : // value of the marker may be the old value or the new value. Callers
     186             : // may retry a Move error.
     187             : //
     188             : // If an error occurs while syncing the directory, Move panics.
     189           1 : func (a *Marker) Move(newValue string) error {
     190           1 :         a.iter++
     191           1 :         dstFilename := markerFilename(a.name, a.iter, newValue)
     192           1 :         dstPath := a.fs.PathJoin(a.dir, dstFilename)
     193           1 :         oldFilename := a.filename
     194           1 : 
     195           1 :         // Create the new marker.
     196           1 :         f, err := a.fs.Create(dstPath, vfs.WriteCategoryUnspecified)
     197           1 :         if err != nil {
     198           0 :                 // On a distributed filesystem, an error doesn't guarantee that
     199           0 :                 // the file wasn't created. A retry of the same Move call will
     200           0 :                 // use a new iteration value, and try to a create a new file. If
     201           0 :                 // the errored invocation was actually successful in creating
     202           0 :                 // the file, we'll leak a file. That's okay, because the next
     203           0 :                 // time the marker is located we'll add it to the obsolete files
     204           0 :                 // list.
     205           0 :                 //
     206           0 :                 // Note that the unconditional increment of `a.iter` means that
     207           0 :                 // `a.iter` and `a.filename` are not necessarily in sync,
     208           0 :                 // because `a.filename` is only updated on success.
     209           0 :                 return err
     210           0 :         }
     211           1 :         a.filename = dstFilename
     212           1 :         if err := f.Close(); err != nil {
     213           0 :                 return err
     214           0 :         }
     215             : 
     216             :         // Remove the now defunct file. If an error is surfaced, we record
     217             :         // the file as an obsolete file.  The file's presence does not
     218             :         // affect correctness, and it will be cleaned up the next time
     219             :         // RemoveObsolete is called, either by this process or the next.
     220           1 :         if oldFilename != "" {
     221           1 :                 if err := a.fs.Remove(a.fs.PathJoin(a.dir, oldFilename)); err != nil && !oserror.IsNotExist(err) {
     222           0 :                         a.obsoleteFiles = append(a.obsoleteFiles, oldFilename)
     223           0 :                 }
     224             :         }
     225             : 
     226             :         // Sync the directory to ensure marker movement is synced.
     227           1 :         if err := a.dirFD.Sync(); err != nil {
     228           0 :                 // Fsync errors are unrecoverable.
     229           0 :                 // See https://wiki.postgresql.org/wiki/Fsync_Errors and
     230           0 :                 // https://danluu.com/fsyncgate.
     231           0 :                 panic(errors.WithStack(err))
     232             :         }
     233           1 :         return nil
     234             : }
     235             : 
     236             : // NextIter returns the next iteration number that the marker will use.
     237             : // Clients may use this number for formulating new values that are
     238             : // unused.
     239           1 : func (a *Marker) NextIter() uint64 {
     240           1 :         return a.iter + 1
     241           1 : }
     242             : 
     243             : // RemoveObsolete removes any obsolete files discovered while locating
     244             : // the marker or files unable to be removed during Move.
     245           1 : func (a *Marker) RemoveObsolete() error {
     246           1 :         for i, filename := range a.obsoleteFiles {
     247           0 :                 if err := a.fs.Remove(a.fs.PathJoin(a.dir, filename)); err != nil && !oserror.IsNotExist(err) {
     248           0 :                         a.obsoleteFiles = a.obsoleteFiles[i:]
     249           0 :                         return err
     250           0 :                 }
     251             :         }
     252           1 :         a.obsoleteFiles = nil
     253           1 :         return nil
     254             : }

Generated by: LCOV version 1.14