Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "cmp"
9 : "context"
10 : "io"
11 : "os"
12 : "slices"
13 : "sync"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/errors/oserror"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/sharedcache"
23 : "github.com/cockroachdb/pebble/objstorage/remote"
24 : "github.com/cockroachdb/pebble/vfs"
25 : )
26 :
27 : // provider is the implementation of objstorage.Provider.
28 : type provider struct {
29 : st Settings
30 :
31 : fsDir vfs.File
32 :
33 : tracer *objiotracing.Tracer
34 :
35 : remote remoteSubsystem
36 :
37 : mu struct {
38 : sync.RWMutex
39 :
40 : remote struct {
41 : // catalogBatch accumulates remote object creations and deletions until
42 : // Sync is called.
43 : catalogBatch remoteobjcat.Batch
44 :
45 : storageObjects map[remote.Locator]remote.Storage
46 : }
47 :
48 : // localObjectsChanged is set if non-remote objects were created or deleted
49 : // but Sync was not yet called.
50 : localObjectsChanged bool
51 :
52 : // knownObjects maintains information about objects that are known to the provider.
53 : // It is initialized with the list of files in the manifest when we open a DB.
54 : knownObjects map[base.DiskFileNum]objstorage.ObjectMetadata
55 :
56 : // protectedObjects are objects that cannot be unreferenced because they
57 : // have outstanding SharedObjectBackingHandles. The value is a count of outstanding handles
58 : protectedObjects map[base.DiskFileNum]int
59 : }
60 : }
61 :
62 : var _ objstorage.Provider = (*provider)(nil)
63 :
64 : // Settings that must be specified when creating the provider.
65 : type Settings struct {
66 : Logger base.Logger
67 :
68 : // Local filesystem configuration.
69 : FS vfs.FS
70 : FSDirName string
71 :
72 : // FSDirInitialListing is a listing of FSDirName at the time of calling Open.
73 : //
74 : // This is an optional optimization to avoid double listing on Open when the
75 : // higher layer already has a listing. When nil, we obtain the listing on
76 : // Open.
77 : FSDirInitialListing []string
78 :
79 : // Cleaner cleans obsolete files from the local filesystem.
80 : //
81 : // The default cleaner uses the DeleteCleaner.
82 : FSCleaner base.Cleaner
83 :
84 : // NoSyncOnClose decides whether the implementation will enforce a
85 : // close-time synchronization (e.g., fdatasync() or sync_file_range())
86 : // on files it writes to. Setting this to true removes the guarantee for a
87 : // sync on close. Some implementations can still issue a non-blocking sync.
88 : NoSyncOnClose bool
89 :
90 : // BytesPerSync enables periodic syncing of files in order to smooth out
91 : // writes to disk. This option does not provide any persistence guarantee, but
92 : // is used to avoid latency spikes if the OS automatically decides to write
93 : // out a large chunk of dirty filesystem buffers.
94 : BytesPerSync int
95 :
96 : // Fields here are set only if the provider is to support remote objects
97 : // (experimental).
98 : Remote struct {
99 : StorageFactory remote.StorageFactory
100 :
101 : // If CreateOnShared is non-zero, sstables are created on remote storage using
102 : // the CreateOnSharedLocator (when the PreferSharedStorage create option is
103 : // true).
104 : CreateOnShared remote.CreateOnSharedStrategy
105 : CreateOnSharedLocator remote.Locator
106 :
107 : // CacheSizeBytes is the size of the on-disk block cache for objects
108 : // on remote storage. If it is 0, no cache is used.
109 : CacheSizeBytes int64
110 :
111 : // CacheBlockSize is the block size of the cache; if 0, the default of 32KB is used.
112 : CacheBlockSize int
113 :
114 : // ShardingBlockSize is the size of a shard block. The cache is split into contiguous
115 : // ShardingBlockSize units. The units are distributed across multiple independent shards
116 : // of the cache, via a hash(offset) modulo num shards operation. The cache replacement
117 : // policies operate at the level of shard, not whole cache. This is done to reduce lock
118 : // contention.
119 : //
120 : // If ShardingBlockSize is 0, the default of 1 MB is used.
121 : ShardingBlockSize int64
122 :
123 : // The number of independent shards the cache leverages. Each shard is the same size,
124 : // and a hash of filenum & offset map a read to a certain shard. If set to 0,
125 : // 2*runtime.GOMAXPROCS is used as the shard count.
126 : CacheShardCount int
127 :
128 : // TODO(radu): allow the cache to live on another FS/location (e.g. to use
129 : // instance-local SSD).
130 : }
131 : }
132 :
133 : // DefaultSettings initializes default settings (with no remote storage),
134 : // suitable for tests and tools.
135 0 : func DefaultSettings(fs vfs.FS, dirName string) Settings {
136 0 : return Settings{
137 0 : Logger: base.DefaultLogger,
138 0 : FS: fs,
139 0 : FSDirName: dirName,
140 0 : FSCleaner: base.DeleteCleaner{},
141 0 : NoSyncOnClose: false,
142 0 : BytesPerSync: 512 * 1024, // 512KB
143 0 : }
144 0 : }
145 :
146 : // Open creates the provider.
147 1 : func Open(settings Settings) (objstorage.Provider, error) {
148 1 : // Note: we can't just `return open(settings)` because in an error case we
149 1 : // would return (*provider)(nil) which is not objstorage.Provider(nil).
150 1 : p, err := open(settings)
151 1 : if err != nil {
152 0 : return nil, err
153 0 : }
154 1 : return p, nil
155 : }
156 :
157 1 : func open(settings Settings) (p *provider, _ error) {
158 1 : fsDir, err := settings.FS.OpenDir(settings.FSDirName)
159 1 : if err != nil {
160 0 : return nil, err
161 0 : }
162 :
163 1 : defer func() {
164 1 : if p == nil {
165 0 : fsDir.Close()
166 0 : }
167 : }()
168 :
169 1 : p = &provider{
170 1 : st: settings,
171 1 : fsDir: fsDir,
172 1 : }
173 1 : p.mu.knownObjects = make(map[base.DiskFileNum]objstorage.ObjectMetadata)
174 1 : p.mu.protectedObjects = make(map[base.DiskFileNum]int)
175 1 :
176 1 : if objiotracing.Enabled {
177 0 : p.tracer = objiotracing.Open(settings.FS, settings.FSDirName)
178 0 : }
179 :
180 : // Add local FS objects.
181 1 : if err := p.vfsInit(); err != nil {
182 0 : return nil, err
183 0 : }
184 :
185 : // Initialize remote subsystem (if configured) and add remote objects.
186 1 : if err := p.remoteInit(); err != nil {
187 0 : return nil, err
188 0 : }
189 :
190 1 : return p, nil
191 : }
192 :
193 : // Close is part of the objstorage.Provider interface.
194 1 : func (p *provider) Close() error {
195 1 : err := p.sharedClose()
196 1 : if p.fsDir != nil {
197 1 : err = firstError(err, p.fsDir.Close())
198 1 : p.fsDir = nil
199 1 : }
200 1 : if objiotracing.Enabled {
201 0 : if p.tracer != nil {
202 0 : p.tracer.Close()
203 0 : p.tracer = nil
204 0 : }
205 : }
206 1 : return err
207 : }
208 :
209 : // OpenForReading opens an existing object.
210 : func (p *provider) OpenForReading(
211 : ctx context.Context,
212 : fileType base.FileType,
213 : fileNum base.DiskFileNum,
214 : opts objstorage.OpenOptions,
215 1 : ) (objstorage.Readable, error) {
216 1 : meta, err := p.Lookup(fileType, fileNum)
217 1 : if err != nil {
218 0 : if opts.MustExist {
219 0 : p.st.Logger.Fatalf("%v", err)
220 0 : }
221 0 : return nil, err
222 : }
223 :
224 1 : var r objstorage.Readable
225 1 : if !meta.IsRemote() {
226 1 : r, err = p.vfsOpenForReading(ctx, fileType, fileNum, opts)
227 1 : } else {
228 1 : r, err = p.remoteOpenForReading(ctx, meta, opts)
229 1 : if err != nil && p.isNotExistError(meta, err) {
230 0 : // Wrap the error so that IsNotExistError functions properly.
231 0 : err = errors.Mark(err, os.ErrNotExist)
232 0 : }
233 : }
234 1 : if err != nil {
235 0 : return nil, err
236 0 : }
237 1 : if objiotracing.Enabled {
238 0 : r = p.tracer.WrapReadable(ctx, r, fileNum)
239 0 : }
240 1 : return r, nil
241 : }
242 :
243 : // Create creates a new object and opens it for writing.
244 : //
245 : // The object is not guaranteed to be durable (accessible in case of crashes)
246 : // until Sync is called.
247 : func (p *provider) Create(
248 : ctx context.Context,
249 : fileType base.FileType,
250 : fileNum base.DiskFileNum,
251 : opts objstorage.CreateOptions,
252 1 : ) (w objstorage.Writable, meta objstorage.ObjectMetadata, err error) {
253 1 : if opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone {
254 1 : w, meta, err = p.sharedCreate(ctx, fileType, fileNum, p.st.Remote.CreateOnSharedLocator, opts)
255 1 : } else {
256 1 : w, meta, err = p.vfsCreate(ctx, fileType, fileNum)
257 1 : }
258 1 : if err != nil {
259 0 : err = errors.Wrapf(err, "creating object %s", fileNum)
260 0 : return nil, objstorage.ObjectMetadata{}, err
261 0 : }
262 1 : p.addMetadata(meta)
263 1 : if objiotracing.Enabled {
264 0 : w = p.tracer.WrapWritable(ctx, w, fileNum)
265 0 : }
266 1 : return w, meta, nil
267 : }
268 :
269 : // Remove removes an object.
270 : //
271 : // Note that if the object is remote, the object is only (conceptually) removed
272 : // from this provider. If other providers have references on the remote object,
273 : // it will not be removed.
274 : //
275 : // The object is not guaranteed to be durably removed until Sync is called.
276 1 : func (p *provider) Remove(fileType base.FileType, fileNum base.DiskFileNum) error {
277 1 : meta, err := p.Lookup(fileType, fileNum)
278 1 : if err != nil {
279 0 : return err
280 0 : }
281 :
282 1 : if !meta.IsRemote() {
283 1 : err = p.vfsRemove(fileType, fileNum)
284 1 : } else {
285 1 : // TODO(radu): implement remote object removal (i.e. deref).
286 1 : err = p.sharedUnref(meta)
287 1 : if err != nil && p.isNotExistError(meta, err) {
288 0 : // Wrap the error so that IsNotExistError functions properly.
289 0 : err = errors.Mark(err, os.ErrNotExist)
290 0 : }
291 : }
292 1 : if err != nil && !p.IsNotExistError(err) {
293 0 : // We want to be able to retry a Remove, so we keep the object in our list.
294 0 : // TODO(radu): we should mark the object as "zombie" and not allow any other
295 0 : // operations.
296 0 : return errors.Wrapf(err, "removing object %s", fileNum)
297 0 : }
298 :
299 1 : p.removeMetadata(fileNum)
300 1 : return err
301 : }
302 :
303 0 : func (p *provider) isNotExistError(meta objstorage.ObjectMetadata, err error) bool {
304 0 : if meta.Remote.Storage != nil {
305 0 : return meta.Remote.Storage.IsNotExistError(err)
306 0 : }
307 0 : return oserror.IsNotExist(err)
308 : }
309 :
310 : // IsNotExistError is part of the objstorage.Provider interface.
311 1 : func (p *provider) IsNotExistError(err error) bool {
312 1 : // We use errors.Mark(err, os.ErrNotExist) for not-exist errors coming from
313 1 : // remote.Storage.
314 1 : return oserror.IsNotExist(err)
315 1 : }
316 :
317 : // Sync flushes the metadata from creation or removal of objects since the last Sync.
318 1 : func (p *provider) Sync() error {
319 1 : if err := p.vfsSync(); err != nil {
320 0 : return err
321 0 : }
322 1 : if err := p.sharedSync(); err != nil {
323 0 : return err
324 0 : }
325 1 : return nil
326 : }
327 :
328 : // LinkOrCopyFromLocal creates a new object that is either a copy of a given
329 : // local file or a hard link (if the new object is created on the same FS, and
330 : // if the FS supports it).
331 : //
332 : // The object is not guaranteed to be durable (accessible in case of crashes)
333 : // until Sync is called.
334 : func (p *provider) LinkOrCopyFromLocal(
335 : ctx context.Context,
336 : srcFS vfs.FS,
337 : srcFilePath string,
338 : dstFileType base.FileType,
339 : dstFileNum base.DiskFileNum,
340 : opts objstorage.CreateOptions,
341 1 : ) (objstorage.ObjectMetadata, error) {
342 1 : shared := opts.PreferSharedStorage && p.st.Remote.CreateOnShared != remote.CreateOnSharedNone
343 1 : if !shared && srcFS == p.st.FS {
344 1 : // Wrap the normal filesystem with one which wraps newly created files with
345 1 : // vfs.NewSyncingFile.
346 1 : fs := vfs.NewSyncingFS(p.st.FS, vfs.SyncingFileOptions{
347 1 : NoSyncOnClose: p.st.NoSyncOnClose,
348 1 : BytesPerSync: p.st.BytesPerSync,
349 1 : })
350 1 : dstPath := p.vfsPath(dstFileType, dstFileNum)
351 1 : if err := vfs.LinkOrCopy(fs, srcFilePath, dstPath); err != nil {
352 0 : return objstorage.ObjectMetadata{}, err
353 0 : }
354 :
355 1 : meta := objstorage.ObjectMetadata{
356 1 : DiskFileNum: dstFileNum,
357 1 : FileType: dstFileType,
358 1 : }
359 1 : p.addMetadata(meta)
360 1 : return meta, nil
361 : }
362 : // Create the object and copy the data.
363 1 : w, meta, err := p.Create(ctx, dstFileType, dstFileNum, opts)
364 1 : if err != nil {
365 0 : return objstorage.ObjectMetadata{}, err
366 0 : }
367 1 : f, err := srcFS.Open(srcFilePath, vfs.SequentialReadsOption)
368 1 : if err != nil {
369 0 : return objstorage.ObjectMetadata{}, err
370 0 : }
371 1 : defer f.Close()
372 1 : buf := make([]byte, 64*1024)
373 1 : for {
374 1 : n, readErr := f.Read(buf)
375 1 : if readErr != nil && readErr != io.EOF {
376 0 : w.Abort()
377 0 : return objstorage.ObjectMetadata{}, readErr
378 0 : }
379 :
380 1 : if n > 0 {
381 1 : if err := w.Write(buf[:n]); err != nil {
382 0 : w.Abort()
383 0 : return objstorage.ObjectMetadata{}, err
384 0 : }
385 : }
386 :
387 1 : if readErr == io.EOF {
388 1 : break
389 : }
390 : }
391 1 : if err := w.Finish(); err != nil {
392 0 : return objstorage.ObjectMetadata{}, err
393 0 : }
394 1 : return meta, nil
395 : }
396 :
397 : // Lookup is part of the objstorage.Provider interface.
398 : func (p *provider) Lookup(
399 : fileType base.FileType, fileNum base.DiskFileNum,
400 1 : ) (objstorage.ObjectMetadata, error) {
401 1 : p.mu.RLock()
402 1 : defer p.mu.RUnlock()
403 1 : meta, ok := p.mu.knownObjects[fileNum]
404 1 : if !ok {
405 0 : return objstorage.ObjectMetadata{}, errors.Wrapf(
406 0 : os.ErrNotExist,
407 0 : "file %s (type %d) unknown to the objstorage provider",
408 0 : fileNum, errors.Safe(fileType),
409 0 : )
410 0 : }
411 1 : if meta.FileType != fileType {
412 0 : return objstorage.ObjectMetadata{}, errors.AssertionFailedf(
413 0 : "file %s type mismatch (known type %d, expected type %d)",
414 0 : fileNum, errors.Safe(meta.FileType), errors.Safe(fileType),
415 0 : )
416 0 : }
417 1 : return meta, nil
418 : }
419 :
420 : // Path is part of the objstorage.Provider interface.
421 1 : func (p *provider) Path(meta objstorage.ObjectMetadata) string {
422 1 : if !meta.IsRemote() {
423 1 : return p.vfsPath(meta.FileType, meta.DiskFileNum)
424 1 : }
425 1 : return p.remotePath(meta)
426 : }
427 :
428 : // Size returns the size of the object.
429 1 : func (p *provider) Size(meta objstorage.ObjectMetadata) (int64, error) {
430 1 : if !meta.IsRemote() {
431 1 : return p.vfsSize(meta.FileType, meta.DiskFileNum)
432 1 : }
433 1 : return p.remoteSize(meta)
434 : }
435 :
436 : // List is part of the objstorage.Provider interface.
437 1 : func (p *provider) List() []objstorage.ObjectMetadata {
438 1 : p.mu.RLock()
439 1 : defer p.mu.RUnlock()
440 1 : res := make([]objstorage.ObjectMetadata, 0, len(p.mu.knownObjects))
441 1 : for _, meta := range p.mu.knownObjects {
442 1 : res = append(res, meta)
443 1 : }
444 1 : slices.SortFunc(res, func(a, b objstorage.ObjectMetadata) int {
445 1 : return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
446 1 : })
447 1 : return res
448 : }
449 :
450 : // Metrics is part of the objstorage.Provider interface.
451 1 : func (p *provider) Metrics() sharedcache.Metrics {
452 1 : if p.remote.cache != nil {
453 1 : return p.remote.cache.Metrics()
454 1 : }
455 1 : return sharedcache.Metrics{}
456 : }
457 :
458 1 : func (p *provider) addMetadata(meta objstorage.ObjectMetadata) {
459 1 : if invariants.Enabled {
460 1 : meta.AssertValid()
461 1 : }
462 1 : p.mu.Lock()
463 1 : defer p.mu.Unlock()
464 1 : p.mu.knownObjects[meta.DiskFileNum] = meta
465 1 : if meta.IsRemote() {
466 1 : p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
467 1 : FileNum: meta.DiskFileNum,
468 1 : FileType: meta.FileType,
469 1 : CreatorID: meta.Remote.CreatorID,
470 1 : CreatorFileNum: meta.Remote.CreatorFileNum,
471 1 : Locator: meta.Remote.Locator,
472 1 : CleanupMethod: meta.Remote.CleanupMethod,
473 1 : CustomObjectName: meta.Remote.CustomObjectName,
474 1 : })
475 1 : } else {
476 1 : p.mu.localObjectsChanged = true
477 1 : }
478 : }
479 :
480 1 : func (p *provider) removeMetadata(fileNum base.DiskFileNum) {
481 1 : p.mu.Lock()
482 1 : defer p.mu.Unlock()
483 1 :
484 1 : meta, ok := p.mu.knownObjects[fileNum]
485 1 : if !ok {
486 0 : return
487 0 : }
488 1 : delete(p.mu.knownObjects, fileNum)
489 1 : if meta.IsRemote() {
490 1 : p.mu.remote.catalogBatch.DeleteObject(fileNum)
491 1 : } else {
492 1 : p.mu.localObjectsChanged = true
493 1 : }
494 : }
495 :
496 : // protectObject prevents the unreferencing of a remote object until
497 : // unprotectObject is called.
498 0 : func (p *provider) protectObject(fileNum base.DiskFileNum) {
499 0 : p.mu.Lock()
500 0 : defer p.mu.Unlock()
501 0 : p.mu.protectedObjects[fileNum] = p.mu.protectedObjects[fileNum] + 1
502 0 : }
503 :
504 0 : func (p *provider) unprotectObject(fileNum base.DiskFileNum) {
505 0 : p.mu.Lock()
506 0 : defer p.mu.Unlock()
507 0 : v := p.mu.protectedObjects[fileNum]
508 0 : if invariants.Enabled && v == 0 {
509 0 : panic("invalid protection count")
510 : }
511 0 : if v > 1 {
512 0 : p.mu.protectedObjects[fileNum] = v - 1
513 0 : } else {
514 0 : delete(p.mu.protectedObjects, fileNum)
515 0 : // TODO(radu): check if the object is still in knownObject; if not, unref it
516 0 : // now.
517 0 : }
518 : }
519 :
520 1 : func (p *provider) isProtected(fileNum base.DiskFileNum) bool {
521 1 : p.mu.Lock()
522 1 : defer p.mu.Unlock()
523 1 : return p.mu.protectedObjects[fileNum] > 0
524 1 : }
|