Line data Source code
1 : /* fd_bundle_client.c steps gRPC related tasks. */
2 :
3 : #define _GNU_SOURCE /* SOL_TCP */
4 : #include "fd_bundle_auth.h"
5 : #include "fd_bundle_tile_private.h"
6 : #include "fd_bundle_tile.h"
7 : #include "proto/block_engine.pb.h"
8 : #include "proto/bundle.pb.h"
9 : #include "proto/packet.pb.h"
10 : #include "../fd_txn_m.h"
11 : #include "../../waltz/h2/fd_h2_conn.h"
12 : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
13 : #include "../../waltz/openssl/fd_openssl.h"
14 : #include "../../ballet/base58/fd_base58.h"
15 : #include "../../ballet/nanopb/pb_decode.h"
16 : #include "../../util/net/fd_ip4.h"
17 :
18 : #include <fcntl.h>
19 : #include <errno.h>
20 : #include <unistd.h> /* close */
21 : #include <poll.h> /* poll */
22 : #include <sys/socket.h> /* socket */
23 : #include <netinet/in.h>
24 : #include <netinet/ip.h>
25 : #include <netinet/tcp.h>
26 :
27 0 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
28 :
29 :
30 : __attribute__((weak)) long
31 0 : fd_bundle_now( void ) {
32 0 : return fd_log_wallclock();
33 0 : }
34 :
35 : void
36 0 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
37 0 : if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
38 0 : if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
39 0 : FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
40 0 : }
41 0 : ctx->tcp_sock = -1;
42 0 : ctx->tcp_sock_connected = 0;
43 0 : }
44 0 : ctx->defer_reset = 0;
45 :
46 0 : ctx->builder_info_avail = 0;
47 0 : ctx->builder_info_wait = 0;
48 0 : ctx->packet_subscription_live = 0;
49 0 : ctx->packet_subscription_wait = 0;
50 0 : ctx->bundle_subscription_live = 0;
51 0 : ctx->bundle_subscription_wait = 0;
52 :
53 0 : fd_memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
54 :
55 0 : # if FD_HAS_OPENSSL
56 0 : if( FD_UNLIKELY( ctx->ssl ) ) {
57 0 : SSL_free( ctx->ssl );
58 0 : ctx->ssl = NULL;
59 0 : }
60 0 : # endif
61 :
62 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
63 :
64 0 : fd_bundle_auther_reset( &ctx->auther );
65 0 : fd_grpc_client_reset( ctx->grpc_client );
66 0 : }
67 :
68 : static int
69 : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
70 0 : uint ip4_addr ) {
71 0 : struct sockaddr_in addr = {
72 0 : .sin_family = AF_INET,
73 0 : .sin_addr.s_addr = ip4_addr,
74 0 : .sin_port = fd_ushort_bswap( ctx->server_tcp_port )
75 0 : };
76 0 : int err = connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
77 : /* FD_LIKELY is used here as EINPROGRESS is expected even to local tcp ports */
78 0 : if( FD_LIKELY( err==-1 ) ) {
79 0 : return errno;
80 0 : }
81 0 : return 0;
82 0 : }
83 :
84 : static int
85 0 : fd_bundle_client_get_connect_result( fd_bundle_tile_t const * ctx ) {
86 0 : int so_err = 0;
87 0 : socklen_t so_err_sz = sizeof(so_err);
88 0 : if( FD_UNLIKELY( getsockopt( ctx->tcp_sock, SOL_SOCKET, SO_ERROR, &so_err, &so_err_sz )==-1 ) ) {
89 0 : return errno;
90 0 : }
91 0 : return so_err;
92 0 : }
93 :
94 : static void
95 0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
96 0 : fd_bundle_client_reset( ctx );
97 :
98 : /* FIXME IPv6 support */
99 0 : fd_addrinfo_t hints = {0};
100 0 : hints.ai_family = AF_INET;
101 0 : fd_addrinfo_t * res = NULL;
102 0 : uchar scratch[ 4096 ];
103 0 : void * pscratch = scratch;
104 0 : int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
105 0 : if( FD_UNLIKELY( err ) ) {
106 0 : FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
107 0 : fd_bundle_client_reset( ctx );
108 0 : ctx->metrics.transport_fail_cnt++;
109 0 : return;
110 0 : }
111 0 : uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
112 0 : ctx->server_ip4_addr = ip4_addr;
113 :
114 0 : int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
115 0 : if( FD_UNLIKELY( tcp_sock<0 ) ) {
116 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
117 0 : }
118 0 : ctx->tcp_sock = tcp_sock;
119 :
120 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
121 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
122 0 : }
123 :
124 0 : int tcp_nodelay = 1;
125 0 : if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
126 0 : FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
127 0 : }
128 :
129 0 : if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
130 0 : FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
131 0 : }
132 :
133 0 : char const * scheme = "http";
134 0 : # if FD_HAS_OPENSSL
135 0 : if( ctx->is_ssl ) scheme = "https";
136 0 : # endif
137 :
138 0 : FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
139 0 : scheme,
140 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
141 0 : (int)ctx->server_sni_len, ctx->server_sni ));
142 :
143 0 : int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
144 : /* FD_LIKELY as EINPROGRESS is expected */
145 0 : if( FD_LIKELY( connect_err ) ) {
146 0 : if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
147 0 : FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
148 0 : FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
149 0 : connect_err, fd_io_strerror( connect_err ) ));
150 0 : fd_bundle_client_reset( ctx );
151 0 : ctx->metrics.transport_fail_cnt++;
152 0 : return;
153 0 : }
154 0 : }
155 :
156 0 : # if FD_HAS_OPENSSL
157 0 : if( ctx->is_ssl ) {
158 0 : BIO * bio = fd_openssl_bio_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
159 0 : if( FD_UNLIKELY( !bio ) ) {
160 0 : FD_LOG_ERR(( "fd_openssl_bio_new_socket failed" ));
161 0 : }
162 :
163 0 : SSL * ssl = SSL_new( ctx->ssl_ctx );
164 0 : if( FD_UNLIKELY( !ssl ) ) {
165 0 : FD_LOG_ERR(( "SSL_new failed" ));
166 0 : }
167 :
168 0 : SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
169 0 : SSL_set_connect_state( ssl );
170 :
171 : /* Indicate to endpoint which server name we want */
172 0 : if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
173 0 : FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
174 0 : }
175 :
176 : /* Enable hostname verification */
177 0 : if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
178 0 : FD_LOG_ERR(( "SSL_set1_host failed" ));
179 0 : }
180 :
181 0 : ctx->ssl = ssl;
182 0 : }
183 0 : # endif /* FD_HAS_OPENSSL */
184 :
185 0 : fd_grpc_client_reset( ctx->grpc_client );
186 0 : fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
187 0 : }
188 :
189 : static int
190 : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
191 0 : int * charge_busy ) {
192 0 : # if FD_HAS_OPENSSL
193 0 : if( ctx->is_ssl ) {
194 0 : return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
195 0 : }
196 0 : # endif /* FD_HAS_OPENSSL */
197 :
198 0 : return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
199 0 : }
200 :
201 : static void
202 0 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
203 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
204 :
205 0 : block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
206 0 : static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
207 0 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
208 0 : ctx->grpc_client,
209 0 : path, sizeof(path)-1,
210 0 : FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
211 0 : &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
212 0 : ctx->auther.access_token, ctx->auther.access_token_sz,
213 0 : 0 /* is_streaming */
214 0 : );
215 0 : if( FD_UNLIKELY( !request ) ) return;
216 0 : fd_grpc_client_deadline_set(
217 0 : request,
218 0 : FD_GRPC_DEADLINE_RX_END,
219 0 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
220 :
221 0 : ctx->builder_info_wait = 1;
222 0 : }
223 :
224 : static void
225 0 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
226 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
227 :
228 0 : block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
229 0 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
230 0 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
231 0 : ctx->grpc_client,
232 0 : path, sizeof(path)-1,
233 0 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
234 0 : &block_engine_SubscribePacketsRequest_msg, &req,
235 0 : ctx->auther.access_token, ctx->auther.access_token_sz,
236 0 : 0 /* is_streaming */
237 0 : );
238 0 : if( FD_UNLIKELY( !request ) ) return;
239 0 : fd_grpc_client_deadline_set(
240 0 : request,
241 0 : FD_GRPC_DEADLINE_HEADER,
242 0 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
243 :
244 0 : ctx->packet_subscription_wait = 1;
245 0 : }
246 :
247 : static void
248 0 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
249 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
250 :
251 0 : block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
252 0 : static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
253 0 : fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
254 0 : ctx->grpc_client,
255 0 : path, sizeof(path)-1,
256 0 : FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
257 0 : &block_engine_SubscribeBundlesRequest_msg, &req,
258 0 : ctx->auther.access_token, ctx->auther.access_token_sz,
259 0 : 0 /* is_streaming */
260 0 : );
261 0 : if( FD_UNLIKELY( !request ) ) return;
262 0 : fd_grpc_client_deadline_set(
263 0 : request,
264 0 : FD_GRPC_DEADLINE_HEADER,
265 0 : fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
266 :
267 0 : ctx->bundle_subscription_wait = 1;
268 0 : }
269 :
270 : void
271 0 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
272 0 : if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
273 0 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
274 0 : if( FD_UNLIKELY( !conn ) ) return; /* no conn */
275 0 : if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
276 0 : fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
277 :
278 0 : if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
279 0 : long now = fd_bundle_now();
280 0 : fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
281 0 : FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
282 0 : }
283 0 : }
284 :
285 : int
286 : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
287 0 : long now ) {
288 : /* Drive auth */
289 0 : if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
290 0 : fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
291 0 : return 1;
292 0 : }
293 0 : if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
294 :
295 : /* Request block builder info */
296 0 : int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
297 0 : if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
298 0 : ( builder_info_expired ) ) &
299 0 : ( !ctx->builder_info_wait ) ) ) {
300 0 : fd_bundle_client_request_builder_info( ctx );
301 0 : return 1;
302 0 : }
303 :
304 : /* Subscribe to packets */
305 0 : if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
306 0 : fd_bundle_client_subscribe_packets( ctx );
307 0 : return 1;
308 0 : }
309 :
310 : /* Subscribe to bundles */
311 0 : if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
312 0 : fd_bundle_client_subscribe_bundles( ctx );
313 0 : return 1;
314 0 : }
315 :
316 : /* Send a PING */
317 0 : if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
318 0 : fd_bundle_client_send_ping( ctx );
319 0 : return 1;
320 0 : }
321 :
322 0 : return 0;
323 0 : }
324 :
325 : static void
326 : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
327 0 : int * charge_busy ) {
328 :
329 : /* Wait for TCP socket to connect */
330 0 : if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
331 0 : if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
332 :
333 0 : struct pollfd pfds[1] = {
334 0 : { .fd = ctx->tcp_sock, .events = POLLOUT }
335 0 : };
336 0 : int poll_res = fd_syscall_poll( pfds, 1, 0 );
337 0 : if( FD_UNLIKELY( poll_res<0 ) ) {
338 0 : FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
339 0 : }
340 0 : if( poll_res==0 ) return;
341 :
342 0 : int connect_result = 0;
343 0 : if( pfds[0].revents & (POLLERR|POLLHUP) ) {
344 0 : connect_result = fd_bundle_client_get_connect_result( ctx );
345 0 : connect_failed:
346 0 : FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_result, fd_io_strerror( connect_result ) ));
347 0 : fd_bundle_client_reset( ctx );
348 0 : ctx->metrics.transport_fail_cnt++;
349 0 : *charge_busy = 1;
350 0 : return;
351 0 : }
352 0 : if( pfds[0].revents & POLLOUT ) {
353 0 : connect_result = fd_bundle_client_get_connect_result( ctx );
354 0 : if( FD_UNLIKELY( connect_result!=0 ) ) {
355 0 : goto connect_failed;
356 0 : }
357 0 : FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
358 0 : ctx->tcp_sock_connected = 1;
359 0 : *charge_busy = 1;
360 0 : return;
361 0 : }
362 0 : return;
363 0 : }
364 :
365 : /* gRPC conn died? */
366 0 : if( FD_UNLIKELY( !ctx->grpc_client ) ) {
367 0 : long sleep_start;
368 0 : reconnect:
369 0 : sleep_start = fd_bundle_now();
370 0 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
371 0 : long wait_dur = ctx->backoff_until - sleep_start;
372 0 : fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
373 0 : return;
374 0 : }
375 0 : fd_bundle_client_create_conn( ctx );
376 0 : *charge_busy = 1;
377 0 : return;
378 0 : }
379 :
380 : /* Did a HTTP/2 PING time out */
381 0 : long check_ts = ctx->cached_ts = fd_bundle_now();
382 0 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
383 0 : FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
384 0 : (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
385 0 : ctx->keepalive->inflight = 0;
386 0 : ctx->defer_reset = 1;
387 0 : *charge_busy = 1;
388 0 : return;
389 0 : }
390 :
391 : /* Drive I/O, SSL handshake, and any inflight requests */
392 0 : if( FD_UNLIKELY( -1==fd_bundle_client_drive_io( ctx, charge_busy ) || ctx->defer_reset /* new error? */ ) ) {
393 0 : fd_bundle_client_reset( ctx );
394 0 : ctx->metrics.transport_fail_cnt++;
395 0 : *charge_busy = 1;
396 0 : return;
397 0 : }
398 :
399 : /* Are we ready to issue a new request? */
400 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
401 0 : long io_ts = fd_bundle_now();
402 0 : if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
403 :
404 0 : *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
405 0 : }
406 :
407 : static void
408 0 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
409 0 : int status = fd_bundle_client_status( ctx );
410 :
411 0 : int const connected_now = ( status==FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED );
412 0 : int const connected_before = ( ctx->bundle_status_logged==FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED );
413 :
414 0 : if( FD_UNLIKELY( connected_now!=connected_before ) ) {
415 0 : long ts = fd_log_wallclock();
416 0 : if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
417 0 : if( connected_now ) {
418 0 : FD_LOG_NOTICE(( "Connected to bundle server" ));
419 0 : } else {
420 0 : FD_LOG_WARNING(( "Disconnected from bundle server" ));
421 0 : }
422 0 : ctx->last_bundle_status_log_nanos = ts;
423 0 : ctx->bundle_status_logged = (uchar)status;
424 0 : }
425 0 : }
426 0 : }
427 :
428 : void
429 : fd_bundle_client_step( fd_bundle_tile_t * ctx,
430 0 : int * charge_busy ) {
431 : /* Edge-trigger logging with rate limiting */
432 0 : fd_bundle_client_step1( ctx, charge_busy );
433 0 : fd_bundle_client_log_status( ctx );
434 0 : }
435 :
436 : void
437 : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
438 0 : long now ) {
439 0 : uint iter = ctx->backoff_iter;
440 0 : if( now < ctx->backoff_reset ) iter = 0U;
441 0 : iter++;
442 :
443 : /* FIXME proper backoff */
444 0 : long wait_ns = (long)2e9;
445 0 : wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
446 :
447 0 : ctx->backoff_until = now + wait_ns;
448 0 : ctx->backoff_reset = now + 2*wait_ns;
449 :
450 0 : ctx->backoff_iter = iter;
451 0 : }
452 :
453 : static void
454 0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
455 0 : (void)app_ctx;
456 0 : FD_LOG_INFO(( "Bundle gRPC connection established" ));
457 0 : }
458 :
459 : static void
460 : fd_bundle_client_grpc_conn_dead( void * app_ctx,
461 : uint h2_err,
462 0 : int closed_by ) {
463 0 : fd_bundle_tile_t * ctx = app_ctx;
464 0 : FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
465 0 : closed_by ? "by peer" : "due to error",
466 0 : h2_err, fd_h2_strerror( h2_err ) ));
467 0 : ctx->defer_reset = 1;
468 0 : }
469 :
470 : /* Buffers a bundle transaction for deferred publishing by after_credit. */
471 :
472 : static void
473 : fd_bundle_tile_publish_bundle_txn(
474 : fd_bundle_tile_t * ctx,
475 : void const * txn,
476 : ulong txn_sz, /* <=FD_TXN_MTU */
477 : ulong bundle_txn_cnt,
478 : uint source_ipv4
479 0 : ) {
480 0 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
481 0 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
482 0 : return;
483 0 : }
484 :
485 0 : if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
486 0 : ctx->metrics.backpressure_drop_cnt++;
487 0 : return;
488 0 : }
489 :
490 0 : fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
491 0 : fd_memcpy( entry->payload, txn, txn_sz );
492 0 : entry->payload_sz = (ushort)txn_sz;
493 0 : entry->source_ipv4 = source_ipv4;
494 0 : entry->sig = 1UL;
495 0 : entry->bundle_seq = ctx->bundle_seq;
496 0 : entry->bundle_txn_cnt = bundle_txn_cnt;
497 0 : entry->commission = (uchar)ctx->builder_commission;
498 0 : fd_memcpy( entry->commission_pubkey, ctx->builder_pubkey, 32UL );
499 0 : ctx->metrics.txn_received_cnt++;
500 0 : }
501 :
502 : /* Buffers a regular transaction for deferred publishing by after_credit. */
503 :
504 : static void
505 : fd_bundle_tile_publish_txn(
506 : fd_bundle_tile_t * ctx,
507 : void const * txn,
508 : ulong txn_sz, /* <=FD_TXN_MTU */
509 : uint source_ipv4
510 0 : ) {
511 0 : if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
512 0 : ctx->metrics.backpressure_drop_cnt++;
513 0 : return;
514 0 : }
515 :
516 0 : fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
517 0 : fd_memcpy( entry->payload, txn, txn_sz );
518 0 : entry->payload_sz = (ushort)txn_sz;
519 0 : entry->source_ipv4 = source_ipv4;
520 0 : entry->sig = 0UL;
521 0 : entry->bundle_seq = 0UL;
522 0 : entry->bundle_txn_cnt = 1UL;
523 0 : entry->commission = 0U;
524 0 : fd_memset( entry->commission_pubkey, 0, 32UL );
525 0 : ctx->metrics.txn_received_cnt++;
526 0 : }
527 :
528 : /* Called for each transaction in a bundle. Simply counts up
529 : bundle_txn_cnt, but does not publish anything. */
530 :
531 : static bool
532 : fd_bundle_client_visit_pb_bundle_txn_preflight(
533 : pb_istream_t * istream,
534 : pb_field_t const * field,
535 : void ** arg
536 0 : ) {
537 0 : (void)istream; (void)field;
538 0 : fd_bundle_tile_t * ctx = *arg;
539 0 : ctx->bundle_txn_cnt++;
540 0 : return true;
541 0 : }
542 :
543 : /* Called for each transaction in a bundle. Publishes each transaction
544 : to the tango message bus. */
545 :
546 : static bool
547 : fd_bundle_client_visit_pb_bundle_txn(
548 : pb_istream_t * istream,
549 : pb_field_t const * field,
550 : void ** arg
551 0 : ) {
552 0 : (void)field;
553 0 : fd_bundle_tile_t * ctx = *arg;
554 :
555 0 : packet_Packet packet = packet_Packet_init_default;
556 0 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
557 0 : ctx->metrics.decode_fail_cnt++;
558 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
559 0 : return false;
560 0 : }
561 :
562 0 : if( FD_UNLIKELY( packet.data.size == 0 ) ) {
563 0 : FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
564 0 : return true;
565 0 : }
566 :
567 0 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
568 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
569 0 : return true;
570 0 : }
571 :
572 0 : uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
573 0 : fd_bundle_tile_publish_bundle_txn(
574 0 : ctx,
575 0 : packet.data.bytes, packet.data.size,
576 0 : ctx->bundle_txn_cnt,
577 0 : ip4
578 0 : );
579 :
580 0 : return true;
581 0 : }
582 :
583 : static void
584 : fd_bundle_client_sample_rx_delay(
585 : fd_bundle_tile_t * ctx,
586 : google_protobuf_Timestamp const * ts
587 0 : ) {
588 0 : ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
589 0 : fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
590 0 : }
591 :
592 : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
593 :
594 : static bool
595 : fd_bundle_client_visit_pb_bundle_uuid(
596 : pb_istream_t * istream,
597 : pb_field_t const * field,
598 : void ** arg
599 0 : ) {
600 0 : (void)field;
601 0 : fd_bundle_tile_t * ctx = *arg;
602 :
603 : /* Reset bundle state */
604 :
605 0 : ctx->bundle_txn_cnt = 0UL;
606 :
607 : /* Do two decode passes. This is required because we need to know the
608 : number of transactions in a bundle ahead of time. However, due to
609 : the Protobuf wire encoding, we don't know the number of txns that
610 : will come until we've parsed everything.
611 :
612 : First pass: Count number of bundles. */
613 :
614 0 : pb_istream_t peek = *istream;
615 0 : bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
616 0 : bundle.bundle.packets = (pb_callback_t) {
617 0 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
618 0 : .arg = ctx
619 0 : };
620 0 : if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
621 0 : ctx->metrics.decode_fail_cnt++;
622 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
623 0 : return false;
624 0 : }
625 :
626 : /* At this opint, ctx->bundle_txn_cnt is correctly set. Too many txns
627 : is treated as a NOP.
628 :
629 : Second pass: Actually publish bundle packets */
630 :
631 0 : if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
632 :
633 0 : if( FD_UNLIKELY( pending_txn_avail( ctx->pending_txns )<ctx->bundle_txn_cnt ) ) {
634 0 : ctx->metrics.backpressure_drop_cnt += ctx->bundle_txn_cnt;
635 0 : return true;
636 0 : }
637 :
638 0 : ctx->bundle_seq++;
639 0 : bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
640 0 : bundle.bundle.packets = (pb_callback_t) {
641 0 : .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
642 0 : .arg = ctx
643 0 : };
644 :
645 0 : ctx->metrics.bundle_received_cnt++;
646 :
647 0 : if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
648 0 : ctx->metrics.decode_fail_cnt++;
649 0 : FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
650 0 : return false;
651 0 : }
652 :
653 0 : fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
654 :
655 0 : return true;
656 0 : }
657 :
658 : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
659 :
660 : static void
661 : fd_bundle_client_handle_bundle_batch(
662 : fd_bundle_tile_t * ctx,
663 : pb_istream_t * istream
664 0 : ) {
665 0 : if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
666 0 : ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
667 0 : return;
668 0 : }
669 :
670 0 : block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
671 0 : res.bundles = (pb_callback_t) {
672 0 : .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
673 0 : .arg = ctx
674 0 : };
675 0 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
676 0 : ctx->metrics.decode_fail_cnt++;
677 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
678 0 : return;
679 0 : }
680 0 : }
681 :
682 : /* Called for each 'Packet' (a regular transaction) of a
683 : SubscribePacketsResponse. */
684 :
685 : static bool
686 : fd_bundle_client_visit_pb_packet(
687 : pb_istream_t * istream,
688 : pb_field_t const * field,
689 : void ** arg
690 0 : ) {
691 0 : (void)field;
692 0 : fd_bundle_tile_t * ctx = *arg;
693 :
694 0 : packet_Packet packet = packet_Packet_init_default;
695 0 : if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
696 0 : ctx->metrics.decode_fail_cnt++;
697 0 : FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
698 0 : return false;
699 0 : }
700 :
701 0 : if( FD_UNLIKELY( packet.data.size == 0 ) ) {
702 0 : FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
703 0 : return true;
704 0 : }
705 :
706 0 : if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
707 0 : FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
708 0 : return true;
709 0 : }
710 :
711 :
712 0 : uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
713 0 : fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
714 0 : ctx->metrics.packet_received_cnt++;
715 :
716 0 : return true;
717 0 : }
718 :
719 : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
720 :
721 : static void
722 : fd_bundle_client_handle_packet_batch(
723 : fd_bundle_tile_t * ctx,
724 : pb_istream_t * istream
725 0 : ) {
726 0 : block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
727 0 : res.batch.packets = (pb_callback_t) {
728 0 : .funcs.decode = fd_bundle_client_visit_pb_packet,
729 0 : .arg = ctx
730 0 : };
731 0 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
732 0 : ctx->metrics.decode_fail_cnt++;
733 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
734 0 : return;
735 0 : }
736 :
737 0 : fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
738 0 : }
739 :
740 : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
741 : gRPC call. */
742 :
743 : static void
744 : fd_bundle_client_handle_builder_fee_info(
745 : fd_bundle_tile_t * ctx,
746 : pb_istream_t * istream
747 0 : ) {
748 0 : block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
749 0 : if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
750 0 : ctx->metrics.decode_fail_cnt++;
751 0 : FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
752 0 : return;
753 0 : }
754 0 : if( FD_UNLIKELY( res.commission > 100 ) ) {
755 0 : ctx->metrics.decode_fail_cnt++;
756 0 : FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
757 0 : return;
758 0 : }
759 :
760 0 : uchar decoded_builder_pubkey[ 32 ];
761 0 : if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, decoded_builder_pubkey ) ) ) {
762 0 : FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
763 0 : return;
764 0 : }
765 :
766 0 : ctx->builder_commission = (uchar)res.commission; /* Apply update atomically */
767 0 : fd_memcpy( ctx->builder_pubkey, decoded_builder_pubkey, sizeof(ctx->builder_pubkey) );
768 :
769 0 : long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
770 0 : ctx->builder_info_avail = 1;
771 0 : ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
772 0 : }
773 :
774 : static void
775 : fd_bundle_client_grpc_tx_complete(
776 : void * app_ctx,
777 : ulong request_ctx
778 0 : ) {
779 0 : (void)app_ctx; (void)request_ctx;
780 0 : }
781 :
782 : void
783 : fd_bundle_client_grpc_rx_start(
784 : void * app_ctx,
785 : ulong request_ctx
786 0 : ) {
787 0 : fd_bundle_tile_t * ctx = app_ctx;
788 0 : switch( request_ctx ) {
789 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
790 0 : ctx->packet_subscription_live = 1;
791 0 : ctx->packet_subscription_wait = 0;
792 0 : break;
793 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
794 0 : ctx->bundle_subscription_live = 1;
795 0 : ctx->bundle_subscription_wait = 0;
796 0 : break;
797 0 : }
798 0 : }
799 :
800 : void
801 : fd_bundle_client_grpc_rx_msg(
802 : void * app_ctx,
803 : void const * protobuf,
804 : ulong protobuf_sz,
805 : ulong request_ctx
806 0 : ) {
807 0 : fd_bundle_tile_t * ctx = app_ctx;
808 0 : ctx->metrics.proto_received_bytes += protobuf_sz;
809 0 : pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
810 0 : switch( request_ctx ) {
811 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
812 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
813 0 : ctx->metrics.decode_fail_cnt++;
814 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
815 0 : }
816 0 : break;
817 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
818 0 : if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
819 0 : ctx->metrics.decode_fail_cnt++;
820 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
821 0 : }
822 0 : break;
823 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
824 0 : fd_bundle_client_handle_bundle_batch( ctx, &istream );
825 0 : break;
826 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
827 0 : fd_bundle_client_handle_packet_batch( ctx, &istream );
828 0 : break;
829 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
830 0 : fd_bundle_client_handle_builder_fee_info( ctx, &istream );
831 0 : break;
832 0 : default:
833 0 : FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
834 0 : }
835 0 : }
836 :
837 : static void
838 : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
839 0 : ulong request_ctx ) {
840 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
841 0 : switch( request_ctx ) {
842 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
843 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
844 0 : fd_bundle_auther_handle_request_fail( &ctx->auther );
845 0 : break;
846 0 : }
847 0 : }
848 :
849 : void
850 : fd_bundle_client_grpc_rx_end(
851 : void * app_ctx,
852 : ulong request_ctx,
853 : fd_grpc_resp_hdrs_t * resp
854 0 : ) {
855 0 : fd_bundle_tile_t * ctx = app_ctx;
856 0 : if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
857 0 : FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
858 0 : fd_bundle_client_request_failed( ctx, request_ctx );
859 0 : return;
860 0 : }
861 :
862 0 : resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
863 0 : if( !resp->grpc_msg_len ) {
864 0 : fd_memcpy( resp->grpc_msg, "unknown error", 13 );
865 0 : resp->grpc_msg_len = 13;
866 0 : }
867 :
868 0 : switch( request_ctx ) {
869 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
870 0 : ctx->packet_subscription_live = 0;
871 0 : ctx->packet_subscription_wait = 0;
872 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
873 0 : ctx->defer_reset = 1;
874 0 : FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
875 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
876 0 : return;
877 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
878 0 : ctx->bundle_subscription_live = 0;
879 0 : ctx->bundle_subscription_wait = 0;
880 0 : fd_bundle_tile_backoff( ctx, fd_bundle_now() );
881 0 : ctx->defer_reset = 1;
882 0 : FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
883 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
884 0 : return;
885 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
886 0 : ctx->builder_info_wait = 0;
887 0 : break;
888 0 : default:
889 0 : break;
890 0 : }
891 :
892 0 : if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
893 0 : FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
894 0 : resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
895 0 : (int)resp->grpc_msg_len, resp->grpc_msg ));
896 0 : fd_bundle_client_request_failed( ctx, request_ctx );
897 0 : if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
898 0 : resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
899 0 : fd_bundle_auther_reset( &ctx->auther );
900 0 : }
901 0 : return;
902 0 : }
903 0 : }
904 :
905 : void
906 : fd_bundle_client_grpc_rx_timeout(
907 : void * app_ctx,
908 : ulong request_ctx, /* FD_BUNDLE_CLIENT_REQ_{...} */
909 : int deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
910 0 : ) {
911 0 : (void)deadline_kind;
912 0 : FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
913 0 : fd_bundle_tile_t * ctx = app_ctx;
914 0 : ctx->defer_reset = 1;
915 0 : }
916 :
917 : static void
918 0 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
919 0 : fd_bundle_tile_t * ctx = app_ctx;
920 0 : long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
921 0 : if( FD_LIKELY( rtt_sample ) ) {
922 0 : fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
923 0 : FD_LOG_DEBUG(( "Keepalive ACK" ));
924 0 : }
925 0 : ctx->metrics.ping_ack_cnt++;
926 0 : }
927 :
928 : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
929 : .conn_established = fd_bundle_client_grpc_conn_established,
930 : .conn_dead = fd_bundle_client_grpc_conn_dead,
931 : .tx_complete = fd_bundle_client_grpc_tx_complete,
932 : .rx_start = fd_bundle_client_grpc_rx_start,
933 : .rx_msg = fd_bundle_client_grpc_rx_msg,
934 : .rx_end = fd_bundle_client_grpc_rx_end,
935 : .rx_timeout = fd_bundle_client_grpc_rx_timeout,
936 : .ping_ack = fd_bundle_client_grpc_ping_ack,
937 : };
938 :
939 : int
940 0 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
941 0 : if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
942 0 : ( !ctx->grpc_client ) ) ) {
943 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED;
944 0 : }
945 :
946 0 : fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
947 0 : if( FD_UNLIKELY( !conn ) ) {
948 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED; /* no conn */
949 0 : }
950 0 : if( FD_UNLIKELY( conn->flags &
951 0 : ( FD_H2_CONN_FLAGS_DEAD |
952 0 : FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
953 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED;
954 0 : }
955 :
956 0 : if( FD_UNLIKELY( conn->flags &
957 0 : ( FD_H2_CONN_FLAGS_CLIENT_INITIAL |
958 0 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
959 0 : FD_H2_CONN_FLAGS_WAIT_SETTINGS_0 |
960 0 : FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
961 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* connection is not ready */
962 0 : }
963 :
964 0 : if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
965 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* not authenticated */
966 0 : }
967 :
968 0 : if( FD_UNLIKELY( ( !ctx->builder_info_avail ) |
969 0 : ( !ctx->packet_subscription_live ) |
970 0 : ( !ctx->bundle_subscription_live ) ) ) {
971 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* not fully connected */
972 0 : }
973 :
974 0 : if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
975 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED; /* possible timeout */
976 0 : }
977 :
978 0 : if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
979 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING;
980 0 : }
981 :
982 : /* As far as we know, the bundle connection is alive and well. */
983 0 : return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED;
984 0 : }
985 :
986 : #undef DISCONNECTED
987 : #undef CONNECTING
988 : #undef CONNECTED
989 :
990 : FD_FN_CONST char const *
991 0 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
992 0 : switch( request_ctx ) {
993 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
994 0 : return "GenerateAuthChallenge";
995 0 : case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
996 0 : return "GenerateAuthTokens";
997 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
998 0 : return "SubscribePackets";
999 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
1000 0 : return "SubscribeBundles";
1001 0 : case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
1002 0 : return "GetBlockBuilderFeeInfo";
1003 0 : default:
1004 0 : return "unknown";
1005 0 : }
1006 0 : }
|