Line data Source code
1 : #include "fd_checkpt.h"
2 :
3 : int
4 0 : fd_checkpt_frame_style_is_supported( int frame_style ) {
5 0 : int supported;
6 0 : supported = (frame_style==FD_CHECKPT_FRAME_STYLE_RAW);
7 0 : # if FD_HAS_LZ4
8 0 : supported |= (frame_style==FD_CHECKPT_FRAME_STYLE_LZ4);
9 0 : # endif
10 0 : return supported;
11 0 : }
12 :
13 : char const *
14 0 : fd_checkpt_strerror( int err ) {
15 0 : switch( err ) {
16 0 : case FD_CHECKPT_SUCCESS: return "success";
17 0 : case FD_CHECKPT_ERR_INVAL: return "bad input args";
18 0 : case FD_CHECKPT_ERR_UNSUP: return "unsupported on this target";
19 0 : case FD_CHECKPT_ERR_IO: return "io error";
20 0 : case FD_CHECKPT_ERR_COMP: return "compression error";
21 0 : default: break;
22 0 : }
23 0 : return "unknown";
24 0 : }
25 :
26 : #if FD_HAS_LZ4
27 : #include <lz4.h>
28 :
29 : /* fd_checkpt_private_lz4 compresses the ubuf_usz byte size memory
30 : region pointed to by ubuf into the cbuf_max memory region pointed to
31 : by cbuf using the given lz4 compressor. Assumes lz4, ubuf and cbuf
32 : are valid. On success, returns the compressed size (will be in
33 : [4,cbuf_max]). The ubuf passed to this should not be modified
34 : until the given lz4 stream is reset / closed or there has been an
35 : additional 64 KiB passed to the stream. On failure, returns 0 and
36 : retains no interest in ubuf. In, either case, this retains no
37 : interest in cbuf on return.
38 :
39 : _gbuf, gbuf_sz, gbuf_thresh, _gbuf_cursor specify the small buf
40 : gather ring state. It is detailed below. */
41 :
42 : static ulong
43 : fd_checkpt_private_lz4( LZ4_stream_t * lz4,
44 : void const * _ubuf,
45 : ulong ubuf_usz,
46 : void * _cbuf,
47 : ulong cbuf_max,
48 : void * _gbuf,
49 : ulong gbuf_sz,
50 : ulong gbuf_thresh,
51 0 : ulong * _gbuf_cursor ) {
52 0 : char * cbuf = (char *) _cbuf;
53 0 : char const * ubuf = (char const *)_ubuf;
54 :
55 : /* Verify ubuf_usz is in [1,LZ4_MAX_INPUT_SIZE] and cbuf_max is large
56 : enough to store a header and a non-trivial compressed body. */
57 :
58 0 : if( FD_UNLIKELY( !((1UL<=ubuf_usz) & (ubuf_usz<=(ulong)LZ4_MAX_INPUT_SIZE)) ) ) {
59 0 : FD_LOG_WARNING(( "bad ubuf_usz" ));
60 0 : return 0UL;
61 0 : }
62 :
63 0 : if( FD_UNLIKELY( cbuf_max<4UL ) ) {
64 0 : FD_LOG_WARNING(( "not enough room to compress" ));
65 0 : return 0UL;
66 0 : }
67 :
68 : /* Small ubuf gather optimization. Though the LZ4 streaming API looks
69 : like it is designed for scatter/gather operation, the
70 : implementation under the hood is heavily optimized for the case
71 : when incoming data buffers are stored in a ring buffer (basically,
72 : the compression dictionary has a size of the most recent 64 KiB of
73 : _contiguous_ _in_ _memory_ buffers passed to it ... see
74 : lz4-1.9.4@lz4/lib/lz4.c:2636-2665 for an example).
75 :
76 : When a buffer >> 64 KiB is checkpointed, it will be compressed as
77 : CHUNK_USZ sequential chunks contiguous in memory. So outside of
78 : minor startup effects (where the initial dictionary might not be as
79 : large as it could have been), this case is optimal.
80 :
81 : But when lots of disjoint tiny buffers << 64 KiB are checkpointed,
82 : LZ4 is constantly reseting its dictionary to only use the most
83 : recently previously compressed (tiny) buffer. This case is
84 : suboptimal.
85 :
86 : At the same time, we don't want to use a ring buffer because that
87 : would imply an extra copy when compressing large data. This is a
88 : complete waste because that case was already optimal. And this is
89 : the most important case for high performance.
90 :
91 : Below, if the incoming buffer to compress is large enough
92 : (>thresh), we compress it in place as it will be optimal as before.
93 :
94 : If not, we first copy it into a gather buffer and have LZ4 compress
95 : out of the gather buffer location. Then, when compressing lots of
96 : tiny buffer disjoint buffers, it will appear to LZ4 as though they
97 : were contiguous in memory and LZ4 will handle that optimally too.
98 :
99 : Then our dictionary size is optimal in both asymptotic regimes and
100 : we are still zero copy in the important case of compressing large
101 : data. The dictionary will also be reasonable when toggling
102 : frequently between asymptotic regimes, as often happens in
103 : checkpointing (small metadata checkpts/large checkpt/small metadata
104 : checkpts/large data checkpt/...).
105 :
106 : This is also necessary to satisfy fd_checkpt_data's and
107 : fd_checkpt_meta's API guarantees. The lz4 streaming API requires
108 : the most recent 64KiB of uncompressed bytes to be unmodified and in
109 : the same place when called. If FD_CHECKPT_META_MAX<=thresh<=64KiB,
110 : the copying into the gather buffer here and out of the scatter
111 : buffer on restore means the unmodified-in-place part of the
112 : requirement can be satified even if the user passes a temporary
113 : buffer and immediately modifies/frees it on return. That is, the
114 : checkpt/restore will be prompt and retain no interest in buf on
115 : return.
116 :
117 : We also have to make the gather/scatter buffers large enough to
118 : satify the most-recent-64KiB part of the requirement. Suppose we
119 : have only been compressing small buffers and we are trying to
120 : compress a thresh byte buffer when only thresh-1 bytes of gather
121 : buffer space remains. Since we wrap at buffer granularity, we will
122 : need to put thresh bytes at the head of the buffer. To ensure this
123 : doesn't clobber any of the 64 KiB previously compressed bytes, we
124 : need a gather buffer at least:
125 :
126 : thresh + 64KiB + thresh-1 = 2 thresh + 64 KiB - 1
127 :
128 : in size. Larger is fine. Smaller will violate this part of the
129 : requirement.
130 :
131 : We do the corresponding in the restore and the restore
132 : configuration must match our checkpt configuration exactly in order
133 : to keep the dictionaries on both sides synchronized.
134 :
135 : TL;DR We store small buffers into a gather ring at buffer
136 : granularity for better compression and compress large buffers in
137 : place for extra performance due to the details of how LZ4 stream
138 : APIs are implemented We also do this to support immediate use and
139 : reuse of the metadata checkpt/restores buffers. */
140 :
141 0 : int is_small = ubuf_usz<=gbuf_thresh;
142 0 : if( is_small ) { /* app dependent branch prob */
143 0 : ulong gbuf_cursor = *_gbuf_cursor;
144 0 : if( (gbuf_sz-gbuf_cursor)<ubuf_usz ) gbuf_cursor = 0UL; /* cmov */
145 0 : ubuf = (char *)_gbuf + gbuf_cursor;
146 0 : *_gbuf_cursor = gbuf_cursor + ubuf_usz;
147 0 : memcpy( (char *)ubuf, _ubuf, ubuf_usz );
148 0 : }
149 :
150 : /* Compress ubuf into cbuf, leaving space for the header. Compression
151 : will fail if there is no room to the resulting compressed size into
152 : the header as we clamp the capacity to 2^24-1. */
153 :
154 0 : ulong ubuf_csz_max = fd_ulong_min( cbuf_max-3UL, (1UL<<24)-1UL ); /* In [1,2^24) */
155 :
156 0 : int _ubuf_csz = LZ4_compress_fast_continue( lz4, ubuf, cbuf+3UL, (int)ubuf_usz, (int)ubuf_csz_max, 1 /* default */ );
157 0 : if( FD_UNLIKELY( _ubuf_csz<=0 ) ) {
158 0 : FD_LOG_WARNING(( "LZ4_compress_fast_continue error (%i)", _ubuf_csz ));
159 0 : return 0UL;
160 0 : }
161 :
162 0 : ulong ubuf_csz = (ulong)_ubuf_csz;
163 0 : if( FD_UNLIKELY( ubuf_csz>ubuf_csz_max ) ) {
164 0 : FD_LOG_WARNING(( "unexpected compressed size" ));
165 0 : return 0UL;
166 0 : }
167 :
168 : /* Write compressed size we obtained into the header as a 24-bit
169 : little endian unsigned integer. This need to do this is a
170 : limitation of how the recent LZ4 APIs (>=1.9) work. */
171 :
172 0 : cbuf[0] = (char)( ubuf_csz & 255UL);
173 0 : cbuf[1] = (char)((ubuf_csz>> 8) & 255UL);
174 0 : cbuf[2] = (char)((ubuf_csz>>16) & 255UL);
175 :
176 0 : return ubuf_csz + 3UL;
177 0 : }
178 : #endif
179 :
180 : fd_checkpt_t *
181 : fd_checkpt_init_stream( void * mem,
182 : int fd,
183 : void * wbuf,
184 0 : ulong wbuf_sz ) {
185 :
186 : /* Check input args */
187 :
188 0 : if( FD_UNLIKELY( !mem ) ) {
189 0 : FD_LOG_WARNING(( "NULL mem" ));
190 0 : return NULL;
191 0 : }
192 :
193 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, FD_CHECKPT_ALIGN ) ) ) {
194 0 : FD_LOG_WARNING(( "misaligned mem" ));
195 0 : return NULL;
196 0 : }
197 :
198 0 : if( FD_UNLIKELY( fd<0 ) ) {
199 0 : FD_LOG_WARNING(( "bad fd" ));
200 0 : return NULL;
201 0 : }
202 :
203 0 : if( FD_UNLIKELY( !wbuf ) ) {
204 0 : FD_LOG_WARNING(( "NULL wbuf" ));
205 0 : return NULL;
206 0 : }
207 :
208 0 : if( FD_UNLIKELY( wbuf_sz<FD_CHECKPT_WBUF_MIN ) ) {
209 0 : FD_LOG_WARNING(( "wbuf_sz too small" ));
210 0 : return NULL;
211 0 : }
212 :
213 : /* Create the compressor */
214 :
215 0 : # if FD_HAS_LZ4
216 0 : LZ4_stream_t * lz4 = LZ4_createStream();
217 0 : if( FD_UNLIKELY( !lz4 ) ) {
218 0 : FD_LOG_WARNING(( "lz4 error" ));
219 0 : return NULL;
220 0 : }
221 : # else
222 : void * lz4 = NULL;
223 : # endif
224 :
225 : /* Init the checkpt */
226 :
227 0 : fd_checkpt_t * checkpt = (fd_checkpt_t *)mem;
228 :
229 0 : checkpt->fd = fd; /* streaming mode */
230 0 : checkpt->frame_style = 0; /* not in frame */
231 0 : checkpt->lz4 = (void *)lz4;
232 0 : checkpt->gbuf_cursor = 0UL;
233 0 : checkpt->off = 0UL;
234 0 : checkpt->wbuf.mem = (uchar *)wbuf;
235 0 : checkpt->wbuf.sz = wbuf_sz;
236 0 : checkpt->wbuf.used = 0UL;
237 :
238 0 : return checkpt;
239 0 : }
240 :
241 : fd_checkpt_t *
242 : fd_checkpt_init_mmio( void * mem,
243 : void * mmio,
244 0 : ulong mmio_sz ) {
245 :
246 : /* Check input args */
247 :
248 0 : if( FD_UNLIKELY( !mem ) ) {
249 0 : FD_LOG_WARNING(( "NULL mem" ));
250 0 : return NULL;
251 0 : }
252 :
253 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, FD_CHECKPT_ALIGN ) ) ) {
254 0 : FD_LOG_WARNING(( "misaligned mem" ));
255 0 : return NULL;
256 0 : }
257 :
258 0 : if( FD_UNLIKELY( (!mmio) & (!!mmio_sz) ) ) {
259 0 : FD_LOG_WARNING(( "NULL mmio" ));
260 0 : return NULL;
261 0 : }
262 :
263 : /* Create the compressor */
264 :
265 0 : # if FD_HAS_LZ4
266 0 : LZ4_stream_t * lz4 = LZ4_createStream();
267 0 : if( FD_UNLIKELY( !lz4 ) ) {
268 0 : FD_LOG_WARNING(( "lz4 error" ));
269 0 : return NULL;
270 0 : }
271 : # else
272 : void * lz4 = NULL;
273 : # endif
274 :
275 : /* Init the checkpt */
276 :
277 0 : fd_checkpt_t * checkpt = (fd_checkpt_t *)mem;
278 :
279 0 : checkpt->fd = -1; /* mmio mode */
280 0 : checkpt->frame_style = 0; /* not in frame */
281 0 : checkpt->lz4 = (void *)lz4;
282 0 : checkpt->gbuf_cursor = 0UL;
283 0 : checkpt->off = 0UL;
284 0 : checkpt->mmio.mem = (uchar *)mmio;
285 0 : checkpt->mmio.sz = mmio_sz;
286 :
287 0 : return checkpt;
288 0 : }
289 :
290 : void *
291 0 : fd_checkpt_fini( fd_checkpt_t * checkpt ) {
292 :
293 0 : if( FD_UNLIKELY( !checkpt ) ) {
294 0 : FD_LOG_WARNING(( "NULL checkpt" ));
295 0 : return NULL;
296 0 : }
297 :
298 0 : if( FD_UNLIKELY( fd_checkpt_in_frame( checkpt ) ) ) {
299 0 : FD_LOG_WARNING(( "in a frame" ));
300 0 : checkpt->frame_style = -1; /* failed */
301 0 : return NULL;
302 0 : }
303 :
304 0 : # if FD_HAS_LZ4
305 :
306 : /* Note: Though this this doesn't seem to be officially documented,
307 : the lz4-1.9.4@lz4/lib/lz4.c:1575) suggests that this always returns
308 : 0. That is, 0 is success and non-zero is failure. */
309 :
310 0 : if( FD_UNLIKELY( LZ4_freeStream( (LZ4_stream_t *)checkpt->lz4 ) ) )
311 0 : FD_LOG_WARNING(( "LZ4 freeStream error, attempting to continue" ));
312 :
313 0 : # endif
314 :
315 0 : return (void *)checkpt;
316 0 : }
317 :
318 : int
319 : fd_checkpt_open_advanced( fd_checkpt_t * checkpt,
320 : int frame_style,
321 0 : ulong * _off ) {
322 :
323 0 : if( FD_UNLIKELY( !checkpt ) ) {
324 0 : FD_LOG_WARNING(( "NULL checkpt" ));
325 0 : return FD_CHECKPT_ERR_INVAL;
326 0 : }
327 :
328 0 : if( FD_UNLIKELY( !fd_checkpt_can_open( checkpt ) ) ) {
329 0 : FD_LOG_WARNING(( "in a frame or failed" ));
330 0 : checkpt->frame_style = -1; /* failed */
331 0 : return FD_CHECKPT_ERR_INVAL;
332 0 : }
333 :
334 0 : if( FD_UNLIKELY( !_off ) ) {
335 0 : FD_LOG_WARNING(( "NULL _off" ));
336 0 : checkpt->frame_style = -1; /* failed */
337 0 : return FD_CHECKPT_ERR_INVAL;
338 0 : }
339 :
340 0 : frame_style = fd_int_if( !!frame_style, frame_style, FD_CHECKPT_FRAME_STYLE_DEFAULT );
341 :
342 0 : switch( frame_style ) {
343 :
344 0 : case FD_CHECKPT_FRAME_STYLE_RAW: {
345 0 : break;
346 0 : }
347 :
348 0 : # if FD_HAS_LZ4
349 0 : case FD_CHECKPT_FRAME_STYLE_LZ4: {
350 0 : LZ4_resetStream_fast( (LZ4_stream_t *)checkpt->lz4 ); /* Note: no error code for this API */
351 0 : checkpt->gbuf_cursor = 0UL;
352 0 : break;
353 0 : }
354 0 : # endif
355 :
356 0 : default: {
357 0 : FD_LOG_WARNING(( "unsupported frame_style" ));
358 0 : checkpt->frame_style = -1; /* failed */
359 0 : return FD_CHECKPT_ERR_UNSUP;
360 0 : }
361 :
362 0 : }
363 :
364 0 : checkpt->frame_style = frame_style;
365 :
366 0 : *_off = checkpt->off;
367 0 : return FD_CHECKPT_SUCCESS;
368 0 : }
369 :
370 : int
371 : fd_checkpt_close_advanced( fd_checkpt_t * checkpt,
372 0 : ulong * _off ) {
373 :
374 0 : if( FD_UNLIKELY( !checkpt ) ) {
375 0 : FD_LOG_WARNING(( "NULL checkpt" ));
376 0 : return FD_CHECKPT_ERR_INVAL;
377 0 : }
378 :
379 0 : if( FD_UNLIKELY( !fd_checkpt_in_frame( checkpt ) ) ) {
380 0 : FD_LOG_WARNING(( "not in a frame" ));
381 0 : checkpt->frame_style = -1; /* failed */
382 0 : return FD_CHECKPT_ERR_INVAL;
383 0 : }
384 :
385 0 : if( FD_UNLIKELY( !_off ) ) {
386 0 : FD_LOG_WARNING(( "NULL _off" ));
387 0 : checkpt->frame_style = -1; /* failed */
388 0 : return FD_CHECKPT_ERR_INVAL;
389 0 : }
390 :
391 0 : ulong off = checkpt->off;
392 :
393 0 : if( fd_checkpt_is_mmio( checkpt ) ) { /* mmio mode (app dependent branch prob) */
394 :
395 : /* Nothing to do */
396 :
397 0 : } else { /* streaming mode */
398 :
399 : /* Flush out all pending bytes for this frame */
400 :
401 0 : ulong wbuf_used = checkpt->wbuf.used;
402 :
403 0 : if( FD_LIKELY( wbuf_used ) ) {
404 :
405 0 : ulong wsz;
406 0 : int err = fd_io_write( checkpt->fd, checkpt->wbuf.mem, wbuf_used, wbuf_used, &wsz );
407 0 : if( FD_UNLIKELY( err ) ) {
408 0 : FD_LOG_WARNING(( "fd_io_write failed (%i-%s)", err, fd_io_strerror( err ) ));
409 0 : checkpt->frame_style = -1; /* failed */
410 0 : return FD_CHECKPT_ERR_IO;
411 0 : }
412 :
413 0 : off += wsz;
414 0 : if( FD_UNLIKELY( off<wsz ) ) {
415 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
416 0 : checkpt->frame_style = -1; /* failed */
417 0 : return FD_CHECKPT_ERR_IO;
418 0 : }
419 :
420 0 : }
421 :
422 0 : checkpt->wbuf.used = 0UL;
423 :
424 0 : }
425 :
426 0 : checkpt->off = off;
427 0 : checkpt->frame_style = 0; /* not in frame */
428 :
429 0 : *_off = off;
430 0 : return FD_CHECKPT_SUCCESS;
431 0 : }
432 :
433 : static int
434 : fd_checkpt_private_buf( fd_checkpt_t * checkpt,
435 : void const * buf,
436 : ulong sz,
437 0 : ulong max ) {
438 :
439 0 : if( FD_UNLIKELY( !checkpt ) ) {
440 0 : FD_LOG_WARNING(( "NULL checkpt" ));
441 0 : return FD_CHECKPT_ERR_INVAL;
442 0 : }
443 :
444 0 : if( FD_UNLIKELY( !fd_checkpt_in_frame( checkpt ) ) ) {
445 0 : FD_LOG_WARNING(( "not in a frame" ));
446 0 : checkpt->frame_style = -1; /* failed */
447 0 : return FD_CHECKPT_ERR_INVAL;
448 0 : }
449 :
450 0 : if( FD_UNLIKELY( !sz ) ) return FD_CHECKPT_SUCCESS; /* nothing to do */
451 :
452 0 : if( FD_UNLIKELY( sz>max ) ) {
453 0 : FD_LOG_WARNING(( "sz too large" ));
454 0 : checkpt->frame_style = -1; /* failed */
455 0 : return FD_CHECKPT_ERR_INVAL;
456 0 : }
457 :
458 0 : if( FD_UNLIKELY( !buf ) ) {
459 0 : FD_LOG_WARNING(( "NULL buf with non-zero sz" ));
460 0 : checkpt->frame_style = -1; /* failed */
461 0 : return FD_CHECKPT_ERR_INVAL;
462 0 : }
463 :
464 0 : ulong off = checkpt->off;
465 :
466 0 : switch( checkpt->frame_style ) {
467 :
468 0 : case FD_CHECKPT_FRAME_STYLE_RAW: {
469 :
470 0 : if( fd_checkpt_is_mmio( checkpt ) ) { /* mmio mode (app dependent branch prob) */
471 :
472 0 : if( FD_UNLIKELY( sz > (checkpt->mmio.sz-off) ) ) {
473 0 : FD_LOG_WARNING(( "mmio_sz too small" ));
474 0 : checkpt->frame_style = -1; /* failed */
475 0 : return FD_CHECKPT_ERR_IO;
476 0 : }
477 :
478 0 : memcpy( checkpt->mmio.mem + off, buf, sz );
479 :
480 0 : off += sz; /* at most mmio.sz */
481 :
482 0 : } else { /* streaming mode */
483 :
484 0 : ulong wbuf_used = checkpt->wbuf.used;
485 :
486 0 : ulong wsz_max = wbuf_used + sz;
487 0 : if( FD_UNLIKELY( wsz_max<sz ) ) {
488 0 : FD_LOG_WARNING(( "sz overflow" ));
489 0 : checkpt->frame_style = -1; /* failed */
490 0 : return FD_CHECKPT_ERR_IO;
491 0 : }
492 :
493 0 : int err = fd_io_buffered_write( checkpt->fd, buf, sz, checkpt->wbuf.mem, checkpt->wbuf.sz, &wbuf_used );
494 0 : if( FD_UNLIKELY( err ) ) {
495 0 : FD_LOG_WARNING(( "fd_io_buffered_write failed (%i-%s)", err, fd_io_strerror( err ) ));
496 0 : checkpt->frame_style = -1; /* failed */
497 0 : return FD_CHECKPT_ERR_IO;
498 0 : }
499 :
500 0 : if( FD_UNLIKELY( wsz_max<wbuf_used ) ) {
501 0 : FD_LOG_WARNING(( "unexpected buffered write size" ));
502 0 : checkpt->frame_style = -1; /* failed */
503 0 : return FD_CHECKPT_ERR_IO;
504 0 : }
505 :
506 0 : ulong wsz = wsz_max - wbuf_used;
507 :
508 0 : off += wsz;
509 0 : if( FD_UNLIKELY( off<wsz ) ) {
510 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
511 0 : checkpt->frame_style = -1; /* failed */
512 0 : return FD_CHECKPT_ERR_IO;
513 0 : }
514 :
515 0 : checkpt->wbuf.used = wbuf_used;
516 :
517 0 : }
518 :
519 0 : break;
520 0 : }
521 :
522 0 : # if FD_HAS_LZ4
523 0 : case FD_CHECKPT_FRAME_STYLE_LZ4: {
524 0 : LZ4_stream_t * lz4 = (LZ4_stream_t *)checkpt->lz4;
525 :
526 0 : if( fd_checkpt_is_mmio( checkpt ) ) { /* mmio mode, app dependent branch prob */
527 :
528 0 : uchar * mmio = checkpt->mmio.mem;
529 0 : ulong mmio_sz = checkpt->mmio.sz;
530 :
531 0 : uchar const * chunk = (uchar const *)buf;
532 0 : do {
533 0 : ulong chunk_usz = fd_ulong_min( sz, FD_CHECKPT_PRIVATE_CHUNK_USZ_MAX );
534 :
535 0 : ulong chunk_csz = fd_checkpt_private_lz4( lz4, chunk, chunk_usz, mmio + off, mmio_sz - off,
536 0 : checkpt->gbuf, FD_CHECKPT_PRIVATE_GBUF_SZ, FD_CHECKPT_META_MAX,
537 0 : &checkpt->gbuf_cursor ); /* logs details */
538 0 : if( FD_UNLIKELY( !chunk_csz ) ) {
539 0 : checkpt->frame_style = -1; /* failed */
540 0 : return FD_CHECKPT_ERR_COMP;
541 0 : }
542 :
543 0 : off += chunk_csz; /* at most mmio_sz */
544 :
545 0 : chunk += chunk_usz;
546 0 : sz -= chunk_usz;
547 0 : } while( sz );
548 :
549 0 : } else { /* streaming mode */
550 :
551 0 : int fd = checkpt->fd;
552 0 : uchar * wbuf = checkpt->wbuf.mem;
553 0 : ulong wbuf_sz = checkpt->wbuf.sz;
554 0 : ulong wbuf_used = checkpt->wbuf.used;
555 :
556 0 : uchar const * chunk = (uchar const *)buf;
557 0 : do {
558 0 : ulong chunk_usz = fd_ulong_min( sz, FD_CHECKPT_PRIVATE_CHUNK_USZ_MAX );
559 :
560 : /* If we are not guaranteed to have enough room in the write
561 : buffer to hold the compressed chunk, flush it to make room. */
562 :
563 0 : ulong chunk_csz_max = FD_CHECKPT_PRIVATE_CSZ_MAX( chunk_usz );
564 0 : ulong wbuf_free = wbuf_sz - wbuf_used;
565 0 : if( FD_UNLIKELY( chunk_csz_max > wbuf_free ) ) {
566 :
567 0 : ulong wsz;
568 0 : int err = fd_io_write( fd, wbuf, wbuf_used, wbuf_used, &wsz );
569 0 : if( FD_UNLIKELY( err ) ) {
570 0 : FD_LOG_WARNING(( "fd_io_write failed (%i-%s)", err, fd_io_strerror( err ) ));
571 0 : checkpt->frame_style = -1; /* failed */
572 0 : return FD_CHECKPT_ERR_IO;
573 0 : }
574 :
575 0 : off += wsz;
576 0 : if( FD_UNLIKELY( off<wsz ) ) {
577 0 : FD_LOG_WARNING(( "checkpt sz overflow" ));
578 0 : checkpt->frame_style = -1; /* failed */
579 0 : return FD_CHECKPT_ERR_IO;
580 0 : }
581 :
582 0 : wbuf_used = 0UL;
583 0 : wbuf_free = wbuf_sz; /* >= WBUF_MIN >= CSZ_MAX( CHUNK_USZ_MAX ) >= CSZ_MAX( chunk_usz ) */
584 :
585 0 : }
586 :
587 : /* At this point, wbuf_free >= chunk_csz_max */
588 :
589 0 : ulong chunk_csz = fd_checkpt_private_lz4( lz4, chunk, chunk_usz, wbuf + wbuf_used, wbuf_free,
590 0 : checkpt->gbuf, FD_CHECKPT_PRIVATE_GBUF_SZ, FD_CHECKPT_META_MAX,
591 0 : &checkpt->gbuf_cursor ); /* logs details */
592 0 : if( FD_UNLIKELY( !chunk_csz ) ) {
593 0 : checkpt->frame_style = -1; /* failed */
594 0 : return FD_CHECKPT_ERR_COMP;
595 0 : }
596 :
597 0 : wbuf_used += chunk_csz;
598 :
599 0 : chunk += chunk_usz;
600 0 : sz -= chunk_usz;
601 :
602 0 : } while( sz );
603 :
604 0 : checkpt->wbuf.used = wbuf_used;
605 :
606 0 : }
607 :
608 0 : break;
609 0 : }
610 0 : # endif
611 :
612 0 : default: { /* never get here */
613 0 : FD_LOG_WARNING(( "unsupported frame style" ));
614 0 : checkpt->frame_style = -1; /* failed */
615 0 : return FD_CHECKPT_ERR_UNSUP;
616 0 : }
617 :
618 0 : }
619 :
620 0 : checkpt->off = off;
621 0 : return FD_CHECKPT_SUCCESS;
622 0 : }
623 :
624 : int
625 : fd_checkpt_meta( fd_checkpt_t * checkpt,
626 : void const * buf,
627 0 : ulong sz ) {
628 0 : return fd_checkpt_private_buf( checkpt, buf, sz, FD_CHECKPT_META_MAX );
629 0 : }
630 :
631 : int
632 : fd_checkpt_data( fd_checkpt_t * checkpt,
633 : void const * buf,
634 0 : ulong sz ) {
635 : /* TODO: optimize sz <= META_MAX better? */
636 0 : return fd_checkpt_private_buf( checkpt, buf, sz, ULONG_MAX );
637 0 : }
|