Coverage Report

Created: 2023-06-07 06:20

/src/h2o/lib/common/http1client.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014 DeNA Co., Ltd.
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <arpa/inet.h>
23
#include <fcntl.h>
24
#include <netdb.h>
25
#include <netinet/in.h>
26
#include <sys/socket.h>
27
#include <sys/types.h>
28
#include <sys/un.h>
29
#include "picohttpparser.h"
30
#include "h2o/httpclient.h"
31
#include "h2o/token.h"
32
33
#if !H2O_USE_LIBUV && defined(__linux__)
34
#define USE_PIPE_READER 1
35
#else
36
#define USE_PIPE_READER 0
37
#endif
38
39
enum enum_h2o_http1client_stream_state {
40
    STREAM_STATE_HEAD,
41
    STREAM_STATE_BODY,
42
    STREAM_STATE_CLOSED,
43
};
44
45
struct st_h2o_http1client_t {
46
    h2o_httpclient_t super;
47
    h2o_socket_t *sock;
48
    struct {
49
        enum enum_h2o_http1client_stream_state req;
50
        enum enum_h2o_http1client_stream_state res;
51
    } state;
52
    h2o_url_t *_origin;
53
    int _method_is_head;
54
    int _do_keepalive;
55
    union {
56
        struct {
57
            size_t bytesleft;
58
        } content_length;
59
        struct {
60
            struct phr_chunked_decoder decoder;
61
            size_t bytes_decoded_in_buf;
62
        } chunked;
63
    } _body_decoder;
64
    h2o_socket_cb reader;
65
    h2o_httpclient_proceed_req_cb proceed_req;
66
    /**
67
     * buffer used to hold chunk headers of a request body; the size is SIZE_MAX in hex + CRLF + '\0'
68
     */
69
    char _chunk_len_str[(sizeof(H2O_UINT64_LONGEST_HEX_STR) - 1) + 2 + 1];
70
    /**
71
     * buffer used to retain request body that is inflight
72
     */
73
    struct {
74
        h2o_buffer_t *buf;
75
        int is_end_stream;
76
    } body_buf;
77
    /**
78
     * `on_body_piped` is non-NULL iff used
79
     */
80
    h2o_httpclient_pipe_reader_t pipe_reader;
81
    /**
82
     * maintain the number of bytes being already processed on the associated socket
83
     */
84
    uint64_t _socket_bytes_processed;
85
    unsigned _is_chunked : 1;
86
    unsigned _seen_at_least_one_chunk : 1;
87
    unsigned _delay_free : 1;
88
    unsigned _app_prefers_pipe_reader : 1;
89
};
90
91
static void on_body_to_pipe(h2o_socket_t *_sock, const char *err);
92
93
static void req_body_send(struct st_h2o_http1client_t *client);
94
static void update_read_state(struct st_h2o_http1client_t *client);
95
96
static void close_client(struct st_h2o_http1client_t *client)
97
618
{
98
618
    if (client->sock != NULL) {
99
618
        if (client->super.connpool != NULL && client->_do_keepalive && client->super.connpool->socketpool->timeout > 0) {
100
            /* we do not send pipelined requests, and thus can trash all the received input at the end of the request */
101
0
            h2o_buffer_consume(&client->sock->input, client->sock->input->size);
102
0
            h2o_socketpool_return(client->super.connpool->socketpool, client->sock);
103
618
        } else {
104
618
            h2o_socket_close(client->sock);
105
618
        }
106
618
    }
107
618
    if (h2o_timer_is_linked(&client->super._timeout))
108
0
        h2o_timer_unlink(&client->super._timeout);
109
618
    if (client->body_buf.buf != NULL)
110
0
        h2o_buffer_dispose(&client->body_buf.buf);
111
618
    if (!client->_delay_free)
112
618
        free(client);
113
618
}
114
115
static void close_response(struct st_h2o_http1client_t *client)
116
564
{
117
564
    assert(client->state.res == STREAM_STATE_CLOSED);
118
564
    if (client->state.req == STREAM_STATE_CLOSED) {
119
548
        close_client(client);
120
548
    } else {
121
16
        h2o_socket_read_stop(client->sock);
122
16
    }
123
564
}
124
125
static h2o_httpclient_body_cb call_on_head(struct st_h2o_http1client_t *client, const char *errstr, h2o_httpclient_on_head_t *args)
126
618
{
127
618
    assert(!client->_delay_free);
128
0
    client->_delay_free = 1;
129
618
    h2o_httpclient_body_cb cb = client->super._cb.on_head(&client->super, errstr, args);
130
618
    client->_delay_free = 0;
131
618
    return cb;
132
618
}
133
134
static int call_on_body(struct st_h2o_http1client_t *client, const char *errstr)
135
1.03k
{
136
1.03k
    assert(!client->_delay_free);
137
0
    client->_delay_free = 1;
138
1.03k
    int ret =
139
1.03k
        (client->reader == on_body_to_pipe ? client->pipe_reader.on_body_piped : client->super._cb.on_body)(&client->super, errstr);
140
1.03k
    client->_delay_free = 0;
141
1.03k
    return ret;
142
1.03k
}
143
144
static void call_proceed_req(struct st_h2o_http1client_t *client, const char *errstr)
145
0
{
146
0
    assert(!client->_delay_free);
147
0
    client->_delay_free = 1;
148
0
    client->proceed_req(&client->super, errstr);
149
0
    client->_delay_free = 0;
150
0
}
151
152
static void on_error(struct st_h2o_http1client_t *client, const char *errstr)
153
70
{
154
70
    switch (client->state.res) {
155
0
    case STREAM_STATE_HEAD:
156
0
        call_on_head(client, errstr, NULL);
157
0
        break;
158
54
    case STREAM_STATE_BODY:
159
54
        call_on_body(client, errstr);
160
54
        break;
161
16
    case STREAM_STATE_CLOSED:
162
16
        if (client->proceed_req != NULL)
163
0
            call_proceed_req(client, errstr);
164
16
        break;
165
70
    }
166
70
    close_client(client);
167
70
}
168
169
static void on_body_timeout(h2o_timer_t *entry)
170
0
{
171
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
172
0
    on_error(client, h2o_httpclient_error_io_timeout);
173
0
}
174
175
static void on_body_until_close(h2o_socket_t *sock, const char *err)
176
976
{
177
976
    struct st_h2o_http1client_t *client = sock->data;
178
179
976
    h2o_timer_unlink(&client->super._timeout);
180
181
976
    if (err != NULL) {
182
461
        client->state.res = STREAM_STATE_CLOSED;
183
461
        client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
184
461
        call_on_body(client, h2o_httpclient_error_is_eos);
185
461
        close_response(client);
186
461
        return;
187
461
    }
188
515
    uint64_t size = sock->bytes_read - client->_socket_bytes_processed;
189
515
    client->_socket_bytes_processed = sock->bytes_read;
190
191
515
    client->super.bytes_read.body += size;
192
515
    client->super.bytes_read.total += size;
193
194
515
    if (size != 0) {
195
515
        if (call_on_body(client, NULL) != 0) {
196
0
            close_client(client);
197
0
            return;
198
0
        }
199
515
        update_read_state(client);
200
515
    }
201
515
}
202
203
static void on_body_content_length(h2o_socket_t *sock, const char *err)
204
0
{
205
0
    struct st_h2o_http1client_t *client = sock->data;
206
207
0
    h2o_timer_unlink(&client->super._timeout);
208
209
0
    if (err != NULL) {
210
0
        on_error(client, h2o_httpclient_error_io);
211
0
        return;
212
0
    }
213
0
    uint64_t size = sock->bytes_read - client->_socket_bytes_processed;
214
0
    client->_socket_bytes_processed = sock->bytes_read;
215
216
0
    client->super.bytes_read.body += size;
217
0
    client->super.bytes_read.total += size;
218
219
0
    if (size != 0 || client->_body_decoder.content_length.bytesleft == 0) {
220
0
        int ret;
221
0
        if (client->_body_decoder.content_length.bytesleft <= size) {
222
0
            if (client->_body_decoder.content_length.bytesleft < size) {
223
                /* remove the trailing garbage from buf, and disable keepalive */
224
0
                client->sock->input->size -= size - client->_body_decoder.content_length.bytesleft;
225
0
                client->_do_keepalive = 0;
226
0
            }
227
0
            client->_body_decoder.content_length.bytesleft = 0;
228
0
            client->state.res = STREAM_STATE_CLOSED;
229
0
            client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
230
0
        } else {
231
0
            client->_body_decoder.content_length.bytesleft -= size;
232
0
        }
233
0
        ret = call_on_body(client, client->state.res == STREAM_STATE_CLOSED ? h2o_httpclient_error_is_eos : NULL);
234
0
        if (client->state.res == STREAM_STATE_CLOSED) {
235
0
            close_response(client);
236
0
            return;
237
0
        } else if (ret != 0) {
238
0
            client->_do_keepalive = 0;
239
0
            close_client(client);
240
0
            return;
241
0
        }
242
0
    }
243
244
0
#if USE_PIPE_READER
245
0
    if (client->pipe_reader.on_body_piped != NULL) {
246
0
        h2o_socket_dont_read(client->sock, 1);
247
0
        client->reader = on_body_to_pipe;
248
0
    }
249
0
#endif
250
0
    update_read_state(client);
251
0
}
252
253
void on_body_to_pipe(h2o_socket_t *_sock, const char *err)
254
0
{
255
0
#if USE_PIPE_READER
256
0
    struct st_h2o_http1client_t *client = _sock->data;
257
258
0
    h2o_timer_unlink(&client->super._timeout);
259
0
    h2o_socket_read_stop(client->sock);
260
261
0
    if (err != NULL) {
262
0
        on_error(client, h2o_httpclient_error_io);
263
0
        return;
264
0
    }
265
266
0
    ssize_t bytes_read;
267
0
    while ((bytes_read = splice(h2o_socket_get_fd(client->sock), NULL, client->pipe_reader.fd, NULL,
268
0
                                client->_body_decoder.content_length.bytesleft, SPLICE_F_NONBLOCK)) == -1 &&
269
0
           errno == EINTR)
270
0
        ;
271
0
    if (bytes_read == -1 && errno == EAGAIN) {
272
0
        update_read_state(client);
273
0
        return;
274
0
    }
275
0
    if (bytes_read <= 0) {
276
0
        on_error(client, h2o_httpclient_error_io);
277
0
        return;
278
0
    }
279
280
0
    client->_socket_bytes_processed += bytes_read;
281
0
    client->sock->bytes_read += bytes_read;
282
0
    client->super.bytes_read.body += bytes_read;
283
0
    client->super.bytes_read.total += bytes_read;
284
285
0
    client->_body_decoder.content_length.bytesleft -= bytes_read;
286
0
    if (client->_body_decoder.content_length.bytesleft == 0) {
287
0
        client->state.res = STREAM_STATE_CLOSED;
288
0
        client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
289
0
        h2o_socket_dont_read(client->sock, 0);
290
0
    }
291
292
0
    int ret = call_on_body(client, client->state.res == STREAM_STATE_CLOSED ? h2o_httpclient_error_is_eos : NULL);
293
294
0
    if (client->state.res == STREAM_STATE_CLOSED) {
295
0
        close_response(client);
296
0
    } else if (ret != 0) {
297
0
        client->_do_keepalive = 0;
298
0
        close_client(client);
299
0
    }
300
#else
301
    h2o_fatal("%s cannot be called", __FUNCTION__);
302
#endif
303
0
}
304
305
static void on_body_chunked(h2o_socket_t *sock, const char *err)
306
0
{
307
0
    struct st_h2o_http1client_t *client = sock->data;
308
0
    h2o_buffer_t *inbuf;
309
310
0
    h2o_timer_unlink(&client->super._timeout);
311
312
0
    if (err != NULL) {
313
0
        if (err == h2o_socket_error_closed && !phr_decode_chunked_is_in_data(&client->_body_decoder.chunked.decoder) &&
314
0
            client->_seen_at_least_one_chunk) {
315
            /*
316
             * if the peer closed after a full chunk, treat this
317
             * as if the transfer had complete, browsers appear to ignore
318
             * a missing 0\r\n chunk
319
             */
320
0
            client->_do_keepalive = 0;
321
0
            client->state.res = STREAM_STATE_CLOSED;
322
0
            client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
323
0
            call_on_body(client, h2o_httpclient_error_is_eos);
324
0
            close_response(client);
325
0
        } else {
326
0
            on_error(client, h2o_httpclient_error_io);
327
0
        }
328
0
        return;
329
0
    }
330
0
    uint64_t size = sock->bytes_read - client->_socket_bytes_processed;
331
0
    client->_socket_bytes_processed = sock->bytes_read;
332
333
0
    client->super.bytes_read.body += size;
334
0
    client->super.bytes_read.total += size;
335
336
0
    inbuf = client->sock->input;
337
0
    if (size != 0) {
338
0
        const char *errstr;
339
0
        int cb_ret;
340
0
        size_t newsz = size;
341
342
0
        switch (phr_decode_chunked(&client->_body_decoder.chunked.decoder, inbuf->bytes + inbuf->size - newsz, &newsz)) {
343
0
        case -1: /* error */
344
0
            newsz = size;
345
0
            client->_do_keepalive = 0;
346
0
            errstr = h2o_httpclient_error_http1_parse_failed;
347
0
            break;
348
0
        case -2: /* incomplete */
349
0
            errstr = NULL;
350
0
            break;
351
0
        default: /* complete, with garbage on tail; should disable keepalive */
352
0
            client->_do_keepalive = 0;
353
        /* fallthru */
354
0
        case 0: /* complete */
355
0
            client->state.res = STREAM_STATE_CLOSED;
356
0
            errstr = h2o_httpclient_error_is_eos;
357
0
            client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
358
0
            break;
359
0
        }
360
0
        inbuf->size -= size - newsz;
361
0
        if (inbuf->size > 0)
362
0
            client->_seen_at_least_one_chunk = 1;
363
0
        cb_ret = call_on_body(client, errstr);
364
0
        if (client->state.res == STREAM_STATE_CLOSED) {
365
0
            close_response(client);
366
0
            return;
367
0
        } else if (errstr != NULL) {
368
0
            close_client(client);
369
0
            return;
370
0
        } else if (cb_ret != 0) {
371
0
            client->_do_keepalive = 0;
372
0
            close_client(client);
373
0
            return;
374
0
        }
375
0
        update_read_state(client);
376
0
    }
377
0
}
378
379
static void on_head_timeout(h2o_timer_t *entry)
380
0
{
381
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
382
0
    on_error(client, h2o_httpclient_error_io_timeout);
383
0
}
384
385
static void on_head(h2o_socket_t *sock, const char *err)
386
618
{
387
618
    struct st_h2o_http1client_t *client = sock->data;
388
618
    int minor_version, version, http_status, rlen;
389
618
    const char *msg;
390
618
#define MAX_HEADERS 100
391
618
    h2o_header_t *headers;
392
618
    h2o_iovec_t *header_names;
393
618
    size_t msg_len, num_headers, i;
394
618
    h2o_socket_cb reader;
395
396
618
    h2o_timer_unlink(&client->super._timeout);
397
398
618
    if (err != NULL) {
399
0
        on_error(client, h2o_httpclient_error_io);
400
0
        return;
401
0
    }
402
403
    /* revert max read size to 1MB now that we have received the first chunk, presumably carrying all the response headers */
404
618
#if USE_PIPE_READER
405
618
    if (client->_app_prefers_pipe_reader)
406
0
        h2o_evloop_socket_set_max_read_size(client->sock, h2o_evloop_socket_max_read_size);
407
618
#endif
408
409
618
    client->super._timeout.cb = on_head_timeout;
410
411
618
    headers = h2o_mem_alloc_pool(client->super.pool, *headers, MAX_HEADERS);
412
618
    header_names = h2o_mem_alloc_pool(client->super.pool, *header_names, MAX_HEADERS);
413
414
    /* continue parsing the responses until we see a final one */
415
618
    while (1) {
416
        /* parse response */
417
618
        struct phr_header src_headers[MAX_HEADERS];
418
618
        num_headers = MAX_HEADERS;
419
618
        rlen = phr_parse_response(sock->input->bytes, sock->input->size, &minor_version, &http_status, &msg, &msg_len, src_headers,
420
618
                                  &num_headers, 0);
421
618
        switch (rlen) {
422
0
        case -1: /* error */
423
0
            on_error(client, h2o_httpclient_error_http1_parse_failed);
424
0
            return;
425
0
        case -2: /* incomplete */
426
0
            h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
427
0
            return;
428
618
        }
429
430
618
        client->super.bytes_read.header += rlen;
431
618
        client->super.bytes_read.total += rlen;
432
433
618
        version = 0x100 | (minor_version != 0);
434
435
        /* fill-in the headers */
436
1.23k
        for (i = 0; i != num_headers; ++i) {
437
618
            if (src_headers[i].name_len == 0) {
438
                /* reject multiline header */
439
0
                on_error(client, h2o_httpclient_error_http1_line_folding);
440
0
                return;
441
0
            }
442
618
            const h2o_token_t *token;
443
618
            char *orig_name = h2o_strdup(client->super.pool, src_headers[i].name, src_headers[i].name_len).base;
444
618
            h2o_strtolower((char *)src_headers[i].name, src_headers[i].name_len);
445
618
            token = h2o_lookup_token(src_headers[i].name, src_headers[i].name_len);
446
618
            if (token != NULL) {
447
618
                headers[i].name = (h2o_iovec_t *)&token->buf;
448
618
            } else {
449
0
                header_names[i] = h2o_iovec_init(src_headers[i].name, src_headers[i].name_len);
450
0
                headers[i].name = &header_names[i];
451
0
            }
452
618
            headers[i].value = h2o_iovec_init(src_headers[i].value, src_headers[i].value_len);
453
618
            headers[i].orig_name = orig_name;
454
618
            headers[i].flags = (h2o_header_flags_t){0};
455
618
        }
456
457
618
        if (!(100 <= http_status && http_status <= 199 && http_status != 101))
458
618
            break;
459
460
0
        if (client->super.informational_cb != NULL &&
461
0
            client->super.informational_cb(&client->super, version, http_status, h2o_iovec_init(msg, msg_len), headers,
462
0
                                           num_headers) != 0) {
463
0
            close_client(client);
464
0
            return;
465
0
        }
466
0
        h2o_buffer_consume(&client->sock->input, rlen);
467
0
        if (client->sock->input->size == 0) {
468
0
            h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
469
0
            return;
470
0
        }
471
0
    }
472
473
    /* recognize hop-by-hop response headers */
474
618
    reader = on_body_until_close;
475
618
    if (!h2o_httpclient__tunnel_is_ready(&client->super, http_status)) {
476
618
        client->_do_keepalive = minor_version >= 1;
477
1.23k
        for (i = 0; i != num_headers; ++i) {
478
618
            if (headers[i].name == &H2O_TOKEN_CONNECTION->buf) {
479
618
                if (h2o_contains_token(headers[i].value.base, headers[i].value.len, H2O_STRLIT("keep-alive"), ',')) {
480
0
                    client->_do_keepalive = 1;
481
618
                } else {
482
618
                    client->_do_keepalive = 0;
483
618
                }
484
618
            } else if (headers[i].name == &H2O_TOKEN_TRANSFER_ENCODING->buf) {
485
0
                if (h2o_memis(headers[i].value.base, headers[i].value.len, H2O_STRLIT("chunked"))) {
486
                    /* precond: _body_decoder.chunked is zero-filled */
487
0
                    client->_body_decoder.chunked.decoder.consume_trailer = 1;
488
0
                    reader = on_body_chunked;
489
0
                } else if (h2o_memis(headers[i].value.base, headers[i].value.len, H2O_STRLIT("identity"))) {
490
                    /* continue */
491
0
                } else {
492
0
                    on_error(client, h2o_httpclient_error_http1_unexpected_transfer_encoding);
493
0
                    return;
494
0
                }
495
0
            } else if (headers[i].name == &H2O_TOKEN_CONTENT_LENGTH->buf) {
496
0
                if ((client->_body_decoder.content_length.bytesleft = h2o_strtosize(headers[i].value.base, headers[i].value.len)) ==
497
0
                    SIZE_MAX) {
498
0
                    on_error(client, h2o_httpclient_error_invalid_content_length);
499
0
                    return;
500
0
                }
501
0
                if (reader != on_body_chunked)
502
0
                    reader = on_body_content_length;
503
0
            }
504
618
        }
505
618
    }
506
507
618
    client->state.res = STREAM_STATE_BODY;
508
618
    client->super.timings.response_start_at = h2o_gettimeofday(client->super.ctx->loop);
509
510
    /* RFC 2616 4.4 */
511
618
    if (client->_method_is_head || http_status == 204 || http_status == 304) {
512
103
        client->state.res = STREAM_STATE_CLOSED;
513
103
        client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
514
515
    } else {
515
        /* close the connection if impossible to determine the end of the response (RFC 7230 3.3.3) */
516
515
        if (reader == on_body_until_close)
517
515
            client->_do_keepalive = 0;
518
515
    }
519
520
618
    h2o_httpclient_on_head_t on_head = {
521
618
        .version = version,
522
618
        .status = http_status,
523
618
        .msg = h2o_iovec_init(msg, msg_len),
524
618
        .headers = headers,
525
618
        .num_headers = num_headers,
526
618
        .header_requires_dup = 1,
527
618
    };
528
618
#if USE_PIPE_READER
529
    /* If there is no less than 64KB of data to be read from the socket, offer the application the opportunity to use pipe for
530
     * transferring the content zero-copy. As switching to pipe involves the cost of creating a pipe (and disposing it when the
531
     * request is complete), we adopt this margin of 64KB, which offers clear improvement (5%) on 9th-gen Intel Core. */
532
618
    if (client->_app_prefers_pipe_reader && reader == on_body_content_length &&
533
618
        client->sock->input->size + 65536 <= client->_body_decoder.content_length.bytesleft)
534
0
        on_head.pipe_reader = &client->pipe_reader;
535
618
#endif
536
537
    /* call the callback */
538
618
    client->super._cb.on_body =
539
618
        call_on_head(client, client->state.res == STREAM_STATE_CLOSED ? h2o_httpclient_error_is_eos : NULL, &on_head);
540
541
618
    if (client->state.res == STREAM_STATE_CLOSED) {
542
103
        close_response(client);
543
103
        return;
544
515
    } else if (client->super._cb.on_body == NULL) {
545
0
        client->_do_keepalive = 0;
546
0
        close_client(client);
547
0
        return;
548
0
    }
549
550
515
    h2o_buffer_consume(&sock->input, rlen);
551
515
    client->_socket_bytes_processed = client->sock->bytes_read - client->sock->input->size;
552
553
515
    client->super._timeout.cb = on_body_timeout;
554
515
    h2o_socket_read_start(sock, reader);
555
515
    reader(client->sock, 0);
556
557
515
#undef MAX_HEADERS
558
515
}
559
560
static void on_head_first_byte_timeout(h2o_timer_t *entry)
561
0
{
562
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
563
0
    on_error(client, h2o_httpclient_error_first_byte_timeout);
564
0
}
565
566
static void on_whole_request_sent(h2o_socket_t *sock, const char *err)
567
618
{
568
618
    struct st_h2o_http1client_t *client = sock->data;
569
570
618
    h2o_timer_unlink(&client->super._timeout);
571
572
618
    if (err != NULL) {
573
70
        on_error(client, h2o_httpclient_error_io);
574
70
        return;
575
70
    }
576
577
548
    client->state.req = STREAM_STATE_CLOSED;
578
548
    client->super.timings.request_end_at = h2o_gettimeofday(client->super.ctx->loop);
579
580
548
    if (client->super.upgrade_to != NULL) {
581
        /* TODO use shutdown(2) to signal the peer that our send side has been closed, but continue reading on the receive side. */
582
0
        on_error(client, client->state.res < STREAM_STATE_BODY ? h2o_httpclient_error_io : h2o_httpclient_error_is_eos);
583
548
    } else {
584
548
        switch (client->state.res) {
585
548
        case STREAM_STATE_HEAD:
586
548
            client->super._timeout.cb = on_head_first_byte_timeout;
587
548
            h2o_timer_link(client->super.ctx->loop, client->super.ctx->first_byte_timeout, &client->super._timeout);
588
548
            break;
589
0
        case STREAM_STATE_BODY:
590
0
            break;
591
0
        case STREAM_STATE_CLOSED:
592
0
            close_client(client);
593
0
            break;
594
548
        }
595
548
    }
596
548
}
597
598
static void req_body_send_complete(h2o_socket_t *sock, const char *err)
599
0
{
600
0
    struct st_h2o_http1client_t *client = sock->data;
601
602
0
    h2o_buffer_consume(&client->body_buf.buf, client->body_buf.buf->size);
603
604
0
    if (err != NULL) {
605
0
        on_whole_request_sent(client->sock, err);
606
0
        return;
607
0
    }
608
609
0
    int is_end_stream = client->body_buf.is_end_stream;
610
611
0
    call_proceed_req(client, NULL);
612
613
0
    if (is_end_stream)
614
0
        on_whole_request_sent(client->sock, NULL);
615
0
}
616
617
/**
618
 * Encodes data. `bufs` must have at least 4 elements of space.
619
 */
620
static size_t req_body_send_prepare(struct st_h2o_http1client_t *client, h2o_iovec_t *bufs, size_t *bytes)
621
0
{
622
0
    size_t bufcnt = 0;
623
0
    *bytes = 0;
624
625
0
    if (client->_is_chunked) {
626
0
        if (client->body_buf.buf->size != 0) {
627
            /* build chunk header */
628
0
            bufs[bufcnt].base = client->_chunk_len_str;
629
0
            bufs[bufcnt].len =
630
0
                snprintf(client->_chunk_len_str, sizeof(client->_chunk_len_str), "%zx\r\n", client->body_buf.buf->size);
631
0
            *bytes += bufs[bufcnt].len;
632
0
            ++bufcnt;
633
            /* append chunk body */
634
0
            bufs[bufcnt++] = h2o_iovec_init(client->body_buf.buf->bytes, client->body_buf.buf->size);
635
0
            *bytes += client->body_buf.buf->size;
636
            /* append CRLF */
637
0
            bufs[bufcnt++] = h2o_iovec_init("\r\n", 2);
638
0
            *bytes += 2;
639
0
        }
640
0
        if (client->body_buf.is_end_stream) {
641
0
            static const h2o_iovec_t terminator = {H2O_STRLIT("0\r\n\r\n")};
642
0
            bufs[bufcnt++] = terminator;
643
0
            *bytes += terminator.len;
644
0
        }
645
0
    } else if (client->body_buf.buf->size != 0) {
646
0
        bufs[bufcnt++] = h2o_iovec_init(client->body_buf.buf->bytes, client->body_buf.buf->size);
647
0
        *bytes += client->body_buf.buf->size;
648
0
    }
649
650
0
    return bufcnt;
651
0
}
652
653
static void req_body_send(struct st_h2o_http1client_t *client)
654
0
{
655
0
    h2o_iovec_t bufs[4];
656
0
    size_t bytes, bufcnt = req_body_send_prepare(client, bufs, &bytes);
657
658
0
    h2o_timer_unlink(&client->super._timeout);
659
660
0
    h2o_socket_write(client->sock, bufs, bufcnt, req_body_send_complete);
661
0
    client->super.bytes_written.body += bytes;
662
0
    client->super.bytes_written.total += bytes;
663
664
0
    h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
665
0
}
666
667
static int do_write_req(h2o_httpclient_t *_client, h2o_iovec_t chunk, int is_end_stream)
668
0
{
669
0
    struct st_h2o_http1client_t *client = (struct st_h2o_http1client_t *)_client;
670
671
0
    assert(chunk.len != 0 || is_end_stream);
672
0
    assert(!h2o_socket_is_writing(client->sock));
673
0
    assert(client->body_buf.buf->size == 0);
674
675
    /* store given content to buffer */
676
0
    if (chunk.len != 0) {
677
0
        if (!h2o_buffer_try_append(&client->body_buf.buf, chunk.base, chunk.len))
678
0
            return -1;
679
0
    }
680
0
    client->body_buf.is_end_stream = is_end_stream;
681
682
    /* check if the connection has to be closed for correct framing */
683
0
    if (client->state.res == STREAM_STATE_CLOSED)
684
0
        client->_do_keepalive = 0;
685
686
0
    req_body_send(client);
687
688
0
    return 0;
689
0
}
690
691
static void on_send_timeout(h2o_timer_t *entry)
692
0
{
693
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
694
0
    on_error(client, h2o_httpclient_error_io_timeout);
695
0
}
696
697
static h2o_iovec_t build_request(struct st_h2o_http1client_t *client, h2o_iovec_t method, const h2o_url_t *url,
698
                                 h2o_iovec_t connection, const h2o_header_t *headers, size_t num_headers)
699
618
{
700
618
    h2o_iovec_t buf;
701
618
    size_t offset = 0;
702
703
618
    buf.len = method.len + url->path.len + url->authority.len + 512;
704
618
    buf.base = h2o_mem_alloc_pool(client->super.pool, char, buf.len);
705
706
618
#define RESERVE(sz)                                                                                                                \
707
240k
    do {                                                                                                                           \
708
240k
        size_t required = offset + sz + 4 /* for "\r\n\r\n" */;                                                                    \
709
240k
        if (required > buf.len) {                                                                                                  \
710
1.20k
            do {                                                                                                                   \
711
1.20k
                buf.len *= 2;                                                                                                      \
712
1.20k
            } while (required > buf.len);                                                                                          \
713
1.15k
            char *newp = h2o_mem_alloc_pool(client->super.pool, char, buf.len);                                                    \
714
1.15k
            memcpy(newp, buf.base, offset);                                                                                        \
715
1.15k
            buf.base = newp;                                                                                                       \
716
1.15k
        }                                                                                                                          \
717
240k
    } while (0)
718
618
#define APPEND(s, l)                                                                                                               \
719
484k
    do {                                                                                                                           \
720
484k
        h2o_memcpy(buf.base + offset, (s), (l));                                                                                   \
721
484k
        offset += (l);                                                                                                             \
722
484k
    } while (0)
723
1.23k
#define APPEND_STRLIT(lit) APPEND((lit), sizeof(lit) - 1)
724
618
#define APPEND_HEADER(h)                                                                                                           \
725
240k
    do {                                                                                                                           \
726
240k
        RESERVE((h)->name->len + (h)->value.len + 4);                                                                              \
727
240k
        APPEND((h)->orig_name ? (h)->orig_name : (h)->name->base, (h)->name->len);                                                 \
728
240k
        buf.base[offset++] = ':';                                                                                                  \
729
240k
        buf.base[offset++] = ' ';                                                                                                  \
730
240k
        APPEND((h)->value.base, (h)->value.len);                                                                                   \
731
240k
        buf.base[offset++] = '\r';                                                                                                 \
732
240k
        buf.base[offset++] = '\n';                                                                                                 \
733
240k
    } while (0)
734
735
618
    APPEND(method.base, method.len);
736
618
    buf.base[offset++] = ' ';
737
618
    if (client->super.upgrade_to == h2o_httpclient_upgrade_to_connect) {
738
0
        if (h2o_memis(method.base, method.len, H2O_STRLIT("CONNECT-UDP"))) {
739
0
            APPEND_STRLIT("masque://");
740
0
            APPEND(url->authority.base, url->authority.len);
741
0
            APPEND_STRLIT("/");
742
0
        } else {
743
0
            APPEND(url->authority.base, url->authority.len);
744
0
        }
745
618
    } else {
746
618
        APPEND(url->path.base, url->path.len);
747
618
    }
748
618
    APPEND_STRLIT(" HTTP/1.1\r\nhost: ");
749
618
    APPEND(url->authority.base, url->authority.len);
750
618
    buf.base[offset++] = '\r';
751
618
    buf.base[offset++] = '\n';
752
618
    assert(offset <= buf.len);
753
754
618
    if (connection.base != NULL) {
755
618
        h2o_header_t h = (h2o_header_t){&H2O_TOKEN_CONNECTION->buf, NULL, connection};
756
618
        APPEND_HEADER(&h);
757
618
    }
758
759
618
    if (num_headers != 0) {
760
240k
        for (const h2o_header_t *h = headers, *h_end = h + num_headers; h != h_end; ++h)
761
239k
            APPEND_HEADER(h);
762
618
    }
763
764
618
    APPEND_STRLIT("\r\n");
765
766
    /* set the length */
767
618
    assert(offset <= buf.len);
768
0
    buf.len = offset;
769
770
618
    return buf;
771
772
618
#undef RESERVE
773
618
#undef APPEND
774
618
#undef APPEND_STRLIT
775
618
}
776
777
static void start_request(struct st_h2o_http1client_t *client, h2o_iovec_t method, const h2o_url_t *url,
778
                          const h2o_header_t *headers, size_t num_headers, h2o_iovec_t body,
779
                          const h2o_httpclient_properties_t *props)
780
618
{
781
618
    h2o_iovec_t reqbufs[6]; /* 6 should be the maximum possible elements used */
782
618
    size_t reqbufcnt = 0;
783
618
    if (props->proxy_protocol->base != NULL)
784
0
        reqbufs[reqbufcnt++] = *props->proxy_protocol;
785
618
    h2o_iovec_t header = build_request(client, method, url, *props->connection_header, headers, num_headers);
786
618
    reqbufs[reqbufcnt++] = header;
787
618
    client->super.bytes_written.header = header.len;
788
789
618
    client->_is_chunked = *props->chunked;
790
618
    client->_method_is_head = h2o_memis(method.base, method.len, H2O_STRLIT("HEAD"));
791
792
618
    assert(PTLS_ELEMENTSOF(reqbufs) - reqbufcnt >= 4); /* req_body_send_prepare could write to 4 additional elements */
793
618
    if (client->proceed_req != NULL) {
794
0
        h2o_buffer_init(&client->body_buf.buf, &h2o_socket_buffer_prototype);
795
0
        if (body.len != 0 && !h2o_buffer_try_append(&client->body_buf.buf, body.base, body.len)) {
796
0
            on_whole_request_sent(client->sock, h2o_httpclient_error_internal);
797
0
            return;
798
0
        }
799
0
        size_t bytes_written;
800
0
        reqbufcnt += req_body_send_prepare(client, reqbufs + reqbufcnt, &bytes_written);
801
0
        client->super.bytes_written.body = bytes_written;
802
0
        h2o_socket_write(client->sock, reqbufs, reqbufcnt, req_body_send_complete);
803
618
    } else {
804
618
        assert(!client->_is_chunked);
805
618
        if (body.len != 0) {
806
43
            reqbufs[reqbufcnt++] = body;
807
43
            client->super.bytes_written.body = body.len;
808
43
        }
809
618
        h2o_socket_write(client->sock, reqbufs, reqbufcnt, on_whole_request_sent);
810
618
    }
811
618
    client->super.bytes_written.total = client->sock->bytes_written;
812
813
    /* Even all data highly likely has been written into TCP sendbuf, it is our practice to assume the socket write operation is
814
     * asynchronous and link the timer. */
815
618
    client->super._timeout.cb = on_send_timeout;
816
618
    h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
817
818
618
    client->state.req = STREAM_STATE_BODY;
819
618
    client->super.timings.request_begin_at = h2o_gettimeofday(client->super.ctx->loop);
820
821
    /* If there's possibility of using a pipe for forwarding the content, reduce maximum read size before fetching headers. The
822
     * intent here is to not do a full-sized read of 1MB. 16KB has been chosen so that all HTTP response headers would be available,
823
     * and that an almost full-sized HTTP/2 frame / TLS record can be generated for the first chunk of data that we pass through
824
     * memory. */
825
618
#if USE_PIPE_READER
826
618
    if (client->_app_prefers_pipe_reader && h2o_evloop_socket_max_read_size > 16384)
827
0
        h2o_evloop_socket_set_max_read_size(client->sock, 16384);
828
618
#endif
829
830
618
    h2o_socket_read_start(client->sock, on_head);
831
618
}
832
833
static void on_connection_ready(struct st_h2o_http1client_t *client)
834
618
{
835
618
    h2o_iovec_t proxy_protocol = h2o_iovec_init(NULL, 0);
836
618
    int chunked = 0;
837
618
    h2o_iovec_t connection_header = h2o_iovec_init(NULL, 0);
838
618
    h2o_httpclient_properties_t props = {
839
618
        &proxy_protocol,
840
618
        &chunked,
841
618
        &connection_header,
842
618
    };
843
618
    h2o_iovec_t method;
844
618
    h2o_url_t url;
845
618
    h2o_header_t *headers;
846
618
    size_t num_headers;
847
618
    h2o_iovec_t body;
848
849
618
    client->super._cb.on_head = client->super._cb.on_connect(&client->super, NULL, &method, &url, (const h2o_header_t **)&headers,
850
618
                                                             &num_headers, &body, &client->proceed_req, &props, client->_origin);
851
618
    client->_app_prefers_pipe_reader = props.prefer_pipe_reader;
852
853
618
    if (client->super._cb.on_head == NULL) {
854
0
        close_client(client);
855
0
        return;
856
0
    }
857
858
618
    start_request(client, method, &url, headers, num_headers, body, &props);
859
618
}
860
861
static void do_cancel(h2o_httpclient_t *_client)
862
0
{
863
0
    struct st_h2o_http1client_t *client = (struct st_h2o_http1client_t *)_client;
864
0
    client->_do_keepalive = 0;
865
0
    close_client(client);
866
0
}
867
868
void update_read_state(struct st_h2o_http1client_t *client)
869
515
{
870
    /* If pipe used, `client->reader` would have switched to `on_body_pipe` by the time this function is called for the first time.
871
     */
872
515
    assert((client->pipe_reader.on_body_piped != NULL) == (client->reader == on_body_to_pipe));
873
874
515
    if (client->reader == on_body_to_pipe) {
875
        /* When pipe is being used, resume read when consumption is notified from user. `h2o_socket_read_start` is invoked without
876
         * checking if we are already reading; this is because we want to make sure that the read callback replaced to the current
877
         * one. */
878
0
        h2o_socket_read_start(client->sock, client->reader);
879
515
    } else {
880
        /* When buffer is used, start / stop reading based on the amount of data being buffered. */
881
515
        if ((*client->super.buf)->size >= client->super.ctx->max_buffer_size) {
882
0
            if (h2o_socket_is_reading(client->sock)) {
883
0
                client->reader = client->sock->_cb.read;
884
0
                h2o_socket_read_stop(client->sock);
885
0
            }
886
515
        } else {
887
515
            if (!h2o_socket_is_reading(client->sock))
888
0
                h2o_socket_read_start(client->sock, client->reader);
889
515
        }
890
515
    }
891
892
    /* arm or unarm i/o timeout depending on if we are reading */
893
515
    if (h2o_socket_is_reading(client->sock)) {
894
515
        if (h2o_timer_is_linked(&client->super._timeout))
895
0
            h2o_timer_unlink(&client->super._timeout);
896
515
        h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
897
515
    } else {
898
0
        if (h2o_timer_is_linked(&client->super._timeout))
899
0
            h2o_timer_unlink(&client->super._timeout);
900
0
    }
901
515
}
902
903
static void do_update_window(struct st_h2o_httpclient_t *_client)
904
0
{
905
0
    struct st_h2o_http1client_t *client = (void *)_client;
906
907
    /* When we are splicing to pipe, read synchronously. For prioritization logic to work correctly, it is important to provide
908
     * additional data synchronously in response to the invocation of `h2o_proceed_response`. When memory buffers are used,
909
     * lib/core/proxy.c uses a double buffering to prepare next chunk of data while a chunk of data is being fed to the HTTP
910
     * handlers via `h2o_sendvec`. But when using splice, the pipe is the only one buffer available. */
911
0
    if (client->reader == on_body_to_pipe) {
912
0
        on_body_to_pipe(client->sock, NULL);
913
0
        return;
914
0
    }
915
916
0
    update_read_state(client);
917
0
}
918
919
static void do_get_conn_properties(h2o_httpclient_t *_client, h2o_httpclient_conn_properties_t *properties)
920
618
{
921
618
    struct st_h2o_http1client_t *client = (void *)_client;
922
618
    h2o_httpclient_set_conn_properties_of_socket(client->sock, properties);
923
618
}
924
925
static void setup_client(struct st_h2o_http1client_t *client, h2o_socket_t *sock, h2o_url_t *origin)
926
618
{
927
618
    memset(&client->sock, 0, sizeof(*client) - offsetof(struct st_h2o_http1client_t, sock));
928
618
    client->super.cancel = do_cancel;
929
618
    client->super.get_conn_properties = do_get_conn_properties;
930
618
    client->super.update_window = do_update_window;
931
618
    client->super.write_req = do_write_req;
932
618
    client->super.buf = &sock->input;
933
618
    client->sock = sock;
934
618
    sock->data = client;
935
618
    client->_origin = origin;
936
618
}
937
938
void h2o_httpclient__h1_on_connect(h2o_httpclient_t *_client, h2o_socket_t *sock, h2o_url_t *origin)
939
618
{
940
618
    struct st_h2o_http1client_t *client = (void *)_client;
941
942
618
    assert(!h2o_timer_is_linked(&client->super._timeout));
943
944
0
    setup_client(client, sock, origin);
945
618
    on_connection_ready(client);
946
618
}
947
948
const size_t h2o_httpclient__h1_size = sizeof(struct st_h2o_http1client_t);