Coverage Report

Created: 2025-12-05 07:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/gpac/src/utils/rmt_ws.c
Line
Count
Source
1
#include <gpac/tools.h>
2
#include <gpac/thread.h>
3
#include <gpac/list.h>
4
#include <gpac/bitstream.h>
5
#include <gpac/network.h>
6
#include <gpac/download.h>
7
#include <gpac/base_coding.h>
8
9
#ifndef GPAC_DISABLE_RMTWS
10
11
12
// Global settings
13
static RMT_Settings g_Settings;
14
static Bool g_SettingsInitialized = GF_FALSE;
15
16
static const char websocket_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
17
18
19
struct RMT_WS {
20
21
    // The main server thread
22
    GF_Thread* thread;
23
24
    // Should we stop the thread?
25
    Bool should_stop;
26
27
};
28
29
30
31
GF_DownloadSession *gf_dm_sess_new_server(GF_DownloadManager *dm, GF_Socket *server, void *ssl_ctx, gf_dm_user_io user_io, void *usr_cbk, Bool async, GF_Err *e);
32
void  gf_dm_sess_set_header(GF_DownloadSession *sess, const char *name, const char *value);
33
GF_Err gf_dm_sess_send_reply(GF_DownloadSession *sess, u32 reply_code, const char *response_body, u32 body_len, Bool no_body);
34
void gf_dm_sess_clear_headers(GF_DownloadSession *sess);
35
void  gf_dm_sess_set_header(GF_DownloadSession *sess, const char *name, const char *value);
36
GF_Err dm_sess_write(GF_DownloadSession *session, const u8 *buffer, u32 size);
37
GF_Err gf_dm_read_data(GF_DownloadSession *sess, char *data, u32 data_size, u32 *out_read);
38
void httpout_format_date(u64 time, char szDate[200], Bool for_listing);
39
40
#ifdef GPAC_HAS_SSL
41
42
void *gf_ssl_new(void *ssl_server_ctx, GF_Socket *client_sock, GF_Err *e);
43
void *gf_ssl_server_context_new(const char *cert, const char *key);
44
void gf_ssl_server_context_del(void *ssl_server_ctx);
45
Bool gf_ssl_init_lib();
46
47
#endif
48
49
enum {
50
    RMT_WEBSOCKET_CONTINUATION = 0,
51
    RMT_WEBSOCKET_TEXT = 1,
52
    RMT_WEBSOCKET_BINARY = 2,
53
    RMT_WEBSOCKET_CLOSE = 8,
54
    RMT_WEBSOCKET_PING = 9,
55
    RMT_WEBSOCKET_PONG = 10,
56
};
57
58
static char RMT_WEBSOCKET_PING_MSG[2] = { 0x89, 0x00 };
59
60
struct __rmt_serverctx {
61
62
    GF_Socket* server_sock;
63
64
    GF_SockGroup* sg;
65
66
    void *ssl_ctx;
67
68
    GF_DownloadManager* dm_sess;
69
70
    GF_List* active_clients;
71
72
};
73
74
struct __rmt_clientctx {
75
76
    GF_Socket* client_sock;
77
    RMT_ServerCtx* ctx;
78
    GF_DownloadSession* http_sess;
79
80
    char peer_address[GF_MAX_IP_NAME_LEN + 16];
81
    char buffer[1024];
82
83
    Bool is_ws;
84
    u64 last_active_time;
85
    u64 last_ping_time;
86
    Bool should_close;
87
88
    rmt_client_on_data_cbk on_data_cbk;
89
    void* on_data_cbk_task;
90
91
    rmt_client_on_del_cbk on_del_cbk;
92
    void* on_del_cbk_task;
93
94
};
95
96
GF_EXPORT
97
0
const char* gf_rmt_get_peer_address(RMT_ClientCtx* client) {
98
0
    if (client)
99
0
        return client->peer_address;
100
0
    return NULL;
101
0
}
102
103
GF_EXPORT
104
0
void* gf_rmt_client_get_on_data_task(RMT_ClientCtx* client) {
105
0
    if (client)
106
0
        return client->on_data_cbk_task;
107
0
    return NULL;
108
0
}
109
110
GF_EXPORT
111
0
void gf_rmt_set_on_new_client_cbk(void *task, rmt_on_new_client_cbk cbk) {
112
0
    RMT_Settings *rmtcfg = gf_rmt_get_settings();
113
0
    if (rmtcfg) {
114
0
        rmtcfg->on_new_client_cbk = cbk;
115
0
        rmtcfg->on_new_client_cbk_task = task;
116
0
    }
117
0
}
118
119
GF_EXPORT
120
0
void* gf_rmt_get_on_new_client_task() {
121
0
    return g_Settings.on_new_client_cbk_task;
122
0
}
123
124
GF_EXPORT
125
0
void gf_rmt_client_set_on_del_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_del_cbk cbk) {
126
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("gf_rmt_client_set_on_del_cbk client %p task %p cbk %p\n", client, task, cbk));
127
0
    if (client) {
128
0
        client->on_del_cbk = cbk;
129
0
        client->on_del_cbk_task = task;
130
0
    }
131
0
}
132
133
GF_EXPORT
134
0
void* gf_rmt_client_get_on_del_task(RMT_ClientCtx* client) {
135
0
    if (client)
136
0
        return client->on_del_cbk_task;
137
0
    return NULL;
138
0
}
139
140
0
void rmt_clientctx_del(RMT_ClientCtx* client) {
141
142
0
    if (!client) return;
143
144
0
    GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] closing client %s\n", client->peer_address));
145
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_clientctx_del client %p del_cbk %p\n", client, client ? client->on_del_cbk : NULL));
146
147
0
    if (client->on_del_cbk) {
148
0
        client->on_del_cbk(client->on_del_cbk_task);
149
0
    }
150
151
0
    if (client->client_sock) {
152
0
        if (client->ctx && client->ctx->sg)
153
0
            gf_sk_group_unregister(client->ctx->sg, client->client_sock);
154
155
0
        if (!client->http_sess)
156
0
            gf_sk_del(client->client_sock); //socket deleted by gf_dm_sess_del() if http_sess has been created
157
0
        client->client_sock = NULL;
158
0
    }
159
160
0
    if (client->http_sess) {
161
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("deleting http_sess %p\n", client->http_sess));
162
0
        gf_dm_sess_del(client->http_sess);
163
0
        client->http_sess = NULL;
164
0
    }
165
166
0
    client->ctx = NULL;
167
0
    client->on_data_cbk = NULL;
168
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("%s:%d rmt_clientctx_del client %p task %p\n", __FILE__, __LINE__, client, client->on_data_cbk_task));
169
170
0
    if (client->on_data_cbk_task) gf_free(client->on_data_cbk_task);
171
0
    client->on_data_cbk_task = NULL;
172
173
0
    if (client->on_del_cbk_task) gf_free(client->on_del_cbk_task);
174
0
    client->on_del_cbk_task = NULL;
175
176
0
    gf_free(client);
177
0
}
178
179
0
void rmt_serverctx_reset(RMT_ServerCtx* ctx) {
180
0
  if (ctx->active_clients) {
181
0
    while (gf_list_count(ctx->active_clients)) {
182
0
      RMT_ClientCtx* client = gf_list_pop_back(ctx->active_clients);
183
0
            rmt_clientctx_del(client);
184
0
            client = NULL;
185
0
    }
186
0
    gf_list_del(ctx->active_clients);
187
0
    ctx->active_clients = NULL;
188
0
  }
189
190
0
    if (ctx->server_sock) {
191
0
        if (ctx->sg)
192
0
            gf_sk_group_unregister(ctx->sg, ctx->server_sock);
193
194
0
        gf_sk_del(ctx->server_sock);
195
0
        ctx->server_sock = NULL;
196
0
    }
197
198
0
    if (ctx->sg) { gf_sk_group_del(ctx->sg); ctx->sg = NULL;  }
199
200
0
    if (ctx->dm_sess) { gf_dm_del(ctx->dm_sess); ctx->dm_sess = NULL; }
201
202
0
    if (ctx->ssl_ctx) {
203
0
#ifdef GPAC_HAS_SSL
204
0
    gf_ssl_server_context_del(ctx->ssl_ctx);
205
#else
206
        gf_free(ctx->ssl_ctx);
207
#endif
208
0
        ctx->ssl_ctx = NULL;
209
0
    }
210
211
212
0
}
213
214
0
void rmt_close_client(RMT_ClientCtx* client) {
215
0
    if (!client) return;
216
0
    RMT_ServerCtx* ctx = client->ctx;
217
0
    if (ctx && ctx->active_clients) {
218
0
        gf_list_del_item(ctx->active_clients, client);
219
0
    }
220
0
    rmt_clientctx_del(client);
221
0
}
222
223
0
GF_Err rmt_send_reply(GF_DownloadSession* http_sess, int responseCode, char* response_body, char* content_type) {
224
225
0
        u32 body_size = 0;
226
0
        char szFmt[100];
227
0
        char szDate[200];
228
229
0
      httpout_format_date(gf_net_get_utc(), szDate, GF_FALSE);
230
0
      gf_dm_sess_set_header(http_sess, "Date", szDate);
231
0
        gf_dm_sess_set_header(http_sess, "Server", gf_gpac_version());
232
233
0
        if (response_body) {
234
0
            body_size = (u32) strlen(response_body);
235
0
            gf_dm_sess_set_header(http_sess, "Content-Type", content_type ? content_type : "text/html");
236
0
            sprintf(szFmt, "%d", body_size);
237
0
            gf_dm_sess_set_header(http_sess, "Content-Length", szFmt);
238
239
0
        }
240
241
0
        return gf_dm_sess_send_reply(http_sess, responseCode, response_body, response_body ? (u32) strlen(response_body) : 0, (response_body==NULL));
242
243
0
}
244
245
0
static void rmt_on_http_session_data(void *usr_cbk, GF_NETIO_Parameter *parameter) {
246
247
0
    RMT_ClientCtx* client_ctx = (RMT_ClientCtx*) usr_cbk;
248
0
    if (!client_ctx) {
249
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] sess io on null session\n"));
250
0
        return;
251
0
    }
252
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_on_http_session_data on peer %s param %p msg_type %d \nerror %s \ndata %p \nsize %d \nname %p \nvalue %p \nreply %d\n",
253
0
                    client_ctx->peer_address,
254
0
                    parameter,
255
0
                    parameter->msg_type,
256
0
                    gf_error_to_string(parameter->error),
257
0
                    parameter->data,
258
0
                    parameter->size,
259
0
                    parameter->name,
260
0
                    parameter->value,
261
0
                    parameter->reply
262
0
            ));
263
264
0
    client_ctx->last_active_time = gf_sys_clock_high_res();
265
266
0
    const char* durl = gf_dm_sess_get_resource_name(parameter->sess);
267
0
    const char* ua = gf_dm_sess_get_header(parameter->sess, "User-Agent");
268
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("requested url: %s with ua %s\n", durl, ua));
269
0
    GF_Err e;
270
271
0
    if (parameter->msg_type != GF_NETIO_PARSE_REPLY) {
272
0
        if (parameter->size) {
273
0
            strncat(client_ctx->buffer, parameter->data, parameter->size);
274
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("session data is now: %s\n", client_ctx->buffer));
275
0
        }
276
0
    }
277
278
0
    if (parameter->msg_type == GF_NETIO_PARSE_REPLY) {
279
280
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("final buffer before response: %s\n", client_ctx->buffer));
281
282
0
        GF_DownloadSession* http_sess = parameter->sess ;
283
284
0
        if (!client_ctx->is_ws) {
285
0
            const char* ws_header_version = gf_dm_sess_get_header(http_sess, "Sec-WebSocket-Version");
286
0
            const char* ws_header_key = gf_dm_sess_get_header(http_sess, "Sec-WebSocket-Key");
287
288
0
            if (!ws_header_version || strcmp(ws_header_version, "13") || !ws_header_key) {
289
290
0
                gf_dm_sess_clear_headers(http_sess);
291
0
                gf_dm_sess_set_header(http_sess, "Connection", "close");
292
293
0
                rmt_send_reply(http_sess, 404, "only ws connections are accepted", NULL);
294
0
                client_ctx->should_close = GF_TRUE;
295
0
                return;
296
297
0
            }
298
0
            else {
299
300
0
                char* resp_key = gf_strdup(ws_header_key);
301
0
                gf_dynstrcat(&resp_key, websocket_guid, NULL);
302
303
0
                u32 resp_key_len = (u32) strlen(resp_key);
304
0
                if (resp_key_len < 1)
305
0
                    return;
306
307
308
0
                u8 hash[GF_SHA1_DIGEST_SIZE];
309
0
                gf_sha1_csum( resp_key, resp_key_len, hash );
310
311
0
                u32 end_b64 = gf_base64_encode(hash, GF_SHA1_DIGEST_SIZE, resp_key, resp_key_len);
312
0
                resp_key[resp_key_len-1] = 0;
313
0
                if (end_b64 < resp_key_len) {
314
0
                    resp_key[end_b64] = 0;
315
0
                } else {
316
0
                    return;
317
0
                }
318
319
0
              gf_dm_sess_clear_headers(http_sess);
320
321
0
              gf_dm_sess_set_header(http_sess, "Connection", "Upgrade");
322
0
                gf_dm_sess_set_header(http_sess, "Upgrade", "websocket");
323
0
                gf_dm_sess_set_header(http_sess, "Sec-WebSocket-Accept", resp_key);
324
325
0
                e = rmt_send_reply(http_sess, 101, NULL, NULL);
326
327
0
                if (e==GF_OK) {
328
0
                    client_ctx->is_ws = GF_TRUE;
329
330
0
                    if (g_Settings.on_new_client_cbk && g_Settings.on_new_client_cbk_task) {
331
0
                        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("%s:%d calling on new client cb %p with client %p\n", __FILE__,__LINE__, g_Settings.on_new_client_cbk, client_ctx));
332
0
                        g_Settings.on_new_client_cbk(g_Settings.on_new_client_cbk_task, client_ctx);
333
0
                    }
334
0
                }
335
0
                gf_free(resp_key);
336
0
            }
337
0
        } else {
338
0
            GF_LOG(GF_LOG_WARNING, GF_LOG_RMTWS, ("this should be a ws message right?\n"));
339
0
        }
340
341
342
343
0
        memset(&client_ctx->buffer, 0, 1024);
344
0
    }
345
346
347
0
}
348
349
350
0
GF_Err rmt_create_server(RMT_ServerCtx* ctx) {
351
352
0
    GF_Err e = GF_OK;
353
354
0
    rmt_serverctx_reset(ctx);
355
356
0
    if (g_Settings.cert && g_Settings.pkey) {
357
0
#ifdef GPAC_HAS_SSL
358
0
    if (!gf_file_exists(g_Settings.cert)) {
359
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Certificate file %s not found\n", g_Settings.cert));
360
0
      return GF_IO_ERR;
361
0
    }
362
0
    if (!gf_file_exists(g_Settings.pkey)) {
363
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Private key file %s not found\n", g_Settings.pkey));
364
0
      return GF_IO_ERR;
365
0
    }
366
0
    if (gf_ssl_init_lib()) {
367
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Failed to initialize OpenSSL library\n"));
368
0
      return GF_IO_ERR;
369
0
    }
370
371
0
        Bool prev_noh2 = gf_opts_get_bool("core", "no-h2");
372
0
        if (!prev_noh2)
373
0
            gf_opts_set_key("core", "no-h2", "1");
374
375
0
        ctx->ssl_ctx = gf_ssl_server_context_new(g_Settings.cert, g_Settings.pkey);
376
377
0
        if (!prev_noh2)
378
0
            gf_opts_set_key("core", "no-h2", "0");
379
380
0
    if (!ctx->ssl_ctx) return GF_IO_ERR;
381
#else
382
    GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] TLS key/certificate set but GPAC compiled without TLS support\n"));
383
    return GF_NOT_SUPPORTED;
384
385
#endif
386
0
    }
387
388
0
    ctx->sg = gf_sk_group_new();
389
390
0
    ctx->server_sock = gf_sk_new(GF_SOCK_TYPE_TCP);
391
0
    e = gf_sk_bind( ctx->server_sock,
392
0
                    g_Settings.limit_connections_to_localhost ? "127.0.0.1" : "0.0.0.0",
393
0
                    g_Settings.port,
394
0
                    NULL, 0, GF_SOCK_REUSE_PORT );
395
396
0
    if (!e) e = gf_sk_listen(ctx->server_sock, 0);
397
0
    if (e) {
398
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] failed to start server on port %d: %s\n", g_Settings.port, gf_error_to_string(e) ));
399
0
        return e;
400
0
    }
401
402
0
    gf_sk_group_register(ctx->sg, ctx->server_sock);
403
404
0
    gf_sk_server_mode(ctx->server_sock, GF_TRUE);
405
0
    GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] Server running on port %d\n", g_Settings.port));
406
407
0
    Bool prev_noh2 = gf_opts_get_bool("core", "no-h2");
408
0
    if (!prev_noh2)
409
0
        gf_opts_set_key("core", "no-h2", "1");
410
0
    ctx->dm_sess = gf_dm_new(NULL);
411
0
    if (!prev_noh2)
412
0
        gf_opts_set_key("core", "no-h2", "0");
413
0
    ctx->active_clients = gf_list_new();
414
415
0
    return e;
416
417
0
}
418
419
GF_EXPORT
420
0
void gf_rmt_client_set_on_data_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_data_cbk cbk) {
421
0
    if (client) {
422
0
        client->on_data_cbk = cbk;
423
0
        client->on_data_cbk_task = task;
424
0
    }
425
0
}
426
427
0
GF_Err rmt_server_handle_new_client(RMT_ServerCtx* ctx) {
428
0
    GF_Err e = GF_OK;
429
430
0
    void *ssl_c = NULL;
431
432
0
    RMT_ClientCtx* new_client;
433
0
    GF_SAFEALLOC(new_client, RMT_ClientCtx);
434
435
0
    new_client->ctx = ctx;
436
437
0
    e = gf_sk_accept(ctx->server_sock, &new_client->client_sock);
438
0
    gf_sk_group_register(ctx->sg, new_client->client_sock);
439
440
0
    u32 port;
441
0
    gf_sk_get_remote_address_port(new_client->client_sock, new_client->peer_address, &port);
442
0
    sprintf(new_client->peer_address + strlen(new_client->peer_address), ":%d", port); //TDOO: size check
443
0
    GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] connected to remote peer %s\n",  new_client->peer_address));
444
445
446
0
#ifdef GPAC_HAS_SSL
447
0
  if (ctx->ssl_ctx) {
448
0
    ssl_c = gf_ssl_new(ctx->ssl_ctx, new_client->client_sock, &e);
449
0
    if (e) {
450
0
      GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Failed to create TLS session from %s: %s\n", new_client->peer_address, gf_error_to_string(e) ));
451
0
      rmt_clientctx_del(new_client);
452
0
      return e;
453
0
    }
454
0
  }
455
0
#endif
456
457
0
    new_client->http_sess = gf_dm_sess_new_server(ctx->dm_sess, new_client->client_sock, ssl_c, rmt_on_http_session_data, new_client, GF_TRUE, &e);
458
459
0
    new_client->is_ws = GF_FALSE;
460
0
    new_client->last_active_time = gf_sys_clock_high_res();
461
0
    new_client->last_ping_time = gf_sys_clock_high_res();
462
463
0
    gf_list_add(ctx->active_clients, new_client);
464
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("adding active client socket %p %s\n",  new_client, new_client->peer_address));
465
466
0
    return e;
467
0
}
468
469
0
GF_Err rmt_client_send_payload(RMT_ClientCtx* client, const u8* payload, u64 size, Bool is_binary) {
470
471
0
    GF_Err e = GF_OK;
472
473
0
    GF_BitStream* respbs = gf_bs_new(NULL, size+10, GF_BITSTREAM_WRITE_DYN);
474
0
    gf_bs_write_int(respbs, 1, 1); //FIN=1
475
0
    gf_bs_write_int(respbs, 0, 3); //RSV=0
476
0
    gf_bs_write_int(respbs, is_binary ? RMT_WEBSOCKET_BINARY : RMT_WEBSOCKET_TEXT, 4); //opcode=text
477
0
    gf_bs_write_int(respbs, 0, 1); //masked=0
478
479
0
    if (size < 126)
480
0
        gf_bs_write_int(respbs, (s32)size, 7);
481
0
    else if (size < 65536) {
482
0
        gf_bs_write_int(respbs, 126, 7);
483
0
        gf_bs_write_long_int(respbs, size, 16);
484
0
    } else {
485
0
        gf_bs_write_long_int(respbs, 127, 7);
486
0
        gf_bs_write_long_int(respbs, size, 64);
487
0
    }
488
489
0
    gf_bs_write_data(respbs, payload, (u32)size);
490
491
0
    u8* respbuf=NULL;
492
0
    u32 respsize=0;
493
0
    gf_bs_get_content(respbs, &respbuf, &respsize);
494
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("ready to send respbuf of size %d: %.*s\n", respsize, respsize, respbuf));
495
0
    e = dm_sess_write(client->http_sess, respbuf, respsize);
496
497
0
    gf_bs_del(respbs);
498
0
    gf_free(respbuf);
499
500
0
    return e;
501
502
0
}
503
504
0
GF_Err rmt_client_handle_ws_payload(RMT_ClientCtx* client, u8* payload, u64 size, Bool is_binary) {
505
506
0
    if (client->on_data_cbk && client->on_data_cbk_task) {
507
0
        client->on_data_cbk(client->on_data_cbk_task, payload, size, is_binary);
508
0
    }
509
510
0
    return GF_OK;
511
512
    // if (size)
513
    //     payload[0] = 'A';
514
515
    // return rmt_client_send_payload(client, payload, size, is_binary);
516
517
0
}
518
519
0
GF_Err rmt_client_handle_ws_frame(RMT_ClientCtx* client, GF_BitStream* bs) {
520
521
0
    GF_Err e = GF_OK;
522
523
0
    if (gf_bs_available(bs) < 2) return GF_IO_ERR;
524
525
0
    u32 FIN = gf_bs_read_int(bs, 1); //TODO: handle fragmented?
526
0
    /*u32 RSV =*/ gf_bs_read_int(bs, 3);
527
0
    u32 opcode = gf_bs_read_int(bs, 4);
528
0
    u32 masked = gf_bs_read_int(bs, 1);
529
0
    u64 payload_size = gf_bs_read_int(bs, 7);
530
0
    if (payload_size == 126) {
531
0
        if (gf_bs_available(bs) < 2) return GF_IO_ERR;
532
0
        payload_size = gf_bs_read_int(bs, 16);
533
0
    }
534
0
    else if (payload_size == 127) {
535
0
        if (gf_bs_available(bs) < 8) return GF_IO_ERR;
536
0
        payload_size = gf_bs_read_long_int(bs, 64);
537
0
    }
538
0
    if (gf_bs_available(bs) < 4) return GF_IO_ERR;
539
0
    char masking_key[4] = {0};
540
0
    if (masked) {
541
0
        gf_bs_read_data(bs, (u8*)&masking_key, 4);
542
0
    }
543
544
0
    u8* extra_payload = NULL;
545
0
    u32 extra_read = 0;
546
547
0
    if (payload_size + gf_bs_get_position(bs) > gf_bs_get_size(bs)) {
548
0
        u64 extra_size = payload_size + gf_bs_get_position(bs) - gf_bs_get_size(bs) ;
549
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("buffer too small for payload_size %llu bs_pos %u bs_size %u => extra_size %llu\n", payload_size, gf_bs_get_position(bs), gf_bs_get_size(bs), extra_size));
550
0
        extra_payload = gf_malloc( sizeof(u8) * extra_size );
551
552
0
        e = GF_OK;
553
0
        while (!e && extra_read < extra_size) {
554
0
            u32 new_read = 0;
555
0
            e = gf_dm_read_data(client->http_sess, extra_payload + extra_read, (u32)(extra_size-extra_read), &new_read);
556
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("extra gf_dm_read_data => %d e=%s\n",extra_read, gf_error_to_string(e)));
557
0
            extra_read += new_read;
558
0
        }
559
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("extra gf_dm_read_data => %d e=%s\n",extra_read, gf_error_to_string(e)));
560
0
    }
561
562
0
    u8* unmasked_payload = gf_malloc( payload_size * sizeof(u8) + 1); // add 1 to add null to get c string
563
0
    int i=0;
564
0
    for (i=0; i<payload_size && gf_bs_available(bs); i++) {
565
0
        unmasked_payload[i] = (u8) ( gf_bs_read_u8(bs) ^ masking_key[i%4] );
566
0
    }
567
568
0
    if (extra_payload) {
569
0
        for (u32 j=0; j<extra_read; j++) {
570
0
            unmasked_payload[i] = (u8) ( extra_payload[j] ^ masking_key[i%4] );
571
0
            i++;
572
0
        }
573
0
    }
574
575
0
    unmasked_payload[i] = 0;
576
577
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("parsed ws message: FIN %d \n opcode %d \n masked %d \n payload_size %d \n masking_key %.*s \n unmasked_payload \n",
578
0
        FIN,
579
0
        opcode,
580
0
        masked,
581
0
        payload_size,
582
0
        4, masking_key
583
0
    ));
584
585
0
    switch(opcode) {
586
587
0
        case RMT_WEBSOCKET_CLOSE:
588
0
            client->should_close = GF_TRUE;
589
0
            break;
590
591
0
        case RMT_WEBSOCKET_TEXT:
592
0
        case RMT_WEBSOCKET_BINARY:
593
594
0
            e = rmt_client_handle_ws_payload(client, unmasked_payload, payload_size, (opcode==RMT_WEBSOCKET_BINARY));
595
596
0
            break;
597
598
599
0
        case RMT_WEBSOCKET_PING:;
600
601
0
            GF_BitStream* respbs = gf_bs_new(NULL, 0, GF_BITSTREAM_WRITE_DYN);
602
0
            gf_bs_write_int(respbs, 1, 1); //FIN=1
603
0
            gf_bs_write_int(respbs, 0, 3); //RSV=0
604
0
            gf_bs_write_int(respbs, RMT_WEBSOCKET_PONG, 4); //opcode=pong
605
0
            gf_bs_write_int(respbs, 0, 1); //masked=0
606
0
            gf_bs_write_int(respbs, (s32)payload_size, 7);
607
0
            gf_bs_write_data(respbs, unmasked_payload, (u32)payload_size);
608
609
0
            u8* respbuf=NULL;
610
0
            u32 respsize=0;
611
0
            gf_bs_get_content(respbs, &respbuf, &respsize);
612
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("ready to send PONG respbuf of size %d: %.*s\n", respsize, respsize, respbuf));
613
0
            e = dm_sess_write(client->http_sess, respbuf, respsize);
614
615
0
            gf_bs_del(respbs);
616
0
            gf_free(respbuf);
617
618
0
            break;
619
620
621
0
        case RMT_WEBSOCKET_CONTINUATION:    // not supported yet
622
0
        case RMT_WEBSOCKET_PONG:            // last active timer already reset
623
0
        default:
624
0
            break;
625
0
    }
626
627
628
0
    gf_free(unmasked_payload);
629
0
    if (extra_payload)
630
0
        gf_free(extra_payload);
631
632
0
    return e;
633
0
}
634
635
0
GF_Err rmt_client_handle_event(RMT_ClientCtx* client) {
636
0
    GF_Err e = GF_OK;
637
638
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("select ready for socket %p %s\n",  client, client->peer_address));
639
640
0
    if (!client->is_ws) {
641
0
        e = gf_dm_sess_process(client->http_sess);
642
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("[RMT] gf_dm_sess_process: %s\n",  gf_error_to_string(e)));
643
0
    }
644
0
    else {
645
0
        u8 buffer[1024]; //TODO: what size here?? do something like extra_payload??
646
0
        u32 read = 0;
647
0
        e = gf_dm_read_data(client->http_sess, buffer, 1024, &read);
648
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("gf_dm_read_data => %d sHTTP %.*s\n", read, read, buffer));
649
650
0
        client->last_active_time = gf_sys_clock_high_res();
651
652
0
        GF_BitStream* bs = gf_bs_new(buffer, read, GF_BITSTREAM_READ);
653
654
0
        while (e==GF_OK && gf_bs_available(bs)) {
655
656
0
            e = rmt_client_handle_ws_frame(client, bs);
657
658
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_client_handle_ws_frame() returned %s bs has avail %d \n", gf_errno_str(e), gf_bs_available(bs)));
659
660
0
        }
661
662
663
0
        gf_bs_del(bs);
664
665
666
0
    }
667
668
669
0
    return e;
670
0
}
671
672
0
GF_Err rmt_client_send_ping(RMT_ClientCtx* client) {
673
674
0
    GF_Err e = dm_sess_write(client->http_sess, RMT_WEBSOCKET_PING_MSG, 2);
675
0
    return e;
676
677
0
}
678
679
GF_EXPORT
680
0
GF_Err gf_rmt_client_send_to_ws(RMT_ClientCtx* client, const char* msg, u64 size, Bool is_binary) {
681
682
0
    return rmt_client_send_payload(client, (const u8*) msg, size, is_binary);
683
684
0
}
685
686
0
Bool rmt_client_should_close(RMT_ClientCtx* client) {
687
688
    // check valid object
689
0
    if (!client || !client->client_sock) {
690
0
        GF_LOG(GF_LOG_WARNING, GF_LOG_RMTWS, ("[RMT] weird dead session in rmt\n"));
691
0
        return GF_TRUE;
692
0
    }
693
694
0
    if (client->should_close) {
695
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s because request done\n",  client, client->peer_address));
696
0
        return GF_TRUE;
697
0
    }
698
699
    // check disconnection
700
0
    GF_Err e = gf_sk_probe(client->client_sock);
701
0
    if (e==GF_IP_CONNECTION_CLOSED) {
702
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s\n",  client, client->peer_address));
703
0
        return GF_TRUE;
704
0
    }
705
706
    // check timeout
707
0
    if (g_Settings.timeout_secs) {
708
0
        u32 diff_sec = (u32) (gf_sys_clock_high_res() - client->last_active_time)/1000000;
709
0
        if ( diff_sec > g_Settings.timeout_secs ) {
710
0
            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s for timeout \n",  client, client->peer_address));
711
0
            return GF_TRUE;
712
0
        }
713
0
    }
714
715
0
    return GF_FALSE;
716
717
0
}
718
719
0
GF_Err rmt_server_wait_for_event(RMT_ServerCtx* ctx) {
720
721
0
    GF_Err e = gf_sk_group_select(ctx->sg, 10, GF_SK_SELECT_BOTH);
722
723
0
    if (e==GF_OK) {
724
725
        // event on server socket = new client
726
0
        if (gf_sk_group_sock_is_set(ctx->sg, ctx->server_sock, GF_SK_SELECT_READ)) {
727
728
0
            e = rmt_server_handle_new_client(ctx);
729
0
            if (e)
730
0
                return e;
731
732
0
        }
733
734
        // event on one the active client
735
0
        else {
736
737
            // check active sessions
738
0
        u32 count = gf_list_count(ctx->active_clients);
739
0
        for (u32 i=0; i<count; i++) {
740
0
          RMT_ClientCtx* client = gf_list_get(ctx->active_clients, i);
741
742
0
                if (rmt_client_should_close(client)) {
743
744
0
                    gf_list_rem(ctx->active_clients, i);
745
0
                    i--;
746
0
                    count--;
747
0
                    rmt_clientctx_del(client);
748
0
                    client = NULL;
749
0
                    continue;
750
0
                }
751
752
0
                if (gf_sk_group_sock_is_set(client->ctx->sg, client->client_sock, GF_SK_SELECT_WRITE)) {
753
754
                    // check if we should send ping
755
0
                    if (g_Settings.ping_secs) {
756
757
0
                        u32 diff_sec = (u32) (gf_sys_clock_high_res() - client->last_ping_time)/1000000;
758
759
0
                        if ( diff_sec > g_Settings.ping_secs ) {
760
761
0
                            GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Sending PING on client %p %s\n",  client, client->peer_address));
762
0
                            e = rmt_client_send_ping(client);
763
0
                            if (!e) {
764
0
                                client->last_ping_time = gf_sys_clock_high_res();
765
0
                            }
766
0
                            continue;
767
0
                        }
768
0
                    }
769
770
0
                }
771
772
                // handle received event
773
0
                if (gf_sk_group_sock_is_set(client->ctx->sg, client->client_sock, GF_SK_SELECT_READ)) {
774
775
0
                    e = rmt_client_handle_event(client);
776
777
0
                }
778
0
            }
779
780
0
        }
781
782
0
    }
783
784
0
    gf_sleep(g_Settings.msSleepBetweenServerUpdates);
785
0
    return e;
786
787
0
}
788
789
0
static u32 rmt_ws_thread_main(void* par) {
790
791
0
    RMT_WS* rmt = (RMT_WS*)par;
792
793
0
    RMT_ServerCtx* ctx;
794
0
    GF_SAFEALLOC(ctx, RMT_ServerCtx);
795
796
0
    GF_Err e;
797
798
0
    while (!rmt->should_stop) {
799
800
        // create socket if not exist
801
0
        if (!ctx->server_sock) {
802
803
0
            e = rmt_create_server(ctx);
804
0
            if (e) {
805
0
                if (ctx) {
806
0
                    rmt_serverctx_reset(ctx);
807
0
                    gf_free(ctx);
808
0
                    ctx = NULL;
809
0
                }
810
0
                return e;
811
0
            }
812
813
0
            continue;
814
0
        }
815
816
0
        e = rmt_server_wait_for_event(ctx);
817
818
0
    }
819
820
0
    GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("[RMT] thread main request exit! should cleanup\n"));
821
0
    rmt_serverctx_reset(ctx);
822
0
    gf_free(ctx);
823
0
    ctx = NULL;
824
0
    return GF_OK;
825
826
0
}
827
828
829
830
RMT_Settings* gf_rmt_get_settings()
831
0
{
832
    // Default-initialize on first call
833
0
    if( g_SettingsInitialized == GF_FALSE ) {
834
835
0
        g_Settings.port = 6363;
836
0
        g_Settings.timeout_secs = 20;
837
0
        g_Settings.ping_secs = 7;
838
839
0
        g_Settings.limit_connections_to_localhost = GF_FALSE;
840
0
        g_Settings.msSleepBetweenServerUpdates = 50;
841
842
0
        g_Settings.on_new_client_cbk = NULL;
843
0
        g_Settings.on_new_client_cbk_task = NULL;
844
845
0
        g_Settings.cert = NULL;
846
0
        g_Settings.pkey = NULL;
847
848
0
        g_SettingsInitialized = GF_TRUE;
849
0
    }
850
851
0
    return &g_Settings;
852
0
}
853
854
855
0
void rmt_ws_del(RMT_WS* rmt) {
856
857
0
    if (rmt && rmt->thread) {
858
859
860
0
        rmt->should_stop = GF_TRUE;
861
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_ws_del() calling stop...\n"));
862
863
0
        gf_th_stop(rmt->thread);
864
865
0
        GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_ws_del() stop returned thread status is now %d\n", gf_th_status(rmt->thread)));
866
867
0
        gf_th_del(rmt->thread);
868
869
0
        rmt->thread = NULL;
870
871
0
    }
872
873
0
    gf_free(rmt);
874
0
    rmt = NULL;
875
876
0
}
877
878
0
RMT_WS* rmt_ws_new() {
879
0
    RMT_WS* rmt = NULL;
880
881
0
    GF_SAFEALLOC(rmt, RMT_WS);
882
0
    if (!rmt) {
883
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT_WS] unable init rmt thread\n"));
884
0
        return NULL;
885
0
    }
886
887
888
    // Default-initialise if user has not set values
889
0
    gf_rmt_get_settings();
890
891
0
    rmt->thread = gf_th_new("rmt_ws_main_th");
892
0
    rmt->should_stop = GF_FALSE;
893
894
0
    GF_Err e = gf_th_run(rmt->thread, rmt_ws_thread_main, rmt);
895
0
    if (e != GF_OK) {
896
0
        GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT_WS] unable to start websocket thread: %s\n", gf_error_to_string(e)));
897
0
        rmt_ws_del(rmt);
898
0
        rmt = NULL;
899
0
    }
900
901
0
    return rmt;
902
903
904
0
}
905
906
#else //GPAC_DISABLE_RMTWS
907
908
void gf_rmt_set_on_new_client_cbk(void *task, rmt_on_new_client_cbk cbk) {
909
910
}
911
void* gf_rmt_get_on_new_client_task() {
912
    return NULL;
913
}
914
915
const char* gf_rmt_get_peer_address(RMT_ClientCtx* client) {
916
    return NULL;
917
}
918
GF_Err gf_rmt_client_send_to_ws(RMT_ClientCtx* client, const char* msg, u64 size, Bool is_binary) {
919
    return GF_NOT_SUPPORTED;
920
}
921
922
void gf_rmt_client_set_on_data_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_data_cbk cbk) {
923
924
}
925
void* gf_rmt_client_get_on_data_task(RMT_ClientCtx* client) {
926
    return NULL;
927
}
928
929
void gf_rmt_client_set_on_del_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_del_cbk cbk) {
930
931
}
932
void* gf_rmt_client_get_on_del_task(RMT_ClientCtx* client) {
933
    return NULL;
934
}
935
936
#endif