Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package remoteobjcat
6 :
7 : import (
8 : "cmp"
9 : "fmt"
10 : "io"
11 : "path/filepath"
12 : "slices"
13 : "sync"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/remote"
19 : "github.com/cockroachdb/pebble/record"
20 : "github.com/cockroachdb/pebble/vfs"
21 : "github.com/cockroachdb/pebble/vfs/atomicfs"
22 : )
23 :
24 : // Catalog is used to manage the on-disk remote object catalog.
25 : //
26 : // The catalog file is a log of records, where each record is an encoded
27 : // VersionEdit.
28 : type Catalog struct {
29 : fs vfs.FS
30 : dirname string
31 : mu struct {
32 : sync.Mutex
33 :
34 : creatorID objstorage.CreatorID
35 : objects map[base.DiskFileNum]RemoteObjectMetadata
36 :
37 : marker *atomicfs.Marker
38 :
39 : catalogFile vfs.File
40 : catalogRecWriter *record.Writer
41 :
42 : rotationHelper record.RotationHelper
43 :
44 : // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
45 : // it is the filename of the last catalog file.
46 : catalogFilename string
47 : }
48 : }
49 :
50 : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
51 : type RemoteObjectMetadata struct {
52 : // FileNum is the identifier for the object within the context of a single DB
53 : // instance.
54 : FileNum base.DiskFileNum
55 : // FileType is the type of the object. Only certain FileTypes are possible.
56 : FileType base.FileType
57 : // CreatorID identifies the DB instance that originally created the object.
58 : CreatorID objstorage.CreatorID
59 : // CreatorFileNum is the identifier for the object within the context of the
60 : // DB instance that originally created the object.
61 : CreatorFileNum base.DiskFileNum
62 : // CleanupMethod indicates the method for cleaning up unused shared objects.
63 : CleanupMethod objstorage.SharedCleanupMethod
64 : // Locator identifies a remote.Storage implementation.
65 : Locator remote.Locator
66 : // CustomObjectName (if it is set) overrides the object name that is normally
67 : // derived from the CreatorID and CreatorFileNum.
68 : CustomObjectName string
69 : }
70 :
71 : const (
72 : catalogFilenameBase = "REMOTE-OBJ-CATALOG"
73 : catalogMarkerName = "remote-obj-catalog"
74 :
75 : // We create a new file when the size exceeds 1MB (and some other conditions
76 : // hold; see record.RotationHelper).
77 : rotateFileSize = 1024 * 1024 // 1MB
78 : )
79 :
80 : // CatalogContents contains the remote objects in the catalog.
81 : type CatalogContents struct {
82 : // CreatorID, if it is set.
83 : CreatorID objstorage.CreatorID
84 : Objects []RemoteObjectMetadata
85 : }
86 :
87 : // Open creates a Catalog and loads any existing catalog file, returning the
88 : // creator ID (if it is set) and the contents.
89 2 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
90 2 : c := &Catalog{
91 2 : fs: fs,
92 2 : dirname: dirname,
93 2 : }
94 2 : c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
95 2 :
96 2 : var err error
97 2 : c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
98 2 : if err != nil {
99 0 : return nil, CatalogContents{}, err
100 0 : }
101 : // If the filename is empty, there is no existing catalog.
102 2 : if c.mu.catalogFilename != "" {
103 2 : if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
104 0 : return nil, CatalogContents{}, err
105 0 : }
106 2 : if err := c.mu.marker.RemoveObsolete(); err != nil {
107 0 : return nil, CatalogContents{}, err
108 0 : }
109 : // TODO(radu): remove obsolete catalog files.
110 : }
111 2 : res := CatalogContents{
112 2 : CreatorID: c.mu.creatorID,
113 2 : Objects: make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
114 2 : }
115 2 : for _, meta := range c.mu.objects {
116 2 : res.Objects = append(res.Objects, meta)
117 2 : }
118 : // Sort the objects so the function is deterministic.
119 2 : slices.SortFunc(res.Objects, func(a, b RemoteObjectMetadata) int {
120 2 : return cmp.Compare(a.FileNum, b.FileNum)
121 2 : })
122 2 : return c, res, nil
123 : }
124 :
125 : // SetCreatorID sets the creator ID. If it is already set, it must match.
126 2 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
127 2 : if !id.IsSet() {
128 0 : return base.AssertionFailedf("attempt to unset CreatorID")
129 0 : }
130 :
131 2 : c.mu.Lock()
132 2 : defer c.mu.Unlock()
133 2 :
134 2 : if c.mu.creatorID.IsSet() {
135 1 : if c.mu.creatorID != id {
136 1 : return base.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
137 1 : }
138 1 : return nil
139 : }
140 :
141 2 : ve := VersionEdit{CreatorID: id}
142 2 : if err := c.writeToCatalogFileLocked(&ve); err != nil {
143 0 : return errors.Wrapf(err, "pebble: could not write to remote object catalog")
144 0 : }
145 2 : c.mu.creatorID = id
146 2 : return nil
147 : }
148 :
149 : // Close any open files.
150 2 : func (c *Catalog) Close() error {
151 2 : var err error
152 2 : if c.mu.marker != nil {
153 2 : err = c.mu.marker.Close()
154 2 : c.mu.marker = nil
155 2 : }
156 2 : return errors.CombineErrors(err, c.closeCatalogFile())
157 : }
158 :
159 2 : func (c *Catalog) closeCatalogFile() error {
160 2 : if c.mu.catalogFile == nil {
161 2 : return nil
162 2 : }
163 2 : err1 := c.mu.catalogRecWriter.Close()
164 2 : err2 := c.mu.catalogFile.Close()
165 2 : c.mu.catalogRecWriter = nil
166 2 : c.mu.catalogFile = nil
167 2 : if err1 != nil {
168 0 : return err1
169 0 : }
170 2 : return err2
171 : }
172 :
173 : // Batch is used to perform multiple object additions/deletions at once.
174 : type Batch struct {
175 : ve VersionEdit
176 : }
177 :
178 : // AddObject adds a new object to the batch.
179 : //
180 : // The given FileNum must be new - it must not match that of any object that was
181 : // ever in the catalog.
182 2 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
183 2 : b.ve.NewObjects = append(b.ve.NewObjects, meta)
184 2 : }
185 :
186 : // DeleteObject adds an object removal to the batch.
187 2 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
188 2 : b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
189 2 : }
190 :
191 : // Reset clears the batch.
192 2 : func (b *Batch) Reset() {
193 2 : b.ve.NewObjects = b.ve.NewObjects[:0]
194 2 : b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
195 2 : }
196 :
197 : // IsEmpty returns true if the batch is empty.
198 2 : func (b *Batch) IsEmpty() bool {
199 2 : return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
200 2 : }
201 :
202 : // Copy returns a copy of the Batch.
203 2 : func (b *Batch) Copy() Batch {
204 2 : var res Batch
205 2 : if len(b.ve.NewObjects) > 0 {
206 2 : res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
207 2 : copy(res.ve.NewObjects, b.ve.NewObjects)
208 2 : }
209 2 : if len(b.ve.DeletedObjects) > 0 {
210 2 : res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
211 2 : copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
212 2 : }
213 2 : return res
214 : }
215 :
216 : // Append merges two batches.
217 0 : func (b *Batch) Append(other Batch) {
218 0 : b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
219 0 : b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
220 0 : }
221 :
222 : // ApplyBatch applies a batch of updates; returns after the change is stably
223 : // recorded on storage.
224 2 : func (c *Catalog) ApplyBatch(b Batch) error {
225 2 : c.mu.Lock()
226 2 : defer c.mu.Unlock()
227 2 :
228 2 : // Sanity checks.
229 2 : toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
230 2 : exists := func(n base.DiskFileNum) bool {
231 2 : _, ok := c.mu.objects[n]
232 2 : if !ok {
233 2 : _, ok = toAdd[n]
234 2 : }
235 2 : return ok
236 : }
237 2 : for _, meta := range b.ve.NewObjects {
238 2 : if exists(meta.FileNum) {
239 1 : return base.AssertionFailedf("adding existing object %s", meta.FileNum)
240 1 : }
241 2 : toAdd[meta.FileNum] = struct{}{}
242 : }
243 2 : for _, n := range b.ve.DeletedObjects {
244 2 : if !exists(n) {
245 1 : return base.AssertionFailedf("deleting non-existent object %s", n)
246 1 : }
247 : }
248 :
249 2 : if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
250 0 : return errors.Wrapf(err, "pebble: could not write to remote object catalog")
251 0 : }
252 :
253 : // Add new objects before deleting any objects. This allows for cases where
254 : // the same batch adds and deletes an object.
255 2 : for _, meta := range b.ve.NewObjects {
256 2 : c.mu.objects[meta.FileNum] = meta
257 2 : }
258 2 : for _, n := range b.ve.DeletedObjects {
259 2 : delete(c.mu.objects, n)
260 2 : }
261 :
262 2 : return nil
263 : }
264 :
265 2 : func (c *Catalog) loadFromCatalogFile(filename string) error {
266 2 : catalogPath := c.fs.PathJoin(c.dirname, filename)
267 2 : f, err := c.fs.Open(catalogPath)
268 2 : if err != nil {
269 0 : return errors.Wrapf(
270 0 : err, "pebble: could not open remote object catalog file %q for DB %q",
271 0 : errors.Safe(filename), c.dirname,
272 0 : )
273 0 : }
274 2 : defer f.Close()
275 2 : rr := record.NewReader(f, 0 /* logNum */)
276 2 : for {
277 2 : r, err := rr.Next()
278 2 : if err == io.EOF || record.IsInvalidRecord(err) {
279 2 : break
280 : }
281 2 : if err != nil {
282 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
283 0 : errors.Safe(filename))
284 0 : }
285 2 : var ve VersionEdit
286 2 : if err := ve.Decode(r); err != nil {
287 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
288 0 : errors.Safe(filename))
289 0 : }
290 : // Apply the version edit to the current state.
291 2 : if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
292 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
293 0 : errors.Safe(filename))
294 0 : }
295 : }
296 2 : return nil
297 : }
298 :
299 : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
300 : // Creates a new file if this is the first write.
301 2 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
302 2 : c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
303 2 : snapshotSize := int64(len(c.mu.objects))
304 2 :
305 2 : var shouldRotate bool
306 2 : if c.mu.catalogFile == nil {
307 2 : shouldRotate = true
308 2 : } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
309 1 : shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
310 1 : }
311 :
312 2 : if shouldRotate {
313 2 : if c.mu.catalogFile != nil {
314 1 : if err := c.closeCatalogFile(); err != nil {
315 0 : return err
316 0 : }
317 : }
318 2 : if err := c.createNewCatalogFileLocked(); err != nil {
319 0 : return err
320 0 : }
321 2 : c.mu.rotationHelper.Rotate(snapshotSize)
322 : }
323 2 : return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
324 : }
325 :
326 2 : func makeCatalogFilename(iter uint64) string {
327 2 : return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
328 2 : }
329 :
330 : // createNewCatalogFileLocked creates a new catalog file, populates it with the
331 : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
332 2 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
333 2 : if c.mu.catalogFile != nil {
334 0 : return base.AssertionFailedf("catalogFile already open")
335 0 : }
336 2 : filename := makeCatalogFilename(c.mu.marker.NextIter())
337 2 : filepath := c.fs.PathJoin(c.dirname, filename)
338 2 : file, err := c.fs.Create(filepath, "pebble-manifest")
339 2 : if err != nil {
340 0 : return err
341 0 : }
342 2 : recWriter := record.NewWriter(file)
343 2 : err = func() error {
344 2 : // Create a VersionEdit that gets us from an empty catalog to the current state.
345 2 : var ve VersionEdit
346 2 : ve.CreatorID = c.mu.creatorID
347 2 : ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
348 2 : for _, meta := range c.mu.objects {
349 2 : ve.NewObjects = append(ve.NewObjects, meta)
350 2 : }
351 2 : if err := writeRecord(&ve, file, recWriter); err != nil {
352 0 : return err
353 0 : }
354 :
355 : // Move the marker to the new filename. Move handles syncing the data
356 : // directory as well.
357 2 : if err := c.mu.marker.Move(filename); err != nil {
358 0 : return errors.Wrap(err, "moving marker")
359 0 : }
360 :
361 2 : return nil
362 : }()
363 :
364 2 : if err != nil {
365 0 : _ = recWriter.Close()
366 0 : _ = file.Close()
367 0 : _ = c.fs.Remove(filepath)
368 0 : return err
369 0 : }
370 :
371 : // Remove any previous file (ignoring any error).
372 2 : if c.mu.catalogFilename != "" {
373 2 : _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
374 2 : }
375 :
376 2 : c.mu.catalogFile = file
377 2 : c.mu.catalogRecWriter = recWriter
378 2 : c.mu.catalogFilename = filename
379 2 : return nil
380 : }
381 :
382 : // Checkpoint copies catalog state to a file in the specified directory
383 1 : func (c *Catalog) Checkpoint(fs vfs.FS, dir string) error {
384 1 : c.mu.Lock()
385 1 : defer c.mu.Unlock()
386 1 :
387 1 : // NB: Every write to recWriter is flushed. We don't need to worry about
388 1 : // this new file descriptor not getting all the saved catalog entries.
389 1 : existingCatalogFilepath := filepath.Join(c.dirname, c.mu.catalogFilename)
390 1 : destPath := filepath.Join(dir, c.mu.catalogFilename)
391 1 : if err := vfs.CopyAcrossFS(c.fs, existingCatalogFilepath, fs, destPath); err != nil {
392 0 : return err
393 0 : }
394 1 : catalogMarker, _, err := atomicfs.LocateMarker(fs, dir, catalogMarkerName)
395 1 : if err != nil {
396 0 : return err
397 0 : }
398 1 : if err := catalogMarker.Move(c.mu.catalogFilename); err != nil {
399 0 : return err
400 0 : }
401 1 : return catalogMarker.Close()
402 : }
403 :
404 2 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
405 2 : w, err := recWriter.Next()
406 2 : if err != nil {
407 0 : return err
408 0 : }
409 2 : if err := ve.Encode(w); err != nil {
410 0 : return err
411 0 : }
412 2 : if err := recWriter.Flush(); err != nil {
413 0 : return err
414 0 : }
415 2 : return file.Sync()
416 : }
|