Line data Source code
1 : #include "fd_vinyl_io.h"
2 :
3 : #include <errno.h>
4 : #include <unistd.h>
5 :
6 : struct fd_vinyl_io_bd_rd;
7 : typedef struct fd_vinyl_io_bd_rd fd_vinyl_io_bd_rd_t;
8 :
9 : struct fd_vinyl_io_bd_rd {
10 : ulong ctx; /* Must mirror fd_vinyl_io_rd_t */
11 : ulong seq; /* " */
12 : void * dst; /* " */
13 : ulong sz; /* " */
14 : fd_vinyl_io_bd_rd_t * next; /* Next element in bd rd queue */
15 : };
16 :
17 : struct fd_vinyl_io_bd {
18 : fd_vinyl_io_t base[1];
19 : int dev_fd; /* File descriptor of block device */
20 : ulong dev_sync; /* Offset to block that holds bstream sync (BLOCK_SZ multiple) */
21 : ulong dev_base; /* Offset to first block (BLOCK_SZ multiple) */
22 : ulong dev_sz; /* Block store byte size (BLOCK_SZ multiple) */
23 : fd_vinyl_io_bd_rd_t * rd_head; /* Pointer to queue head */
24 : fd_vinyl_io_bd_rd_t ** rd_tail_next; /* Pointer to queue &tail->next or &rd_head if empty. */
25 : fd_vinyl_bstream_block_t sync[1];
26 : /* spad_max bytes follow */
27 : };
28 :
29 : typedef struct fd_vinyl_io_bd fd_vinyl_io_bd_t;
30 :
31 : static inline void
32 : bd_read( fd_vinyl_io_bd_t * bd,
33 : ulong off,
34 : void * buf,
35 0 : ulong sz ) {
36 0 : int fd = bd->dev_fd;
37 0 : bd->base->file_read_cnt++;
38 0 : bd->base->file_read_tot_sz += sz;
39 0 : ssize_t ssz = pread( fd, buf, sz, (off_t)off );
40 0 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
41 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
42 0 : /**/ FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
43 0 : }
44 :
45 : static inline void
46 : bd_write( fd_vinyl_io_bd_t * bd,
47 : ulong off,
48 : void const * buf,
49 0 : ulong sz ) {
50 0 : int fd = bd->dev_fd;
51 0 : bd->base->file_write_cnt++;
52 0 : bd->base->file_write_tot_sz += sz;
53 0 : ssize_t ssz = pwrite( fd, buf, sz, (off_t)off );
54 0 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
55 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
56 0 : else FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
57 0 : }
58 :
59 : static void
60 : fd_vinyl_io_bd_read_imm( fd_vinyl_io_t * io,
61 : ulong seq0,
62 : void * _dst,
63 0 : ulong sz ) {
64 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
65 :
66 : /* If this is a request to read nothing, succeed immediately. If
67 : this is a request to read outside the bstream's past, fail. */
68 :
69 0 : if( FD_UNLIKELY( !sz ) ) return;
70 :
71 0 : uchar * dst = (uchar *)_dst;
72 0 : ulong seq1 = seq0 + sz;
73 :
74 0 : ulong seq_past = bd->base->seq_past;
75 0 : ulong seq_present = bd->base->seq_present;
76 :
77 0 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
78 0 : int bad_dst = !dst;
79 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
80 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
81 :
82 0 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
83 0 : FD_LOG_CRIT(( "bstream read_imm [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
84 0 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
85 0 : bad_seq ? "misaligned seq" :
86 0 : bad_dst ? "NULL dst" :
87 0 : bad_sz ? "misaligned sz" :
88 0 : "not in past" ));
89 :
90 : /* At this point, we have a valid read request. Map seq0 into the
91 : bstream store. Read the lesser of sz bytes or until the store end.
92 : If we hit the store end with more to go, wrap around and finish the
93 : read at the store start. */
94 :
95 0 : ulong dev_base = bd->dev_base;
96 0 : ulong dev_sz = bd->dev_sz;
97 :
98 0 : ulong dev_off = seq0 % dev_sz;
99 :
100 0 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
101 0 : bd_read( bd, dev_base + dev_off, dst, rsz );
102 0 : sz -= rsz;
103 :
104 0 : if( FD_UNLIKELY( sz ) ) bd_read( bd, dev_base, dst + rsz, sz );
105 0 : }
106 :
107 : static void
108 : fd_vinyl_io_bd_read( fd_vinyl_io_t * io,
109 0 : fd_vinyl_io_rd_t * _rd ) {
110 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *) io; /* Note: io must be non-NULL to have even been called */
111 0 : fd_vinyl_io_bd_rd_t * rd = (fd_vinyl_io_bd_rd_t *)_rd;
112 :
113 0 : rd->next = NULL;
114 0 : *bd->rd_tail_next = rd;
115 0 : bd->rd_tail_next = &rd->next;
116 :
117 0 : ulong seq0 = rd->seq;
118 0 : uchar * dst = (uchar *)rd->dst;
119 0 : ulong sz = rd->sz;
120 :
121 : /* If this is a request to read nothing, succeed immediately. If
122 : this is a request to read outside the bstream's past, fail. */
123 :
124 0 : if( FD_UNLIKELY( !sz ) ) return;
125 :
126 0 : ulong seq1 = seq0 + sz;
127 :
128 0 : ulong seq_past = bd->base->seq_past;
129 0 : ulong seq_present = bd->base->seq_present;
130 :
131 0 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
132 0 : int bad_dst = !dst;
133 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
134 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
135 :
136 0 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
137 0 : FD_LOG_CRIT(( "bstream read [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
138 0 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
139 0 : bad_seq ? "misaligned seq" :
140 0 : bad_dst ? "NULL dst" :
141 0 : bad_sz ? "misaligned sz" :
142 0 : "not in past" ));
143 :
144 : /* At this point, we have a valid read request. Map seq0 into the
145 : bstream store. Read the lesser of sz bytes or until the store end.
146 : If we hit the store end with more to go, wrap around and finish the
147 : read at the store start. */
148 :
149 0 : ulong dev_base = bd->dev_base;
150 0 : ulong dev_sz = bd->dev_sz;
151 :
152 0 : ulong dev_off = seq0 % dev_sz;
153 :
154 0 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
155 0 : bd_read( bd, dev_base + dev_off, dst, rsz );
156 0 : sz -= rsz;
157 :
158 0 : if( FD_UNLIKELY( sz ) ) bd_read( bd, dev_base, dst + rsz, sz );
159 0 : }
160 :
161 : static int
162 : fd_vinyl_io_bd_poll( fd_vinyl_io_t * io,
163 : fd_vinyl_io_rd_t ** _rd,
164 0 : int flags ) {
165 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t * )io; /* Note: io must be non-NULL to have even been called */
166 0 : (void)flags;
167 :
168 0 : fd_vinyl_io_bd_rd_t * rd = bd->rd_head;
169 :
170 0 : if( FD_UNLIKELY( !rd ) ) {
171 0 : *_rd = NULL;
172 0 : return FD_VINYL_ERR_EMPTY;
173 0 : }
174 :
175 0 : fd_vinyl_io_bd_rd_t ** rd_tail_next = bd->rd_tail_next;
176 0 : fd_vinyl_io_bd_rd_t * rd_next = rd->next;
177 :
178 0 : bd->rd_head = rd_next;
179 0 : bd->rd_tail_next = fd_ptr_if( !!rd_next, rd_tail_next, &bd->rd_head );
180 :
181 0 : *_rd = (fd_vinyl_io_rd_t *)rd;
182 0 : return FD_VINYL_SUCCESS;
183 0 : }
184 :
185 : static ulong
186 : fd_vinyl_io_bd_append( fd_vinyl_io_t * io,
187 : void const * _src,
188 0 : ulong sz ) {
189 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
190 0 : uchar const * src = (uchar const *)_src;
191 :
192 : /* Validate the input args. */
193 :
194 0 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
195 0 : ulong seq_ancient = bd->base->seq_ancient;
196 0 : ulong dev_base = bd->dev_base;
197 0 : ulong dev_sz = bd->dev_sz;
198 :
199 0 : int bad_src = !src;
200 0 : int bad_align = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
201 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
202 0 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
203 :
204 0 : if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
205 0 : FD_LOG_CRIT(( bad_src ? "NULL src" :
206 0 : bad_align ? "misaligned src" :
207 0 : bad_sz ? "misaligned sz" :
208 0 : "device full" ));
209 :
210 : /* At this point, we appear to have a valid append request. Map it to
211 : the bstream (updating seq_future) and map it to the device. Then
212 : write the lesser of sz bytes or until the store end. If we hit the
213 : store end with more to go, wrap around and finish the write at the
214 : store start. */
215 :
216 0 : ulong seq = seq_future;
217 0 : bd->base->seq_future = seq + sz;
218 :
219 0 : ulong dev_off = seq % dev_sz;
220 :
221 0 : ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
222 0 : bd_write( bd, dev_base + dev_off, src, wsz );
223 0 : sz -= wsz;
224 0 : if( sz ) bd_write( bd, dev_base, src + wsz, sz );
225 :
226 0 : return seq;
227 0 : }
228 :
229 : static int
230 : fd_vinyl_io_bd_commit( fd_vinyl_io_t * io,
231 0 : int flags ) {
232 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
233 0 : (void)flags;
234 :
235 0 : bd->base->seq_present = bd->base->seq_future;
236 0 : bd->base->spad_used = 0UL;
237 :
238 0 : return FD_VINYL_SUCCESS;
239 0 : }
240 :
241 : static ulong
242 : fd_vinyl_io_bd_hint( fd_vinyl_io_t * io,
243 0 : ulong sz ) {
244 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
245 :
246 0 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
247 0 : ulong seq_ancient = bd->base->seq_ancient;
248 0 : ulong dev_sz = bd->dev_sz;
249 :
250 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
251 0 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
252 :
253 0 : if( FD_UNLIKELY( bad_sz | bad_capacity ) ) FD_LOG_CRIT(( bad_sz ? "misaligned sz" : "device full" ));
254 :
255 0 : return bd->base->seq_future;
256 0 : }
257 :
258 : static void *
259 : fd_vinyl_io_bd_alloc( fd_vinyl_io_t * io,
260 : ulong sz,
261 0 : int flags ) {
262 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
263 :
264 0 : ulong spad_max = bd->base->spad_max;
265 0 : ulong spad_used = bd->base->spad_used; if( FD_UNLIKELY( !sz ) ) return ((uchar *)(bd+1)) + spad_used;
266 :
267 0 : int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
268 0 : int bad_sz = sz > spad_max;
269 :
270 0 : if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
271 :
272 0 : if( FD_UNLIKELY( sz > (spad_max - spad_used ) ) ) {
273 0 : if( FD_UNLIKELY( fd_vinyl_io_bd_commit( io, flags ) ) ) return NULL;
274 0 : spad_used = 0UL;
275 0 : }
276 :
277 0 : bd->base->spad_used = spad_used + sz;
278 :
279 0 : return ((uchar *)(bd+1)) + spad_used;
280 0 : }
281 :
282 : static ulong
283 : fd_vinyl_io_bd_copy( fd_vinyl_io_t * io,
284 : ulong seq_src0,
285 0 : ulong sz ) {
286 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
287 :
288 : /* Validate the input args */
289 :
290 0 : ulong seq_ancient = bd->base->seq_ancient;
291 0 : ulong seq_past = bd->base->seq_past;
292 0 : ulong seq_present = bd->base->seq_present;
293 0 : ulong seq_future = bd->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
294 0 : ulong spad_max = bd->base->spad_max;
295 0 : ulong spad_used = bd->base->spad_used;
296 0 : ulong dev_base = bd->dev_base;
297 0 : ulong dev_sz = bd->dev_sz;
298 :
299 0 : ulong seq_src1 = seq_src0 + sz;
300 :
301 0 : int bad_past = !( fd_vinyl_seq_le( seq_past, seq_src0 ) &
302 0 : fd_vinyl_seq_lt( seq_src0, seq_src1 ) &
303 0 : fd_vinyl_seq_le( seq_src1, seq_present ) );
304 0 : int bad_src = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
305 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
306 0 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
307 :
308 0 : if( FD_UNLIKELY( bad_past | bad_src | bad_sz | bad_capacity ) )
309 0 : FD_LOG_CRIT(( bad_past ? "src is not in the past" :
310 0 : bad_src ? "misaligned src_seq" :
311 0 : bad_sz ? "misaligned sz" :
312 0 : "device full" ));
313 :
314 : /* At this point, we appear to have a valid copy request. Get
315 : buffer space from the scratch pad (committing as necessary). */
316 :
317 0 : if( FD_UNLIKELY( sz>(spad_max-spad_used) ) ) {
318 0 : fd_vinyl_io_bd_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
319 0 : spad_used = 0UL;
320 0 : }
321 :
322 0 : uchar * buf = (uchar *)(bd+1) + spad_used;
323 0 : ulong buf_max = spad_max - spad_used;
324 :
325 : /* Map the dst to the bstream (updating seq_future) and map the src
326 : and dst regions onto the device. Then copy as much as we can at a
327 : time, handling device wrap around and copy buffering space. */
328 :
329 0 : ulong seq = seq_future;
330 0 : bd->base->seq_future = seq + sz;
331 :
332 0 : ulong seq_dst0 = seq;
333 :
334 0 : for(;;) {
335 0 : ulong src_off = seq_src0 % dev_sz;
336 0 : ulong dst_off = seq_dst0 % dev_sz;
337 0 : ulong csz = fd_ulong_min( fd_ulong_min( sz, buf_max ), fd_ulong_min( dev_sz - src_off, dev_sz - dst_off ) );
338 :
339 0 : bd_read ( bd, dev_base + src_off, buf, csz );
340 0 : bd_write( bd, dev_base + dst_off, buf, csz );
341 :
342 0 : sz -= csz;
343 0 : if( !sz ) break;
344 :
345 0 : seq_src0 += csz;
346 0 : seq_dst0 += csz;
347 0 : }
348 :
349 0 : return seq;
350 0 : }
351 :
352 : static void
353 : fd_vinyl_io_bd_forget( fd_vinyl_io_t * io,
354 0 : ulong seq ) {
355 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
356 :
357 : /* Validate input arguments. Note that we don't allow forgetting into
358 : the future even when we have no uncommitted blocks because the
359 : resulting [seq_ancient,seq_future) might contain blocks that were
360 : never written (which might not be an issue practically but it would
361 : be a bit strange for something to try to scan starting from
362 : seq_ancient and discover unwritten blocks). */
363 :
364 0 : ulong seq_past = bd->base->seq_past;
365 0 : ulong seq_present = bd->base->seq_present;
366 0 : ulong seq_future = bd->base->seq_future;
367 :
368 0 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
369 0 : int bad_dir = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
370 0 : int bad_read = !!bd->rd_head;
371 0 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
372 :
373 0 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
374 0 : FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
375 0 : seq, seq_past, seq_present, seq_present-seq_past,
376 0 : bad_seq ? "misaligned seq" :
377 0 : bad_dir ? "seq out of bounds" :
378 0 : bad_read ? "reads in progress" :
379 0 : "appends/copies in progress" ));
380 :
381 0 : bd->base->seq_past = seq;
382 0 : }
383 :
384 : static void
385 : fd_vinyl_io_bd_rewind( fd_vinyl_io_t * io,
386 0 : ulong seq ) {
387 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
388 :
389 : /* Validate input argments. Unlike forgot, we do allow rewinding to
390 : before seq_ancient as the region of sequence space reported to the
391 : caller as written is still accurate. */
392 :
393 0 : ulong seq_ancient = bd->base->seq_ancient;
394 0 : ulong seq_past = bd->base->seq_past;
395 0 : ulong seq_present = bd->base->seq_present;
396 0 : ulong seq_future = bd->base->seq_future;
397 :
398 0 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
399 0 : int bad_dir = fd_vinyl_seq_gt( seq, seq_present );
400 0 : int bad_read = !!bd->rd_head;
401 0 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
402 :
403 0 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
404 0 : FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
405 0 : bad_seq ? "misaligned seq" :
406 0 : bad_dir ? "seq after seq_present" :
407 0 : bad_read ? "reads in progress" :
408 0 : "appends/copies in progress" ));
409 :
410 0 : bd->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
411 0 : bd->base->seq_past = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past ), seq_past, seq );
412 0 : bd->base->seq_present = seq;
413 0 : bd->base->seq_future = seq;
414 0 : }
415 :
416 : static int
417 : fd_vinyl_io_bd_sync( fd_vinyl_io_t * io,
418 0 : int flags ) {
419 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
420 0 : (void)flags;
421 :
422 0 : ulong seed = bd->base->seed;
423 0 : ulong seq_past = bd->base->seq_past;
424 0 : ulong seq_present = bd->base->seq_present;
425 :
426 0 : ulong dev_sync = bd->dev_sync;
427 :
428 0 : fd_vinyl_bstream_block_t * block = bd->sync;
429 :
430 : /* block->sync.ctl current (static) */
431 0 : block->sync.seq_past = seq_past;
432 0 : block->sync.seq_present = seq_present;
433 : /* block->sync.info_sz current (static) */
434 : /* block->sync.info current (static) */
435 :
436 0 : block->sync.hash_trail = 0UL;
437 0 : block->sync.hash_blocks = 0UL;
438 0 : fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
439 :
440 0 : bd_write( bd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
441 :
442 0 : bd->base->seq_ancient = seq_past;
443 :
444 0 : return FD_VINYL_SUCCESS;
445 0 : }
446 :
447 : static void *
448 0 : fd_vinyl_io_bd_fini( fd_vinyl_io_t * io ) {
449 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)io; /* Note: io must be non-NULL to have even been called */
450 :
451 0 : ulong seq_present = bd->base->seq_present;
452 0 : ulong seq_future = bd->base->seq_future;
453 :
454 0 : if( FD_UNLIKELY( bd->rd_head ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
455 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
456 :
457 0 : return io;
458 0 : }
459 :
460 : static fd_vinyl_io_impl_t fd_vinyl_io_bd_impl[1] = { {
461 : fd_vinyl_io_bd_read_imm,
462 : fd_vinyl_io_bd_read,
463 : fd_vinyl_io_bd_poll,
464 : fd_vinyl_io_bd_append,
465 : fd_vinyl_io_bd_commit,
466 : fd_vinyl_io_bd_hint,
467 : fd_vinyl_io_bd_alloc,
468 : fd_vinyl_io_bd_copy,
469 : fd_vinyl_io_bd_forget,
470 : fd_vinyl_io_bd_rewind,
471 : fd_vinyl_io_bd_sync,
472 : fd_vinyl_io_bd_fini
473 : } };
474 :
475 : FD_STATIC_ASSERT( alignof(fd_vinyl_io_bd_t)==FD_VINYL_BSTREAM_BLOCK_SZ, layout );
476 :
477 : ulong
478 0 : fd_vinyl_io_bd_align( void ) {
479 0 : return alignof(fd_vinyl_io_bd_t);
480 0 : }
481 :
482 : ulong
483 0 : fd_vinyl_io_bd_footprint( ulong spad_max ) {
484 0 : if( FD_UNLIKELY( !((0UL<spad_max) & (spad_max<(1UL<<63)) & fd_ulong_is_aligned( spad_max, FD_VINYL_BSTREAM_BLOCK_SZ )) ) )
485 0 : return 0UL;
486 0 : return sizeof(fd_vinyl_io_bd_t) + spad_max;
487 0 : }
488 :
489 : fd_vinyl_io_t *
490 : fd_vinyl_io_bd_init( void * mem,
491 : ulong spad_max,
492 : int dev_fd,
493 : int reset,
494 : void const * info,
495 : ulong info_sz,
496 0 : ulong io_seed ) {
497 0 : fd_vinyl_io_bd_t * bd = (fd_vinyl_io_bd_t *)mem;
498 :
499 0 : if( FD_UNLIKELY( !bd ) ) {
500 0 : FD_LOG_WARNING(( "NULL mem" ));
501 0 : return NULL;
502 0 : }
503 :
504 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)bd, fd_vinyl_io_bd_align() ) ) ) {
505 0 : FD_LOG_WARNING(( "misaligned mem" ));
506 0 : return NULL;
507 0 : }
508 :
509 0 : ulong footprint = fd_vinyl_io_bd_footprint( spad_max );
510 0 : if( FD_UNLIKELY( !footprint ) ) {
511 0 : FD_LOG_WARNING(( "bad spad_max" ));
512 0 : return NULL;
513 0 : }
514 :
515 0 : off_t _dev_sz = lseek( dev_fd, (off_t)0, SEEK_END );
516 0 : if( FD_UNLIKELY( _dev_sz<(off_t)0 ) ) {
517 0 : FD_LOG_WARNING(( "lseek failed, bstream must be seekable (%i-%s)", errno, fd_io_strerror( errno ) ));
518 0 : return NULL;
519 0 : }
520 0 : ulong dev_sz = (ulong)_dev_sz;
521 :
522 : ulong dev_sz_min = 3UL*FD_VINYL_BSTREAM_BLOCK_SZ /* sync block, move block, closing partition */
523 0 : + fd_vinyl_bstream_pair_sz( FD_VINYL_VAL_MAX ); /* worst case pair (FIXME: LZ4_COMPRESSBOUND?) */
524 :
525 0 : int too_small = dev_sz < dev_sz_min;
526 0 : int too_large = dev_sz > (ulong)LONG_MAX;
527 0 : int misaligned = !fd_ulong_is_aligned( dev_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
528 :
529 0 : if( FD_UNLIKELY( too_small | too_large | misaligned ) ) {
530 0 : FD_LOG_WARNING(( "bstream size %s", too_small ? "too small" :
531 0 : too_large ? "too large" :
532 0 : "not a block size multiple" ));
533 0 : return NULL;
534 0 : }
535 :
536 0 : if( reset ) {
537 0 : if( FD_UNLIKELY( !info ) ) info_sz = 0UL;
538 0 : if( FD_UNLIKELY( info_sz>FD_VINYL_BSTREAM_SYNC_INFO_MAX ) ) {
539 0 : FD_LOG_WARNING(( "info_sz too large" ));
540 0 : return NULL;
541 0 : }
542 0 : }
543 :
544 0 : memset( bd, 0, footprint );
545 :
546 0 : bd->base->type = FD_VINYL_IO_TYPE_BD;
547 :
548 : /* io_seed, seq_ancient, seq_past, seq_present, seq_future are init
549 : below */
550 :
551 0 : bd->base->spad_max = spad_max;
552 0 : bd->base->spad_used = 0UL;
553 0 : bd->base->impl = fd_vinyl_io_bd_impl;
554 :
555 0 : bd->dev_fd = dev_fd;
556 0 : bd->dev_sync = 0UL; /* Use the beginning of the file for the sync block */
557 0 : bd->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ; /* Use the rest for the actual bstream store (at least 3.5 KiB) */
558 0 : bd->dev_sz = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
559 :
560 0 : bd->rd_head = NULL;
561 0 : bd->rd_tail_next = &bd->rd_head;
562 :
563 : /* Note that [seq_ancient,seq_future) (cyclic) contains at most dev_sz
564 : bytes, bstream's antiquity, past and present are subsets of this
565 : range and dev_sz is less than 2^63 given the above (practically
566 : much much less). As such, differences between two ordered bstream
567 : sequence numbers (e.g. ulong sz = seq_a - seq_b where a is
568 : logically not before b) will "just work" regardless of wrapping
569 : and/or amount of data stored. */
570 :
571 : /* FIXME: Consider having the sync block on a completely separate
572 : device (to reduce seeking when syncing). */
573 :
574 0 : fd_vinyl_bstream_block_t * block = bd->sync;
575 :
576 0 : if( reset ) {
577 :
578 : /* We are starting a new bstream. Write the initial sync block. */
579 :
580 0 : bd->base->seed = io_seed;
581 0 : bd->base->seq_ancient = 0UL;
582 0 : bd->base->seq_past = 0UL;
583 0 : bd->base->seq_present = 0UL;
584 0 : bd->base->seq_future = 0UL;
585 :
586 0 : memset( block, 0, FD_VINYL_BSTREAM_BLOCK_SZ ); /* bulk zero */
587 :
588 0 : block->sync.ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_SYNC, 0, FD_VINYL_VAL_MAX );
589 : //block->sync.seq_past = ...; /* init by sync */
590 : //block->sync.seq_present = ...; /* init by sync */
591 0 : block->sync.info_sz = info_sz;
592 0 : if( info_sz ) memcpy( block->sync.info, info, info_sz );
593 : //block->sync.hash_trail = ...; /* init by sync */
594 : //block->sync.hash_blocks = ...; /* init by sync */
595 :
596 0 : int err = fd_vinyl_io_bd_sync( bd->base, FD_VINYL_IO_FLAG_BLOCKING ); /* logs details */
597 0 : if( FD_UNLIKELY( err ) ) {
598 0 : FD_LOG_WARNING(( "sync block write failed (%i-%s)", err, fd_vinyl_strerror( err ) ));
599 0 : return NULL;
600 0 : }
601 :
602 0 : } else {
603 :
604 : /* We are resuming an existing bstream. Read and validate the
605 : bstream's sync block. */
606 :
607 0 : bd_read( bd, bd->dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ ); /* logs details */
608 :
609 0 : int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
610 0 : int version = fd_vinyl_bstream_ctl_style( block->sync.ctl );
611 0 : ulong val_max = fd_vinyl_bstream_ctl_sz ( block->sync.ctl );
612 0 : ulong seq_past = block->sync.seq_past;
613 0 : ulong seq_present = block->sync.seq_present;
614 0 : /**/ info_sz = block->sync.info_sz; // overrides user info_sz
615 0 : /**/ info = block->sync.info; // overrides user info
616 0 : /**/ io_seed = block->sync.hash_trail; // overrides user io_seed
617 :
618 0 : int bad_type = (type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC);
619 0 : int bad_version = (version != 0);
620 0 : int bad_val_max = (val_max != FD_VINYL_VAL_MAX);
621 0 : int bad_seq_past = !fd_ulong_is_aligned( seq_past, FD_VINYL_BSTREAM_BLOCK_SZ );
622 0 : int bad_seq_present = !fd_ulong_is_aligned( seq_present, FD_VINYL_BSTREAM_BLOCK_SZ );
623 0 : int bad_info_sz = (info_sz > FD_VINYL_BSTREAM_SYNC_INFO_MAX);
624 0 : int bad_past_order = fd_vinyl_seq_gt( seq_past, seq_present );
625 0 : int bad_past_sz = ((seq_present-seq_past) > bd->dev_sz);
626 :
627 0 : if( FD_UNLIKELY( bad_type | bad_version | bad_val_max | bad_seq_past | bad_seq_present | bad_info_sz |
628 0 : bad_past_order | bad_past_sz ) ) {
629 0 : FD_LOG_WARNING(( "bad sync block when recovering bstream (%s)",
630 0 : bad_type ? "unexpected type" :
631 0 : bad_version ? "unexpected version" :
632 0 : bad_val_max ? "unexpected max pair value decoded byte size" :
633 0 : bad_seq_past ? "unaligned seq_past" :
634 0 : bad_seq_present ? "unaligned seq_present" :
635 0 : bad_info_sz ? "unexpected info size" :
636 0 : bad_past_order ? "unordered seq_past and seq_present" :
637 0 : "past size larger than bstream store" ));
638 0 : return NULL;
639 0 : }
640 :
641 0 : if( FD_UNLIKELY( fd_vinyl_bstream_block_test( io_seed, block ) ) ) {
642 0 : FD_LOG_WARNING(( "corrupt sync block when recovering bstream" ));
643 0 : return NULL;
644 0 : }
645 :
646 0 : bd->base->seed = io_seed;
647 0 : bd->base->seq_ancient = seq_past;
648 0 : bd->base->seq_past = seq_past;
649 0 : bd->base->seq_present = seq_present;
650 0 : bd->base->seq_future = seq_present;
651 :
652 0 : }
653 :
654 0 : FD_LOG_INFO(( "IO config"
655 0 : "\n\ttype bd"
656 0 : "\n\tspad_max %lu bytes"
657 0 : "\n\tdev_sz %lu bytes"
658 0 : "\n\treset %i"
659 0 : "\n\tinfo \"%s\" (info_sz %lu%s)"
660 0 : "\n\tio_seed 0x%016lx%s",
661 0 : spad_max, dev_sz, reset,
662 0 : info ? (char const *)info : "", info_sz, reset ? "" : ", discovered",
663 0 : io_seed, reset ? "" : " (discovered)" ));
664 :
665 0 : return bd->base;
666 0 : }
|