/src/gpac/src/utils/rmt_ws.c
Line | Count | Source |
1 | | #include <gpac/tools.h> |
2 | | #include <gpac/thread.h> |
3 | | #include <gpac/list.h> |
4 | | #include <gpac/bitstream.h> |
5 | | #include <gpac/network.h> |
6 | | #include <gpac/download.h> |
7 | | #include <gpac/base_coding.h> |
8 | | |
9 | | #ifndef GPAC_DISABLE_RMTWS |
10 | | |
11 | | |
12 | | // Global settings |
13 | | static RMT_Settings g_Settings; |
14 | | static Bool g_SettingsInitialized = GF_FALSE; |
15 | | |
16 | | static const char websocket_guid[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
17 | | |
18 | | |
19 | | struct RMT_WS { |
20 | | |
21 | | // The main server thread |
22 | | GF_Thread* thread; |
23 | | |
24 | | // Should we stop the thread? |
25 | | Bool should_stop; |
26 | | |
27 | | }; |
28 | | |
29 | | |
30 | | |
31 | | GF_DownloadSession *gf_dm_sess_new_server(GF_DownloadManager *dm, GF_Socket *server, void *ssl_ctx, gf_dm_user_io user_io, void *usr_cbk, Bool async, GF_Err *e); |
32 | | void gf_dm_sess_set_header(GF_DownloadSession *sess, const char *name, const char *value); |
33 | | GF_Err gf_dm_sess_send_reply(GF_DownloadSession *sess, u32 reply_code, const char *response_body, u32 body_len, Bool no_body); |
34 | | void gf_dm_sess_clear_headers(GF_DownloadSession *sess); |
35 | | void gf_dm_sess_set_header(GF_DownloadSession *sess, const char *name, const char *value); |
36 | | GF_Err dm_sess_write(GF_DownloadSession *session, const u8 *buffer, u32 size); |
37 | | GF_Err gf_dm_read_data(GF_DownloadSession *sess, char *data, u32 data_size, u32 *out_read); |
38 | | void httpout_format_date(u64 time, char szDate[200], Bool for_listing); |
39 | | |
40 | | #ifdef GPAC_HAS_SSL |
41 | | |
42 | | void *gf_ssl_new(void *ssl_server_ctx, GF_Socket *client_sock, GF_Err *e); |
43 | | void *gf_ssl_server_context_new(const char *cert, const char *key); |
44 | | void gf_ssl_server_context_del(void *ssl_server_ctx); |
45 | | Bool gf_ssl_init_lib(); |
46 | | |
47 | | #endif |
48 | | |
49 | | enum { |
50 | | RMT_WEBSOCKET_CONTINUATION = 0, |
51 | | RMT_WEBSOCKET_TEXT = 1, |
52 | | RMT_WEBSOCKET_BINARY = 2, |
53 | | RMT_WEBSOCKET_CLOSE = 8, |
54 | | RMT_WEBSOCKET_PING = 9, |
55 | | RMT_WEBSOCKET_PONG = 10, |
56 | | }; |
57 | | |
58 | | static char RMT_WEBSOCKET_PING_MSG[2] = { 0x89, 0x00 }; |
59 | | |
60 | | struct __rmt_serverctx { |
61 | | |
62 | | GF_Socket* server_sock; |
63 | | |
64 | | GF_SockGroup* sg; |
65 | | |
66 | | void *ssl_ctx; |
67 | | |
68 | | GF_DownloadManager* dm_sess; |
69 | | |
70 | | GF_List* active_clients; |
71 | | |
72 | | }; |
73 | | |
74 | | struct __rmt_clientctx { |
75 | | |
76 | | GF_Socket* client_sock; |
77 | | RMT_ServerCtx* ctx; |
78 | | GF_DownloadSession* http_sess; |
79 | | |
80 | | char peer_address[GF_MAX_IP_NAME_LEN + 16]; |
81 | | char buffer[1024]; |
82 | | |
83 | | Bool is_ws; |
84 | | u64 last_active_time; |
85 | | u64 last_ping_time; |
86 | | Bool should_close; |
87 | | |
88 | | rmt_client_on_data_cbk on_data_cbk; |
89 | | void* on_data_cbk_task; |
90 | | |
91 | | rmt_client_on_del_cbk on_del_cbk; |
92 | | void* on_del_cbk_task; |
93 | | |
94 | | }; |
95 | | |
96 | | GF_EXPORT |
97 | 0 | const char* gf_rmt_get_peer_address(RMT_ClientCtx* client) { |
98 | 0 | if (client) |
99 | 0 | return client->peer_address; |
100 | 0 | return NULL; |
101 | 0 | } |
102 | | |
103 | | GF_EXPORT |
104 | 0 | void* gf_rmt_client_get_on_data_task(RMT_ClientCtx* client) { |
105 | 0 | if (client) |
106 | 0 | return client->on_data_cbk_task; |
107 | 0 | return NULL; |
108 | 0 | } |
109 | | |
110 | | GF_EXPORT |
111 | 0 | void gf_rmt_set_on_new_client_cbk(void *task, rmt_on_new_client_cbk cbk) { |
112 | 0 | RMT_Settings *rmtcfg = gf_rmt_get_settings(); |
113 | 0 | if (rmtcfg) { |
114 | 0 | rmtcfg->on_new_client_cbk = cbk; |
115 | 0 | rmtcfg->on_new_client_cbk_task = task; |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | | GF_EXPORT |
120 | 0 | void* gf_rmt_get_on_new_client_task() { |
121 | 0 | return g_Settings.on_new_client_cbk_task; |
122 | 0 | } |
123 | | |
124 | | GF_EXPORT |
125 | 0 | void gf_rmt_client_set_on_del_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_del_cbk cbk) { |
126 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("gf_rmt_client_set_on_del_cbk client %p task %p cbk %p\n", client, task, cbk)); |
127 | 0 | if (client) { |
128 | 0 | client->on_del_cbk = cbk; |
129 | 0 | client->on_del_cbk_task = task; |
130 | 0 | } |
131 | 0 | } |
132 | | |
133 | | GF_EXPORT |
134 | 0 | void* gf_rmt_client_get_on_del_task(RMT_ClientCtx* client) { |
135 | 0 | if (client) |
136 | 0 | return client->on_del_cbk_task; |
137 | 0 | return NULL; |
138 | 0 | } |
139 | | |
140 | 0 | void rmt_clientctx_del(RMT_ClientCtx* client) { |
141 | |
|
142 | 0 | if (!client) return; |
143 | | |
144 | 0 | GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] closing client %s\n", client->peer_address)); |
145 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_clientctx_del client %p del_cbk %p\n", client, client ? client->on_del_cbk : NULL)); |
146 | |
|
147 | 0 | if (client->on_del_cbk) { |
148 | 0 | client->on_del_cbk(client->on_del_cbk_task); |
149 | 0 | } |
150 | |
|
151 | 0 | if (client->client_sock) { |
152 | 0 | if (client->ctx && client->ctx->sg) |
153 | 0 | gf_sk_group_unregister(client->ctx->sg, client->client_sock); |
154 | |
|
155 | 0 | if (!client->http_sess) |
156 | 0 | gf_sk_del(client->client_sock); //socket deleted by gf_dm_sess_del() if http_sess has been created |
157 | 0 | client->client_sock = NULL; |
158 | 0 | } |
159 | |
|
160 | 0 | if (client->http_sess) { |
161 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("deleting http_sess %p\n", client->http_sess)); |
162 | 0 | gf_dm_sess_del(client->http_sess); |
163 | 0 | client->http_sess = NULL; |
164 | 0 | } |
165 | |
|
166 | 0 | client->ctx = NULL; |
167 | 0 | client->on_data_cbk = NULL; |
168 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("%s:%d rmt_clientctx_del client %p task %p\n", __FILE__, __LINE__, client, client->on_data_cbk_task)); |
169 | |
|
170 | 0 | if (client->on_data_cbk_task) gf_free(client->on_data_cbk_task); |
171 | 0 | client->on_data_cbk_task = NULL; |
172 | |
|
173 | 0 | if (client->on_del_cbk_task) gf_free(client->on_del_cbk_task); |
174 | 0 | client->on_del_cbk_task = NULL; |
175 | |
|
176 | 0 | gf_free(client); |
177 | 0 | } |
178 | | |
179 | 0 | void rmt_serverctx_reset(RMT_ServerCtx* ctx) { |
180 | 0 | if (ctx->active_clients) { |
181 | 0 | while (gf_list_count(ctx->active_clients)) { |
182 | 0 | RMT_ClientCtx* client = gf_list_pop_back(ctx->active_clients); |
183 | 0 | rmt_clientctx_del(client); |
184 | 0 | client = NULL; |
185 | 0 | } |
186 | 0 | gf_list_del(ctx->active_clients); |
187 | 0 | ctx->active_clients = NULL; |
188 | 0 | } |
189 | |
|
190 | 0 | if (ctx->server_sock) { |
191 | 0 | if (ctx->sg) |
192 | 0 | gf_sk_group_unregister(ctx->sg, ctx->server_sock); |
193 | |
|
194 | 0 | gf_sk_del(ctx->server_sock); |
195 | 0 | ctx->server_sock = NULL; |
196 | 0 | } |
197 | |
|
198 | 0 | if (ctx->sg) { gf_sk_group_del(ctx->sg); ctx->sg = NULL; } |
199 | |
|
200 | 0 | if (ctx->dm_sess) { gf_dm_del(ctx->dm_sess); ctx->dm_sess = NULL; } |
201 | |
|
202 | 0 | if (ctx->ssl_ctx) { |
203 | 0 | #ifdef GPAC_HAS_SSL |
204 | 0 | gf_ssl_server_context_del(ctx->ssl_ctx); |
205 | | #else |
206 | | gf_free(ctx->ssl_ctx); |
207 | | #endif |
208 | 0 | ctx->ssl_ctx = NULL; |
209 | 0 | } |
210 | | |
211 | |
|
212 | 0 | } |
213 | | |
214 | 0 | void rmt_close_client(RMT_ClientCtx* client) { |
215 | 0 | if (!client) return; |
216 | 0 | RMT_ServerCtx* ctx = client->ctx; |
217 | 0 | if (ctx && ctx->active_clients) { |
218 | 0 | gf_list_del_item(ctx->active_clients, client); |
219 | 0 | } |
220 | 0 | rmt_clientctx_del(client); |
221 | 0 | } |
222 | | |
223 | 0 | GF_Err rmt_send_reply(GF_DownloadSession* http_sess, int responseCode, char* response_body, char* content_type) { |
224 | |
|
225 | 0 | u32 body_size = 0; |
226 | 0 | char szFmt[100]; |
227 | 0 | char szDate[200]; |
228 | |
|
229 | 0 | httpout_format_date(gf_net_get_utc(), szDate, GF_FALSE); |
230 | 0 | gf_dm_sess_set_header(http_sess, "Date", szDate); |
231 | 0 | gf_dm_sess_set_header(http_sess, "Server", gf_gpac_version()); |
232 | |
|
233 | 0 | if (response_body) { |
234 | 0 | body_size = (u32) strlen(response_body); |
235 | 0 | gf_dm_sess_set_header(http_sess, "Content-Type", content_type ? content_type : "text/html"); |
236 | 0 | sprintf(szFmt, "%d", body_size); |
237 | 0 | gf_dm_sess_set_header(http_sess, "Content-Length", szFmt); |
238 | |
|
239 | 0 | } |
240 | |
|
241 | 0 | return gf_dm_sess_send_reply(http_sess, responseCode, response_body, response_body ? (u32) strlen(response_body) : 0, (response_body==NULL)); |
242 | |
|
243 | 0 | } |
244 | | |
245 | 0 | static void rmt_on_http_session_data(void *usr_cbk, GF_NETIO_Parameter *parameter) { |
246 | |
|
247 | 0 | RMT_ClientCtx* client_ctx = (RMT_ClientCtx*) usr_cbk; |
248 | 0 | if (!client_ctx) { |
249 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] sess io on null session\n")); |
250 | 0 | return; |
251 | 0 | } |
252 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_on_http_session_data on peer %s param %p msg_type %d \nerror %s \ndata %p \nsize %d \nname %p \nvalue %p \nreply %d\n", |
253 | 0 | client_ctx->peer_address, |
254 | 0 | parameter, |
255 | 0 | parameter->msg_type, |
256 | 0 | gf_error_to_string(parameter->error), |
257 | 0 | parameter->data, |
258 | 0 | parameter->size, |
259 | 0 | parameter->name, |
260 | 0 | parameter->value, |
261 | 0 | parameter->reply |
262 | 0 | )); |
263 | |
|
264 | 0 | client_ctx->last_active_time = gf_sys_clock_high_res(); |
265 | |
|
266 | 0 | const char* durl = gf_dm_sess_get_resource_name(parameter->sess); |
267 | 0 | const char* ua = gf_dm_sess_get_header(parameter->sess, "User-Agent"); |
268 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("requested url: %s with ua %s\n", durl, ua)); |
269 | 0 | GF_Err e; |
270 | |
|
271 | 0 | if (parameter->msg_type != GF_NETIO_PARSE_REPLY) { |
272 | 0 | if (parameter->size) { |
273 | 0 | strncat(client_ctx->buffer, parameter->data, parameter->size); |
274 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("session data is now: %s\n", client_ctx->buffer)); |
275 | 0 | } |
276 | 0 | } |
277 | |
|
278 | 0 | if (parameter->msg_type == GF_NETIO_PARSE_REPLY) { |
279 | |
|
280 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("final buffer before response: %s\n", client_ctx->buffer)); |
281 | |
|
282 | 0 | GF_DownloadSession* http_sess = parameter->sess ; |
283 | |
|
284 | 0 | if (!client_ctx->is_ws) { |
285 | 0 | const char* ws_header_version = gf_dm_sess_get_header(http_sess, "Sec-WebSocket-Version"); |
286 | 0 | const char* ws_header_key = gf_dm_sess_get_header(http_sess, "Sec-WebSocket-Key"); |
287 | |
|
288 | 0 | if (!ws_header_version || strcmp(ws_header_version, "13") || !ws_header_key) { |
289 | |
|
290 | 0 | gf_dm_sess_clear_headers(http_sess); |
291 | 0 | gf_dm_sess_set_header(http_sess, "Connection", "close"); |
292 | |
|
293 | 0 | rmt_send_reply(http_sess, 404, "only ws connections are accepted", NULL); |
294 | 0 | client_ctx->should_close = GF_TRUE; |
295 | 0 | return; |
296 | |
|
297 | 0 | } |
298 | 0 | else { |
299 | |
|
300 | 0 | char* resp_key = gf_strdup(ws_header_key); |
301 | 0 | gf_dynstrcat(&resp_key, websocket_guid, NULL); |
302 | |
|
303 | 0 | u32 resp_key_len = (u32) strlen(resp_key); |
304 | 0 | if (resp_key_len < 1) |
305 | 0 | return; |
306 | | |
307 | | |
308 | 0 | u8 hash[GF_SHA1_DIGEST_SIZE]; |
309 | 0 | gf_sha1_csum( resp_key, resp_key_len, hash ); |
310 | |
|
311 | 0 | u32 end_b64 = gf_base64_encode(hash, GF_SHA1_DIGEST_SIZE, resp_key, resp_key_len); |
312 | 0 | resp_key[resp_key_len-1] = 0; |
313 | 0 | if (end_b64 < resp_key_len) { |
314 | 0 | resp_key[end_b64] = 0; |
315 | 0 | } else { |
316 | 0 | return; |
317 | 0 | } |
318 | | |
319 | 0 | gf_dm_sess_clear_headers(http_sess); |
320 | |
|
321 | 0 | gf_dm_sess_set_header(http_sess, "Connection", "Upgrade"); |
322 | 0 | gf_dm_sess_set_header(http_sess, "Upgrade", "websocket"); |
323 | 0 | gf_dm_sess_set_header(http_sess, "Sec-WebSocket-Accept", resp_key); |
324 | |
|
325 | 0 | e = rmt_send_reply(http_sess, 101, NULL, NULL); |
326 | |
|
327 | 0 | if (e==GF_OK) { |
328 | 0 | client_ctx->is_ws = GF_TRUE; |
329 | |
|
330 | 0 | if (g_Settings.on_new_client_cbk && g_Settings.on_new_client_cbk_task) { |
331 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("%s:%d calling on new client cb %p with client %p\n", __FILE__,__LINE__, g_Settings.on_new_client_cbk, client_ctx)); |
332 | 0 | g_Settings.on_new_client_cbk(g_Settings.on_new_client_cbk_task, client_ctx); |
333 | 0 | } |
334 | 0 | } |
335 | 0 | gf_free(resp_key); |
336 | 0 | } |
337 | 0 | } else { |
338 | 0 | GF_LOG(GF_LOG_WARNING, GF_LOG_RMTWS, ("this should be a ws message right?\n")); |
339 | 0 | } |
340 | | |
341 | | |
342 | | |
343 | 0 | memset(&client_ctx->buffer, 0, 1024); |
344 | 0 | } |
345 | | |
346 | |
|
347 | 0 | } |
348 | | |
349 | | |
350 | 0 | GF_Err rmt_create_server(RMT_ServerCtx* ctx) { |
351 | |
|
352 | 0 | GF_Err e = GF_OK; |
353 | |
|
354 | 0 | rmt_serverctx_reset(ctx); |
355 | |
|
356 | 0 | if (g_Settings.cert && g_Settings.pkey) { |
357 | 0 | #ifdef GPAC_HAS_SSL |
358 | 0 | if (!gf_file_exists(g_Settings.cert)) { |
359 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Certificate file %s not found\n", g_Settings.cert)); |
360 | 0 | return GF_IO_ERR; |
361 | 0 | } |
362 | 0 | if (!gf_file_exists(g_Settings.pkey)) { |
363 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Private key file %s not found\n", g_Settings.pkey)); |
364 | 0 | return GF_IO_ERR; |
365 | 0 | } |
366 | 0 | if (gf_ssl_init_lib()) { |
367 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Failed to initialize OpenSSL library\n")); |
368 | 0 | return GF_IO_ERR; |
369 | 0 | } |
370 | | |
371 | 0 | Bool prev_noh2 = gf_opts_get_bool("core", "no-h2"); |
372 | 0 | if (!prev_noh2) |
373 | 0 | gf_opts_set_key("core", "no-h2", "1"); |
374 | |
|
375 | 0 | ctx->ssl_ctx = gf_ssl_server_context_new(g_Settings.cert, g_Settings.pkey); |
376 | |
|
377 | 0 | if (!prev_noh2) |
378 | 0 | gf_opts_set_key("core", "no-h2", "0"); |
379 | |
|
380 | 0 | if (!ctx->ssl_ctx) return GF_IO_ERR; |
381 | | #else |
382 | | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] TLS key/certificate set but GPAC compiled without TLS support\n")); |
383 | | return GF_NOT_SUPPORTED; |
384 | | |
385 | | #endif |
386 | 0 | } |
387 | | |
388 | 0 | ctx->sg = gf_sk_group_new(); |
389 | |
|
390 | 0 | ctx->server_sock = gf_sk_new(GF_SOCK_TYPE_TCP); |
391 | 0 | e = gf_sk_bind( ctx->server_sock, |
392 | 0 | g_Settings.limit_connections_to_localhost ? "127.0.0.1" : "0.0.0.0", |
393 | 0 | g_Settings.port, |
394 | 0 | NULL, 0, GF_SOCK_REUSE_PORT ); |
395 | |
|
396 | 0 | if (!e) e = gf_sk_listen(ctx->server_sock, 0); |
397 | 0 | if (e) { |
398 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] failed to start server on port %d: %s\n", g_Settings.port, gf_error_to_string(e) )); |
399 | 0 | return e; |
400 | 0 | } |
401 | | |
402 | 0 | gf_sk_group_register(ctx->sg, ctx->server_sock); |
403 | |
|
404 | 0 | gf_sk_server_mode(ctx->server_sock, GF_TRUE); |
405 | 0 | GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] Server running on port %d\n", g_Settings.port)); |
406 | |
|
407 | 0 | Bool prev_noh2 = gf_opts_get_bool("core", "no-h2"); |
408 | 0 | if (!prev_noh2) |
409 | 0 | gf_opts_set_key("core", "no-h2", "1"); |
410 | 0 | ctx->dm_sess = gf_dm_new(NULL); |
411 | 0 | if (!prev_noh2) |
412 | 0 | gf_opts_set_key("core", "no-h2", "0"); |
413 | 0 | ctx->active_clients = gf_list_new(); |
414 | |
|
415 | 0 | return e; |
416 | |
|
417 | 0 | } |
418 | | |
419 | | GF_EXPORT |
420 | 0 | void gf_rmt_client_set_on_data_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_data_cbk cbk) { |
421 | 0 | if (client) { |
422 | 0 | client->on_data_cbk = cbk; |
423 | 0 | client->on_data_cbk_task = task; |
424 | 0 | } |
425 | 0 | } |
426 | | |
427 | 0 | GF_Err rmt_server_handle_new_client(RMT_ServerCtx* ctx) { |
428 | 0 | GF_Err e = GF_OK; |
429 | |
|
430 | 0 | void *ssl_c = NULL; |
431 | |
|
432 | 0 | RMT_ClientCtx* new_client; |
433 | 0 | GF_SAFEALLOC(new_client, RMT_ClientCtx); |
434 | |
|
435 | 0 | new_client->ctx = ctx; |
436 | |
|
437 | 0 | e = gf_sk_accept(ctx->server_sock, &new_client->client_sock); |
438 | 0 | gf_sk_group_register(ctx->sg, new_client->client_sock); |
439 | |
|
440 | 0 | u32 port; |
441 | 0 | gf_sk_get_remote_address_port(new_client->client_sock, new_client->peer_address, &port); |
442 | 0 | sprintf(new_client->peer_address + strlen(new_client->peer_address), ":%d", port); //TDOO: size check |
443 | 0 | GF_LOG(GF_LOG_INFO, GF_LOG_RMTWS, ("[RMT] connected to remote peer %s\n", new_client->peer_address)); |
444 | | |
445 | |
|
446 | 0 | #ifdef GPAC_HAS_SSL |
447 | 0 | if (ctx->ssl_ctx) { |
448 | 0 | ssl_c = gf_ssl_new(ctx->ssl_ctx, new_client->client_sock, &e); |
449 | 0 | if (e) { |
450 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT] Failed to create TLS session from %s: %s\n", new_client->peer_address, gf_error_to_string(e) )); |
451 | 0 | rmt_clientctx_del(new_client); |
452 | 0 | return e; |
453 | 0 | } |
454 | 0 | } |
455 | 0 | #endif |
456 | | |
457 | 0 | new_client->http_sess = gf_dm_sess_new_server(ctx->dm_sess, new_client->client_sock, ssl_c, rmt_on_http_session_data, new_client, GF_TRUE, &e); |
458 | |
|
459 | 0 | new_client->is_ws = GF_FALSE; |
460 | 0 | new_client->last_active_time = gf_sys_clock_high_res(); |
461 | 0 | new_client->last_ping_time = gf_sys_clock_high_res(); |
462 | |
|
463 | 0 | gf_list_add(ctx->active_clients, new_client); |
464 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("adding active client socket %p %s\n", new_client, new_client->peer_address)); |
465 | |
|
466 | 0 | return e; |
467 | 0 | } |
468 | | |
469 | 0 | GF_Err rmt_client_send_payload(RMT_ClientCtx* client, const u8* payload, u64 size, Bool is_binary) { |
470 | |
|
471 | 0 | GF_Err e = GF_OK; |
472 | |
|
473 | 0 | GF_BitStream* respbs = gf_bs_new(NULL, size+10, GF_BITSTREAM_WRITE_DYN); |
474 | 0 | gf_bs_write_int(respbs, 1, 1); //FIN=1 |
475 | 0 | gf_bs_write_int(respbs, 0, 3); //RSV=0 |
476 | 0 | gf_bs_write_int(respbs, is_binary ? RMT_WEBSOCKET_BINARY : RMT_WEBSOCKET_TEXT, 4); //opcode=text |
477 | 0 | gf_bs_write_int(respbs, 0, 1); //masked=0 |
478 | |
|
479 | 0 | if (size < 126) |
480 | 0 | gf_bs_write_int(respbs, (s32)size, 7); |
481 | 0 | else if (size < 65536) { |
482 | 0 | gf_bs_write_int(respbs, 126, 7); |
483 | 0 | gf_bs_write_long_int(respbs, size, 16); |
484 | 0 | } else { |
485 | 0 | gf_bs_write_long_int(respbs, 127, 7); |
486 | 0 | gf_bs_write_long_int(respbs, size, 64); |
487 | 0 | } |
488 | |
|
489 | 0 | gf_bs_write_data(respbs, payload, (u32)size); |
490 | |
|
491 | 0 | u8* respbuf=NULL; |
492 | 0 | u32 respsize=0; |
493 | 0 | gf_bs_get_content(respbs, &respbuf, &respsize); |
494 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("ready to send respbuf of size %d: %.*s\n", respsize, respsize, respbuf)); |
495 | 0 | e = dm_sess_write(client->http_sess, respbuf, respsize); |
496 | |
|
497 | 0 | gf_bs_del(respbs); |
498 | 0 | gf_free(respbuf); |
499 | |
|
500 | 0 | return e; |
501 | |
|
502 | 0 | } |
503 | | |
504 | 0 | GF_Err rmt_client_handle_ws_payload(RMT_ClientCtx* client, u8* payload, u64 size, Bool is_binary) { |
505 | |
|
506 | 0 | if (client->on_data_cbk && client->on_data_cbk_task) { |
507 | 0 | client->on_data_cbk(client->on_data_cbk_task, payload, size, is_binary); |
508 | 0 | } |
509 | |
|
510 | 0 | return GF_OK; |
511 | | |
512 | | // if (size) |
513 | | // payload[0] = 'A'; |
514 | | |
515 | | // return rmt_client_send_payload(client, payload, size, is_binary); |
516 | |
|
517 | 0 | } |
518 | | |
519 | 0 | GF_Err rmt_client_handle_ws_frame(RMT_ClientCtx* client, GF_BitStream* bs) { |
520 | |
|
521 | 0 | GF_Err e = GF_OK; |
522 | |
|
523 | 0 | if (gf_bs_available(bs) < 2) return GF_IO_ERR; |
524 | | |
525 | 0 | u32 FIN = gf_bs_read_int(bs, 1); //TODO: handle fragmented? |
526 | 0 | /*u32 RSV =*/ gf_bs_read_int(bs, 3); |
527 | 0 | u32 opcode = gf_bs_read_int(bs, 4); |
528 | 0 | u32 masked = gf_bs_read_int(bs, 1); |
529 | 0 | u64 payload_size = gf_bs_read_int(bs, 7); |
530 | 0 | if (payload_size == 126) { |
531 | 0 | if (gf_bs_available(bs) < 2) return GF_IO_ERR; |
532 | 0 | payload_size = gf_bs_read_int(bs, 16); |
533 | 0 | } |
534 | 0 | else if (payload_size == 127) { |
535 | 0 | if (gf_bs_available(bs) < 8) return GF_IO_ERR; |
536 | 0 | payload_size = gf_bs_read_long_int(bs, 64); |
537 | 0 | } |
538 | 0 | if (gf_bs_available(bs) < 4) return GF_IO_ERR; |
539 | 0 | char masking_key[4] = {0}; |
540 | 0 | if (masked) { |
541 | 0 | gf_bs_read_data(bs, (u8*)&masking_key, 4); |
542 | 0 | } |
543 | |
|
544 | 0 | u8* extra_payload = NULL; |
545 | 0 | u32 extra_read = 0; |
546 | |
|
547 | 0 | if (payload_size + gf_bs_get_position(bs) > gf_bs_get_size(bs)) { |
548 | 0 | u64 extra_size = payload_size + gf_bs_get_position(bs) - gf_bs_get_size(bs) ; |
549 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("buffer too small for payload_size %llu bs_pos %u bs_size %u => extra_size %llu\n", payload_size, gf_bs_get_position(bs), gf_bs_get_size(bs), extra_size)); |
550 | 0 | extra_payload = gf_malloc( sizeof(u8) * extra_size ); |
551 | |
|
552 | 0 | e = GF_OK; |
553 | 0 | while (!e && extra_read < extra_size) { |
554 | 0 | u32 new_read = 0; |
555 | 0 | e = gf_dm_read_data(client->http_sess, extra_payload + extra_read, (u32)(extra_size-extra_read), &new_read); |
556 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("extra gf_dm_read_data => %d e=%s\n",extra_read, gf_error_to_string(e))); |
557 | 0 | extra_read += new_read; |
558 | 0 | } |
559 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("extra gf_dm_read_data => %d e=%s\n",extra_read, gf_error_to_string(e))); |
560 | 0 | } |
561 | |
|
562 | 0 | u8* unmasked_payload = gf_malloc( payload_size * sizeof(u8) + 1); // add 1 to add null to get c string |
563 | 0 | int i=0; |
564 | 0 | for (i=0; i<payload_size && gf_bs_available(bs); i++) { |
565 | 0 | unmasked_payload[i] = (u8) ( gf_bs_read_u8(bs) ^ masking_key[i%4] ); |
566 | 0 | } |
567 | |
|
568 | 0 | if (extra_payload) { |
569 | 0 | for (u32 j=0; j<extra_read; j++) { |
570 | 0 | unmasked_payload[i] = (u8) ( extra_payload[j] ^ masking_key[i%4] ); |
571 | 0 | i++; |
572 | 0 | } |
573 | 0 | } |
574 | |
|
575 | 0 | unmasked_payload[i] = 0; |
576 | |
|
577 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("parsed ws message: FIN %d \n opcode %d \n masked %d \n payload_size %d \n masking_key %.*s \n unmasked_payload \n", |
578 | 0 | FIN, |
579 | 0 | opcode, |
580 | 0 | masked, |
581 | 0 | payload_size, |
582 | 0 | 4, masking_key |
583 | 0 | )); |
584 | |
|
585 | 0 | switch(opcode) { |
586 | | |
587 | 0 | case RMT_WEBSOCKET_CLOSE: |
588 | 0 | client->should_close = GF_TRUE; |
589 | 0 | break; |
590 | | |
591 | 0 | case RMT_WEBSOCKET_TEXT: |
592 | 0 | case RMT_WEBSOCKET_BINARY: |
593 | |
|
594 | 0 | e = rmt_client_handle_ws_payload(client, unmasked_payload, payload_size, (opcode==RMT_WEBSOCKET_BINARY)); |
595 | |
|
596 | 0 | break; |
597 | | |
598 | | |
599 | 0 | case RMT_WEBSOCKET_PING:; |
600 | |
|
601 | 0 | GF_BitStream* respbs = gf_bs_new(NULL, 0, GF_BITSTREAM_WRITE_DYN); |
602 | 0 | gf_bs_write_int(respbs, 1, 1); //FIN=1 |
603 | 0 | gf_bs_write_int(respbs, 0, 3); //RSV=0 |
604 | 0 | gf_bs_write_int(respbs, RMT_WEBSOCKET_PONG, 4); //opcode=pong |
605 | 0 | gf_bs_write_int(respbs, 0, 1); //masked=0 |
606 | 0 | gf_bs_write_int(respbs, (s32)payload_size, 7); |
607 | 0 | gf_bs_write_data(respbs, unmasked_payload, (u32)payload_size); |
608 | |
|
609 | 0 | u8* respbuf=NULL; |
610 | 0 | u32 respsize=0; |
611 | 0 | gf_bs_get_content(respbs, &respbuf, &respsize); |
612 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("ready to send PONG respbuf of size %d: %.*s\n", respsize, respsize, respbuf)); |
613 | 0 | e = dm_sess_write(client->http_sess, respbuf, respsize); |
614 | |
|
615 | 0 | gf_bs_del(respbs); |
616 | 0 | gf_free(respbuf); |
617 | |
|
618 | 0 | break; |
619 | | |
620 | | |
621 | 0 | case RMT_WEBSOCKET_CONTINUATION: // not supported yet |
622 | 0 | case RMT_WEBSOCKET_PONG: // last active timer already reset |
623 | 0 | default: |
624 | 0 | break; |
625 | 0 | } |
626 | | |
627 | | |
628 | 0 | gf_free(unmasked_payload); |
629 | 0 | if (extra_payload) |
630 | 0 | gf_free(extra_payload); |
631 | |
|
632 | 0 | return e; |
633 | 0 | } |
634 | | |
635 | 0 | GF_Err rmt_client_handle_event(RMT_ClientCtx* client) { |
636 | 0 | GF_Err e = GF_OK; |
637 | |
|
638 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("select ready for socket %p %s\n", client, client->peer_address)); |
639 | |
|
640 | 0 | if (!client->is_ws) { |
641 | 0 | e = gf_dm_sess_process(client->http_sess); |
642 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("[RMT] gf_dm_sess_process: %s\n", gf_error_to_string(e))); |
643 | 0 | } |
644 | 0 | else { |
645 | 0 | u8 buffer[1024]; //TODO: what size here?? do something like extra_payload?? |
646 | 0 | u32 read = 0; |
647 | 0 | e = gf_dm_read_data(client->http_sess, buffer, 1024, &read); |
648 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("gf_dm_read_data => %d sHTTP %.*s\n", read, read, buffer)); |
649 | |
|
650 | 0 | client->last_active_time = gf_sys_clock_high_res(); |
651 | |
|
652 | 0 | GF_BitStream* bs = gf_bs_new(buffer, read, GF_BITSTREAM_READ); |
653 | |
|
654 | 0 | while (e==GF_OK && gf_bs_available(bs)) { |
655 | |
|
656 | 0 | e = rmt_client_handle_ws_frame(client, bs); |
657 | |
|
658 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_client_handle_ws_frame() returned %s bs has avail %d \n", gf_errno_str(e), gf_bs_available(bs))); |
659 | |
|
660 | 0 | } |
661 | | |
662 | |
|
663 | 0 | gf_bs_del(bs); |
664 | | |
665 | |
|
666 | 0 | } |
667 | | |
668 | |
|
669 | 0 | return e; |
670 | 0 | } |
671 | | |
672 | 0 | GF_Err rmt_client_send_ping(RMT_ClientCtx* client) { |
673 | |
|
674 | 0 | GF_Err e = dm_sess_write(client->http_sess, RMT_WEBSOCKET_PING_MSG, 2); |
675 | 0 | return e; |
676 | |
|
677 | 0 | } |
678 | | |
679 | | GF_EXPORT |
680 | 0 | GF_Err gf_rmt_client_send_to_ws(RMT_ClientCtx* client, const char* msg, u64 size, Bool is_binary) { |
681 | |
|
682 | 0 | return rmt_client_send_payload(client, (const u8*) msg, size, is_binary); |
683 | |
|
684 | 0 | } |
685 | | |
686 | 0 | Bool rmt_client_should_close(RMT_ClientCtx* client) { |
687 | | |
688 | | // check valid object |
689 | 0 | if (!client || !client->client_sock) { |
690 | 0 | GF_LOG(GF_LOG_WARNING, GF_LOG_RMTWS, ("[RMT] weird dead session in rmt\n")); |
691 | 0 | return GF_TRUE; |
692 | 0 | } |
693 | | |
694 | 0 | if (client->should_close) { |
695 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s because request done\n", client, client->peer_address)); |
696 | 0 | return GF_TRUE; |
697 | 0 | } |
698 | | |
699 | | // check disconnection |
700 | 0 | GF_Err e = gf_sk_probe(client->client_sock); |
701 | 0 | if (e==GF_IP_CONNECTION_CLOSED) { |
702 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s\n", client, client->peer_address)); |
703 | 0 | return GF_TRUE; |
704 | 0 | } |
705 | | |
706 | | // check timeout |
707 | 0 | if (g_Settings.timeout_secs) { |
708 | 0 | u32 diff_sec = (u32) (gf_sys_clock_high_res() - client->last_active_time)/1000000; |
709 | 0 | if ( diff_sec > g_Settings.timeout_secs ) { |
710 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Disconnected socket %p %s for timeout \n", client, client->peer_address)); |
711 | 0 | return GF_TRUE; |
712 | 0 | } |
713 | 0 | } |
714 | | |
715 | 0 | return GF_FALSE; |
716 | |
|
717 | 0 | } |
718 | | |
719 | 0 | GF_Err rmt_server_wait_for_event(RMT_ServerCtx* ctx) { |
720 | |
|
721 | 0 | GF_Err e = gf_sk_group_select(ctx->sg, 10, GF_SK_SELECT_BOTH); |
722 | |
|
723 | 0 | if (e==GF_OK) { |
724 | | |
725 | | // event on server socket = new client |
726 | 0 | if (gf_sk_group_sock_is_set(ctx->sg, ctx->server_sock, GF_SK_SELECT_READ)) { |
727 | |
|
728 | 0 | e = rmt_server_handle_new_client(ctx); |
729 | 0 | if (e) |
730 | 0 | return e; |
731 | |
|
732 | 0 | } |
733 | | |
734 | | // event on one the active client |
735 | 0 | else { |
736 | | |
737 | | // check active sessions |
738 | 0 | u32 count = gf_list_count(ctx->active_clients); |
739 | 0 | for (u32 i=0; i<count; i++) { |
740 | 0 | RMT_ClientCtx* client = gf_list_get(ctx->active_clients, i); |
741 | |
|
742 | 0 | if (rmt_client_should_close(client)) { |
743 | |
|
744 | 0 | gf_list_rem(ctx->active_clients, i); |
745 | 0 | i--; |
746 | 0 | count--; |
747 | 0 | rmt_clientctx_del(client); |
748 | 0 | client = NULL; |
749 | 0 | continue; |
750 | 0 | } |
751 | | |
752 | 0 | if (gf_sk_group_sock_is_set(client->ctx->sg, client->client_sock, GF_SK_SELECT_WRITE)) { |
753 | | |
754 | | // check if we should send ping |
755 | 0 | if (g_Settings.ping_secs) { |
756 | |
|
757 | 0 | u32 diff_sec = (u32) (gf_sys_clock_high_res() - client->last_ping_time)/1000000; |
758 | |
|
759 | 0 | if ( diff_sec > g_Settings.ping_secs ) { |
760 | |
|
761 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("Sending PING on client %p %s\n", client, client->peer_address)); |
762 | 0 | e = rmt_client_send_ping(client); |
763 | 0 | if (!e) { |
764 | 0 | client->last_ping_time = gf_sys_clock_high_res(); |
765 | 0 | } |
766 | 0 | continue; |
767 | 0 | } |
768 | 0 | } |
769 | |
|
770 | 0 | } |
771 | | |
772 | | // handle received event |
773 | 0 | if (gf_sk_group_sock_is_set(client->ctx->sg, client->client_sock, GF_SK_SELECT_READ)) { |
774 | |
|
775 | 0 | e = rmt_client_handle_event(client); |
776 | |
|
777 | 0 | } |
778 | 0 | } |
779 | |
|
780 | 0 | } |
781 | |
|
782 | 0 | } |
783 | | |
784 | 0 | gf_sleep(g_Settings.msSleepBetweenServerUpdates); |
785 | 0 | return e; |
786 | |
|
787 | 0 | } |
788 | | |
789 | 0 | static u32 rmt_ws_thread_main(void* par) { |
790 | |
|
791 | 0 | RMT_WS* rmt = (RMT_WS*)par; |
792 | |
|
793 | 0 | RMT_ServerCtx* ctx; |
794 | 0 | GF_SAFEALLOC(ctx, RMT_ServerCtx); |
795 | |
|
796 | 0 | GF_Err e; |
797 | |
|
798 | 0 | while (!rmt->should_stop) { |
799 | | |
800 | | // create socket if not exist |
801 | 0 | if (!ctx->server_sock) { |
802 | |
|
803 | 0 | e = rmt_create_server(ctx); |
804 | 0 | if (e) { |
805 | 0 | if (ctx) { |
806 | 0 | rmt_serverctx_reset(ctx); |
807 | 0 | gf_free(ctx); |
808 | 0 | ctx = NULL; |
809 | 0 | } |
810 | 0 | return e; |
811 | 0 | } |
812 | | |
813 | 0 | continue; |
814 | 0 | } |
815 | | |
816 | 0 | e = rmt_server_wait_for_event(ctx); |
817 | |
|
818 | 0 | } |
819 | | |
820 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("[RMT] thread main request exit! should cleanup\n")); |
821 | 0 | rmt_serverctx_reset(ctx); |
822 | 0 | gf_free(ctx); |
823 | 0 | ctx = NULL; |
824 | 0 | return GF_OK; |
825 | |
|
826 | 0 | } |
827 | | |
828 | | |
829 | | |
830 | | RMT_Settings* gf_rmt_get_settings() |
831 | 0 | { |
832 | | // Default-initialize on first call |
833 | 0 | if( g_SettingsInitialized == GF_FALSE ) { |
834 | |
|
835 | 0 | g_Settings.port = 6363; |
836 | 0 | g_Settings.timeout_secs = 20; |
837 | 0 | g_Settings.ping_secs = 7; |
838 | |
|
839 | 0 | g_Settings.limit_connections_to_localhost = GF_FALSE; |
840 | 0 | g_Settings.msSleepBetweenServerUpdates = 50; |
841 | |
|
842 | 0 | g_Settings.on_new_client_cbk = NULL; |
843 | 0 | g_Settings.on_new_client_cbk_task = NULL; |
844 | |
|
845 | 0 | g_Settings.cert = NULL; |
846 | 0 | g_Settings.pkey = NULL; |
847 | |
|
848 | 0 | g_SettingsInitialized = GF_TRUE; |
849 | 0 | } |
850 | |
|
851 | 0 | return &g_Settings; |
852 | 0 | } |
853 | | |
854 | | |
855 | 0 | void rmt_ws_del(RMT_WS* rmt) { |
856 | |
|
857 | 0 | if (rmt && rmt->thread) { |
858 | | |
859 | |
|
860 | 0 | rmt->should_stop = GF_TRUE; |
861 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_ws_del() calling stop...\n")); |
862 | |
|
863 | 0 | gf_th_stop(rmt->thread); |
864 | |
|
865 | 0 | GF_LOG(GF_LOG_DEBUG, GF_LOG_RMTWS, ("rmt_ws_del() stop returned thread status is now %d\n", gf_th_status(rmt->thread))); |
866 | |
|
867 | 0 | gf_th_del(rmt->thread); |
868 | |
|
869 | 0 | rmt->thread = NULL; |
870 | |
|
871 | 0 | } |
872 | |
|
873 | 0 | gf_free(rmt); |
874 | 0 | rmt = NULL; |
875 | |
|
876 | 0 | } |
877 | | |
878 | 0 | RMT_WS* rmt_ws_new() { |
879 | 0 | RMT_WS* rmt = NULL; |
880 | |
|
881 | 0 | GF_SAFEALLOC(rmt, RMT_WS); |
882 | 0 | if (!rmt) { |
883 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT_WS] unable init rmt thread\n")); |
884 | 0 | return NULL; |
885 | 0 | } |
886 | | |
887 | | |
888 | | // Default-initialise if user has not set values |
889 | 0 | gf_rmt_get_settings(); |
890 | |
|
891 | 0 | rmt->thread = gf_th_new("rmt_ws_main_th"); |
892 | 0 | rmt->should_stop = GF_FALSE; |
893 | |
|
894 | 0 | GF_Err e = gf_th_run(rmt->thread, rmt_ws_thread_main, rmt); |
895 | 0 | if (e != GF_OK) { |
896 | 0 | GF_LOG(GF_LOG_ERROR, GF_LOG_RMTWS, ("[RMT_WS] unable to start websocket thread: %s\n", gf_error_to_string(e))); |
897 | 0 | rmt_ws_del(rmt); |
898 | 0 | rmt = NULL; |
899 | 0 | } |
900 | |
|
901 | 0 | return rmt; |
902 | | |
903 | |
|
904 | 0 | } |
905 | | |
906 | | #else //GPAC_DISABLE_RMTWS |
907 | | |
908 | | void gf_rmt_set_on_new_client_cbk(void *task, rmt_on_new_client_cbk cbk) { |
909 | | |
910 | | } |
911 | | void* gf_rmt_get_on_new_client_task() { |
912 | | return NULL; |
913 | | } |
914 | | |
915 | | const char* gf_rmt_get_peer_address(RMT_ClientCtx* client) { |
916 | | return NULL; |
917 | | } |
918 | | GF_Err gf_rmt_client_send_to_ws(RMT_ClientCtx* client, const char* msg, u64 size, Bool is_binary) { |
919 | | return GF_NOT_SUPPORTED; |
920 | | } |
921 | | |
922 | | void gf_rmt_client_set_on_data_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_data_cbk cbk) { |
923 | | |
924 | | } |
925 | | void* gf_rmt_client_get_on_data_task(RMT_ClientCtx* client) { |
926 | | return NULL; |
927 | | } |
928 | | |
929 | | void gf_rmt_client_set_on_del_cbk(RMT_ClientCtx* client, void* task, rmt_client_on_del_cbk cbk) { |
930 | | |
931 | | } |
932 | | void* gf_rmt_client_get_on_del_task(RMT_ClientCtx* client) { |
933 | | return NULL; |
934 | | } |
935 | | |
936 | | #endif |