Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package remoteobjcat
6 :
7 : import (
8 : "cmp"
9 : "fmt"
10 : "io"
11 : "slices"
12 : "sync"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/pebble/objstorage/remote"
18 : "github.com/cockroachdb/pebble/record"
19 : "github.com/cockroachdb/pebble/vfs"
20 : "github.com/cockroachdb/pebble/vfs/atomicfs"
21 : )
22 :
23 : // Catalog is used to manage the on-disk remote object catalog.
24 : //
25 : // The catalog file is a log of records, where each record is an encoded
26 : // VersionEdit.
27 : type Catalog struct {
28 : fs vfs.FS
29 : dirname string
30 : mu struct {
31 : sync.Mutex
32 :
33 : creatorID objstorage.CreatorID
34 : objects map[base.DiskFileNum]RemoteObjectMetadata
35 :
36 : marker *atomicfs.Marker
37 :
38 : catalogFile vfs.File
39 : catalogRecWriter *record.Writer
40 :
41 : rotationHelper record.RotationHelper
42 :
43 : // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
44 : // it is the filename of the last catalog file.
45 : catalogFilename string
46 : }
47 : }
48 :
49 : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
50 : type RemoteObjectMetadata struct {
51 : // FileNum is the identifier for the object within the context of a single DB
52 : // instance.
53 : FileNum base.DiskFileNum
54 : // FileType is the type of the object. Only certain FileTypes are possible.
55 : FileType base.FileType
56 : // CreatorID identifies the DB instance that originally created the object.
57 : CreatorID objstorage.CreatorID
58 : // CreatorFileNum is the identifier for the object within the context of the
59 : // DB instance that originally created the object.
60 : CreatorFileNum base.DiskFileNum
61 : // CleanupMethod indicates the method for cleaning up unused shared objects.
62 : CleanupMethod objstorage.SharedCleanupMethod
63 : // Locator identifies a remote.Storage implementation.
64 : Locator remote.Locator
65 : // CustomObjectName (if it is set) overrides the object name that is normally
66 : // derived from the CreatorID and CreatorFileNum.
67 : CustomObjectName string
68 : }
69 :
70 : const (
71 : catalogFilenameBase = "REMOTE-OBJ-CATALOG"
72 : catalogMarkerName = "remote-obj-catalog"
73 :
74 : // We create a new file when the size exceeds 1MB (and some other conditions
75 : // hold; see record.RotationHelper).
76 : rotateFileSize = 1024 * 1024 // 1MB
77 : )
78 :
79 : // CatalogContents contains the remote objects in the catalog.
80 : type CatalogContents struct {
81 : // CreatorID, if it is set.
82 : CreatorID objstorage.CreatorID
83 : Objects []RemoteObjectMetadata
84 : }
85 :
86 : // Open creates a Catalog and loads any existing catalog file, returning the
87 : // creator ID (if it is set) and the contents.
88 1 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
89 1 : c := &Catalog{
90 1 : fs: fs,
91 1 : dirname: dirname,
92 1 : }
93 1 : c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
94 1 :
95 1 : var err error
96 1 : c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
97 1 : if err != nil {
98 0 : return nil, CatalogContents{}, err
99 0 : }
100 : // If the filename is empty, there is no existing catalog.
101 1 : if c.mu.catalogFilename != "" {
102 1 : if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
103 0 : return nil, CatalogContents{}, err
104 0 : }
105 1 : if err := c.mu.marker.RemoveObsolete(); err != nil {
106 0 : return nil, CatalogContents{}, err
107 0 : }
108 : // TODO(radu): remove obsolete catalog files.
109 : }
110 1 : res := CatalogContents{
111 1 : CreatorID: c.mu.creatorID,
112 1 : Objects: make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
113 1 : }
114 1 : for _, meta := range c.mu.objects {
115 1 : res.Objects = append(res.Objects, meta)
116 1 : }
117 : // Sort the objects so the function is deterministic.
118 1 : slices.SortFunc(res.Objects, func(a, b RemoteObjectMetadata) int {
119 1 : return cmp.Compare(a.FileNum, b.FileNum)
120 1 : })
121 1 : return c, res, nil
122 : }
123 :
124 : // SetCreatorID sets the creator ID. If it is already set, it must match.
125 1 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
126 1 : if !id.IsSet() {
127 0 : return errors.AssertionFailedf("attempt to unset CreatorID")
128 0 : }
129 :
130 1 : c.mu.Lock()
131 1 : defer c.mu.Unlock()
132 1 :
133 1 : if c.mu.creatorID.IsSet() {
134 0 : if c.mu.creatorID != id {
135 0 : return errors.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
136 0 : }
137 0 : return nil
138 : }
139 :
140 1 : ve := VersionEdit{CreatorID: id}
141 1 : if err := c.writeToCatalogFileLocked(&ve); err != nil {
142 0 : return errors.Wrapf(err, "pebble: could not write to remote object catalog")
143 0 : }
144 1 : c.mu.creatorID = id
145 1 : return nil
146 : }
147 :
148 : // Close any open files.
149 1 : func (c *Catalog) Close() error {
150 1 : return c.closeCatalogFile()
151 1 : }
152 :
153 1 : func (c *Catalog) closeCatalogFile() error {
154 1 : if c.mu.catalogFile == nil {
155 1 : return nil
156 1 : }
157 1 : err1 := c.mu.catalogRecWriter.Close()
158 1 : err2 := c.mu.catalogFile.Close()
159 1 : c.mu.catalogRecWriter = nil
160 1 : c.mu.catalogFile = nil
161 1 : if err1 != nil {
162 0 : return err1
163 0 : }
164 1 : return err2
165 : }
166 :
167 : // Batch is used to perform multiple object additions/deletions at once.
168 : type Batch struct {
169 : ve VersionEdit
170 : }
171 :
172 : // AddObject adds a new object to the batch.
173 : //
174 : // The given FileNum must be new - it must not match that of any object that was
175 : // ever in the catalog.
176 1 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
177 1 : b.ve.NewObjects = append(b.ve.NewObjects, meta)
178 1 : }
179 :
180 : // DeleteObject adds an object removal to the batch.
181 1 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
182 1 : b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
183 1 : }
184 :
185 : // Reset clears the batch.
186 1 : func (b *Batch) Reset() {
187 1 : b.ve.NewObjects = b.ve.NewObjects[:0]
188 1 : b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
189 1 : }
190 :
191 : // IsEmpty returns true if the batch is empty.
192 1 : func (b *Batch) IsEmpty() bool {
193 1 : return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
194 1 : }
195 :
196 : // Copy returns a copy of the Batch.
197 1 : func (b *Batch) Copy() Batch {
198 1 : var res Batch
199 1 : if len(b.ve.NewObjects) > 0 {
200 1 : res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
201 1 : copy(res.ve.NewObjects, b.ve.NewObjects)
202 1 : }
203 1 : if len(b.ve.DeletedObjects) > 0 {
204 1 : res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
205 1 : copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
206 1 : }
207 1 : return res
208 : }
209 :
210 : // Append merges two batches.
211 0 : func (b *Batch) Append(other Batch) {
212 0 : b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
213 0 : b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
214 0 : }
215 :
216 : // ApplyBatch applies a batch of updates; returns after the change is stably
217 : // recorded on storage.
218 1 : func (c *Catalog) ApplyBatch(b Batch) error {
219 1 : c.mu.Lock()
220 1 : defer c.mu.Unlock()
221 1 :
222 1 : // Sanity checks.
223 1 : toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
224 1 : exists := func(n base.DiskFileNum) bool {
225 1 : _, ok := c.mu.objects[n]
226 1 : if !ok {
227 1 : _, ok = toAdd[n]
228 1 : }
229 1 : return ok
230 : }
231 1 : for _, meta := range b.ve.NewObjects {
232 1 : if exists(meta.FileNum) {
233 0 : return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
234 0 : }
235 1 : toAdd[meta.FileNum] = struct{}{}
236 : }
237 1 : for _, n := range b.ve.DeletedObjects {
238 1 : if !exists(n) {
239 0 : return errors.AssertionFailedf("deleting non-existent object %s", n)
240 0 : }
241 : }
242 :
243 1 : if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
244 0 : return errors.Wrapf(err, "pebble: could not write to remote object catalog")
245 0 : }
246 :
247 : // Add new objects before deleting any objects. This allows for cases where
248 : // the same batch adds and deletes an object.
249 1 : for _, meta := range b.ve.NewObjects {
250 1 : c.mu.objects[meta.FileNum] = meta
251 1 : }
252 1 : for _, n := range b.ve.DeletedObjects {
253 1 : delete(c.mu.objects, n)
254 1 : }
255 :
256 1 : return nil
257 : }
258 :
259 1 : func (c *Catalog) loadFromCatalogFile(filename string) error {
260 1 : catalogPath := c.fs.PathJoin(c.dirname, filename)
261 1 : f, err := c.fs.Open(catalogPath)
262 1 : if err != nil {
263 0 : return errors.Wrapf(
264 0 : err, "pebble: could not open remote object catalog file %q for DB %q",
265 0 : errors.Safe(filename), c.dirname,
266 0 : )
267 0 : }
268 1 : defer f.Close()
269 1 : rr := record.NewReader(f, 0 /* logNum */)
270 1 : for {
271 1 : r, err := rr.Next()
272 1 : if err == io.EOF || record.IsInvalidRecord(err) {
273 1 : break
274 : }
275 1 : if err != nil {
276 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
277 0 : errors.Safe(filename))
278 0 : }
279 1 : var ve VersionEdit
280 1 : if err := ve.Decode(r); err != nil {
281 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
282 0 : errors.Safe(filename))
283 0 : }
284 : // Apply the version edit to the current state.
285 1 : if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
286 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
287 0 : errors.Safe(filename))
288 0 : }
289 : }
290 1 : return nil
291 : }
292 :
293 : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
294 : // Creates a new file if this is the first write.
295 1 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
296 1 : c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
297 1 : snapshotSize := int64(len(c.mu.objects))
298 1 :
299 1 : var shouldRotate bool
300 1 : if c.mu.catalogFile == nil {
301 1 : shouldRotate = true
302 1 : } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
303 0 : shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
304 0 : }
305 :
306 1 : if shouldRotate {
307 1 : if c.mu.catalogFile != nil {
308 0 : if err := c.closeCatalogFile(); err != nil {
309 0 : return err
310 0 : }
311 : }
312 1 : if err := c.createNewCatalogFileLocked(); err != nil {
313 0 : return err
314 0 : }
315 1 : c.mu.rotationHelper.Rotate(snapshotSize)
316 : }
317 1 : return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
318 : }
319 :
320 1 : func makeCatalogFilename(iter uint64) string {
321 1 : return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
322 1 : }
323 :
324 : // createNewCatalogFileLocked creates a new catalog file, populates it with the
325 : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
326 1 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
327 1 : if c.mu.catalogFile != nil {
328 0 : return errors.AssertionFailedf("catalogFile already open")
329 0 : }
330 1 : filename := makeCatalogFilename(c.mu.marker.NextIter())
331 1 : filepath := c.fs.PathJoin(c.dirname, filename)
332 1 : file, err := c.fs.Create(filepath)
333 1 : if err != nil {
334 0 : return err
335 0 : }
336 1 : recWriter := record.NewWriter(file)
337 1 : err = func() error {
338 1 : // Create a VersionEdit that gets us from an empty catalog to the current state.
339 1 : var ve VersionEdit
340 1 : ve.CreatorID = c.mu.creatorID
341 1 : ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
342 1 : for _, meta := range c.mu.objects {
343 1 : ve.NewObjects = append(ve.NewObjects, meta)
344 1 : }
345 1 : if err := writeRecord(&ve, file, recWriter); err != nil {
346 0 : return err
347 0 : }
348 :
349 : // Move the marker to the new filename. Move handles syncing the data
350 : // directory as well.
351 1 : if err := c.mu.marker.Move(filename); err != nil {
352 0 : return errors.Wrap(err, "moving marker")
353 0 : }
354 :
355 1 : return nil
356 : }()
357 :
358 1 : if err != nil {
359 0 : _ = recWriter.Close()
360 0 : _ = file.Close()
361 0 : _ = c.fs.Remove(filepath)
362 0 : return err
363 0 : }
364 :
365 : // Remove any previous file (ignoring any error).
366 1 : if c.mu.catalogFilename != "" {
367 1 : _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
368 1 : }
369 :
370 1 : c.mu.catalogFile = file
371 1 : c.mu.catalogRecWriter = recWriter
372 1 : c.mu.catalogFilename = filename
373 1 : return nil
374 : }
375 :
376 1 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
377 1 : w, err := recWriter.Next()
378 1 : if err != nil {
379 0 : return err
380 0 : }
381 1 : if err := ve.Encode(w); err != nil {
382 0 : return err
383 0 : }
384 1 : if err := recWriter.Flush(); err != nil {
385 0 : return err
386 0 : }
387 1 : return file.Sync()
388 : }
|