Line data Source code
1 : #define _GNU_SOURCE
2 : #include "utils/fd_ssarchive.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_sshttp.h"
5 :
6 : #include "../../disco/topo/fd_topo.h"
7 : #include "../../disco/metrics/fd_metrics.h"
8 : #include "../../waltz/openssl/fd_openssl_tile.h"
9 :
10 : #include <sys/mman.h> /* memfd_create */
11 : #include <errno.h>
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 : #include <sys/socket.h>
15 :
16 : #include "generated/fd_snapld_tile_seccomp.h"
17 :
18 : #define NAME "snapld"
19 :
20 : /* download progress in each 10 second window must be at
21 : min_download_speed_mibs * 10 seconds or higher. Catches extremely
22 : slow download speeds where we may not get to 100 MiB downloaded for a
23 : while. */
24 0 : #define FD_SNAPLD_DOWNLOAD_WINDOW_NS (10L*1000L*1000L*1000L) /* 10 seconds */
25 :
26 : /* The snapld tile is responsible for loading data from the local file
27 : or from an HTTP/TCP connection and sending it to the snapdc tile
28 : for later decompression. */
29 :
30 : typedef struct fd_snapld_tile {
31 :
32 : struct {
33 : char path[ PATH_MAX ];
34 : uint min_download_speed_mibs;
35 : } config;
36 :
37 : int state;
38 : ulong pending_ctrl_sig;
39 : int load_full;
40 : int load_file;
41 : int sent_meta;
42 :
43 : ulong bytes_in_batch;
44 : double download_speed_mibs;
45 : long start_batch;
46 : long end_batch;
47 :
48 : ulong bytes_in_window;
49 : ulong min_bytes_in_window;
50 : long window_deadline;
51 :
52 : int local_full_fd;
53 : int local_incr_fd;
54 : int sockfd;
55 :
56 : int is_https;
57 :
58 : fd_sshttp_t * sshttp;
59 :
60 : struct {
61 : void const * base;
62 : } in_rd;
63 :
64 : struct {
65 : fd_wksp_t * mem;
66 : ulong chunk0;
67 : ulong wmark;
68 : ulong chunk;
69 : ulong mtu;
70 : } out_dc;
71 :
72 : } fd_snapld_tile_t;
73 :
74 : static ulong
75 0 : scratch_align( void ) {
76 0 : return fd_ulong_max( alignof(fd_snapld_tile_t), fd_sshttp_align() );
77 0 : }
78 :
79 : static ulong
80 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
81 0 : ulong l = FD_LAYOUT_INIT;
82 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
83 0 : l = FD_LAYOUT_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
84 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
85 0 : return FD_LAYOUT_FINI( l, scratch_align() );
86 0 : }
87 :
88 : FD_FN_CONST static inline ulong
89 0 : loose_footprint( fd_topo_tile_t const * tile ) {
90 0 : (void)tile;
91 : /* Leftover space for OpenSSL allocations */
92 0 : return 1UL<<26UL; /* 64 MiB */
93 0 : }
94 :
95 : static void
96 : privileged_init( fd_topo_t * topo,
97 0 : fd_topo_tile_t * tile ) {
98 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
99 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
100 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
101 0 : void * _sshttp = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
102 :
103 0 : #if FD_HAS_OPENSSL
104 0 : void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
105 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
106 0 : fd_ossl_tile_init( alloc );
107 0 : #endif
108 :
109 0 : ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
110 0 : FD_TEST( ctx->sshttp );
111 :
112 0 : ulong full_slot = ULONG_MAX;
113 0 : ulong incr_slot = ULONG_MAX;
114 0 : int full_is_zstd = 0;
115 0 : int incr_is_zstd = 0;
116 0 : char full_path[ PATH_MAX ] = { 0 };
117 0 : char incr_path[ PATH_MAX ] = { 0 };
118 0 : uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ] = { 0 };
119 0 : uchar incr_snapshot_hash[ FD_HASH_FOOTPRINT ] = { 0 };
120 0 : ctx->local_full_fd = -1;
121 0 : ctx->local_incr_fd = -1;
122 : /* fd_ssarchive_latest_pair needs to be invoked here, irrespective
123 : of whether snapct may do the same, because this information is
124 : needed here during privileged_init. */
125 0 : if( FD_LIKELY( -1!=fd_ssarchive_latest_pair( tile->snapld.snapshots_path,
126 0 : tile->snapld.incremental_snapshots,
127 0 : &full_slot, &incr_slot,
128 0 : full_path, incr_path,
129 0 : &full_is_zstd, &incr_is_zstd,
130 0 : full_snapshot_hash, incr_snapshot_hash ) ) ) {
131 0 : FD_TEST( full_slot!=ULONG_MAX );
132 :
133 0 : ctx->local_full_fd = open( full_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
134 0 : if( FD_UNLIKELY( -1==ctx->local_full_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
135 :
136 0 : if( FD_LIKELY( incr_slot!=ULONG_MAX ) ) {
137 0 : ctx->local_incr_fd = open( incr_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
138 0 : if( FD_UNLIKELY( -1==ctx->local_incr_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", incr_path, errno, fd_io_strerror( errno ) ));
139 0 : }
140 0 : }
141 :
142 : /* Create a temporary file descriptor for our socket file descriptor.
143 : It is closed later in unprivileged init so that the sandbox sees
144 : an existent file descriptor. */
145 0 : ctx->sockfd = memfd_create( "snapld.sockfd", 0 );
146 0 : if( FD_UNLIKELY( -1==ctx->sockfd ) ) FD_LOG_ERR(( "memfd_create() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
147 0 : }
148 :
149 : static ulong
150 : populate_allowed_fds( fd_topo_t const * topo,
151 : fd_topo_tile_t const * tile,
152 : ulong out_fds_cnt,
153 0 : int * out_fds ) {
154 0 : if( FD_UNLIKELY( out_fds_cnt<4UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
155 :
156 0 : ulong out_cnt = 0;
157 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
158 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
159 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd();
160 0 : }
161 :
162 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
163 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
164 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
165 0 : if( FD_LIKELY( -1!=ctx->local_full_fd ) ) out_fds[ out_cnt++ ] = ctx->local_full_fd;
166 0 : if( FD_LIKELY( -1!=ctx->local_incr_fd ) ) out_fds[ out_cnt++ ] = ctx->local_incr_fd;
167 0 : out_fds[ out_cnt++ ] = ctx->sockfd;
168 :
169 0 : return out_cnt;
170 0 : }
171 :
172 : static ulong
173 : populate_allowed_seccomp( fd_topo_t const * topo,
174 : fd_topo_tile_t const * tile,
175 : ulong out_cnt,
176 0 : struct sock_filter * out ) {
177 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
178 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
179 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
180 :
181 0 : populate_sock_filter_policy_fd_snapld_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_full_fd, (uint)ctx->local_incr_fd, (uint)ctx->sockfd );
182 0 : return sock_filter_policy_fd_snapld_tile_instr_cnt;
183 0 : }
184 :
185 : static void
186 : unprivileged_init( fd_topo_t * topo,
187 0 : fd_topo_tile_t * tile ) {
188 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
189 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
190 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
191 :
192 0 : fd_memcpy( ctx->config.path, tile->snapld.snapshots_path, PATH_MAX );
193 0 : ctx->config.min_download_speed_mibs = tile->snapld.min_download_speed_mibs;
194 :
195 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
196 0 : ctx->pending_ctrl_sig = 0UL;
197 :
198 0 : ctx->download_speed_mibs = 0.0;
199 0 : ctx->bytes_in_batch = 0UL;
200 0 : ctx->start_batch = 0L;
201 0 : ctx->end_batch = 0L;
202 0 : ctx->bytes_in_window = 0UL;
203 0 : ctx->window_deadline = LONG_MAX;
204 0 : ctx->min_bytes_in_window = ((ulong)ctx->config.min_download_speed_mibs * (FD_SNAPLD_DOWNLOAD_WINDOW_NS / (ulong)1e9))<<20UL;
205 :
206 0 : FD_TEST( tile->in_cnt==1UL );
207 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0 ] ];
208 0 : FD_TEST( 0==strcmp( in_link->name, "snapct_ld" ) );
209 0 : ctx->in_rd.base = fd_topo_obj_wksp_base( topo, in_link->dcache_obj_id );
210 :
211 0 : FD_TEST( tile->out_cnt==1UL );
212 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ 0 ] ];
213 0 : FD_TEST( 0==strcmp( out_link->name, "snapld_dc" ) );
214 0 : ctx->out_dc.mem = fd_topo_obj_wksp_base( topo, out_link->dcache_obj_id );
215 0 : ctx->out_dc.chunk0 = fd_dcache_compact_chunk0( ctx->out_dc.mem, out_link->dcache );
216 0 : ctx->out_dc.wmark = fd_dcache_compact_wmark ( ctx->out_dc.mem, out_link->dcache, out_link->mtu );
217 0 : ctx->out_dc.chunk = ctx->out_dc.chunk0;
218 0 : ctx->out_dc.mtu = out_link->mtu;
219 :
220 : /* We can only close the temporary socket file descriptor after
221 : entering the sandbox because the sandbox checks all file
222 : descriptors are existent. */
223 0 : if( -1==close( ctx->sockfd ) ) FD_LOG_ERR((" close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
224 0 : }
225 :
226 : static int
227 0 : should_shutdown( fd_snapld_tile_t * ctx ) {
228 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
229 0 : }
230 :
231 : static void
232 0 : metrics_write( fd_snapld_tile_t * ctx ) {
233 0 : #if FD_HAS_OPENSSL
234 0 : FD_MCNT_SET( SNAPLD, SSL_ALLOC_ERRORS, fd_ossl_alloc_errors );
235 0 : #endif
236 0 : FD_MGAUGE_SET( SNAPLD, STATE, (ulong)(ctx->state) );
237 0 : }
238 :
239 : static void
240 : transition_malformed( fd_snapld_tile_t * ctx,
241 0 : fd_stem_context_t * stem ) {
242 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
243 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
244 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
245 0 : }
246 :
247 : static int
248 : check_download_progress( fd_snapld_tile_t * ctx,
249 : fd_stem_context_t * stem,
250 : int downloading,
251 0 : long now ) {
252 0 : if( FD_UNLIKELY( ctx->window_deadline==LONG_MAX && downloading ) ) {
253 0 : ctx->window_deadline = now + FD_SNAPLD_DOWNLOAD_WINDOW_NS;
254 0 : ctx->bytes_in_window = 0UL;
255 0 : }
256 :
257 0 : if( FD_UNLIKELY( now>ctx->window_deadline ) ) {
258 0 : if( FD_UNLIKELY( ctx->bytes_in_window<ctx->min_bytes_in_window ) ) {
259 : /* cancel the download if the download progress speed in the last
260 : window is less than the minimum download speed. */
261 0 : double download_speed_mibs = (double)ctx->bytes_in_window / (double)(FD_SNAPLD_DOWNLOAD_WINDOW_NS / 1e9) / (double)(1<<20UL);
262 0 : FD_LOG_WARNING(( "download progress of %.2f MiB/s in the last %lu seconds for %s snapshot "
263 0 : "is below the minimum download speed %u MiB/s, cancelling download",
264 0 : download_speed_mibs, FD_SNAPLD_DOWNLOAD_WINDOW_NS / (ulong)1e9,
265 0 : ctx->load_full ? "full" : "incremental", ctx->config.min_download_speed_mibs ));
266 0 : transition_malformed(ctx, stem );
267 0 : fd_sshttp_cancel( ctx->sshttp );
268 0 : return -1;
269 0 : }
270 0 : ctx->window_deadline = now + FD_SNAPLD_DOWNLOAD_WINDOW_NS;
271 0 : ctx->bytes_in_window = 0UL;
272 0 : }
273 0 : return 0;
274 0 : }
275 :
276 : static void
277 : after_credit( fd_snapld_tile_t * ctx,
278 : fd_stem_context_t * stem,
279 : int * opt_poll_in FD_PARAM_UNUSED,
280 0 : int * charge_busy ) {
281 0 : if( FD_UNLIKELY( ctx->pending_ctrl_sig ) ) {
282 0 : FD_TEST( !ctx->load_file && ctx->is_https );
283 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
284 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
285 0 : fd_stem_publish( stem, 0UL, ctx->pending_ctrl_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
286 0 : ctx->pending_ctrl_sig = 0UL;
287 0 : return;
288 0 : } else FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
289 0 : }
290 :
291 0 : if( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) {
292 0 : fd_log_sleep( (long)1e6 );
293 0 : return;
294 0 : }
295 :
296 0 : uchar * out = fd_chunk_to_laddr( ctx->out_dc.mem, ctx->out_dc.chunk );
297 :
298 0 : if( ctx->load_file ) {
299 0 : long result = read( ctx->load_full ? ctx->local_full_fd : ctx->local_incr_fd, out, ctx->out_dc.mtu );
300 0 : if( FD_UNLIKELY( result<=0L ) ) {
301 0 : if( result==0L ) {
302 0 : FD_LOG_NOTICE(( "finished reading %s snapshot from local file", ctx->load_full ? "full" : "incremental" ));
303 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
304 0 : } else if( FD_UNLIKELY( errno!=EAGAIN && errno!=EINTR ) ) {
305 0 : FD_LOG_WARNING(( "read() failed on %s snapshot file (%i-%s)", ctx->load_full ? "full" : "incremental", errno, fd_io_strerror( errno ) ));
306 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
307 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
308 0 : return; /* verbose return */
309 0 : }
310 0 : } else {
311 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out_dc.chunk, (ulong)result, 0UL, 0UL, 0UL );
312 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, (ulong)result, ctx->out_dc.chunk0, ctx->out_dc.wmark );
313 0 : *charge_busy = 1;
314 0 : return; /* verbose return */
315 0 : }
316 0 : } else {
317 0 : int downloading = 0;
318 0 : ulong data_len = ctx->out_dc.mtu;
319 0 : long now = fd_log_wallclock();
320 0 : int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, &downloading, now );
321 0 : switch( result ) {
322 0 : case FD_SSHTTP_ADVANCE_AGAIN:
323 0 : if( FD_UNLIKELY( -1==check_download_progress( ctx, stem, downloading, now ) ) ) break;
324 0 : break;
325 0 : case FD_SSHTTP_ADVANCE_DATA: {
326 0 : ctx->bytes_in_window += data_len;
327 0 : if( FD_UNLIKELY( -1==check_download_progress( ctx, stem, downloading, now ) ) ) break;
328 0 : if( FD_UNLIKELY( !ctx->sent_meta ) ) {
329 : /* On the first DATA return, the HTTP headers are available
330 : for use. We need to send this metadata downstream, but
331 : need to do so before any data frags. So, we copy any data
332 : we received with the headers (if any) to the next dcache
333 : chunk and then publish both in order. */
334 0 : ctx->start_batch = fd_log_wallclock();
335 0 : ctx->sent_meta = 1;
336 0 : fd_ssctrl_meta_t * meta = (fd_ssctrl_meta_t *)out;
337 0 : ulong next_chunk = fd_dcache_compact_next( ctx->out_dc.chunk, sizeof(fd_ssctrl_meta_t), ctx->out_dc.chunk0, ctx->out_dc.wmark );
338 0 : memmove( fd_chunk_to_laddr( ctx->out_dc.mem, next_chunk ), out, data_len );
339 0 : meta->total_sz = fd_sshttp_content_len( ctx->sshttp );
340 0 : if( FD_UNLIKELY( meta->total_sz==ULONG_MAX ) ) {
341 0 : FD_LOG_ERR(( "HTTP response for %s snapshot is missing Content-Length header", ctx->load_full ? "full" : "incremental" ));
342 0 : }
343 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_META, ctx->out_dc.chunk, sizeof(fd_ssctrl_meta_t), 0UL, 0UL, 0UL );
344 0 : ctx->out_dc.chunk = next_chunk;
345 0 : }
346 0 : if( FD_LIKELY( data_len!=0UL ) ) {
347 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out_dc.chunk, data_len, 0UL, 0UL, 0UL );
348 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, data_len, ctx->out_dc.chunk0, ctx->out_dc.wmark );
349 0 : ctx->bytes_in_batch += data_len;
350 :
351 : /* measure download speed every 100 MiB */
352 0 : if(ctx->bytes_in_batch>=100<<20UL) {
353 0 : ctx->end_batch = fd_log_wallclock();
354 : /* as a precaution, make sure elapsed_batch is positive
355 : and larger than zero (to avoid division by zero). */
356 0 : long elapsed_batch = fd_long_if( ctx->end_batch > ctx->start_batch, ctx->end_batch - ctx->start_batch, 1L );
357 : /* download speed in MiB/s = bytes/nanoseconds * 1e9/(1 second) * 1/(1MiB = 1<<20UL) = 1e9/(1024*1024) ~= 954 */
358 0 : ctx->download_speed_mibs = (double)(ctx->bytes_in_batch*954) / (double)elapsed_batch;
359 0 : if( FD_UNLIKELY( ctx->download_speed_mibs<ctx->config.min_download_speed_mibs ) ) {
360 : /* cancel the snapshot load if the download speed is less
361 : than the minimum download speed. */
362 0 : FD_LOG_WARNING(( "download speed %.2f MiB/s on a batch of %lu MiB for %s snapshot is below the minimum threshold %.2f MiB/s. "
363 0 : "cancelling snapshot download",
364 0 : ctx->download_speed_mibs, ctx->bytes_in_batch>>20UL, ctx->load_full ? "full" : "incremental",
365 0 : (double)(ctx->config.min_download_speed_mibs) ));
366 0 : transition_malformed(ctx, stem );
367 0 : }
368 0 : ctx->start_batch = ctx->end_batch;
369 0 : ctx->bytes_in_batch = 0UL;
370 0 : }
371 0 : }
372 0 : *charge_busy = 1;
373 0 : break;
374 0 : }
375 0 : case FD_SSHTTP_ADVANCE_DONE:
376 0 : FD_LOG_NOTICE(( "finished downloading %s snapshot", ctx->load_full ? "full" : "incremental" ));
377 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
378 0 : break;
379 0 : case FD_SSHTTP_ADVANCE_ERROR:
380 0 : FD_LOG_WARNING(( "HTTP advance error during %s snapshot download, entering error state",
381 0 : ctx->load_full ? "full" : "incremental" ));
382 0 : transition_malformed( ctx, stem );
383 0 : fd_sshttp_cancel( ctx->sshttp );
384 0 : break;
385 0 : default: FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d for %s snapshot",
386 0 : result, ctx->load_full ? "full" : "incremental" ));
387 0 : }
388 0 : }
389 0 : }
390 :
391 : static int
392 : returnable_frag( fd_snapld_tile_t * ctx,
393 : ulong in_idx FD_PARAM_UNUSED,
394 : ulong seq FD_PARAM_UNUSED,
395 : ulong sig,
396 : ulong chunk,
397 : ulong sz,
398 : ulong ctl FD_PARAM_UNUSED,
399 : ulong tsorig FD_PARAM_UNUSED,
400 : ulong tspub FD_PARAM_UNUSED,
401 0 : fd_stem_context_t * stem ) {
402 0 : FD_TEST( !ctx->pending_ctrl_sig );
403 :
404 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
405 : /* Control messages move along the snapshot load pipeline. Since
406 : error conditions can be triggered by any tile in the pipeline,
407 : it is possible to be in error state and still receive otherwise
408 : valid messages. Only a fail message can revert this. */
409 0 : return 0;
410 0 : };
411 :
412 0 : int forward_msg = 1;
413 :
414 0 : switch( sig ) {
415 :
416 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
417 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
418 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
419 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
420 0 : FD_TEST( sz==sizeof(fd_ssctrl_init_t) && sz<=ctx->out_dc.mtu );
421 0 : fd_ssctrl_init_t const * msg_in = fd_chunk_to_laddr_const( ctx->in_rd.base, chunk );
422 0 : ctx->load_full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
423 0 : ctx->load_file = msg_in->file;
424 0 : ctx->sent_meta = 0;
425 0 : ctx->is_https = msg_in->is_https;
426 :
427 0 : ctx->window_deadline = LONG_MAX;
428 0 : ctx->bytes_in_window = 0UL;
429 0 : long now = fd_log_wallclock();
430 0 : if( ctx->load_file ) {
431 0 : if( FD_UNLIKELY( 0!=lseek( ctx->load_full ? ctx->local_full_fd : ctx->local_incr_fd, 0, SEEK_SET ) ) )
432 0 : FD_LOG_ERR(( "lseek(0) failed on %s snapshot file (%i-%s)",
433 0 : ctx->load_full ? "full" : "incremental", errno, fd_io_strerror( errno ) ));
434 0 : } else {
435 0 : if( ctx->load_full ) fd_sshttp_init( ctx->sshttp, msg_in->addr, msg_in->hostname, msg_in->is_https, msg_in->path, msg_in->path_len, now );
436 0 : else fd_sshttp_init( ctx->sshttp, msg_in->addr, msg_in->hostname, msg_in->is_https, msg_in->path, msg_in->path_len, now );
437 0 : }
438 0 : fd_ssctrl_init_t * msg_out = fd_chunk_to_laddr( ctx->out_dc.mem, ctx->out_dc.chunk );
439 0 : fd_memcpy( msg_out, msg_in, sz );
440 0 : fd_stem_publish( stem, 0UL, sig, ctx->out_dc.chunk, sz, 0UL, 0UL, 0UL );
441 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, ctx->out_dc.mtu, ctx->out_dc.chunk0, ctx->out_dc.wmark );
442 0 : forward_msg = 0; // we are forwarding the control message in the `fd_sstrl_init_t` message
443 0 : break;
444 0 : }
445 :
446 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
447 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ) ) {
448 : /* snapld should be in the finishing state when reading from a
449 : file or downloading from http. It is only allowed to still
450 : be in progress for shutting down an https connection. Save
451 : the sig here and send the message when snapld is in the
452 : finishing state. */
453 0 : FD_TEST( ctx->is_https );
454 0 : ctx->pending_ctrl_sig = sig;
455 0 : forward_msg = 0;
456 0 : break;
457 0 : }
458 0 : else FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
459 0 : break;
460 0 : }
461 :
462 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
463 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
464 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
465 0 : break;
466 0 : }
467 :
468 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
469 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
470 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
471 0 : break;
472 0 : }
473 :
474 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
475 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
476 0 : fd_sshttp_cancel( ctx->sshttp );
477 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
478 0 : break;
479 :
480 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
481 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
482 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
483 0 : break;
484 0 : }
485 :
486 : /* FD_SNAPSHOT_MSG_DATA is not possible */
487 0 : default: {
488 0 : FD_LOG_ERR(( "unexpected control frag %s (%lu) in state %s (%lu)",
489 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
490 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
491 0 : break;
492 0 : }
493 0 : }
494 :
495 : /* Forward the control message down the pipeline */
496 0 : if( FD_LIKELY( forward_msg ) ) {
497 0 : fd_stem_publish( stem, 0UL, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
498 0 : }
499 :
500 0 : return 0;
501 0 : }
502 :
503 : /* Up to two frags from after_credit plus one from returnable_frag */
504 0 : #define STEM_BURST 3UL
505 :
506 0 : #define STEM_LAZY 1000L
507 :
508 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapld_tile_t
509 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapld_tile_t)
510 :
511 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
512 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
513 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
514 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
515 :
516 : #include "../../disco/stem/fd_stem.c"
517 :
518 : fd_topo_run_tile_t fd_tile_snapld = {
519 : .name = NAME,
520 : .populate_allowed_seccomp = populate_allowed_seccomp,
521 : .populate_allowed_fds = populate_allowed_fds,
522 : .scratch_align = scratch_align,
523 : .scratch_footprint = scratch_footprint,
524 : .loose_footprint = loose_footprint,
525 : .privileged_init = privileged_init,
526 : .unprivileged_init = unprivileged_init,
527 : .run = stem_run,
528 : .keep_host_networking = 1,
529 : .allow_connect = 1,
530 : .rlimit_file_cnt = 5UL, /* stderr, log, http, full/incr local files */
531 : };
532 :
533 : #undef NAME
|