Line data Source code
1 : // Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package atomicfs
6 :
7 : import (
8 : "fmt"
9 : "strconv"
10 : "strings"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/errors/oserror"
14 : "github.com/cockroachdb/pebble/vfs"
15 : )
16 :
17 : // ReadMarker looks up the current state of a marker returning just the
18 : // current value of the marker. Callers that may need to move the marker
19 : // to a new value should use LocateMarker.
20 1 : func ReadMarker(fs vfs.FS, dir, markerName string) (string, error) {
21 1 : ls, err := fs.List(dir)
22 1 : if err != nil {
23 0 : return "", err
24 0 : }
25 1 : state, err := scanForMarker(fs, ls, markerName)
26 1 : if err != nil {
27 0 : return "", err
28 0 : }
29 1 : return state.value, nil
30 : }
31 :
32 : // LocateMarker loads the current state of a marker. It returns a handle
33 : // to the Marker that may be used to move the marker and the
34 : // current value of the marker.
35 1 : func LocateMarker(fs vfs.FS, dir, markerName string) (*Marker, string, error) {
36 1 : ls, err := fs.List(dir)
37 1 : if err != nil {
38 1 : return nil, "", err
39 1 : }
40 1 : return LocateMarkerInListing(fs, dir, markerName, ls)
41 : }
42 :
43 : // LocateMarkerInListing is like LocateMarker but takes a listing of the files
44 : // contained within dir. It's useful when the caller has already listed the
45 : // directory entries of dir for its own purposes.
46 : func LocateMarkerInListing(
47 : fs vfs.FS, dir, markerName string, ls []string,
48 1 : ) (*Marker, string, error) {
49 1 : state, err := scanForMarker(fs, ls, markerName)
50 1 : if err != nil {
51 0 : return nil, "", err
52 0 : }
53 1 : dirFD, err := fs.OpenDir(dir)
54 1 : if err != nil {
55 1 : return nil, "", err
56 1 : }
57 1 : return &Marker{
58 1 : fs: fs,
59 1 : dir: dir,
60 1 : dirFD: dirFD,
61 1 : name: markerName,
62 1 : filename: state.filename,
63 1 : iter: state.iter,
64 1 : obsoleteFiles: state.obsolete,
65 1 : }, state.value, nil
66 : }
67 :
68 : type scannedState struct {
69 : // filename is the latest marker file found (the one with the highest iter value).
70 : filename string
71 : iter uint64
72 : value string
73 : // obsolete is a list of earlier markers that were found.
74 : obsolete []string
75 : }
76 :
77 1 : func scanForMarker(fs vfs.FS, ls []string, markerName string) (scannedState, error) {
78 1 : var state scannedState
79 1 : for _, filename := range ls {
80 1 : if !strings.HasPrefix(filename, `marker.`) {
81 1 : continue
82 : }
83 : // Any filenames with the `marker.` prefix are required to be
84 : // well-formed and parse as markers.
85 1 : name, iter, value, err := parseMarkerFilename(filename)
86 1 : if err != nil {
87 0 : return scannedState{}, err
88 0 : }
89 1 : if name != markerName {
90 1 : continue
91 : }
92 :
93 1 : if state.filename == "" || state.iter < iter {
94 1 : if state.filename != "" {
95 1 : state.obsolete = append(state.obsolete, state.filename)
96 1 : }
97 1 : state.filename = filename
98 1 : state.iter = iter
99 1 : state.value = value
100 1 : } else {
101 1 : state.obsolete = append(state.obsolete, filename)
102 1 : }
103 : }
104 1 : return state, nil
105 : }
106 :
107 : // A Marker provides an interface for maintaining a single string value on the
108 : // filesystem. The marker may be atomically moved from value to value.
109 : //
110 : // The implementation creates a new marker file for each new value, embedding
111 : // the value in the marker filename.
112 : //
113 : // Marker is not safe for concurrent use. Multiple processes may not read or
114 : // move the same marker simultaneously. A Marker may only be constructed through
115 : // LocateMarker.
116 : //
117 : // Marker names must be unique within the directory.
118 : type Marker struct {
119 : fs vfs.FS
120 : dir string
121 : dirFD vfs.File
122 : // name identifies the marker.
123 : name string
124 : // filename contains the entire filename of the current marker. It
125 : // has a format of `marker.<name>.<iter>.<value>`. It's not
126 : // necessarily in sync with iter, since filename is only updated
127 : // when the marker is successfully moved.
128 : filename string
129 : // iter holds the current iteration value. It matches the iteration
130 : // value encoded within filename, if filename is non-empty. Iter is
131 : // monotonically increasing over the lifetime of a marker. Actual
132 : // marker files will always have a positive iter value.
133 : iter uint64
134 : // obsoleteFiles holds a list of marker files discovered by LocateMarker that
135 : // are old values for this marker. These files may exist in certain error
136 : // cases or crashes (e.g. if the deletion of the previous marker file failed
137 : // during Move).
138 : obsoleteFiles []string
139 : }
140 :
141 1 : func markerFilename(name string, iter uint64, value string) string {
142 1 : return fmt.Sprintf("marker.%s.%06d.%s", name, iter, value)
143 1 : }
144 :
145 1 : func parseMarkerFilename(s string) (name string, iter uint64, value string, err error) {
146 1 : // Check for and remove the `marker.` prefix.
147 1 : if !strings.HasPrefix(s, `marker.`) {
148 1 : return "", 0, "", errors.Newf("invalid marker filename: %q", s)
149 1 : }
150 1 : s = s[len(`marker.`):]
151 1 :
152 1 : // Extract the marker's name.
153 1 : i := strings.IndexByte(s, '.')
154 1 : if i == -1 {
155 0 : return "", 0, "", errors.Newf("invalid marker filename: %q", s)
156 0 : }
157 1 : name = s[:i]
158 1 : s = s[i+1:]
159 1 :
160 1 : // Extract the marker's iteration number.
161 1 : i = strings.IndexByte(s, '.')
162 1 : if i == -1 {
163 0 : return "", 0, "", errors.Newf("invalid marker filename: %q", s)
164 0 : }
165 1 : iter, err = strconv.ParseUint(s[:i], 10, 64)
166 1 : if err != nil {
167 1 : return "", 0, "", errors.Newf("invalid marker filename: %q", s)
168 1 : }
169 :
170 : // Everything after the iteration's `.` delimiter is the value.
171 1 : s = s[i+1:]
172 1 :
173 1 : return name, iter, s, nil
174 : }
175 :
176 : // Close releases all resources in use by the marker.
177 1 : func (a *Marker) Close() error {
178 1 : return a.dirFD.Close()
179 1 : }
180 :
181 : // Move atomically moves the marker to a new value.
182 : //
183 : // If Move returns a nil error, the new marker value is guaranteed to be
184 : // persisted to stable storage. If Move returns an error, the current
185 : // value of the marker may be the old value or the new value. Callers
186 : // may retry a Move error.
187 : //
188 : // If an error occurs while syncing the directory, Move panics.
189 1 : func (a *Marker) Move(newValue string) error {
190 1 : a.iter++
191 1 : dstFilename := markerFilename(a.name, a.iter, newValue)
192 1 : dstPath := a.fs.PathJoin(a.dir, dstFilename)
193 1 : oldFilename := a.filename
194 1 :
195 1 : // Create the new marker.
196 1 : f, err := a.fs.Create(dstPath)
197 1 : if err != nil {
198 1 : // On a distributed filesystem, an error doesn't guarantee that
199 1 : // the file wasn't created. A retry of the same Move call will
200 1 : // use a new iteration value, and try to a create a new file. If
201 1 : // the errored invocation was actually successful in creating
202 1 : // the file, we'll leak a file. That's okay, because the next
203 1 : // time the marker is located we'll add it to the obsolete files
204 1 : // list.
205 1 : //
206 1 : // Note that the unconditional increment of `a.iter` means that
207 1 : // `a.iter` and `a.filename` are not necessarily in sync,
208 1 : // because `a.filename` is only updated on success.
209 1 : return err
210 1 : }
211 1 : a.filename = dstFilename
212 1 : if err := f.Close(); err != nil {
213 0 : return err
214 0 : }
215 :
216 : // Remove the now defunct file. If an error is surfaced, we record
217 : // the file as an obsolete file. The file's presence does not
218 : // affect correctness, and it will be cleaned up the next time
219 : // RemoveObsolete is called, either by this process or the next.
220 1 : if oldFilename != "" {
221 1 : if err := a.fs.Remove(a.fs.PathJoin(a.dir, oldFilename)); err != nil && !oserror.IsNotExist(err) {
222 1 : a.obsoleteFiles = append(a.obsoleteFiles, oldFilename)
223 1 : }
224 : }
225 :
226 : // Sync the directory to ensure marker movement is synced.
227 1 : if err := a.dirFD.Sync(); err != nil {
228 1 : // Fsync errors are unrecoverable.
229 1 : // See https://wiki.postgresql.org/wiki/Fsync_Errors and
230 1 : // https://danluu.com/fsyncgate.
231 1 : panic(errors.WithStack(err))
232 : }
233 1 : return nil
234 : }
235 :
236 : // NextIter returns the next iteration number that the marker will use.
237 : // Clients may use this number for formulating new values that are
238 : // unused.
239 1 : func (a *Marker) NextIter() uint64 {
240 1 : return a.iter + 1
241 1 : }
242 :
243 : // RemoveObsolete removes any obsolete files discovered while locating
244 : // the marker or files unable to be removed during Move.
245 1 : func (a *Marker) RemoveObsolete() error {
246 1 : for i, filename := range a.obsoleteFiles {
247 1 : if err := a.fs.Remove(a.fs.PathJoin(a.dir, filename)); err != nil && !oserror.IsNotExist(err) {
248 0 : a.obsoleteFiles = a.obsoleteFiles[i:]
249 0 : return err
250 0 : }
251 : }
252 1 : a.obsoleteFiles = nil
253 1 : return nil
254 : }
|