Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package remoteobjcat
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "sort"
11 : "sync"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/objstorage/remote"
17 : "github.com/cockroachdb/pebble/record"
18 : "github.com/cockroachdb/pebble/vfs"
19 : "github.com/cockroachdb/pebble/vfs/atomicfs"
20 : )
21 :
22 : // Catalog is used to manage the on-disk remote object catalog.
23 : //
24 : // The catalog file is a log of records, where each record is an encoded
25 : // VersionEdit.
26 : type Catalog struct {
27 : fs vfs.FS
28 : dirname string
29 : mu struct {
30 : sync.Mutex
31 :
32 : creatorID objstorage.CreatorID
33 : objects map[base.DiskFileNum]RemoteObjectMetadata
34 :
35 : marker *atomicfs.Marker
36 :
37 : catalogFile vfs.File
38 : catalogRecWriter *record.Writer
39 :
40 : rotationHelper record.RotationHelper
41 :
42 : // catalogFilename is the filename of catalogFile when catalogFile != nil, otherwise
43 : // it is the filename of the last catalog file.
44 : catalogFilename string
45 : }
46 : }
47 :
48 : // RemoteObjectMetadata encapsulates the data stored in the catalog file for each object.
49 : type RemoteObjectMetadata struct {
50 : // FileNum is the identifier for the object within the context of a single DB
51 : // instance.
52 : FileNum base.DiskFileNum
53 : // FileType is the type of the object. Only certain FileTypes are possible.
54 : FileType base.FileType
55 : // CreatorID identifies the DB instance that originally created the object.
56 : CreatorID objstorage.CreatorID
57 : // CreatorFileNum is the identifier for the object within the context of the
58 : // DB instance that originally created the object.
59 : CreatorFileNum base.DiskFileNum
60 : // CleanupMethod indicates the method for cleaning up unused shared objects.
61 : CleanupMethod objstorage.SharedCleanupMethod
62 : // Locator identifies a remote.Storage implementation.
63 : Locator remote.Locator
64 : // CustomObjectName (if it is set) overrides the object name that is normally
65 : // derived from the CreatorID and CreatorFileNum.
66 : CustomObjectName string
67 : }
68 :
69 : const (
70 : catalogFilenameBase = "REMOTE-OBJ-CATALOG"
71 : catalogMarkerName = "remote-obj-catalog"
72 :
73 : // We create a new file when the size exceeds 1MB (and some other conditions
74 : // hold; see record.RotationHelper).
75 : rotateFileSize = 1024 * 1024 // 1MB
76 : )
77 :
78 : // CatalogContents contains the remote objects in the catalog.
79 : type CatalogContents struct {
80 : // CreatorID, if it is set.
81 : CreatorID objstorage.CreatorID
82 : Objects []RemoteObjectMetadata
83 : }
84 :
85 : // Open creates a Catalog and loads any existing catalog file, returning the
86 : // creator ID (if it is set) and the contents.
87 2 : func Open(fs vfs.FS, dirname string) (*Catalog, CatalogContents, error) {
88 2 : c := &Catalog{
89 2 : fs: fs,
90 2 : dirname: dirname,
91 2 : }
92 2 : c.mu.objects = make(map[base.DiskFileNum]RemoteObjectMetadata)
93 2 :
94 2 : var err error
95 2 : c.mu.marker, c.mu.catalogFilename, err = atomicfs.LocateMarker(fs, dirname, catalogMarkerName)
96 2 : if err != nil {
97 0 : return nil, CatalogContents{}, err
98 0 : }
99 : // If the filename is empty, there is no existing catalog.
100 2 : if c.mu.catalogFilename != "" {
101 2 : if err := c.loadFromCatalogFile(c.mu.catalogFilename); err != nil {
102 0 : return nil, CatalogContents{}, err
103 0 : }
104 2 : if err := c.mu.marker.RemoveObsolete(); err != nil {
105 0 : return nil, CatalogContents{}, err
106 0 : }
107 : // TODO(radu): remove obsolete catalog files.
108 : }
109 2 : res := CatalogContents{
110 2 : CreatorID: c.mu.creatorID,
111 2 : Objects: make([]RemoteObjectMetadata, 0, len(c.mu.objects)),
112 2 : }
113 2 : for _, meta := range c.mu.objects {
114 2 : res.Objects = append(res.Objects, meta)
115 2 : }
116 : // Sort the objects so the function is deterministic.
117 2 : sort.Slice(res.Objects, func(i, j int) bool {
118 2 : return res.Objects[i].FileNum.FileNum() < res.Objects[j].FileNum.FileNum()
119 2 : })
120 2 : return c, res, nil
121 : }
122 :
123 : // SetCreatorID sets the creator ID. If it is already set, it must match.
124 2 : func (c *Catalog) SetCreatorID(id objstorage.CreatorID) error {
125 2 : if !id.IsSet() {
126 0 : return errors.AssertionFailedf("attempt to unset CreatorID")
127 0 : }
128 :
129 2 : c.mu.Lock()
130 2 : defer c.mu.Unlock()
131 2 :
132 2 : if c.mu.creatorID.IsSet() {
133 1 : if c.mu.creatorID != id {
134 1 : return errors.AssertionFailedf("attempt to change CreatorID from %s to %s", c.mu.creatorID, id)
135 1 : }
136 1 : return nil
137 : }
138 :
139 2 : ve := VersionEdit{CreatorID: id}
140 2 : if err := c.writeToCatalogFileLocked(&ve); err != nil {
141 0 : return errors.Wrapf(err, "pebble: could not write to remote object catalog: %v", err)
142 0 : }
143 2 : c.mu.creatorID = id
144 2 : return nil
145 : }
146 :
147 : // Close any open files.
148 2 : func (c *Catalog) Close() error {
149 2 : return c.closeCatalogFile()
150 2 : }
151 :
152 2 : func (c *Catalog) closeCatalogFile() error {
153 2 : if c.mu.catalogFile == nil {
154 2 : return nil
155 2 : }
156 2 : err1 := c.mu.catalogRecWriter.Close()
157 2 : err2 := c.mu.catalogFile.Close()
158 2 : c.mu.catalogRecWriter = nil
159 2 : c.mu.catalogFile = nil
160 2 : if err1 != nil {
161 0 : return err1
162 0 : }
163 2 : return err2
164 : }
165 :
166 : // Batch is used to perform multiple object additions/deletions at once.
167 : type Batch struct {
168 : ve VersionEdit
169 : }
170 :
171 : // AddObject adds a new object to the batch.
172 : //
173 : // The given FileNum must be new - it must not match that of any object that was
174 : // ever in the catalog.
175 2 : func (b *Batch) AddObject(meta RemoteObjectMetadata) {
176 2 : b.ve.NewObjects = append(b.ve.NewObjects, meta)
177 2 : }
178 :
179 : // DeleteObject adds an object removal to the batch.
180 2 : func (b *Batch) DeleteObject(fileNum base.DiskFileNum) {
181 2 : b.ve.DeletedObjects = append(b.ve.DeletedObjects, fileNum)
182 2 : }
183 :
184 : // Reset clears the batch.
185 2 : func (b *Batch) Reset() {
186 2 : b.ve.NewObjects = b.ve.NewObjects[:0]
187 2 : b.ve.DeletedObjects = b.ve.DeletedObjects[:0]
188 2 : }
189 :
190 : // IsEmpty returns true if the batch is empty.
191 2 : func (b *Batch) IsEmpty() bool {
192 2 : return len(b.ve.NewObjects) == 0 && len(b.ve.DeletedObjects) == 0
193 2 : }
194 :
195 : // Copy returns a copy of the Batch.
196 2 : func (b *Batch) Copy() Batch {
197 2 : var res Batch
198 2 : if len(b.ve.NewObjects) > 0 {
199 2 : res.ve.NewObjects = make([]RemoteObjectMetadata, len(b.ve.NewObjects))
200 2 : copy(res.ve.NewObjects, b.ve.NewObjects)
201 2 : }
202 2 : if len(b.ve.DeletedObjects) > 0 {
203 2 : res.ve.DeletedObjects = make([]base.DiskFileNum, len(b.ve.DeletedObjects))
204 2 : copy(res.ve.DeletedObjects, b.ve.DeletedObjects)
205 2 : }
206 2 : return res
207 : }
208 :
209 : // Append merges two batches.
210 0 : func (b *Batch) Append(other Batch) {
211 0 : b.ve.NewObjects = append(b.ve.NewObjects, other.ve.NewObjects...)
212 0 : b.ve.DeletedObjects = append(b.ve.DeletedObjects, other.ve.DeletedObjects...)
213 0 : }
214 :
215 : // ApplyBatch applies a batch of updates; returns after the change is stably
216 : // recorded on storage.
217 2 : func (c *Catalog) ApplyBatch(b Batch) error {
218 2 : c.mu.Lock()
219 2 : defer c.mu.Unlock()
220 2 :
221 2 : // Sanity checks.
222 2 : toAdd := make(map[base.DiskFileNum]struct{}, len(b.ve.NewObjects))
223 2 : exists := func(n base.DiskFileNum) bool {
224 2 : _, ok := c.mu.objects[n]
225 2 : if !ok {
226 2 : _, ok = toAdd[n]
227 2 : }
228 2 : return ok
229 : }
230 2 : for _, meta := range b.ve.NewObjects {
231 2 : if exists(meta.FileNum) {
232 1 : return errors.AssertionFailedf("adding existing object %s", meta.FileNum)
233 1 : }
234 2 : toAdd[meta.FileNum] = struct{}{}
235 : }
236 2 : for _, n := range b.ve.DeletedObjects {
237 2 : if !exists(n) {
238 1 : return errors.AssertionFailedf("deleting non-existent object %s", n)
239 1 : }
240 : }
241 :
242 2 : if err := c.writeToCatalogFileLocked(&b.ve); err != nil {
243 0 : return errors.Wrapf(err, "pebble: could not write to remote object catalog: %v", err)
244 0 : }
245 :
246 : // Add new objects before deleting any objects. This allows for cases where
247 : // the same batch adds and deletes an object.
248 2 : for _, meta := range b.ve.NewObjects {
249 2 : c.mu.objects[meta.FileNum] = meta
250 2 : }
251 2 : for _, n := range b.ve.DeletedObjects {
252 2 : delete(c.mu.objects, n)
253 2 : }
254 :
255 2 : return nil
256 : }
257 :
258 2 : func (c *Catalog) loadFromCatalogFile(filename string) error {
259 2 : catalogPath := c.fs.PathJoin(c.dirname, filename)
260 2 : f, err := c.fs.Open(catalogPath)
261 2 : if err != nil {
262 0 : return errors.Wrapf(
263 0 : err, "pebble: could not open remote object catalog file %q for DB %q",
264 0 : errors.Safe(filename), c.dirname,
265 0 : )
266 0 : }
267 2 : defer f.Close()
268 2 : rr := record.NewReader(f, 0 /* logNum */)
269 2 : for {
270 2 : r, err := rr.Next()
271 2 : if err == io.EOF || record.IsInvalidRecord(err) {
272 2 : break
273 : }
274 2 : if err != nil {
275 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
276 0 : errors.Safe(filename))
277 0 : }
278 2 : var ve VersionEdit
279 2 : if err := ve.Decode(r); err != nil {
280 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
281 0 : errors.Safe(filename))
282 0 : }
283 : // Apply the version edit to the current state.
284 2 : if err := ve.Apply(&c.mu.creatorID, c.mu.objects); err != nil {
285 0 : return errors.Wrapf(err, "pebble: error when loading remote object catalog file %q",
286 0 : errors.Safe(filename))
287 0 : }
288 : }
289 2 : return nil
290 : }
291 :
292 : // writeToCatalogFileLocked writes a VersionEdit to the catalog file.
293 : // Creates a new file if this is the first write.
294 2 : func (c *Catalog) writeToCatalogFileLocked(ve *VersionEdit) error {
295 2 : c.mu.rotationHelper.AddRecord(int64(len(ve.NewObjects) + len(ve.DeletedObjects)))
296 2 : snapshotSize := int64(len(c.mu.objects))
297 2 :
298 2 : var shouldRotate bool
299 2 : if c.mu.catalogFile == nil {
300 2 : shouldRotate = true
301 2 : } else if c.mu.catalogRecWriter.Size() >= rotateFileSize {
302 1 : shouldRotate = c.mu.rotationHelper.ShouldRotate(snapshotSize)
303 1 : }
304 :
305 2 : if shouldRotate {
306 2 : if c.mu.catalogFile != nil {
307 1 : if err := c.closeCatalogFile(); err != nil {
308 0 : return err
309 0 : }
310 : }
311 2 : if err := c.createNewCatalogFileLocked(); err != nil {
312 0 : return err
313 0 : }
314 2 : c.mu.rotationHelper.Rotate(snapshotSize)
315 : }
316 2 : return writeRecord(ve, c.mu.catalogFile, c.mu.catalogRecWriter)
317 : }
318 :
319 2 : func makeCatalogFilename(iter uint64) string {
320 2 : return fmt.Sprintf("%s-%06d", catalogFilenameBase, iter)
321 2 : }
322 :
323 : // createNewCatalogFileLocked creates a new catalog file, populates it with the
324 : // current catalog and sets c.mu.catalogFile and c.mu.catalogRecWriter.
325 2 : func (c *Catalog) createNewCatalogFileLocked() (outErr error) {
326 2 : if c.mu.catalogFile != nil {
327 0 : return errors.AssertionFailedf("catalogFile already open")
328 0 : }
329 2 : filename := makeCatalogFilename(c.mu.marker.NextIter())
330 2 : filepath := c.fs.PathJoin(c.dirname, filename)
331 2 : file, err := c.fs.Create(filepath)
332 2 : if err != nil {
333 0 : return err
334 0 : }
335 2 : recWriter := record.NewWriter(file)
336 2 : err = func() error {
337 2 : // Create a VersionEdit that gets us from an empty catalog to the current state.
338 2 : var ve VersionEdit
339 2 : ve.CreatorID = c.mu.creatorID
340 2 : ve.NewObjects = make([]RemoteObjectMetadata, 0, len(c.mu.objects))
341 2 : for _, meta := range c.mu.objects {
342 2 : ve.NewObjects = append(ve.NewObjects, meta)
343 2 : }
344 2 : if err := writeRecord(&ve, file, recWriter); err != nil {
345 0 : return err
346 0 : }
347 :
348 : // Move the marker to the new filename. Move handles syncing the data
349 : // directory as well.
350 2 : if err := c.mu.marker.Move(filename); err != nil {
351 0 : return errors.Wrap(err, "moving marker")
352 0 : }
353 :
354 2 : return nil
355 : }()
356 :
357 2 : if err != nil {
358 0 : _ = recWriter.Close()
359 0 : _ = file.Close()
360 0 : _ = c.fs.Remove(filepath)
361 0 : return err
362 0 : }
363 :
364 : // Remove any previous file (ignoring any error).
365 2 : if c.mu.catalogFilename != "" {
366 2 : _ = c.fs.Remove(c.fs.PathJoin(c.dirname, c.mu.catalogFilename))
367 2 : }
368 :
369 2 : c.mu.catalogFile = file
370 2 : c.mu.catalogRecWriter = recWriter
371 2 : c.mu.catalogFilename = filename
372 2 : return nil
373 : }
374 :
375 2 : func writeRecord(ve *VersionEdit, file vfs.File, recWriter *record.Writer) error {
376 2 : w, err := recWriter.Next()
377 2 : if err != nil {
378 0 : return err
379 0 : }
380 2 : if err := ve.Encode(w); err != nil {
381 0 : return err
382 0 : }
383 2 : if err := recWriter.Flush(); err != nil {
384 0 : return err
385 0 : }
386 2 : return file.Sync()
387 : }
|