Coverage Report

Created: 2026-05-30 07:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/gpac/src/utils/rmt_ws.c
Line
Count
Source
1
#include <gpac/tools.h>
2
#include <gpac/thread.h>
3
#include <gpac/list.h>
4
#include <gpac/bitstream.h>
5
#include <gpac/network.h>
6
#include <gpac/download.h>
7
#include <gpac/base_coding.h>
8
9
#if !defined(GPAC_DISABLE_RMTWS) && !defined(GPAC_DISABLE_NETWORK)
10
11
12
static const char websocket_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
13
14
15
struct RMT_WS {
16
17
    // The main server thread
18
    GF_Thread* thread;
19
20
    // Should we stop the thread?
21
    Bool should_stop;
22
23
    RMT_Settings settings;
24
25
};
26
27
28
29
GF_DownloadSession *gf_dm_sess_new_server(GF_DownloadManager *dm, GF_Socket *server, void *ssl_ctx, gf_dm_user_io user_io, void *usr_cbk, Bool async, GF_Err *e);
30
void  gf_dm_sess_set_header(GF_DownloadSession *sess, const char *name, const char *value);
31
GF_Err gf_dm_sess_send_reply(GF_DownloadSession *sess, u32 reply_code, const char *response_body, u32 body_len, Bool no_body);
32
void gf_dm_sess_clear_headers(GF_DownloadSession *sess);
33
void  gf_dm_sess_set_header(GF_DownloadSession *sess, const char *name, const char *value);
34
GF_Err dm_sess_write(GF_DownloadSession *session, const u8 *buffer, u32 size);
35
GF_Err gf_dm_read_data(GF_DownloadSession *sess, char *data, u32 data_size, u32 *out_read);
36
void httpout_format_date(u64 time, char szDate[200], Bool for_listing);
37
38
#ifdef GPAC_HAS_SSL
39
40
void *gf_ssl_new(void *ssl_server_ctx, GF_Socket *client_sock, GF_Err *e);
41
void *gf_ssl_server_context_new(const char *cert, const char *key);
42
void gf_ssl_server_context_del(void *ssl_server_ctx);
43
Bool gf_ssl_init_lib();
44
45
#endif
46
47
enum {
48
    RMT_WEBSOCKET_CONTINUATION = 0,
49
    RMT_WEBSOCKET_TEXT = 1,
50
    RMT_WEBSOCKET_BINARY = 2,
51
    RMT_WEBSOCKET_CLOSE = 8,
52
    RMT_WEBSOCKET_PING = 9,
53
    RMT_WEBSOCKET_PONG = 10,
54
};
55
56
static char RMT_WEBSOCKET_PING_MSG[2] = { 0x89, 0x00 };
57
58
struct __rmt_serverctx {
59
60
    RMT_WS* rmt;
61
62
    GF_Socket* server_sock;
63
64
    GF_SockGroup* sg;
65
66
    void *ssl_ctx;
67
68
    GF_DownloadManager* dm_sess;
69
70
    GF_List* active_clients;
71
72
};
73
74
struct __rmt_clientctx {
75
76
    GF_Socket* client_sock;
77
    RMT_ServerCtx* ctx;
78
    GF_DownloadSession* http_sess;
79
80
    char peer_address[GF_MAX_IP_NAME_LEN + 16];
81
    char buffer[1024];
82
83
    Bool is_ws;
84
    u64 last_active_time;
85
    u64 last_ping_time;
86
    Bool should_close;
87
88
    rmt_client_on_data_cbk on_data_cbk;
89
    void* on_data_cbk_task;
90
91
    rmt_client_on_del_cbk on_del_cbk;
92
    void* on_del_cbk_task;
93
94
};
95
96
GF_EXPORT
97
0
const char* gf_rmt_get_peer_address(RMT_ClientCtx* client) {
98
0
    if (client)
99
0
        return client->peer_address;
100
0
    return NULL;
101
0
}
102
103
GF_EXPORT
104
0
void* gf_rmt_client_get_on_data_task(RMT_ClientCtx* client) {
105
0
    if (client)
106
0
        return client->on_data_cbk_task;
107
0
    return NULL;
108
0
}
109
110
GF_EXPORT
111
0
void gf_rmt_set_on_new_client_cbk(RMT_WS* rmt, void *task, rmt_on_new_client_cbk cbk) {
112
0
    RMT_Settings *rmtcfg = gf_rmt_get_settings(rmt);
113
0
    if (rmtcfg) {
114
0
        rmtcfg->on_new_client_cbk = cbk;
115
0
        rmtcfg->on_new_client_cbk_task = task;
116
0
    }
117
0
}
118
119
GF_EXPORT
120
0
void* gf_rmt_get_on_new_client_task(RMT_WS* rmt) {
121
0
    if (!rmt) return NULL;
122
0
    return rmt->settings.on_new_client_cbk_task;
123
0
}
124
125
GF_EXPORT
126
0
void gf_rmt_client_set_on_del_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_del_cbk cbk) {
127
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("gf_rmt_client_set_on_del_cbk client %p task %p cbk %p\n", client, task, cbk));
128
0
    if (client) {
129
0
        client->on_del_cbk = cbk;
130
0
        client->on_del_cbk_task = task;
131
0
    }
132
0
}
133
134
GF_EXPORT
135
0
void* gf_rmt_client_get_on_del_task(RMT_ClientCtx* client) {
136
0
    if (client)
137
0
        return client->on_del_cbk_task;
138
0
    return NULL;
139
0
}
140
141
GF_EXPORT
142
0
RMT_WS* gf_rmt_client_get_rmt(RMT_ClientCtx* client) {
143
0
    if (client && client->ctx)
144
0
        return client->ctx->rmt;
145
0
    return NULL;
146
0
}
147
148
149
0
void rmt_clientctx_del(RMT_ClientCtx* client) {
150
151
0
    if (!client) return;
152
153
0
    GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] closing client %s\n", client->peer_address));
154
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_clientctx_del client %p del_cbk %p\n", client, client ? client->on_del_cbk : NULL));
155
156
0
    if (client->on_del_cbk) {
157
0
        client->on_del_cbk(client->on_del_cbk_task);
158
0
    }
159
160
0
    if (client->client_sock) {
161
0
        if (client->ctx && client->ctx->sg)
162
0
            gf_sk_group_unregister(client->ctx->sg, client->client_sock);
163
164
0
        if (!client->http_sess)
165
0
            gf_sk_del(client->client_sock); //socket deleted by gf_dm_sess_del() if http_sess has been created
166
0
        client->client_sock = NULL;
167
0
    }
168
169
0
    if (client->http_sess) {
170
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("deleting http_sess %p\n", client->http_sess));
171
0
        gf_dm_sess_del(client->http_sess);
172
0
        client->http_sess = NULL;
173
0
    }
174
175
0
    client->ctx = NULL;
176
0
    client->on_data_cbk = NULL;
177
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("%s:%d rmt_clientctx_del client %p task %p\n", __FILE__, __LINE__, client, client->on_data_cbk_task));
178
179
0
    if (client->on_data_cbk_task) gf_free(client->on_data_cbk_task);
180
0
    client->on_data_cbk_task = NULL;
181
182
0
    if (client->on_del_cbk_task) gf_free(client->on_del_cbk_task);
183
0
    client->on_del_cbk_task = NULL;
184
185
0
    gf_free(client);
186
0
}
187
188
0
void rmt_serverctx_reset(RMT_ServerCtx* ctx) {
189
0
  if (ctx->active_clients) {
190
0
    while (gf_list_count(ctx->active_clients)) {
191
0
      RMT_ClientCtx* client = gf_list_pop_back(ctx->active_clients);
192
0
            rmt_clientctx_del(client);
193
0
            client = NULL;
194
0
    }
195
0
    gf_list_del(ctx->active_clients);
196
0
    ctx->active_clients = NULL;
197
0
  }
198
199
0
    if (ctx->server_sock) {
200
0
        if (ctx->sg)
201
0
            gf_sk_group_unregister(ctx->sg, ctx->server_sock);
202
203
0
        gf_sk_del(ctx->server_sock);
204
0
        ctx->server_sock = NULL;
205
0
    }
206
207
0
    if (ctx->sg) { gf_sk_group_del(ctx->sg); ctx->sg = NULL;  }
208
209
0
    if (ctx->dm_sess) { gf_dm_del(ctx->dm_sess); ctx->dm_sess = NULL; }
210
211
0
    if (ctx->ssl_ctx) {
212
0
#ifdef GPAC_HAS_SSL
213
0
    gf_ssl_server_context_del(ctx->ssl_ctx);
214
#else
215
        gf_free(ctx->ssl_ctx);
216
#endif
217
0
        ctx->ssl_ctx = NULL;
218
0
    }
219
220
221
0
}
222
223
0
void rmt_close_client(RMT_ClientCtx* client) {
224
0
    if (!client) return;
225
0
    RMT_ServerCtx* ctx = client->ctx;
226
0
    if (ctx && ctx->active_clients) {
227
0
        gf_list_del_item(ctx->active_clients, client);
228
0
    }
229
0
    rmt_clientctx_del(client);
230
0
}
231
232
0
GF_Err rmt_send_reply(GF_DownloadSession* http_sess, int responseCode, char* response_body, char* content_type) {
233
234
0
        u32 body_size = 0;
235
0
        char szFmt[100];
236
0
        char szDate[200];
237
238
0
      httpout_format_date(gf_net_get_utc(), szDate, GF_FALSE);
239
0
      gf_dm_sess_set_header(http_sess, "Date", szDate);
240
0
        gf_dm_sess_set_header(http_sess, "Server", gf_gpac_version());
241
242
0
        if (response_body) {
243
0
            body_size = (u32) strlen(response_body);
244
0
            gf_dm_sess_set_header(http_sess, "Content-Type", content_type ? content_type : "text/html");
245
0
            sprintf(szFmt, "%d", body_size);
246
0
            gf_dm_sess_set_header(http_sess, "Content-Length", szFmt);
247
248
0
        }
249
250
0
        return gf_dm_sess_send_reply(http_sess, responseCode, response_body, response_body ? (u32) strlen(response_body) : 0, (response_body==NULL));
251
252
0
}
253
254
0
static void rmt_on_http_session_data(void *usr_cbk, GF_NETIO_Parameter *parameter) {
255
256
0
    RMT_ClientCtx* client_ctx = (RMT_ClientCtx*) usr_cbk;
257
0
    if (!client_ctx || !client_ctx->ctx) {
258
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] sess io on null session\n"));
259
0
        return;
260
0
    }
261
0
    RMT_Settings* rmt_settings = gf_rmt_get_settings(client_ctx->ctx->rmt);
262
263
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_on_http_session_data on peer %s param %p msg_type %d \nerror %s \ndata %p \nsize %d \nname %p \nvalue %p \nreply %d\n",
264
0
                    client_ctx->peer_address,
265
0
                    parameter,
266
0
                    parameter->msg_type,
267
0
                    gf_error_to_string(parameter->error),
268
0
                    parameter->data,
269
0
                    parameter->size,
270
0
                    parameter->name,
271
0
                    parameter->value,
272
0
                    parameter->reply
273
0
            ));
274
275
0
    client_ctx->last_active_time = gf_sys_clock_high_res();
276
277
0
    const char* durl = gf_dm_sess_get_resource_name(parameter->sess);
278
0
    const char* ua = gf_dm_sess_get_header(parameter->sess, "User-Agent");
279
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("requested url: %s with ua %s\n", durl, ua));
280
0
    GF_Err e;
281
282
0
    if (parameter->msg_type != GF_NETIO_PARSE_REPLY) {
283
0
        if (parameter->size) {
284
0
            strncat(client_ctx->buffer, parameter->data, parameter->size);
285
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("session data is now: %s\n", client_ctx->buffer));
286
0
        }
287
0
    }
288
289
0
    if (parameter->msg_type == GF_NETIO_PARSE_REPLY) {
290
291
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("final buffer before response: %s\n", client_ctx->buffer));
292
293
0
        GF_DownloadSession* http_sess = parameter->sess ;
294
295
0
        if (!client_ctx->is_ws) {
296
0
            const char* ws_header_version = gf_dm_sess_get_header(http_sess, "Sec-WebSocket-Version");
297
0
            const char* ws_header_key = gf_dm_sess_get_header(http_sess, "Sec-WebSocket-Key");
298
299
0
            if (!ws_header_version || strcmp(ws_header_version, "13") || !ws_header_key) {
300
301
0
                gf_dm_sess_clear_headers(http_sess);
302
0
                gf_dm_sess_set_header(http_sess, "Connection", "close");
303
304
0
                rmt_send_reply(http_sess, 404, "only ws connections are accepted", NULL);
305
0
                client_ctx->should_close = GF_TRUE;
306
0
                return;
307
308
0
            }
309
0
            else {
310
311
0
                char* resp_key = gf_strdup(ws_header_key);
312
0
                gf_dynstrcat(&resp_key, websocket_guid, NULL);
313
314
0
                u32 resp_key_len = (u32) strlen(resp_key);
315
0
                if (resp_key_len < 1)
316
0
                    return;
317
318
319
0
                u8 hash[GF_SHA1_DIGEST_SIZE];
320
0
                gf_sha1_csum( resp_key, resp_key_len, hash );
321
322
0
                u32 end_b64 = gf_base64_encode(hash, GF_SHA1_DIGEST_SIZE, resp_key, resp_key_len);
323
0
                resp_key[resp_key_len-1] = 0;
324
0
                if (end_b64 < resp_key_len) {
325
0
                    resp_key[end_b64] = 0;
326
0
                } else {
327
0
                    return;
328
0
                }
329
330
0
              gf_dm_sess_clear_headers(http_sess);
331
332
0
              gf_dm_sess_set_header(http_sess, "Connection", "Upgrade");
333
0
                gf_dm_sess_set_header(http_sess, "Upgrade", "websocket");
334
0
                gf_dm_sess_set_header(http_sess, "Sec-WebSocket-Accept", resp_key);
335
336
0
                e = rmt_send_reply(http_sess, 101, NULL, NULL);
337
338
0
                if (e==GF_OK) {
339
0
                    client_ctx->is_ws = GF_TRUE;
340
341
0
                    if (rmt_settings->on_new_client_cbk && rmt_settings->on_new_client_cbk_task) {
342
0
                        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("%s:%d calling on new client cb %p with client %p\n", __FILE__,__LINE__, rmt_settings->on_new_client_cbk, client_ctx));
343
0
                        rmt_settings->on_new_client_cbk(rmt_settings->on_new_client_cbk_task, client_ctx);
344
0
                    }
345
0
                }
346
0
                gf_free(resp_key);
347
0
            }
348
0
        } else {
349
0
            GF_LOG(GF_LOG_WARNING, GF_LOG_RMTWS, ("this should be a ws message right?\n"));
350
0
        }
351
352
353
354
0
        memset(&client_ctx->buffer, 0, 1024);
355
0
    }
356
357
358
0
}
359
360
361
0
GF_Err rmt_create_server(RMT_ServerCtx* ctx) {
362
363
0
    GF_Err e = GF_OK;
364
365
0
    rmt_serverctx_reset(ctx);
366
367
0
    RMT_Settings* rmt_settings = gf_rmt_get_settings(ctx->rmt);
368
369
0
    if (rmt_settings->cert && rmt_settings->pkey) {
370
0
#ifdef GPAC_HAS_SSL
371
0
    if (!gf_file_exists(rmt_settings->cert)) {
372
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Certificate file %s not found\n", rmt_settings->cert));
373
0
      return GF_IO_ERR;
374
0
    }
375
0
    if (!gf_file_exists(rmt_settings->pkey)) {
376
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Private key file %s not found\n", rmt_settings->pkey));
377
0
      return GF_IO_ERR;
378
0
    }
379
0
    if (gf_ssl_init_lib()) {
380
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Failed to initialize OpenSSL library\n"));
381
0
      return GF_IO_ERR;
382
0
    }
383
384
0
        Bool prev_noh2 = gf_opts_get_bool("core", "no-h2");
385
0
        if (!prev_noh2)
386
0
            gf_opts_set_key("core", "no-h2", "1");
387
388
0
        ctx->ssl_ctx = gf_ssl_server_context_new(rmt_settings->cert, rmt_settings->pkey);
389
390
0
        if (!prev_noh2)
391
0
            gf_opts_set_key("core", "no-h2", "0");
392
393
0
    if (!ctx->ssl_ctx) return GF_IO_ERR;
394
#else
395
    GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] TLS key/certificate set but GPAC compiled without TLS support\n"));
396
    return GF_NOT_SUPPORTED;
397
398
#endif
399
0
    }
400
401
0
    ctx->sg = gf_sk_group_new();
402
403
0
    ctx->server_sock = gf_sk_new(GF_SOCK_TYPE_TCP);
404
0
    e = gf_sk_bind( ctx->server_sock,
405
0
                    rmt_settings->limit_connections_to_localhost ? "127.0.0.1" : "0.0.0.0",
406
0
                    rmt_settings->port,
407
0
                    NULL, 0, GF_SOCK_REUSE_PORT );
408
409
0
    if (!e) e = gf_sk_listen(ctx->server_sock, 0);
410
0
    if (e) {
411
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] failed to start server on port %d: %s\n", rmt_settings->port, gf_error_to_string(e) ));
412
0
        return e;
413
0
    }
414
415
0
    gf_sk_group_register(ctx->sg, ctx->server_sock);
416
417
0
    gf_sk_server_mode(ctx->server_sock, GF_TRUE);
418
0
    GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] Server running on port %d\n", rmt_settings->port));
419
420
0
    Bool prev_noh2 = gf_opts_get_bool("core", "no-h2");
421
0
    if (!prev_noh2)
422
0
        gf_opts_set_key("core", "no-h2", "1");
423
0
    ctx->dm_sess = gf_dm_new(NULL);
424
0
    if (!prev_noh2)
425
0
        gf_opts_set_key("core", "no-h2", "0");
426
0
    ctx->active_clients = gf_list_new();
427
428
0
    return e;
429
430
0
}
431
432
GF_EXPORT
433
0
void gf_rmt_client_set_on_data_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_data_cbk cbk) {
434
0
    if (client) {
435
0
        client->on_data_cbk = cbk;
436
0
        client->on_data_cbk_task = task;
437
0
    }
438
0
}
439
440
0
GF_Err rmt_server_handle_new_client(RMT_ServerCtx* ctx) {
441
0
    GF_Err e = GF_OK;
442
443
0
    void *ssl_c = NULL;
444
445
0
    RMT_ClientCtx* new_client;
446
0
    GF_SAFEALLOC(new_client, RMT_ClientCtx);
447
448
0
    new_client->ctx = ctx;
449
450
0
    e = gf_sk_accept(ctx->server_sock, &new_client->client_sock);
451
0
    gf_sk_group_register(ctx->sg, new_client->client_sock);
452
453
0
    u32 port;
454
0
    gf_sk_get_remote_address_port(new_client->client_sock, new_client->peer_address, &port);
455
0
    sprintf(new_client->peer_address + strlen(new_client->peer_address), ":%d", port); //TDOO: size check
456
0
    GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] connected to remote peer %s\n",  new_client->peer_address));
457
458
459
0
#ifdef GPAC_HAS_SSL
460
0
  if (ctx->ssl_ctx) {
461
0
    ssl_c = gf_ssl_new(ctx->ssl_ctx, new_client->client_sock, &e);
462
0
    if (e) {
463
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Failed to create TLS session from %s: %s\n", new_client->peer_address, gf_error_to_string(e) ));
464
0
      rmt_clientctx_del(new_client);
465
0
      return e;
466
0
    }
467
0
  }
468
0
#endif
469
470
0
    new_client->http_sess = gf_dm_sess_new_server(ctx->dm_sess, new_client->client_sock, ssl_c, rmt_on_http_session_data, new_client, GF_TRUE, &e);
471
472
0
    new_client->is_ws = GF_FALSE;
473
0
    new_client->last_active_time = gf_sys_clock_high_res();
474
0
    new_client->last_ping_time = gf_sys_clock_high_res();
475
476
0
    gf_list_add(ctx->active_clients, new_client);
477
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("adding active client socket %p %s\n",  new_client, new_client->peer_address));
478
479
0
    return e;
480
0
}
481
482
0
GF_Err rmt_client_send_payload(RMT_ClientCtx* client, const u8* payload, u64 size, Bool is_binary) {
483
484
0
    GF_Err e = GF_OK;
485
486
0
    GF_BitStream* respbs = gf_bs_new(NULL, size+10, GF_BITSTREAM_WRITE_DYN);
487
0
    gf_bs_write_int(respbs, 1, 1); //FIN=1
488
0
    gf_bs_write_int(respbs, 0, 3); //RSV=0
489
0
    gf_bs_write_int(respbs, is_binary ? RMT_WEBSOCKET_BINARY : RMT_WEBSOCKET_TEXT, 4); //opcode=text
490
0
    gf_bs_write_int(respbs, 0, 1); //masked=0
491
492
0
    if (size < 126)
493
0
        gf_bs_write_int(respbs, (s32)size, 7);
494
0
    else if (size < 65536) {
495
0
        gf_bs_write_int(respbs, 126, 7);
496
0
        gf_bs_write_long_int(respbs, size, 16);
497
0
    } else {
498
0
        gf_bs_write_long_int(respbs, 127, 7);
499
0
        gf_bs_write_long_int(respbs, size, 64);
500
0
    }
501
502
0
    gf_bs_write_data(respbs, payload, (u32)size);
503
504
0
    u8* respbuf=NULL;
505
0
    u32 respsize=0;
506
0
    gf_bs_get_content(respbs, &respbuf, &respsize);
507
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("ready to send respbuf of size %d: %.*s\n", respsize, respsize, respbuf));
508
0
    e = dm_sess_write(client->http_sess, respbuf, respsize);
509
510
0
    gf_bs_del(respbs);
511
0
    gf_free(respbuf);
512
513
0
    return e;
514
515
0
}
516
517
0
GF_Err rmt_client_handle_ws_payload(RMT_ClientCtx* client, u8* payload, u64 size, Bool is_binary) {
518
519
0
    if (client->on_data_cbk && client->on_data_cbk_task) {
520
0
        client->on_data_cbk(client->on_data_cbk_task, payload, size, is_binary);
521
0
    }
522
523
0
    return GF_OK;
524
525
    // if (size)
526
    //     payload[0] = 'A';
527
528
    // return rmt_client_send_payload(client, payload, size, is_binary);
529
530
0
}
531
532
0
GF_Err rmt_client_handle_ws_frame(RMT_ClientCtx* client, GF_BitStream* bs) {
533
534
0
    GF_Err e = GF_OK;
535
536
0
    if (gf_bs_available(bs) < 2) return GF_IO_ERR;
537
538
0
    u32 FIN = gf_bs_read_int(bs, 1); //TODO: handle fragmented?
539
0
    /*u32 RSV =*/ gf_bs_read_int(bs, 3);
540
0
    u32 opcode = gf_bs_read_int(bs, 4);
541
0
    u32 masked = gf_bs_read_int(bs, 1);
542
0
    u64 payload_size = gf_bs_read_int(bs, 7);
543
0
    if (payload_size == 126) {
544
0
        if (gf_bs_available(bs) < 2) return GF_IO_ERR;
545
0
        payload_size = gf_bs_read_int(bs, 16);
546
0
    }
547
0
    else if (payload_size == 127) {
548
0
        if (gf_bs_available(bs) < 8) return GF_IO_ERR;
549
0
        payload_size = gf_bs_read_long_int(bs, 64);
550
0
    }
551
0
    if (gf_bs_available(bs) < 4) return GF_IO_ERR;
552
0
    char masking_key[4] = {0};
553
0
    if (masked) {
554
0
        gf_bs_read_data(bs, (u8*)&masking_key, 4);
555
0
    }
556
557
0
    u8* extra_payload = NULL;
558
0
    u32 extra_read = 0;
559
560
0
    if (payload_size + gf_bs_get_position(bs) > gf_bs_get_size(bs)) {
561
0
        u64 extra_size = payload_size + gf_bs_get_position(bs) - gf_bs_get_size(bs) ;
562
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("buffer too small for payload_size %llu bs_pos %u bs_size %u => extra_size %llu\n", payload_size, gf_bs_get_position(bs), gf_bs_get_size(bs), extra_size));
563
0
        extra_payload = gf_malloc( sizeof(u8) * extra_size );
564
565
0
        e = GF_OK;
566
0
        while (!e && extra_read < extra_size) {
567
0
            u32 new_read = 0;
568
0
            e = gf_dm_read_data(client->http_sess, extra_payload + extra_read, (u32)(extra_size-extra_read), &new_read);
569
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("extra gf_dm_read_data => %d e=%s\n",extra_read, gf_error_to_string(e)));
570
0
            extra_read += new_read;
571
0
        }
572
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("extra gf_dm_read_data => %d e=%s\n",extra_read, gf_error_to_string(e)));
573
0
    }
574
575
0
    u8* unmasked_payload = gf_malloc( payload_size * sizeof(u8) + 1); // add 1 to add null to get c string
576
0
    int i=0;
577
0
    for (i=0; i<payload_size && gf_bs_available(bs); i++) {
578
0
        unmasked_payload[i] = (u8) ( gf_bs_read_u8(bs) ^ masking_key[i%4] );
579
0
    }
580
581
0
    if (extra_payload) {
582
0
        for (u32 j=0; j<extra_read; j++) {
583
0
            unmasked_payload[i] = (u8) ( extra_payload[j] ^ masking_key[i%4] );
584
0
            i++;
585
0
        }
586
0
    }
587
588
0
    unmasked_payload[i] = 0;
589
590
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("parsed ws message: FIN %d \n opcode %d \n masked %d \n payload_size %d \n masking_key %.*s \n unmasked_payload \n",
591
0
        FIN,
592
0
        opcode,
593
0
        masked,
594
0
        payload_size,
595
0
        4, masking_key
596
0
    ));
597
598
0
    switch(opcode) {
599
600
0
        case RMT_WEBSOCKET_CLOSE:
601
0
            client->should_close = GF_TRUE;
602
0
            break;
603
604
0
        case RMT_WEBSOCKET_TEXT:
605
0
        case RMT_WEBSOCKET_BINARY:
606
607
0
            e = rmt_client_handle_ws_payload(client, unmasked_payload, payload_size, (opcode==RMT_WEBSOCKET_BINARY));
608
609
0
            break;
610
611
612
0
        case RMT_WEBSOCKET_PING:;
613
614
0
            GF_BitStream* respbs = gf_bs_new(NULL, 0, GF_BITSTREAM_WRITE_DYN);
615
0
            gf_bs_write_int(respbs, 1, 1); //FIN=1
616
0
            gf_bs_write_int(respbs, 0, 3); //RSV=0
617
0
            gf_bs_write_int(respbs, RMT_WEBSOCKET_PONG, 4); //opcode=pong
618
0
            gf_bs_write_int(respbs, 0, 1); //masked=0
619
0
            gf_bs_write_int(respbs, (s32)payload_size, 7);
620
0
            gf_bs_write_data(respbs, unmasked_payload, (u32)payload_size);
621
622
0
            u8* respbuf=NULL;
623
0
            u32 respsize=0;
624
0
            gf_bs_get_content(respbs, &respbuf, &respsize);
625
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("ready to send PONG respbuf of size %d: %.*s\n", respsize, respsize, respbuf));
626
0
            e = dm_sess_write(client->http_sess, respbuf, respsize);
627
628
0
            gf_bs_del(respbs);
629
0
            gf_free(respbuf);
630
631
0
            break;
632
633
634
0
        case RMT_WEBSOCKET_CONTINUATION:    // not supported yet
635
0
        case RMT_WEBSOCKET_PONG:            // last active timer already reset
636
0
        default:
637
0
            break;
638
0
    }
639
640
641
0
    gf_free(unmasked_payload);
642
0
    if (extra_payload)
643
0
        gf_free(extra_payload);
644
645
0
    return e;
646
0
}
647
648
0
GF_Err rmt_client_handle_event(RMT_ClientCtx* client) {
649
0
    GF_Err e = GF_OK;
650
651
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("select ready for socket %p %s\n",  client, client->peer_address));
652
653
0
    if (!client->is_ws) {
654
0
        e = gf_dm_sess_process(client->http_sess);
655
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("[RMT] gf_dm_sess_process: %s\n",  gf_error_to_string(e)));
656
0
    }
657
0
    else {
658
0
        u8 buffer[1024]; //TODO: what size here?? do something like extra_payload??
659
0
        u32 read = 0;
660
0
        e = gf_dm_read_data(client->http_sess, buffer, 1024, &read);
661
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("gf_dm_read_data => %d sHTTP %.*s\n", read, read, buffer));
662
663
0
        client->last_active_time = gf_sys_clock_high_res();
664
665
0
        GF_BitStream* bs = gf_bs_new(buffer, read, GF_BITSTREAM_READ);
666
667
0
        while (e==GF_OK && gf_bs_available(bs)) {
668
669
0
            e = rmt_client_handle_ws_frame(client, bs);
670
671
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_client_handle_ws_frame() returned %s bs has avail %d \n", gf_errno_str(e), gf_bs_available(bs)));
672
673
0
        }
674
675
676
0
        gf_bs_del(bs);
677
678
679
0
    }
680
681
682
0
    return e;
683
0
}
684
685
0
GF_Err rmt_client_send_ping(RMT_ClientCtx* client) {
686
687
0
    GF_Err e = dm_sess_write(client->http_sess, RMT_WEBSOCKET_PING_MSG, 2);
688
0
    return e;
689
690
0
}
691
692
GF_EXPORT
693
0
GF_Err gf_rmt_client_send_to_ws(RMT_ClientCtx* client, const char* msg, u64 size, Bool is_binary) {
694
695
0
    return rmt_client_send_payload(client, (const u8*) msg, size, is_binary);
696
697
0
}
698
699
0
Bool rmt_client_should_close(RMT_ClientCtx* client) {
700
701
    // check valid object
702
0
    if (!client || !client->client_sock || !client->ctx) {
703
0
        GF_LOG(GF_LOG_WARNING, GF_LOG_RMTWS, ("[RMT] weird dead session in rmt\n"));
704
0
        return GF_TRUE;
705
0
    }
706
0
    RMT_Settings* rmt_settings = gf_rmt_get_settings(client->ctx->rmt);
707
708
0
    if (client->should_close) {
709
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s because request done\n",  client, client->peer_address));
710
0
        return GF_TRUE;
711
0
    }
712
713
    // check disconnection
714
0
    GF_Err e = gf_sk_probe(client->client_sock);
715
0
    if (e==GF_IP_CONNECTION_CLOSED) {
716
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s\n",  client, client->peer_address));
717
0
        return GF_TRUE;
718
0
    }
719
720
    // check timeout
721
0
    if (rmt_settings->timeout_secs) {
722
0
        u32 diff_sec = (u32) (gf_sys_clock_high_res() - client->last_active_time)/1000000;
723
0
        if ( diff_sec > rmt_settings->timeout_secs ) {
724
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s for timeout \n",  client, client->peer_address));
725
0
            return GF_TRUE;
726
0
        }
727
0
    }
728
729
0
    return GF_FALSE;
730
731
0
}
732
733
0
GF_Err rmt_server_wait_for_event(RMT_ServerCtx* ctx) {
734
735
0
    RMT_Settings* rmt_settings = gf_rmt_get_settings(ctx->rmt);
736
737
0
    GF_Err e = gf_sk_group_select(ctx->sg, 10, GF_SK_SELECT_BOTH);
738
739
0
    if (e==GF_OK) {
740
741
        // event on server socket = new client
742
0
        if (gf_sk_group_sock_is_set(ctx->sg, ctx->server_sock, GF_SK_SELECT_READ)) {
743
744
0
            e = rmt_server_handle_new_client(ctx);
745
0
            if (e)
746
0
                return e;
747
748
0
        }
749
750
        // event on one the active client
751
0
        else {
752
753
            // check active sessions
754
0
        u32 count = gf_list_count(ctx->active_clients);
755
0
        for (u32 i=0; i<count; i++) {
756
0
          RMT_ClientCtx* client = gf_list_get(ctx->active_clients, i);
757
758
0
                if (rmt_client_should_close(client)) {
759
760
0
                    gf_list_rem(ctx->active_clients, i);
761
0
                    i--;
762
0
                    count--;
763
0
                    rmt_clientctx_del(client);
764
0
                    client = NULL;
765
0
                    continue;
766
0
                }
767
768
0
                if (gf_sk_group_sock_is_set(client->ctx->sg, client->client_sock, GF_SK_SELECT_WRITE)) {
769
770
                    // check if we should send ping
771
0
                    if (rmt_settings->ping_secs) {
772
773
0
                        u32 diff_sec = (u32) (gf_sys_clock_high_res() - client->last_ping_time)/1000000;
774
775
0
                        if ( diff_sec > rmt_settings->ping_secs ) {
776
777
0
                            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Sending PING on client %p %s\n",  client, client->peer_address));
778
0
                            e = rmt_client_send_ping(client);
779
0
                            if (!e) {
780
0
                                client->last_ping_time = gf_sys_clock_high_res();
781
0
                            }
782
0
                            continue;
783
0
                        }
784
0
                    }
785
786
0
                }
787
788
                // handle received event
789
0
                if (gf_sk_group_sock_is_set(client->ctx->sg, client->client_sock, GF_SK_SELECT_READ)) {
790
791
0
                    e = rmt_client_handle_event(client);
792
793
0
                }
794
0
            }
795
796
0
        }
797
798
0
    }
799
800
0
    gf_sleep(rmt_settings->msSleepBetweenServerUpdates);
801
0
    return e;
802
803
0
}
804
805
0
static u32 rmt_ws_thread_main(void* par) {
806
807
0
    RMT_WS* rmt = (RMT_WS*)par;
808
809
0
    RMT_ServerCtx* ctx;
810
0
    GF_SAFEALLOC(ctx, RMT_ServerCtx);
811
0
    ctx->rmt = rmt;
812
813
0
    GF_Err e;
814
815
0
    while (!rmt->should_stop) {
816
817
        // create socket if not exist
818
0
        if (!ctx->server_sock) {
819
820
0
            e = rmt_create_server(ctx);
821
0
            if (e) {
822
0
                if (ctx) {
823
0
                    rmt_serverctx_reset(ctx);
824
0
                    gf_free(ctx);
825
0
                    ctx = NULL;
826
0
                }
827
0
                return e;
828
0
            }
829
830
0
            continue;
831
0
        }
832
833
0
        e = rmt_server_wait_for_event(ctx);
834
835
0
    }
836
837
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("[RMT] thread main request exit! should cleanup\n"));
838
0
    rmt_serverctx_reset(ctx);
839
0
    gf_free(ctx);
840
0
    ctx = NULL;
841
0
    return GF_OK;
842
843
0
}
844
845
846
847
RMT_Settings* gf_rmt_get_settings(RMT_WS* rmt)
848
0
{
849
0
    if (!rmt) return NULL;
850
851
0
    return &rmt->settings;
852
0
}
853
854
855
0
void rmt_ws_del(RMT_WS* rmt) {
856
857
0
    if (rmt && rmt->thread) {
858
859
860
0
        rmt->should_stop = GF_TRUE;
861
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_ws_del() calling stop...\n"));
862
863
0
        gf_th_stop(rmt->thread);
864
865
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_ws_del() stop returned thread status is now %d\n", gf_th_status(rmt->thread)));
866
867
0
        gf_th_del(rmt->thread);
868
869
0
        rmt->thread = NULL;
870
871
0
    }
872
873
0
    gf_free(rmt);
874
0
    rmt = NULL;
875
876
0
}
877
878
0
RMT_WS* rmt_ws_new() {
879
0
    RMT_WS* rmt = NULL;
880
881
0
    GF_SAFEALLOC(rmt, RMT_WS);
882
0
    if (!rmt) {
883
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT_WS] unable init rmt thread\n"));
884
0
        return NULL;
885
0
    }
886
887
0
    rmt->settings.port = 6363;
888
0
    rmt->settings.timeout_secs = 20;
889
0
    rmt->settings.ping_secs = 7;
890
891
0
    rmt->settings.limit_connections_to_localhost = GF_FALSE;
892
0
    rmt->settings.msSleepBetweenServerUpdates = 50;
893
894
0
    rmt->settings.on_new_client_cbk = NULL;
895
0
    rmt->settings.on_new_client_cbk_task = NULL;
896
897
0
    rmt->settings.cert = NULL;
898
0
    rmt->settings.pkey = NULL;
899
900
0
    return rmt;
901
0
}
902
903
0
void rmt_ws_run(RMT_WS* rmt) {
904
905
0
    rmt->thread = gf_th_new("rmt_ws_main_th");
906
0
    rmt->should_stop = GF_FALSE;
907
908
0
    GF_Err e = gf_th_run(rmt->thread, rmt_ws_thread_main, rmt);
909
0
    if (e != GF_OK) {
910
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT_WS] unable to start websocket thread: %s\n", gf_error_to_string(e)));
911
0
        rmt_ws_del(rmt);
912
0
        rmt = NULL;
913
0
    }
914
915
0
}
916
917
#else //GPAC_DISABLE_RMTWS
918
919
void gf_rmt_set_on_new_client_cbk(RMT_WS* rmt, void *task, rmt_on_new_client_cbk cbk) {
920
921
}
922
void* gf_rmt_get_on_new_client_task(RMT_WS* rmt) {
923
    return NULL;
924
}
925
926
const char* gf_rmt_get_peer_address(RMT_ClientCtx* client) {
927
    return NULL;
928
}
929
GF_Err gf_rmt_client_send_to_ws(RMT_ClientCtx* client, const char* msg, u64 size, Bool is_binary) {
930
    return GF_NOT_SUPPORTED;
931
}
932
933
void gf_rmt_client_set_on_data_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_data_cbk cbk) {
934
935
}
936
void* gf_rmt_client_get_on_data_task(RMT_ClientCtx* client) {
937
    return NULL;
938
}
939
940
void gf_rmt_client_set_on_del_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_del_cbk cbk) {
941
942
}
943
void* gf_rmt_client_get_on_del_task(RMT_ClientCtx* client) {
944
    return NULL;
945
}
946
947
RMT_WS* gf_rmt_client_get_rmt(RMT_ClientCtx* client) {
948
    return NULL;
949
}
950
951
#endif