/src/h2o/lib/common/memcached.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku |
3 | | * |
4 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | | * of this software and associated documentation files (the "Software"), to |
6 | | * deal in the Software without restriction, including without limitation the |
7 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
8 | | * sell copies of the Software, and to permit persons to whom the Software is |
9 | | * furnished to do so, subject to the following conditions: |
10 | | * |
11 | | * The above copyright notice and this permission notice shall be included in |
12 | | * all copies or substantial portions of the Software. |
13 | | * |
14 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
19 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
20 | | * IN THE SOFTWARE. |
21 | | */ |
22 | | #include <errno.h> |
23 | | #include <inttypes.h> |
24 | | #include <unistd.h> |
25 | | #include "yrmcds.h" |
26 | | #include "h2o/linklist.h" |
27 | | #include "h2o/memcached.h" |
28 | | #include "h2o/rand.h" |
29 | | #include "h2o/string_.h" |
30 | | |
31 | | struct st_h2o_memcached_context_t { |
32 | | pthread_mutex_t mutex; |
33 | | pthread_cond_t cond; |
34 | | h2o_linklist_t pending; |
35 | | size_t num_threads_connected; |
36 | | char *host; |
37 | | uint16_t port; |
38 | | int text_protocol; |
39 | | h2o_iovec_t prefix; |
40 | | }; |
41 | | |
42 | | struct st_h2o_memcached_conn_t { |
43 | | h2o_memcached_context_t *ctx; |
44 | | yrmcds yrmcds; |
45 | | pthread_mutex_t mutex; |
46 | | h2o_linklist_t inflight; |
47 | | int writer_exit_requested; |
48 | | }; |
49 | | |
50 | | enum en_h2o_memcached_req_type_t { REQ_TYPE_GET, REQ_TYPE_SET, REQ_TYPE_DELETE }; |
51 | | |
52 | | struct st_h2o_memcached_req_t { |
53 | | enum en_h2o_memcached_req_type_t type; |
54 | | h2o_linklist_t pending; |
55 | | h2o_linklist_t inflight; |
56 | | union { |
57 | | struct { |
58 | | h2o_multithread_receiver_t *receiver; |
59 | | h2o_multithread_message_t message; |
60 | | h2o_memcached_get_cb cb; |
61 | | void *cb_data; |
62 | | int value_is_encoded; |
63 | | h2o_iovec_t value; |
64 | | uint32_t serial; |
65 | | } get; |
66 | | struct { |
67 | | h2o_iovec_t value; |
68 | | uint32_t expiration; |
69 | | } set; |
70 | | } data; |
71 | | struct { |
72 | | size_t len; |
73 | | char base[1]; |
74 | | } key; |
75 | | }; |
76 | | |
77 | | static h2o_memcached_req_t *create_req(h2o_memcached_context_t *ctx, enum en_h2o_memcached_req_type_t type, h2o_iovec_t key, |
78 | | int encode_key) |
79 | 0 | { |
80 | 0 | h2o_memcached_req_t *req = h2o_mem_alloc(offsetof(h2o_memcached_req_t, key.base) + ctx->prefix.len + |
81 | 0 | (encode_key ? (key.len + 2) / 3 * 4 + 1 : key.len)); |
82 | 0 | req->type = type; |
83 | 0 | req->pending = (h2o_linklist_t){NULL}; |
84 | 0 | req->inflight = (h2o_linklist_t){NULL}; |
85 | 0 | memset(&req->data, 0, sizeof(req->data)); |
86 | 0 | if (ctx->prefix.len != 0) |
87 | 0 | memcpy(req->key.base, ctx->prefix.base, ctx->prefix.len); |
88 | 0 | req->key.len = ctx->prefix.len; |
89 | 0 | if (encode_key) { |
90 | 0 | req->key.len += h2o_base64_encode(req->key.base + req->key.len, key.base, key.len, 1); |
91 | 0 | } else { |
92 | 0 | memcpy(req->key.base + req->key.len, key.base, key.len); |
93 | 0 | req->key.len += key.len; |
94 | 0 | } |
95 | 0 | return req; |
96 | 0 | } |
97 | | |
98 | | static void free_req(h2o_memcached_req_t *req) |
99 | 0 | { |
100 | 0 | assert(!h2o_linklist_is_linked(&req->pending)); |
101 | 0 | switch (req->type) { |
102 | 0 | case REQ_TYPE_GET: |
103 | 0 | assert(!h2o_linklist_is_linked(&req->data.get.message.link)); |
104 | 0 | h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len); |
105 | 0 | free(req->data.get.value.base); |
106 | 0 | break; |
107 | 0 | case REQ_TYPE_SET: |
108 | 0 | h2o_mem_set_secure(req->data.set.value.base, 0, req->data.set.value.len); |
109 | 0 | free(req->data.set.value.base); |
110 | 0 | break; |
111 | 0 | case REQ_TYPE_DELETE: |
112 | 0 | break; |
113 | 0 | default: |
114 | 0 | assert(!"FIXME"); |
115 | 0 | break; |
116 | 0 | } |
117 | 0 | free(req); |
118 | 0 | } |
119 | | |
120 | | static void discard_req(h2o_memcached_req_t *req) |
121 | 0 | { |
122 | 0 | switch (req->type) { |
123 | 0 | case REQ_TYPE_GET: |
124 | 0 | h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); |
125 | 0 | break; |
126 | 0 | default: |
127 | 0 | free_req(req); |
128 | 0 | break; |
129 | 0 | } |
130 | 0 | } |
131 | | |
132 | | static h2o_memcached_req_t *pop_inflight(struct st_h2o_memcached_conn_t *conn, uint32_t serial) |
133 | 0 | { |
134 | 0 | h2o_memcached_req_t *req; |
135 | |
|
136 | 0 | pthread_mutex_lock(&conn->mutex); |
137 | |
|
138 | 0 | if (conn->yrmcds.text_mode) { |
139 | | /* in text mode, responses are returned in order (and we may receive responses for commands other than GET) */ |
140 | 0 | if (!h2o_linklist_is_empty(&conn->inflight)) { |
141 | 0 | req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn->inflight.next); |
142 | 0 | assert(req->type == REQ_TYPE_GET); |
143 | 0 | if (req->data.get.serial == serial) |
144 | 0 | goto Found; |
145 | 0 | } |
146 | 0 | } else { |
147 | | /* in binary mode, responses are received out-of-order (and we would only recieve responses for GET) */ |
148 | 0 | h2o_linklist_t *node; |
149 | 0 | for (node = conn->inflight.next; node != &conn->inflight; node = node->next) { |
150 | 0 | req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, node); |
151 | 0 | assert(req->type == REQ_TYPE_GET); |
152 | 0 | if (req->data.get.serial == serial) |
153 | 0 | goto Found; |
154 | 0 | } |
155 | 0 | } |
156 | | |
157 | | /* not found */ |
158 | 0 | pthread_mutex_unlock(&conn->mutex); |
159 | 0 | return NULL; |
160 | | |
161 | 0 | Found: |
162 | 0 | h2o_linklist_unlink(&req->inflight); |
163 | 0 | pthread_mutex_unlock(&conn->mutex); |
164 | 0 | return req; |
165 | 0 | } |
166 | | |
167 | | static void *writer_main(void *_conn) |
168 | 0 | { |
169 | 0 | struct st_h2o_memcached_conn_t *conn = _conn; |
170 | 0 | yrmcds_error err; |
171 | |
|
172 | 0 | pthread_mutex_lock(&conn->ctx->mutex); |
173 | |
|
174 | 0 | while (!__sync_add_and_fetch(&conn->writer_exit_requested, 0)) { |
175 | 0 | while (!h2o_linklist_is_empty(&conn->ctx->pending)) { |
176 | 0 | h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn->ctx->pending.next); |
177 | 0 | h2o_linklist_unlink(&req->pending); |
178 | 0 | pthread_mutex_unlock(&conn->ctx->mutex); |
179 | |
|
180 | 0 | switch (req->type) { |
181 | 0 | case REQ_TYPE_GET: |
182 | 0 | pthread_mutex_lock(&conn->mutex); |
183 | 0 | h2o_linklist_insert(&conn->inflight, &req->inflight); |
184 | 0 | pthread_mutex_unlock(&conn->mutex); |
185 | 0 | if ((err = yrmcds_get(&conn->yrmcds, req->key.base, req->key.len, 0, &req->data.get.serial)) != YRMCDS_OK) |
186 | 0 | goto Error; |
187 | 0 | break; |
188 | 0 | case REQ_TYPE_SET: |
189 | 0 | err = yrmcds_set(&conn->yrmcds, req->key.base, req->key.len, req->data.set.value.base, req->data.set.value.len, 0, |
190 | 0 | req->data.set.expiration, 0, !conn->yrmcds.text_mode, NULL); |
191 | 0 | discard_req(req); |
192 | 0 | if (err != YRMCDS_OK) |
193 | 0 | goto Error; |
194 | 0 | break; |
195 | 0 | case REQ_TYPE_DELETE: |
196 | 0 | err = yrmcds_remove(&conn->yrmcds, req->key.base, req->key.len, !conn->yrmcds.text_mode, NULL); |
197 | 0 | discard_req(req); |
198 | 0 | if (err != YRMCDS_OK) |
199 | 0 | goto Error; |
200 | 0 | break; |
201 | 0 | default: |
202 | 0 | h2o_error_printf("[lib/common/memcached.c] unknown type:%d\n", (int)req->type); |
203 | 0 | err = YRMCDS_NOT_IMPLEMENTED; |
204 | 0 | goto Error; |
205 | 0 | } |
206 | | |
207 | 0 | pthread_mutex_lock(&conn->ctx->mutex); |
208 | 0 | } |
209 | 0 | pthread_cond_wait(&conn->ctx->cond, &conn->ctx->mutex); |
210 | 0 | } |
211 | | |
212 | 0 | pthread_mutex_unlock(&conn->ctx->mutex); |
213 | 0 | return NULL; |
214 | | |
215 | 0 | Error: |
216 | 0 | h2o_error_printf("[lib/common/memcached.c] failed to send request; %s\n", yrmcds_strerror(err)); |
217 | | /* doc says the call can be used to interrupt yrmcds_recv */ |
218 | 0 | yrmcds_shutdown(&conn->yrmcds); |
219 | |
|
220 | 0 | return NULL; |
221 | 0 | } |
222 | | |
223 | | static void connect_to_server(h2o_memcached_context_t *ctx, yrmcds *yrmcds) |
224 | 0 | { |
225 | 0 | size_t failcnt; |
226 | 0 | yrmcds_error err; |
227 | |
|
228 | 0 | for (failcnt = 0; (err = yrmcds_connect(yrmcds, ctx->host, ctx->port)) != YRMCDS_OK; ++failcnt) { |
229 | 0 | if (failcnt == 0) { |
230 | 0 | h2o_error_printf("[lib/common/memcached.c] failed to connect to memcached at %s:%" PRIu16 ", %s\n", ctx->host, |
231 | 0 | ctx->port, yrmcds_strerror(err)); |
232 | 0 | } |
233 | 0 | ++failcnt; |
234 | 0 | usleep(2000000 + h2o_rand() % 3000000); /* sleep 2 to 5 seconds */ |
235 | 0 | } |
236 | | /* connected */ |
237 | 0 | if (ctx->text_protocol) |
238 | 0 | yrmcds_text_mode(yrmcds); |
239 | 0 | h2o_error_printf("[lib/common/memcached.c] connected to memcached at %s:%" PRIu16 "\n", ctx->host, ctx->port); |
240 | 0 | } |
241 | | |
242 | | static void reader_main(h2o_memcached_context_t *ctx) |
243 | 0 | { |
244 | 0 | struct st_h2o_memcached_conn_t conn = {ctx, {0}, PTHREAD_MUTEX_INITIALIZER, {&conn.inflight, &conn.inflight}, 0}; |
245 | 0 | pthread_t writer_thread; |
246 | 0 | yrmcds_response resp; |
247 | 0 | yrmcds_error err; |
248 | 0 | int ret; |
249 | | |
250 | | /* connect to server and start the writer thread */ |
251 | 0 | connect_to_server(conn.ctx, &conn.yrmcds); |
252 | 0 | if ((ret = pthread_create(&writer_thread, NULL, writer_main, &conn)) != 0) { |
253 | 0 | char buf[128]; |
254 | 0 | h2o_fatal("pthread_create: %s", h2o_strerror_r(ret, buf, sizeof(buf))); |
255 | 0 | } |
256 | | |
257 | 0 | pthread_mutex_lock(&conn.ctx->mutex); |
258 | 0 | ++conn.ctx->num_threads_connected; |
259 | 0 | pthread_mutex_unlock(&conn.ctx->mutex); |
260 | | |
261 | | /* receive data until an error occurs */ |
262 | 0 | while (1) { |
263 | 0 | if ((err = yrmcds_recv(&conn.yrmcds, &resp)) != YRMCDS_OK) { |
264 | 0 | h2o_error_printf("[lib/common/memcached.c] yrmcds_recv:%s\n", yrmcds_strerror(err)); |
265 | 0 | break; |
266 | 0 | } |
267 | 0 | h2o_memcached_req_t *req = pop_inflight(&conn, resp.serial); |
268 | 0 | if (req == NULL) { |
269 | 0 | if (conn.yrmcds.text_mode) |
270 | 0 | continue; |
271 | 0 | h2o_error_printf("[lib/common/memcached.c] received unexpected serial\n"); |
272 | 0 | break; |
273 | 0 | } |
274 | 0 | if (resp.status == YRMCDS_STATUS_OK) { |
275 | 0 | req->data.get.value = h2o_iovec_init(h2o_mem_alloc(resp.data_len), resp.data_len); |
276 | 0 | memcpy(req->data.get.value.base, resp.data, resp.data_len); |
277 | 0 | h2o_mem_set_secure((void *)resp.data, 0, resp.data_len); |
278 | 0 | } |
279 | 0 | h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); |
280 | 0 | } |
281 | | |
282 | | /* send error to all the reqs in-flight */ |
283 | 0 | pthread_mutex_lock(&conn.mutex); |
284 | 0 | while (!h2o_linklist_is_empty(&conn.inflight)) { |
285 | 0 | h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn.inflight.next); |
286 | 0 | h2o_linklist_unlink(&req->inflight); |
287 | 0 | assert(req->type == REQ_TYPE_GET); |
288 | 0 | h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message); |
289 | 0 | } |
290 | 0 | pthread_mutex_unlock(&conn.mutex); |
291 | | |
292 | | /* stop the writer thread */ |
293 | 0 | __sync_add_and_fetch(&conn.writer_exit_requested, 1); |
294 | 0 | pthread_mutex_lock(&conn.ctx->mutex); |
295 | 0 | pthread_cond_broadcast(&conn.ctx->cond); |
296 | 0 | pthread_mutex_unlock(&conn.ctx->mutex); |
297 | 0 | pthread_join(writer_thread, NULL); |
298 | | |
299 | | /* decrement num_threads_connected, and discard all the pending requests if no connections are alive */ |
300 | 0 | pthread_mutex_lock(&conn.ctx->mutex); |
301 | 0 | if (--conn.ctx->num_threads_connected == 0) { |
302 | 0 | while (!h2o_linklist_is_empty(&conn.ctx->pending)) { |
303 | 0 | h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn.ctx->pending.next); |
304 | 0 | h2o_linklist_unlink(&req->pending); |
305 | 0 | discard_req(req); |
306 | 0 | } |
307 | 0 | } |
308 | 0 | pthread_mutex_unlock(&conn.ctx->mutex); |
309 | | |
310 | | /* close the connection */ |
311 | 0 | yrmcds_close(&conn.yrmcds); |
312 | 0 | } |
313 | | |
314 | | static void *thread_main(void *_ctx) |
315 | 0 | { |
316 | 0 | h2o_memcached_context_t *ctx = _ctx; |
317 | |
|
318 | 0 | while (1) |
319 | 0 | reader_main(ctx); |
320 | 0 | return NULL; |
321 | 0 | } |
322 | | |
323 | | static void dispatch(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req) |
324 | 0 | { |
325 | 0 | pthread_mutex_lock(&ctx->mutex); |
326 | |
|
327 | 0 | if (ctx->num_threads_connected != 0) { |
328 | 0 | h2o_linklist_insert(&ctx->pending, &req->pending); |
329 | 0 | pthread_cond_signal(&ctx->cond); |
330 | 0 | } else { |
331 | 0 | discard_req(req); |
332 | 0 | } |
333 | |
|
334 | 0 | pthread_mutex_unlock(&ctx->mutex); |
335 | 0 | } |
336 | | |
337 | | void h2o_memcached_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages) |
338 | 0 | { |
339 | 0 | while (!h2o_linklist_is_empty(messages)) { |
340 | 0 | h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, data.get.message.link, messages->next); |
341 | 0 | h2o_linklist_unlink(&req->data.get.message.link); |
342 | 0 | assert(req->type == REQ_TYPE_GET); |
343 | 0 | if (req->data.get.cb != NULL) { |
344 | 0 | if (req->data.get.value_is_encoded && req->data.get.value.len != 0) { |
345 | 0 | h2o_iovec_t decoded = h2o_decode_base64url(NULL, req->data.get.value.base, req->data.get.value.len); |
346 | 0 | h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len); |
347 | 0 | free(req->data.get.value.base); |
348 | 0 | req->data.get.value = decoded; |
349 | 0 | } |
350 | 0 | req->data.get.cb(req->data.get.value, req->data.get.cb_data); |
351 | 0 | } |
352 | 0 | free_req(req); |
353 | 0 | } |
354 | 0 | } |
355 | | |
356 | | h2o_memcached_req_t *h2o_memcached_get(h2o_memcached_context_t *ctx, h2o_multithread_receiver_t *receiver, h2o_iovec_t key, |
357 | | h2o_memcached_get_cb cb, void *cb_data, int flags) |
358 | 0 | { |
359 | 0 | h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_GET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); |
360 | 0 | req->data.get.receiver = receiver; |
361 | 0 | req->data.get.cb = cb; |
362 | 0 | req->data.get.cb_data = cb_data; |
363 | 0 | req->data.get.value_is_encoded = (flags & H2O_MEMCACHED_ENCODE_VALUE) != 0; |
364 | 0 | dispatch(ctx, req); |
365 | 0 | return req; |
366 | 0 | } |
367 | | |
368 | | void h2o_memcached_cancel_get(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req) |
369 | 0 | { |
370 | 0 | int do_free = 0; |
371 | |
|
372 | 0 | pthread_mutex_lock(&ctx->mutex); |
373 | 0 | req->data.get.cb = NULL; |
374 | 0 | if (h2o_linklist_is_linked(&req->pending)) { |
375 | 0 | h2o_linklist_unlink(&req->pending); |
376 | 0 | do_free = 1; |
377 | 0 | } |
378 | 0 | pthread_mutex_unlock(&ctx->mutex); |
379 | |
|
380 | 0 | if (do_free) |
381 | 0 | free_req(req); |
382 | 0 | } |
383 | | |
384 | | void h2o_memcached_set(h2o_memcached_context_t *ctx, h2o_iovec_t key, h2o_iovec_t value, uint32_t expiration, int flags) |
385 | 0 | { |
386 | 0 | h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_SET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); |
387 | 0 | if ((flags & H2O_MEMCACHED_ENCODE_VALUE) != 0) { |
388 | 0 | req->data.set.value.base = h2o_mem_alloc((value.len + 2) / 3 * 4 + 1); |
389 | 0 | req->data.set.value.len = h2o_base64_encode(req->data.set.value.base, value.base, value.len, 1); |
390 | 0 | } else { |
391 | 0 | req->data.set.value = h2o_iovec_init(h2o_mem_alloc(value.len), value.len); |
392 | 0 | memcpy(req->data.set.value.base, value.base, value.len); |
393 | 0 | } |
394 | 0 | req->data.set.expiration = expiration; |
395 | 0 | dispatch(ctx, req); |
396 | 0 | } |
397 | | |
398 | | void h2o_memcached_delete(h2o_memcached_context_t *ctx, h2o_iovec_t key, int flags) |
399 | 0 | { |
400 | 0 | h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_DELETE, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0); |
401 | 0 | dispatch(ctx, req); |
402 | 0 | } |
403 | | |
404 | | h2o_memcached_context_t *h2o_memcached_create_context(const char *host, uint16_t port, int text_protocol, size_t num_threads, |
405 | | const char *prefix) |
406 | 0 | { |
407 | 0 | h2o_memcached_context_t *ctx = h2o_mem_alloc(sizeof(*ctx)); |
408 | |
|
409 | 0 | pthread_mutex_init(&ctx->mutex, NULL); |
410 | 0 | pthread_cond_init(&ctx->cond, NULL); |
411 | 0 | h2o_linklist_init_anchor(&ctx->pending); |
412 | 0 | ctx->num_threads_connected = 0; |
413 | 0 | ctx->host = h2o_strdup(NULL, host, SIZE_MAX).base; |
414 | 0 | ctx->port = port; |
415 | 0 | ctx->text_protocol = text_protocol; |
416 | 0 | ctx->prefix = h2o_strdup(NULL, prefix, SIZE_MAX); |
417 | |
|
418 | 0 | { /* start the threads */ |
419 | 0 | pthread_t tid; |
420 | 0 | pthread_attr_t attr; |
421 | 0 | size_t i; |
422 | 0 | pthread_attr_init(&attr); |
423 | 0 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); |
424 | 0 | for (i = 0; i != num_threads; ++i) |
425 | 0 | h2o_multithread_create_thread(&tid, &attr, thread_main, ctx); |
426 | 0 | pthread_attr_destroy(&attr); |
427 | 0 | } |
428 | |
|
429 | 0 | return ctx; |
430 | 0 | } |