Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package objstorageprovider
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "io"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/objstorage"
15 : "github.com/cockroachdb/pebble/objstorage/remote"
16 : )
17 :
18 : const (
19 : tagCreatorID = 1
20 : tagCreatorFileNum = 2
21 : tagCleanupMethod = 3
22 : // tagRefCheckID encodes the information for a ref marker that needs to be
23 : // checked when attaching this object to another provider. This is set to the
24 : // creator ID and FileNum for the provider that encodes the backing, and
25 : // allows the "target" provider to check that the "source" provider kept its
26 : // reference on the object alive.
27 : tagRefCheckID = 4
28 : // tagLocator encodes the remote.Locator; if absent the locator is "". It is
29 : // followed by the locator string length and the locator string.
30 : tagLocator = 5
31 : // tagLocator encodes a custom object name (if present). It is followed by the
32 : // custom name string length and the string.
33 : tagCustomObjectName = 6
34 :
35 : // Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
36 : // followed by the length of the data (so they can be skipped).
37 :
38 : // Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
39 : // they are encountered by earlier code that doesn't know the tag.
40 : tagNotSafeToIgnoreMask = 64
41 : )
42 :
43 : func (p *provider) encodeRemoteObjectBacking(
44 : meta *objstorage.ObjectMetadata,
45 2 : ) (objstorage.RemoteObjectBacking, error) {
46 2 : if !meta.IsRemote() {
47 1 : return nil, errors.AssertionFailedf("object %s not on remote storage", meta.DiskFileNum)
48 1 : }
49 :
50 2 : buf := make([]byte, 0, binary.MaxVarintLen64*4)
51 2 : buf = binary.AppendUvarint(buf, tagCreatorID)
52 2 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorID))
53 2 : // TODO(radu): encode file type as well?
54 2 : buf = binary.AppendUvarint(buf, tagCreatorFileNum)
55 2 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorFileNum))
56 2 : buf = binary.AppendUvarint(buf, tagCleanupMethod)
57 2 : buf = binary.AppendUvarint(buf, uint64(meta.Remote.CleanupMethod))
58 2 : if meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
59 2 : buf = binary.AppendUvarint(buf, tagRefCheckID)
60 2 : buf = binary.AppendUvarint(buf, uint64(p.remote.shared.creatorID))
61 2 : buf = binary.AppendUvarint(buf, uint64(meta.DiskFileNum))
62 2 : }
63 2 : if meta.Remote.Locator != "" {
64 1 : buf = binary.AppendUvarint(buf, tagLocator)
65 1 : buf = encodeString(buf, string(meta.Remote.Locator))
66 1 : }
67 2 : if meta.Remote.CustomObjectName != "" {
68 1 : buf = binary.AppendUvarint(buf, tagCustomObjectName)
69 1 : buf = encodeString(buf, meta.Remote.CustomObjectName)
70 1 : }
71 2 : return buf, nil
72 : }
73 :
74 : type remoteObjectBackingHandle struct {
75 : backing objstorage.RemoteObjectBacking
76 : fileNum base.DiskFileNum
77 : p *provider
78 : }
79 :
80 2 : func (s *remoteObjectBackingHandle) Get() (objstorage.RemoteObjectBacking, error) {
81 2 : if s.backing == nil {
82 1 : return nil, errors.Errorf("RemoteObjectBackingHandle.Get() called after Close()")
83 1 : }
84 2 : return s.backing, nil
85 : }
86 :
87 1 : func (s *remoteObjectBackingHandle) Close() {
88 1 : if s.backing != nil {
89 1 : s.backing = nil
90 1 : s.p.unprotectObject(s.fileNum)
91 1 : }
92 : }
93 :
94 : var _ objstorage.RemoteObjectBackingHandle = (*remoteObjectBackingHandle)(nil)
95 :
96 : // RemoteObjectBacking is part of the objstorage.Provider interface.
97 : func (p *provider) RemoteObjectBacking(
98 : meta *objstorage.ObjectMetadata,
99 2 : ) (objstorage.RemoteObjectBackingHandle, error) {
100 2 : backing, err := p.encodeRemoteObjectBacking(meta)
101 2 : if err != nil {
102 1 : return nil, err
103 1 : }
104 2 : p.protectObject(meta.DiskFileNum)
105 2 : return &remoteObjectBackingHandle{
106 2 : backing: backing,
107 2 : fileNum: meta.DiskFileNum,
108 2 : p: p,
109 2 : }, nil
110 : }
111 :
112 : // CreateExternalObjectBacking is part of the objstorage.Provider interface.
113 : func (p *provider) CreateExternalObjectBacking(
114 : locator remote.Locator, objName string,
115 1 : ) (objstorage.RemoteObjectBacking, error) {
116 1 : var meta objstorage.ObjectMetadata
117 1 : meta.Remote.Locator = locator
118 1 : meta.Remote.CustomObjectName = objName
119 1 : meta.Remote.CleanupMethod = objstorage.SharedNoCleanup
120 1 : return p.encodeRemoteObjectBacking(&meta)
121 1 : }
122 :
123 : type decodedBacking struct {
124 : meta objstorage.ObjectMetadata
125 : // refToCheck is set only when meta.Remote.CleanupMethod is RefTracking
126 : refToCheck struct {
127 : creatorID objstorage.CreatorID
128 : fileNum base.DiskFileNum
129 : }
130 : }
131 :
132 : // decodeRemoteObjectBacking decodes the remote object metadata.
133 : //
134 : // Note that the meta.Remote.Storage field is not set.
135 : func decodeRemoteObjectBacking(
136 : fileType base.FileType, fileNum base.DiskFileNum, buf objstorage.RemoteObjectBacking,
137 2 : ) (decodedBacking, error) {
138 2 : var creatorID, creatorFileNum, cleanupMethod, refCheckCreatorID, refCheckFileNum uint64
139 2 : var locator, customObjName string
140 2 : br := bytes.NewReader(buf)
141 2 : for {
142 2 : tag, err := binary.ReadUvarint(br)
143 2 : if err == io.EOF {
144 2 : break
145 : }
146 2 : if err != nil {
147 0 : return decodedBacking{}, err
148 0 : }
149 2 : switch tag {
150 2 : case tagCreatorID:
151 2 : creatorID, err = binary.ReadUvarint(br)
152 :
153 2 : case tagCreatorFileNum:
154 2 : creatorFileNum, err = binary.ReadUvarint(br)
155 :
156 2 : case tagCleanupMethod:
157 2 : cleanupMethod, err = binary.ReadUvarint(br)
158 :
159 2 : case tagRefCheckID:
160 2 : refCheckCreatorID, err = binary.ReadUvarint(br)
161 2 : if err == nil {
162 2 : refCheckFileNum, err = binary.ReadUvarint(br)
163 2 : }
164 :
165 1 : case tagLocator:
166 1 : locator, err = decodeString(br)
167 :
168 1 : case tagCustomObjectName:
169 1 : customObjName, err = decodeString(br)
170 :
171 1 : default:
172 1 : // Ignore unknown tags, unless they're not safe to ignore.
173 1 : if tag&tagNotSafeToIgnoreMask != 0 {
174 1 : return decodedBacking{}, errors.Newf("unknown tag %d", tag)
175 1 : }
176 1 : var dataLen uint64
177 1 : dataLen, err = binary.ReadUvarint(br)
178 1 : if err == nil {
179 1 : _, err = br.Seek(int64(dataLen), io.SeekCurrent)
180 1 : }
181 : }
182 2 : if err != nil {
183 0 : return decodedBacking{}, err
184 0 : }
185 : }
186 2 : if customObjName == "" {
187 2 : if creatorID == 0 {
188 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator ID")
189 0 : }
190 2 : if creatorFileNum == 0 {
191 0 : return decodedBacking{}, errors.Newf("remote object backing missing creator file num")
192 0 : }
193 : }
194 2 : var res decodedBacking
195 2 : res.meta.DiskFileNum = fileNum
196 2 : res.meta.FileType = fileType
197 2 : res.meta.Remote.CreatorID = objstorage.CreatorID(creatorID)
198 2 : res.meta.Remote.CreatorFileNum = base.DiskFileNum(creatorFileNum)
199 2 : res.meta.Remote.CleanupMethod = objstorage.SharedCleanupMethod(cleanupMethod)
200 2 : if res.meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
201 2 : if refCheckCreatorID == 0 || refCheckFileNum == 0 {
202 0 : return decodedBacking{}, errors.Newf("remote object backing missing ref to check")
203 0 : }
204 2 : res.refToCheck.creatorID = objstorage.CreatorID(refCheckCreatorID)
205 2 : res.refToCheck.fileNum = base.DiskFileNum(refCheckFileNum)
206 : }
207 2 : res.meta.Remote.Locator = remote.Locator(locator)
208 2 : res.meta.Remote.CustomObjectName = customObjName
209 2 : return res, nil
210 : }
211 :
212 1 : func encodeString(buf []byte, s string) []byte {
213 1 : buf = binary.AppendUvarint(buf, uint64(len(s)))
214 1 : buf = append(buf, []byte(s)...)
215 1 : return buf
216 1 : }
217 :
218 1 : func decodeString(br io.ByteReader) (string, error) {
219 1 : length, err := binary.ReadUvarint(br)
220 1 : if err != nil || length == 0 {
221 0 : return "", err
222 0 : }
223 1 : buf := make([]byte, length)
224 1 : for i := range buf {
225 1 : buf[i], err = br.ReadByte()
226 1 : if err != nil {
227 0 : return "", err
228 0 : }
229 : }
230 1 : return string(buf), nil
231 : }
232 :
233 : // AttachRemoteObjects is part of the objstorage.Provider interface.
234 : func (p *provider) AttachRemoteObjects(
235 : objs []objstorage.RemoteObjectToAttach,
236 2 : ) ([]objstorage.ObjectMetadata, error) {
237 2 : decoded := make([]decodedBacking, len(objs))
238 2 : for i, o := range objs {
239 2 : var err error
240 2 : decoded[i], err = decodeRemoteObjectBacking(o.FileType, o.FileNum, o.Backing)
241 2 : if err != nil {
242 0 : return nil, err
243 0 : }
244 2 : decoded[i].meta.Remote.Storage, err = p.ensureStorage(decoded[i].meta.Remote.Locator)
245 2 : if err != nil {
246 1 : return nil, err
247 1 : }
248 : }
249 :
250 : // Create the reference marker objects.
251 : // TODO(radu): parallelize this.
252 2 : for _, d := range decoded {
253 2 : if d.meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
254 1 : continue
255 : }
256 2 : if err := p.sharedCreateRef(d.meta); err != nil {
257 0 : // TODO(radu): clean up references previously created in this loop.
258 0 : return nil, err
259 0 : }
260 : // Check the "origin"'s reference.
261 2 : refName := sharedObjectRefName(d.meta, d.refToCheck.creatorID, d.refToCheck.fileNum)
262 2 : if _, err := d.meta.Remote.Storage.Size(refName); err != nil {
263 1 : _ = p.sharedUnref(d.meta)
264 1 : // TODO(radu): clean up references previously created in this loop.
265 1 : if d.meta.Remote.Storage.IsNotExistError(err) {
266 1 : return nil, errors.Errorf("origin marker object %q does not exist;"+
267 1 : " object probably removed from the provider which created the backing", refName)
268 1 : }
269 0 : return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)
270 : }
271 : }
272 :
273 2 : metas := make([]objstorage.ObjectMetadata, len(decoded))
274 2 : for i, d := range decoded {
275 2 : metas[i] = d.meta
276 2 : }
277 :
278 2 : p.mu.Lock()
279 2 : defer p.mu.Unlock()
280 2 : for i := range metas {
281 2 : p.addMetadataLocked(metas[i])
282 2 : }
283 2 : return metas, nil
284 : }
|