Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "io"
11 : "os"
12 : "slices"
13 : "sync"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/errors/oserror"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
23 : "github.com/cockroachdb/pebble/objstorage/remote"
24 : "github.com/cockroachdb/pebble/vfs"
25 : )
26 :
27 : // provider is the implementation of objstorage.Provider.
28 : type provider struct {
29 : st Settings
30 :
31 : fsDir vfs.File
32 :
33 : tracer *objiotracing.Tracer
34 :
35 : remote remoteSubsystem
36 :
37 : mu struct {
38 : sync.RWMutex
39 :
40 : remote remoteLockedState
41 :
42 : // localObjectsChanged is set if non-remote objects were created or deleted
43 : // but Sync was not yet called.
44 : localObjectsChanged bool
45 :
46 : // knownObjects maintains information about objects that are known to the provider.
47 : // It is initialized with the list of files in the manifest when we open a DB.
48 : knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
49 :
50 : // protectedObjects are objects that cannot be unreferenced because they
51 : // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
52 : protectedObjects map[base.DiskFileNum]int
53 : }
54 : }
55 :
56 : var _ objstorage.Provider = (*provider)(nil)
57 :
58 : // Settings that must be specified when creating the provider.
59 : type Settings struct {
60 : Logger base.Logger
61 :
62 : // Local filesystem configuration.
63 : FS vfs.FS
64 : FSDirName string
65 :
66 : // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
67 : //
68 : // This is an optional optimization to avoid double listing on Open when the
69 : // higher layer already has a listing. When nil, we obtain the listing on
70 : // Open.
71 : FSDirInitialListing []string
72 :
73 : // Cleaner cleans obsolete files from the local filesystem.
74 : //
75 : // The default cleaner uses the DeleteCleaner.
76 : FSCleaner base.Cleaner
77 :
78 : // NoSyncOnClose decides whether the implementation will enforce a
79 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
80 : // on files it writes to. Setting this to true removes the guarantee for a
81 : // sync on close. Some implementations can still issue a non-blocking sync.
82 : NoSyncOnClose bool
83 :
84 : // BytesPerSync enables periodic syncing of files in order to smooth out
85 : // writes to disk. This option does not provide any persistence guarantee, but
86 : // is used to avoid latency spikes if the OS automatically decides to write
87 : // out a large chunk of dirty filesystem buffers.
88 : BytesPerSync int
89 :
90 : // Fields here are set only if the provider is to support remote objects
91 : // (experimental).
92 : Remote struct {
93 : StorageFactory remote.StorageFactory
94 :
95 : // If CreateOnShared is non-zero, sstables are created on remote storage using
96 : // the CreateOnSharedLocator (when the PreferSharedStorage create option is
97 : // true).
98 : CreateOnShared remote.CreateOnSharedStrategy
99 : CreateOnSharedLocator remote.Locator
100 :
101 : // CacheSizeBytes is the size of the on-disk block cache for objects
102 : // on remote storage. If it is 0, no cache is used.
103 : CacheSizeBytes int64
104 :
105 : // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
106 : CacheBlockSize int
107 :
108 : // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
109 : // ShardingBlockSize units. The units are distributed across multiple independent shards
110 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
111 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
112 : // contention.
113 : //
114 : // If ShardingBlockSize is 0, the default of 1 MB is used.
115 : ShardingBlockSize int64
116 :
117 : // The number of independent shards the cache leverages. Each shard is the same size,
118 : // and a hash of filenum & offset map a read to a certain shard. If set to 0,
119 : // 2*runtime.GOMAXPROCS is used as the shard count.
120 : CacheShardCount int
121 :
122 : // TODO(radu): allow the cache to live on another FS/location (e.g. to use
123 : // instance-local SSD).
124 : }
125 : }
126 :
127 : // DefaultSettings initializes default settings (with no remote storage),
128 : // suitable for tests and tools.
129 1 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
130 1 : return Settings{
131 1 : Logger: base.DefaultLogger,
132 1 : FS: fs,
133 1 : FSDirName: dirName,
134 1 : FSCleaner: base.DeleteCleaner{},
135 1 : NoSyncOnClose: false,
136 1 : BytesPerSync: 512 * 1024, // 512KB
137 1 : }
138 1 : }
139 :
140 : // Open creates the provider.
141 2 : func Open(settings Settings) (objstorage.Provider, error) {
142 2 : // Note: we can't just `return open(settings)` because in an error case we
143 2 : // would return (*provider)(nil) which is not objstorage.Provider(nil).
144 2 : p, err := open(settings)
145 2 : if err != nil {
146 1 : return nil, err
147 1 : }
148 2 : return p, nil
149 : }
150 :
151 2 : func open(settings Settings) (p *provider, _ error) {
152 2 : fsDir, err := settings.FS.OpenDir(settings.FSDirName)
153 2 : if err != nil {
154 1 : return nil, err
155 1 : }
156 :
157 2 : defer func() {
158 2 : if p == nil {
159 0 : fsDir.Close()
160 0 : }
161 : }()
162 :
163 2 : p = &provider{
164 2 : st: settings,
165 2 : fsDir: fsDir,
166 2 : }
167 2 : p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
168 2 : p.mu.protectedObjects = make(map[base.DiskFileNum]int)
169 2 :
170 2 : if objiotracing.Enabled {
171 0 : p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
172 0 : }
173 :
174 : // Add local FS objects.
175 2 : if err := p.vfsInit(); err != nil {
176 0 : return nil, err
177 0 : }
178 :
179 : // Initialize remote subsystem (if configured) and add remote objects.
180 2 : if err := p.remoteInit(); err != nil {
181 0 : return nil, err
182 0 : }
183 :
184 2 : return p, nil
185 : }
186 :
187 : // Close is part of the objstorage.Provider interface.
188 2 : func (p *provider) Close() error {
189 2 : err := p.sharedClose()
190 2 : if p.fsDir != nil {
191 2 : err = firstError(err, p.fsDir.Close())
192 2 : p.fsDir = nil
193 2 : }
194 2 : if objiotracing.Enabled {
195 0 : if p.tracer != nil {
196 0 : p.tracer.Close()
197 0 : p.tracer = nil
198 0 : }
199 : }
200 2 : return err
201 : }
202 :
203 : // OpenForReading opens an existing object.
204 : func (p *provider) OpenForReading(
205 : ctx context.Context,
206 : fileType base.FileType,
207 : fileNum base.DiskFileNum,
208 : opts objstorage.OpenOptions,
209 2 : ) (objstorage.Readable, error) {
210 2 : meta, err := p.Lookup(fileType, fileNum)
211 2 : if err != nil {
212 1 : if opts.MustExist {
213 0 : p.st.Logger.Fatalf("%v", err)
214 0 : }
215 1 : return nil, err
216 : }
217 :
218 2 : var r objstorage.Readable
219 2 : if !meta.IsRemote() {
220 2 : r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
221 2 : } else {
222 2 : r, err = p.remoteOpenForReading(ctx, meta, opts)
223 2 : if err != nil && p.isNotExistError(meta, err) {
224 1 : // Wrap the error so that IsNotExistError functions properly.
225 1 : err = errors.Mark(err, os.ErrNotExist)
226 1 : }
227 : }
228 2 : if err != nil {
229 1 : return nil, err
230 1 : }
231 2 : if objiotracing.Enabled {
232 0 : r = p.tracer.WrapReadable(ctx, r, fileNum)
233 0 : }
234 2 : return r, nil
235 : }
236 :
237 : // Create creates a new object and opens it for writing.
238 : //
239 : // The object is not guaranteed to be durable (accessible in case of crashes)
240 : // until Sync is called.
241 : func (p *provider) Create(
242 : ctx context.Context,
243 : fileType base.FileType,
244 : fileNum base.DiskFileNum,
245 : opts objstorage.CreateOptions,
246 2 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
247 2 : if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
248 2 : w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
249 2 : } else {
250 2 : var category vfs.DiskWriteCategory
251 2 : if opts.WriteCategory != "" {
252 2 : category = opts.WriteCategory
253 2 : } else {
254 2 : category = vfs.WriteCategoryUnspecified
255 2 : }
256 2 : w, meta, err = p.vfsCreate(ctx, fileType, fileNum, category)
257 : }
258 2 : if err != nil {
259 1 : err = errors.Wrapf(err, "creating object %s", fileNum)
260 1 : return nil, objstorage.ObjectMetadata{}, err
261 1 : }
262 2 : p.addMetadata(meta)
263 2 : if objiotracing.Enabled {
264 0 : w = p.tracer.WrapWritable(ctx, w, fileNum)
265 0 : }
266 2 : return w, meta, nil
267 : }
268 :
269 : // Remove removes an object.
270 : //
271 : // Note that if the object is remote, the object is only (conceptually) removed
272 : // from this provider. If other providers have references on the remote object,
273 : // it will not be removed.
274 : //
275 : // The object is not guaranteed to be durably removed until Sync is called.
276 2 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
277 2 : meta, err := p.Lookup(fileType, fileNum)
278 2 : if err != nil {
279 1 : return err
280 1 : }
281 :
282 2 : if !meta.IsRemote() {
283 2 : err = p.vfsRemove(fileType, fileNum)
284 2 : } else {
285 2 : // TODO(radu): implement remote object removal (i.e. deref).
286 2 : err = p.sharedUnref(meta)
287 2 : if err != nil && p.isNotExistError(meta, err) {
288 0 : // Wrap the error so that IsNotExistError functions properly.
289 0 : err = errors.Mark(err, os.ErrNotExist)
290 0 : }
291 : }
292 2 : if err != nil && !p.IsNotExistError(err) {
293 1 : // We want to be able to retry a Remove, so we keep the object in our list.
294 1 : // TODO(radu): we should mark the object as "zombie" and not allow any other
295 1 : // operations.
296 1 : return errors.Wrapf(err, "removing object %s", fileNum)
297 1 : }
298 :
299 2 : p.removeMetadata(fileNum)
300 2 : return err
301 : }
302 :
303 1 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
304 1 : if meta.Remote.Storage != nil {
305 1 : return meta.Remote.Storage.IsNotExistError(err)
306 1 : }
307 0 : return oserror.IsNotExist(err)
308 : }
309 :
310 : // IsNotExistError is part of the objstorage.Provider interface.
311 2 : func (p *provider) IsNotExistError(err error) bool {
312 2 : // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
313 2 : // remote.Storage.
314 2 : return oserror.IsNotExist(err)
315 2 : }
316 :
317 : // Sync flushes the metadata from creation or removal of objects since the last Sync.
318 2 : func (p *provider) Sync() error {
319 2 : if err := p.vfsSync(); err != nil {
320 1 : return err
321 1 : }
322 2 : if err := p.sharedSync(); err != nil {
323 0 : return err
324 0 : }
325 2 : return nil
326 : }
327 :
328 : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
329 : // local file or a hard link (if the new object is created on the same FS, and
330 : // if the FS supports it).
331 : //
332 : // The object is not guaranteed to be durable (accessible in case of crashes)
333 : // until Sync is called.
334 : func (p *provider) LinkOrCopyFromLocal(
335 : ctx context.Context,
336 : srcFS vfs.FS,
337 : srcFilePath string,
338 : dstFileType base.FileType,
339 : dstFileNum base.DiskFileNum,
340 : opts objstorage.CreateOptions,
341 2 : ) (objstorage.ObjectMetadata, error) {
342 2 : shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
343 2 : if !shared && srcFS == p.st.FS {
344 2 : // Wrap the normal filesystem with one which wraps newly created files with
345 2 : // vfs.NewSyncingFile.
346 2 : fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
347 2 : NoSyncOnClose: p.st.NoSyncOnClose,
348 2 : BytesPerSync: p.st.BytesPerSync,
349 2 : })
350 2 : dstPath := p.vfsPath(dstFileType, dstFileNum)
351 2 : if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
352 1 : return objstorage.ObjectMetadata{}, err
353 1 : }
354 :
355 2 : meta := objstorage.ObjectMetadata{
356 2 : DiskFileNum: dstFileNum,
357 2 : FileType: dstFileType,
358 2 : }
359 2 : p.addMetadata(meta)
360 2 : return meta, nil
361 : }
362 : // Create the object and copy the data.
363 2 : w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
364 2 : if err != nil {
365 0 : return objstorage.ObjectMetadata{}, err
366 0 : }
367 2 : f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
368 2 : if err != nil {
369 0 : return objstorage.ObjectMetadata{}, err
370 0 : }
371 2 : defer f.Close()
372 2 : buf := make([]byte, 64*1024)
373 2 : for {
374 2 : n, readErr := f.Read(buf)
375 2 : if readErr != nil && readErr != io.EOF {
376 0 : w.Abort()
377 0 : return objstorage.ObjectMetadata{}, readErr
378 0 : }
379 :
380 2 : if n > 0 {
381 2 : if err := w.Write(buf[:n]); err != nil {
382 0 : w.Abort()
383 0 : return objstorage.ObjectMetadata{}, err
384 0 : }
385 : }
386 :
387 2 : if readErr == io.EOF {
388 2 : break
389 : }
390 : }
391 2 : if err := w.Finish(); err != nil {
392 0 : return objstorage.ObjectMetadata{}, err
393 0 : }
394 2 : return meta, nil
395 : }
396 :
397 : // Lookup is part of the objstorage.Provider interface.
398 : func (p *provider) Lookup(
399 : fileType base.FileType, fileNum base.DiskFileNum,
400 2 : ) (objstorage.ObjectMetadata, error) {
401 2 : p.mu.RLock()
402 2 : defer p.mu.RUnlock()
403 2 : meta, ok := p.mu.knownObjects[fileNum]
404 2 : if !ok {
405 2 : return objstorage.ObjectMetadata{}, errors.Wrapf(
406 2 : os.ErrNotExist,
407 2 : "file %s (type %d) unknown to the objstorage provider",
408 2 : fileNum, errors.Safe(fileType),
409 2 : )
410 2 : }
411 2 : if meta.FileType != fileType {
412 0 : return objstorage.ObjectMetadata{}, base.AssertionFailedf(
413 0 : "file %s type mismatch (known type %d, expected type %d)",
414 0 : fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
415 0 : )
416 0 : }
417 2 : return meta, nil
418 : }
419 :
420 : // Path is part of the objstorage.Provider interface.
421 2 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
422 2 : if !meta.IsRemote() {
423 2 : return p.vfsPath(meta.FileType, meta.DiskFileNum)
424 2 : }
425 2 : return p.remotePath(meta)
426 : }
427 :
428 : // Size returns the size of the object.
429 2 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
430 2 : if !meta.IsRemote() {
431 2 : return p.vfsSize(meta.FileType, meta.DiskFileNum)
432 2 : }
433 2 : return p.remoteSize(meta)
434 : }
435 :
436 : // List is part of the objstorage.Provider interface.
437 2 : func (p *provider) List() []objstorage.ObjectMetadata {
438 2 : p.mu.RLock()
439 2 : defer p.mu.RUnlock()
440 2 : res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
441 2 : for _, meta := range p.mu.knownObjects {
442 2 : res = append(res, meta)
443 2 : }
444 2 : slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
445 2 : return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
446 2 : })
447 2 : return res
448 : }
449 :
450 : // Metrics is part of the objstorage.Provider interface.
451 2 : func (p *provider) Metrics() sharedcache.Metrics {
452 2 : if p.remote.cache != nil {
453 1 : return p.remote.cache.Metrics()
454 1 : }
455 2 : return sharedcache.Metrics{}
456 : }
457 :
458 : // CheckpointState is part of the objstorage.Provider interface.
459 : func (p *provider) CheckpointState(
460 : fs vfs.FS, dir string, fileType base.FileType, fileNums []base.DiskFileNum,
461 1 : ) error {
462 1 : p.mu.Lock()
463 1 : defer p.mu.Unlock()
464 1 : for i := range fileNums {
465 1 : if _, ok := p.mu.knownObjects[fileNums[i]]; !ok {
466 0 : return errors.Wrapf(
467 0 : os.ErrNotExist,
468 0 : "file %s (type %d) unknown to the objstorage provider",
469 0 : fileNums[i], errors.Safe(fileType),
470 0 : )
471 0 : }
472 : // Prevent this object from deletion, at least for the life of this instance.
473 1 : p.mu.protectedObjects[fileNums[i]] = p.mu.protectedObjects[fileNums[i]] + 1
474 : }
475 :
476 1 : if p.remote.catalog != nil {
477 1 : return p.remote.catalog.Checkpoint(fs, dir)
478 1 : }
479 0 : return nil
480 : }
481 :
482 2 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
483 2 : p.mu.Lock()
484 2 : defer p.mu.Unlock()
485 2 : p.addMetadataLocked(meta)
486 2 : }
487 :
488 2 : func (p *provider) addMetadataLocked(meta objstorage.ObjectMetadata) {
489 2 : if invariants.Enabled {
490 2 : meta.AssertValid()
491 2 : }
492 2 : p.mu.knownObjects[meta.DiskFileNum] = meta
493 2 : if meta.IsRemote() {
494 2 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
495 2 : FileNum: meta.DiskFileNum,
496 2 : FileType: meta.FileType,
497 2 : CreatorID: meta.Remote.CreatorID,
498 2 : CreatorFileNum: meta.Remote.CreatorFileNum,
499 2 : Locator: meta.Remote.Locator,
500 2 : CleanupMethod: meta.Remote.CleanupMethod,
501 2 : CustomObjectName: meta.Remote.CustomObjectName,
502 2 : })
503 2 : if meta.IsExternal() {
504 2 : p.mu.remote.addExternalObject(meta)
505 2 : }
506 2 : } else {
507 2 : p.mu.localObjectsChanged = true
508 2 : }
509 : }
510 :
511 2 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
512 2 : p.mu.Lock()
513 2 : defer p.mu.Unlock()
514 2 :
515 2 : meta, ok := p.mu.knownObjects[fileNum]
516 2 : if !ok {
517 0 : return
518 0 : }
519 2 : delete(p.mu.knownObjects, fileNum)
520 2 : if meta.IsExternal() {
521 2 : p.mu.remote.removeExternalObject(meta)
522 2 : }
523 2 : if meta.IsRemote() {
524 2 : p.mu.remote.catalogBatch.DeleteObject(fileNum)
525 2 : } else {
526 2 : p.mu.localObjectsChanged = true
527 2 : }
528 : }
529 :
530 : // protectObject prevents the unreferencing of a remote object until
531 : // unprotectObject is called.
532 2 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
533 2 : p.mu.Lock()
534 2 : defer p.mu.Unlock()
535 2 : p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
536 2 : }
537 :
538 1 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
539 1 : p.mu.Lock()
540 1 : defer p.mu.Unlock()
541 1 : v := p.mu.protectedObjects[fileNum]
542 1 : if invariants.Enabled && v == 0 {
543 0 : panic("invalid protection count")
544 : }
545 1 : if v > 1 {
546 0 : p.mu.protectedObjects[fileNum] = v - 1
547 1 : } else {
548 1 : delete(p.mu.protectedObjects, fileNum)
549 1 : // TODO(radu): check if the object is still in knownObject; if not, unref it
550 1 : // now.
551 1 : }
552 : }
553 :
554 2 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
555 2 : p.mu.Lock()
556 2 : defer p.mu.Unlock()
557 2 : return p.mu.protectedObjects[fileNum] > 0
558 2 : }
|