/src/unit/src/nxt_http_websocket.c
Line  | Count  | Source  | 
1  |  |  | 
2  |  | /*  | 
3  |  |  * Copyright (C) NGINX, Inc.  | 
4  |  |  */  | 
5  |  |  | 
6  |  | #include <nxt_main.h>  | 
7  |  | #include <nxt_router.h>  | 
8  |  | #include <nxt_http.h>  | 
9  |  | #include <nxt_router_request.h>  | 
10  |  | #include <nxt_port_memory_int.h>  | 
11  |  | #include <nxt_websocket.h>  | 
12  |  | #include <nxt_websocket_header.h>  | 
13  |  |  | 
14  |  |  | 
15  |  | static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);  | 
16  |  | static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,  | 
17  |  |     void *data);  | 
18  |  |  | 
19  |  |  | 
20  |  | const nxt_http_request_state_t  nxt_http_websocket  | 
21  |  |     nxt_aligned(64) =  | 
22  |  | { | 
23  |  |     .ready_handler = nxt_http_websocket_client,  | 
24  |  |     .error_handler = nxt_http_websocket_error_handler,  | 
25  |  | };  | 
26  |  |  | 
27  |  |  | 
28  |  | static void  | 
29  |  | nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)  | 
30  | 0  | { | 
31  | 0  |     size_t                  frame_size, used_size, copy_size, buf_free_size;  | 
32  | 0  |     size_t                  chunk_copy_size;  | 
33  | 0  |     nxt_buf_t               *out, *buf, **out_tail, *b, *next;  | 
34  | 0  |     nxt_int_t               res;  | 
35  | 0  |     nxt_http_request_t      *r;  | 
36  | 0  |     nxt_request_rpc_data_t  *req_rpc_data;  | 
37  | 0  |     nxt_websocket_header_t  *wsh;  | 
38  |  | 
  | 
39  | 0  |     r = obj;  | 
40  | 0  |     req_rpc_data = r->req_rpc_data;  | 
41  |  | 
  | 
42  | 0  |     if (nxt_slow_path(req_rpc_data == NULL)) { | 
43  | 0  |         nxt_debug(task, "websocket client frame for destroyed request");  | 
44  |  | 
  | 
45  | 0  |         return;  | 
46  | 0  |     }  | 
47  |  |  | 
48  | 0  |     nxt_debug(task, "http websocket client frame");  | 
49  |  | 
  | 
50  | 0  |     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;  | 
51  |  | 
  | 
52  | 0  |     frame_size = nxt_websocket_frame_header_size(wsh)  | 
53  | 0  |                   + nxt_websocket_frame_payload_len(wsh);  | 
54  |  | 
  | 
55  | 0  |     buf = NULL;  | 
56  | 0  |     buf_free_size = 0;  | 
57  | 0  |     out = NULL;  | 
58  | 0  |     out_tail = &out;  | 
59  |  | 
  | 
60  | 0  |     b = r->ws_frame;  | 
61  |  | 
  | 
62  | 0  |     while (b != NULL && frame_size > 0) { | 
63  | 0  |         used_size = nxt_buf_mem_used_size(&b->mem);  | 
64  | 0  |         copy_size = nxt_min(used_size, frame_size);  | 
65  |  | 
  | 
66  | 0  |         while (copy_size > 0) { | 
67  | 0  |             if (buf == NULL || buf_free_size == 0) { | 
68  | 0  |                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);  | 
69  |  | 
  | 
70  | 0  |                 buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,  | 
71  | 0  |                                             buf_free_size);  | 
72  |  | 
  | 
73  | 0  |                 *out_tail = buf;  | 
74  | 0  |                 out_tail = &buf->next;  | 
75  | 0  |             }  | 
76  |  | 
  | 
77  | 0  |             chunk_copy_size = nxt_min(buf_free_size, copy_size);  | 
78  |  | 
  | 
79  | 0  |             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,  | 
80  | 0  |                                        chunk_copy_size);  | 
81  |  | 
  | 
82  | 0  |             copy_size -= chunk_copy_size;  | 
83  | 0  |             b->mem.pos += chunk_copy_size;  | 
84  | 0  |             buf_free_size -= chunk_copy_size;  | 
85  | 0  |         }  | 
86  |  | 
  | 
87  | 0  |         frame_size -= copy_size;  | 
88  | 0  |         next = b->next;  | 
89  | 0  |         b->next = NULL;  | 
90  |  | 
  | 
91  | 0  |         if (nxt_buf_mem_used_size(&b->mem) == 0) { | 
92  | 0  |             nxt_work_queue_add(&task->thread->engine->fast_work_queue,  | 
93  | 0  |                                b->completion_handler, task, b, b->parent);  | 
94  |  | 
  | 
95  | 0  |             r->ws_frame = next;  | 
96  | 0  |         }  | 
97  |  | 
  | 
98  | 0  |         b = next;  | 
99  | 0  |     }  | 
100  |  | 
  | 
101  | 0  |     res = nxt_port_socket_write(task, req_rpc_data->app_port,  | 
102  | 0  |                                 NXT_PORT_MSG_WEBSOCKET, -1,  | 
103  | 0  |                                 req_rpc_data->stream,  | 
104  | 0  |                                 task->thread->engine->port->id, out);  | 
105  | 0  |     if (nxt_slow_path(res != NXT_OK)) { | 
106  |  |         // TODO: handle  | 
107  | 0  |     }  | 
108  |  | 
  | 
109  | 0  |     b = r->ws_frame;  | 
110  |  | 
  | 
111  | 0  |     if (b != NULL) { | 
112  | 0  |         used_size = nxt_buf_mem_used_size(&b->mem);  | 
113  |  | 
  | 
114  | 0  |         if (used_size > 0) { | 
115  | 0  |             nxt_memmove(b->mem.start, b->mem.pos, used_size);  | 
116  |  | 
  | 
117  | 0  |             b->mem.pos = b->mem.start;  | 
118  | 0  |             b->mem.free = b->mem.start + used_size;  | 
119  | 0  |         }  | 
120  | 0  |     }  | 
121  |  | 
  | 
122  | 0  |     nxt_http_request_ws_frame_start(task, r, r->ws_frame);  | 
123  | 0  | }  | 
124  |  |  | 
125  |  |  | 
126  |  | static void  | 
127  |  | nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)  | 
128  | 0  | { | 
129  | 0  |     nxt_http_request_t      *r;  | 
130  | 0  |     nxt_request_rpc_data_t  *req_rpc_data;  | 
131  |  | 
  | 
132  | 0  |     nxt_debug(task, "http websocket error handler");  | 
133  |  | 
  | 
134  | 0  |     r = obj;  | 
135  | 0  |     req_rpc_data = r->req_rpc_data;  | 
136  |  | 
  | 
137  | 0  |     if (req_rpc_data == NULL) { | 
138  | 0  |         nxt_debug(task, "  req_rpc_data is NULL");  | 
139  | 0  |         goto close_handler;  | 
140  | 0  |     }  | 
141  |  |  | 
142  | 0  |     if (req_rpc_data->app_port == NULL) { | 
143  | 0  |         nxt_debug(task, "  app_port is NULL");  | 
144  | 0  |         goto close_handler;  | 
145  | 0  |     }  | 
146  |  |  | 
147  | 0  |     (void) nxt_port_socket_write(task, req_rpc_data->app_port,  | 
148  | 0  |                                  NXT_PORT_MSG_WEBSOCKET_LAST,  | 
149  | 0  |                                  -1, req_rpc_data->stream,  | 
150  | 0  |                                  task->thread->engine->port->id, NULL);  | 
151  |  | 
  | 
152  | 0  | close_handler:  | 
153  |  | 
  | 
154  | 0  |     nxt_http_request_close_handler(task, obj, data);  | 
155  | 0  | }  |