Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "io"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/invariants"
13 : "github.com/cockroachdb/pebble/internal/manifest"
14 : "github.com/cockroachdb/pebble/objstorage"
15 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
16 : "github.com/cockroachdb/pebble/record"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/pebble/vfs/atomicfs"
19 : )
20 :
21 : // recoverState reads the named database directory and recovers the set of files
22 : // encoding the database state at the moment the previous process exited.
23 : // recoverState is read only and does not mutate the on-disk state.
24 1 : func recoverState(opts *Options, dirname string) (s *recoveredState, err error) {
25 1 : rs := &recoveredState{
26 1 : dirname: dirname,
27 1 : fs: opts.FS,
28 1 : }
29 1 : if err := rs.init(opts, dirname); err != nil {
30 1 : return nil, errors.CombineErrors(err, rs.Close())
31 1 : }
32 1 : return rs, nil
33 : }
34 :
35 1 : func (rs *recoveredState) init(opts *Options, dirname string) error {
36 1 : // List the directory contents. This also happens to include WAL log files,
37 1 : // if they are in the same dir.
38 1 : var err error
39 1 : if rs.ls, err = opts.FS.List(dirname); err != nil {
40 1 : return errors.Wrapf(err, "pebble: database %q", dirname)
41 1 : }
42 : // Find the currently format major version and active manifest.
43 1 : rs.fmv, rs.fmvMarker, err = lookupFormatMajorVersion(opts.FS, dirname, rs.ls)
44 1 : if err != nil {
45 1 : return errors.Wrapf(err, "pebble: database %q", dirname)
46 1 : }
47 :
48 : // Open the object storage provider.
49 1 : providerSettings := opts.MakeObjStorageProviderSettings(dirname)
50 1 : providerSettings.FSDirInitialListing = rs.ls
51 1 : rs.objProvider, err = objstorageprovider.Open(providerSettings)
52 1 : if err != nil {
53 1 : return errors.Wrapf(err, "pebble: database %q", dirname)
54 1 : }
55 :
56 : // Determine which manifest is current, and if one exists, replay it to
57 : // recover the current Version of the LSM.
58 1 : var manifestExists bool
59 1 : rs.manifestMarker, rs.manifestFileNum, manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
60 1 : if err != nil {
61 1 : return errors.Wrapf(err, "pebble: database %q", dirname)
62 1 : }
63 1 : if manifestExists {
64 1 : recoveredVersion, err := recoverVersion(opts, dirname, rs.objProvider, rs.manifestFileNum)
65 1 : if err != nil {
66 1 : return err
67 1 : }
68 1 : if !opts.DisableConsistencyCheck {
69 1 : if err := checkConsistency(recoveredVersion.version, rs.objProvider); err != nil {
70 1 : return err
71 1 : }
72 : }
73 1 : rs.recoveredVersion = recoveredVersion
74 : }
75 :
76 : // Identify the maximal file number in the directory. We do not want to
77 : // reuse any existing file numbers even if they are obsolete file numbers to
78 : // avoid modifying an ingested sstable's original external file.
79 : //
80 : // We also identify the most recent OPTIONS file, so we can validate our
81 : // configured Options against the previous options, and we collect any
82 : // orphaned temporary files that should be removed.
83 1 : var previousOptionsFileNum base.DiskFileNum
84 1 : for _, filename := range rs.ls {
85 1 : ft, fn, ok := base.ParseFilename(opts.FS, filename)
86 1 : if !ok {
87 1 : continue
88 : }
89 1 : rs.maxFilenumUsed = max(rs.maxFilenumUsed, fn)
90 1 : switch ft {
91 0 : case base.FileTypeLog:
92 : // Ignore.
93 1 : case base.FileTypeOptions:
94 1 : if previousOptionsFileNum < fn {
95 1 : previousOptionsFileNum = fn
96 1 : rs.previousOptionsFilename = filename
97 1 : }
98 1 : case base.FileTypeTemp, base.FileTypeOldTemp:
99 1 : rs.obsoleteTempFilenames = append(rs.obsoleteTempFilenames, filename)
100 : }
101 : }
102 1 : return nil
103 : }
104 :
105 : // recoveredState encapsulates state recovered from reading the database
106 : // directory.
107 : type recoveredState struct {
108 : dirname string
109 : fmv FormatMajorVersion
110 : fmvMarker *atomicfs.Marker
111 : fs vfs.FS
112 : ls []string
113 : manifestMarker *atomicfs.Marker
114 : manifestFileNum base.DiskFileNum
115 : maxFilenumUsed base.DiskFileNum
116 : obsoleteTempFilenames []string
117 : objProvider objstorage.Provider
118 : previousOptionsFilename string
119 : recoveredVersion *recoveredVersion
120 : }
121 :
122 : // RemoveObsolete removes obsolete files uncovered during recovery.
123 1 : func (rs *recoveredState) RemoveObsolete() error {
124 1 : var err error
125 1 : // Atomic markers may leave behind obsolete files if there's a crash
126 1 : // mid-update.
127 1 : if rs.fmvMarker != nil {
128 1 : err = errors.CombineErrors(err, rs.fmvMarker.RemoveObsolete())
129 1 : }
130 1 : if rs.manifestMarker != nil {
131 1 : err = errors.CombineErrors(err, rs.manifestMarker.RemoveObsolete())
132 1 : }
133 : // Some codepaths write to a temporary file and then rename it to its final
134 : // location when complete. A temp file is leftover if a process exits
135 : // before the rename. Remove any that were found.
136 1 : for _, filename := range rs.obsoleteTempFilenames {
137 1 : err = errors.CombineErrors(err, rs.fs.Remove(rs.fs.PathJoin(rs.dirname, filename)))
138 1 : }
139 1 : return err
140 : }
141 :
142 : // Close closes resources held by the RecoveredState, including open file
143 : // descriptors.
144 1 : func (rs *recoveredState) Close() error {
145 1 : var err error
146 1 : if rs.fmvMarker != nil {
147 1 : err = errors.CombineErrors(err, rs.fmvMarker.Close())
148 1 : }
149 1 : if rs.manifestMarker != nil {
150 1 : err = errors.CombineErrors(err, rs.manifestMarker.Close())
151 1 : }
152 1 : if rs.objProvider != nil {
153 1 : err = errors.CombineErrors(err, rs.objProvider.Close())
154 1 : }
155 1 : return err
156 : }
157 :
158 : // recoveredVersion describes the latest Version of the LSM recovered by
159 : // replaying a manifest file.
160 : type recoveredVersion struct {
161 : manifestFileNum base.DiskFileNum
162 : minUnflushedLogNum base.DiskFileNum
163 : nextFileNum base.DiskFileNum
164 : logSeqNum base.SeqNum
165 : latest *latestVersionState
166 : metrics Metrics
167 : version *manifest.Version
168 : }
169 :
170 : // recoverVersion replays the named manifest file to recover the latest version
171 : // of the LSM from persisted state.
172 : func recoverVersion(
173 : opts *Options, dirname string, provider objstorage.Provider, manifestFileNum base.DiskFileNum,
174 1 : ) (*recoveredVersion, error) {
175 1 : vs := &recoveredVersion{
176 1 : manifestFileNum: manifestFileNum,
177 1 : nextFileNum: 1,
178 1 : logSeqNum: base.SeqNumStart,
179 1 : latest: &latestVersionState{
180 1 : l0Organizer: manifest.NewL0Organizer(opts.Comparer, opts.FlushSplitBytes),
181 1 : virtualBackings: manifest.MakeVirtualBackings(),
182 1 : },
183 1 : }
184 1 : manifestPath := base.MakeFilepath(opts.FS, dirname, base.FileTypeManifest, vs.manifestFileNum)
185 1 : manifestFilename := opts.FS.PathBase(manifestPath)
186 1 :
187 1 : // Read the versionEdits in the manifest file.
188 1 : var bve manifest.BulkVersionEdit
189 1 : bve.AllAddedTables = make(map[base.TableNum]*manifest.TableMetadata)
190 1 : manifestFile, err := opts.FS.Open(manifestPath)
191 1 : if err != nil {
192 0 : return nil, errors.Wrapf(err, "pebble: could not open manifest file %q for DB %q",
193 0 : errors.Safe(manifestFilename), dirname)
194 0 : }
195 1 : defer manifestFile.Close()
196 1 : rr := record.NewReader(manifestFile, 0 /* logNum */)
197 1 : for {
198 1 : r, err := rr.Next()
199 1 : if err == io.EOF || record.IsInvalidRecord(err) {
200 1 : break
201 : }
202 1 : if err != nil {
203 0 : return nil, errors.Wrapf(err, "pebble: error when loading manifest file %q",
204 0 : errors.Safe(manifestFilename))
205 0 : }
206 1 : var ve manifest.VersionEdit
207 1 : err = ve.Decode(r)
208 1 : if err != nil {
209 0 : // Break instead of returning an error if the record is corrupted
210 0 : // or invalid.
211 0 : if err == io.EOF || record.IsInvalidRecord(err) {
212 0 : break
213 : }
214 0 : return nil, err
215 : }
216 1 : if ve.ComparerName != "" {
217 1 : if ve.ComparerName != opts.Comparer.Name {
218 1 : return nil, errors.Errorf("pebble: manifest file %q for DB %q: "+
219 1 : "comparer name from file %q != comparer name from Options %q",
220 1 : errors.Safe(manifestFilename), dirname, errors.Safe(ve.ComparerName), errors.Safe(opts.Comparer.Name))
221 1 : }
222 : }
223 1 : if err := bve.Accumulate(&ve); err != nil {
224 0 : return nil, err
225 0 : }
226 1 : if ve.MinUnflushedLogNum != 0 {
227 1 : vs.minUnflushedLogNum = ve.MinUnflushedLogNum
228 1 : }
229 1 : if ve.NextFileNum != 0 {
230 1 : vs.nextFileNum = base.DiskFileNum(ve.NextFileNum)
231 1 : }
232 1 : if ve.LastSeqNum != 0 {
233 1 : // logSeqNum is the _next_ sequence number that will be assigned,
234 1 : // while LastSeqNum is the last assigned sequence number. Note that
235 1 : // this behaviour mimics that in RocksDB; the first sequence number
236 1 : // assigned is one greater than the one present in the manifest
237 1 : // (assuming no WALs contain higher sequence numbers than the
238 1 : // manifest's LastSeqNum). Increment LastSeqNum by 1 to get the
239 1 : // next sequence number that will be assigned.
240 1 : //
241 1 : // If LastSeqNum is less than SeqNumStart, increase it to at least
242 1 : // SeqNumStart to leave ample room for reserved sequence numbers.
243 1 : vs.logSeqNum = max(ve.LastSeqNum+1, base.SeqNumStart)
244 1 : }
245 : }
246 :
247 : // We have already set vs.nextFileNum=1 at the beginning of the function and
248 : // could have only updated it to some other non-zero value, so it cannot be
249 : // 0 here.
250 1 : if vs.minUnflushedLogNum == 0 {
251 1 : if vs.nextFileNum >= 2 {
252 1 : // We either have a freshly created DB, or a DB created by RocksDB
253 1 : // that has not had a single flushed SSTable yet. This is because
254 1 : // RocksDB bumps up nextFileNum in this case without bumping up
255 1 : // minUnflushedLogNum, even if WALs with non-zero file numbers are
256 1 : // present in the directory.
257 1 : } else {
258 0 : return nil, base.CorruptionErrorf("pebble: malformed manifest file %q for DB %q",
259 0 : errors.Safe(manifestFilename), dirname)
260 0 : }
261 : }
262 1 : vs.nextFileNum = max(vs.nextFileNum, vs.minUnflushedLogNum+1)
263 1 :
264 1 : // Populate the virtual backings for virtual sstables since we have finished
265 1 : // version edit accumulation.
266 1 : for _, b := range bve.AddedFileBacking {
267 1 : isLocal := objstorage.IsLocalTable(provider, b.DiskFileNum)
268 1 : vs.latest.virtualBackings.AddAndRef(b, isLocal)
269 1 : }
270 1 : for l, addedLevel := range bve.AddedTables {
271 1 : for _, m := range addedLevel {
272 1 : if m.Virtual {
273 1 : vs.latest.virtualBackings.AddTable(m, l)
274 1 : }
275 : }
276 : }
277 :
278 1 : if invariants.Enabled {
279 1 : // There should be no deleted tables or backings, since we're starting
280 1 : // from an empty state.
281 1 : for _, deletedLevel := range bve.DeletedTables {
282 1 : if len(deletedLevel) != 0 {
283 0 : panic("deleted files after manifest replay")
284 : }
285 : }
286 1 : if len(bve.RemovedFileBacking) > 0 {
287 0 : panic("deleted backings after manifest replay")
288 : }
289 : }
290 :
291 1 : emptyVersion := manifest.NewInitialVersion(opts.Comparer)
292 1 : newVersion, err := bve.Apply(emptyVersion, opts.Experimental.ReadCompactionRate)
293 1 : if err != nil {
294 0 : return nil, err
295 0 : }
296 1 : vs.latest.l0Organizer.PerformUpdate(vs.latest.l0Organizer.PrepareUpdate(&bve, newVersion), newVersion)
297 1 : vs.latest.l0Organizer.InitCompactingFileInfo(nil /* in-progress compactions */)
298 1 : vs.latest.blobFiles.Init(&bve, manifest.BlobRewriteHeuristic{
299 1 : CurrentTime: opts.private.timeNow,
300 1 : MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
301 1 : })
302 1 : vs.version = newVersion
303 1 :
304 1 : for i := range vs.metrics.Levels {
305 1 : l := &vs.metrics.Levels[i]
306 1 : l.TablesCount = int64(newVersion.Levels[i].Len())
307 1 : files := newVersion.Levels[i].Slice()
308 1 : l.TablesSize = int64(files.TableSizeSum())
309 1 : }
310 1 : for _, l := range newVersion.Levels {
311 1 : for f := range l.All() {
312 1 : if !f.Virtual {
313 1 : isLocal, localSize := sizeIfLocal(f.TableBacking, provider)
314 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
315 1 : if isLocal {
316 1 : vs.metrics.Table.Local.LiveCount++
317 1 : }
318 : }
319 : }
320 : }
321 1 : for backing := range vs.latest.virtualBackings.All() {
322 1 : isLocal, localSize := sizeIfLocal(backing, provider)
323 1 : vs.metrics.Table.Local.LiveSize = uint64(int64(vs.metrics.Table.Local.LiveSize) + localSize)
324 1 : if isLocal {
325 1 : vs.metrics.Table.Local.LiveCount++
326 1 : }
327 : }
328 1 : return vs, nil
329 : }
|