Coverage Report

Created: 2025-06-22 06:18

/src/h2o/lib/common/http1client.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2014 DeNA Co., Ltd.
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <arpa/inet.h>
23
#include <fcntl.h>
24
#include <netdb.h>
25
#include <netinet/in.h>
26
#include <sys/socket.h>
27
#include <sys/types.h>
28
#include <sys/un.h>
29
#include "picohttpparser.h"
30
#include "h2o/httpclient.h"
31
#include "h2o/token.h"
32
33
#if !H2O_USE_LIBUV && defined(__linux__)
34
#define USE_PIPE_READER 1
35
#else
36
#define USE_PIPE_READER 0
37
#endif
38
39
enum enum_h2o_http1client_stream_state {
40
    STREAM_STATE_HEAD,
41
    STREAM_STATE_BODY,
42
    STREAM_STATE_CLOSED,
43
};
44
45
struct st_h2o_http1client_t {
46
    h2o_httpclient_t super;
47
    h2o_socket_t *sock;
48
    struct {
49
        enum enum_h2o_http1client_stream_state req;
50
        enum enum_h2o_http1client_stream_state res;
51
    } state;
52
    h2o_url_t *_origin;
53
    int _method_is_head;
54
    int _do_keepalive;
55
    union {
56
        struct {
57
            size_t bytesleft;
58
        } content_length;
59
        struct {
60
            struct phr_chunked_decoder decoder;
61
            size_t bytes_decoded_in_buf;
62
        } chunked;
63
    } _body_decoder;
64
    h2o_socket_cb reader;
65
    h2o_httpclient_proceed_req_cb proceed_req;
66
    /**
67
     * buffer used to hold chunk headers of a request body; the size is SIZE_MAX in hex + CRLF + '\0'
68
     */
69
    char _chunk_len_str[(sizeof(H2O_UINT64_LONGEST_HEX_STR) - 1) + 2 + 1];
70
    /**
71
     * buffer used to retain request body that is inflight
72
     */
73
    struct {
74
        h2o_buffer_t *buf;
75
        int is_end_stream;
76
    } body_buf;
77
    /**
78
     * `on_body_piped` is non-NULL iff used
79
     */
80
    h2o_httpclient_pipe_reader_t pipe_reader;
81
    /**
82
     * maintain the number of bytes being already processed on the associated socket
83
     */
84
    uint64_t _socket_bytes_processed;
85
    unsigned _is_chunked : 1;
86
    unsigned _seen_at_least_one_chunk : 1;
87
    unsigned _delay_free : 1;
88
    unsigned _app_prefers_pipe_reader : 1;
89
    unsigned _send_own_expect : 1;
90
};
91
92
static void on_body_to_pipe(h2o_socket_t *_sock, const char *err);
93
94
static void req_body_send(struct st_h2o_http1client_t *client);
95
static void update_read_state(struct st_h2o_http1client_t *client);
96
97
static void close_client(struct st_h2o_http1client_t *client)
98
780
{
99
780
    if (client->sock != NULL) {
100
780
        if (client->super.connpool != NULL && client->_do_keepalive && client->super.connpool->socketpool->timeout > 0) {
101
            /* we do not send pipelined requests, and thus can trash all the received input at the end of the request */
102
0
            h2o_buffer_consume(&client->sock->input, client->sock->input->size);
103
0
            h2o_socketpool_return(client->super.connpool->socketpool, client->sock);
104
780
        } else {
105
780
            h2o_socket_close(client->sock);
106
780
        }
107
780
    }
108
780
    if (h2o_timer_is_linked(&client->super._timeout))
109
336
        h2o_timer_unlink(&client->super._timeout);
110
780
    if (client->body_buf.buf != NULL)
111
284
        h2o_buffer_dispose(&client->body_buf.buf);
112
780
    if (!client->_delay_free)
113
780
        free(client);
114
780
}
115
116
static void close_response(struct st_h2o_http1client_t *client)
117
417
{
118
417
    assert(client->state.res == STREAM_STATE_CLOSED);
119
417
    if (client->state.req == STREAM_STATE_CLOSED) {
120
277
        close_client(client);
121
277
    } else {
122
140
        h2o_socket_read_stop(client->sock);
123
140
    }
124
417
}
125
126
static h2o_httpclient_body_cb call_on_head(struct st_h2o_http1client_t *client, const char *errstr, h2o_httpclient_on_head_t *args)
127
487
{
128
487
    assert(!client->_delay_free);
129
487
    client->_delay_free = 1;
130
487
    h2o_httpclient_body_cb cb = client->super._cb.on_head(&client->super, errstr, args);
131
487
    client->_delay_free = 0;
132
487
    return cb;
133
487
}
134
135
static int call_on_body(struct st_h2o_http1client_t *client, const char *errstr)
136
919
{
137
919
    assert(!client->_delay_free);
138
919
    client->_delay_free = 1;
139
919
    int ret = (client->reader == on_body_to_pipe ? client->pipe_reader.on_body_piped : client->super._cb.on_body)(&client->super,
140
919
                                                                                                                  errstr, NULL, 0);
141
919
    client->_delay_free = 0;
142
919
    return ret;
143
919
}
144
145
static void call_proceed_req(struct st_h2o_http1client_t *client, const char *errstr)
146
409
{
147
409
    assert(!client->_delay_free);
148
409
    client->_delay_free = 1;
149
409
    client->proceed_req(&client->super, errstr);
150
409
    client->_delay_free = 0;
151
409
}
152
153
static void on_error(struct st_h2o_http1client_t *client, const char *errstr)
154
68
{
155
68
    switch (client->state.res) {
156
7
    case STREAM_STATE_HEAD:
157
7
        call_on_head(client, errstr, NULL);
158
7
        break;
159
22
    case STREAM_STATE_BODY:
160
22
        call_on_body(client, errstr);
161
22
        break;
162
39
    case STREAM_STATE_CLOSED:
163
39
        if (client->proceed_req != NULL)
164
39
            call_proceed_req(client, errstr);
165
39
        break;
166
68
    }
167
68
    close_client(client);
168
68
}
169
170
static void on_body_timeout(h2o_timer_t *entry)
171
0
{
172
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
173
0
    on_error(client, h2o_httpclient_error_io_timeout);
174
0
}
175
176
static void on_body_until_close(h2o_socket_t *sock, const char *err)
177
897
{
178
897
    struct st_h2o_http1client_t *client = sock->data;
179
180
897
    h2o_timer_unlink(&client->super._timeout);
181
182
897
    if (err != NULL) {
183
417
        client->state.res = STREAM_STATE_CLOSED;
184
417
        client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
185
417
        call_on_body(client, h2o_httpclient_error_is_eos);
186
417
        close_response(client);
187
417
        return;
188
417
    }
189
480
    uint64_t size = sock->bytes_read - client->_socket_bytes_processed;
190
480
    client->_socket_bytes_processed = sock->bytes_read;
191
192
480
    client->super.bytes_read.body += size;
193
480
    client->super.bytes_read.total += size;
194
195
480
    if (size != 0) {
196
480
        if (call_on_body(client, NULL) != 0) {
197
0
            close_client(client);
198
0
            return;
199
0
        }
200
480
        update_read_state(client);
201
480
    }
202
480
}
203
204
static void on_body_content_length(h2o_socket_t *sock, const char *err)
205
0
{
206
0
    struct st_h2o_http1client_t *client = sock->data;
207
208
0
    h2o_timer_unlink(&client->super._timeout);
209
210
0
    if (err != NULL) {
211
0
        on_error(client, h2o_httpclient_error_io);
212
0
        return;
213
0
    }
214
0
    uint64_t size = sock->bytes_read - client->_socket_bytes_processed;
215
0
    client->_socket_bytes_processed = sock->bytes_read;
216
217
0
    client->super.bytes_read.body += size;
218
0
    client->super.bytes_read.total += size;
219
220
0
    if (size != 0 || client->_body_decoder.content_length.bytesleft == 0) {
221
0
        int ret;
222
0
        if (client->_body_decoder.content_length.bytesleft <= size) {
223
0
            if (client->_body_decoder.content_length.bytesleft < size) {
224
                /* remove the trailing garbage from buf, and disable keepalive */
225
0
                client->sock->input->size -= size - client->_body_decoder.content_length.bytesleft;
226
0
                client->_do_keepalive = 0;
227
0
            }
228
0
            client->_body_decoder.content_length.bytesleft = 0;
229
0
            client->state.res = STREAM_STATE_CLOSED;
230
0
            client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
231
0
        } else {
232
0
            client->_body_decoder.content_length.bytesleft -= size;
233
0
        }
234
0
        ret = call_on_body(client, client->state.res == STREAM_STATE_CLOSED ? h2o_httpclient_error_is_eos : NULL);
235
0
        if (client->state.res == STREAM_STATE_CLOSED) {
236
0
            close_response(client);
237
0
            return;
238
0
        } else if (ret != 0) {
239
0
            client->_do_keepalive = 0;
240
0
            close_client(client);
241
0
            return;
242
0
        }
243
0
    }
244
245
0
#if USE_PIPE_READER
246
0
    if (client->pipe_reader.on_body_piped != NULL) {
247
0
        h2o_socket_dont_read(client->sock, 1);
248
0
        client->reader = on_body_to_pipe;
249
0
    }
250
0
#endif
251
0
    update_read_state(client);
252
0
}
253
254
void on_body_to_pipe(h2o_socket_t *_sock, const char *err)
255
0
{
256
0
#if USE_PIPE_READER
257
0
    struct st_h2o_http1client_t *client = _sock->data;
258
259
0
    h2o_timer_unlink(&client->super._timeout);
260
0
    h2o_socket_read_stop(client->sock);
261
262
0
    if (err != NULL) {
263
0
        on_error(client, h2o_httpclient_error_io);
264
0
        return;
265
0
    }
266
267
0
    ssize_t bytes_read;
268
0
    while ((bytes_read = splice(h2o_socket_get_fd(client->sock), NULL, client->pipe_reader.fd, NULL,
269
0
                                client->_body_decoder.content_length.bytesleft, SPLICE_F_NONBLOCK)) == -1 &&
270
0
           errno == EINTR)
271
0
        ;
272
0
    if (bytes_read == -1 && errno == EAGAIN) {
273
0
        update_read_state(client);
274
0
        return;
275
0
    }
276
0
    if (bytes_read <= 0) {
277
0
        on_error(client, h2o_httpclient_error_io);
278
0
        return;
279
0
    }
280
281
0
    client->_socket_bytes_processed += bytes_read;
282
0
    client->sock->bytes_read += bytes_read;
283
0
    client->super.bytes_read.body += bytes_read;
284
0
    client->super.bytes_read.total += bytes_read;
285
286
0
    client->_body_decoder.content_length.bytesleft -= bytes_read;
287
0
    if (client->_body_decoder.content_length.bytesleft == 0) {
288
0
        client->state.res = STREAM_STATE_CLOSED;
289
0
        client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
290
0
        h2o_socket_dont_read(client->sock, 0);
291
0
    }
292
293
0
    int ret = call_on_body(client, client->state.res == STREAM_STATE_CLOSED ? h2o_httpclient_error_is_eos : NULL);
294
295
0
    if (client->state.res == STREAM_STATE_CLOSED) {
296
0
        close_response(client);
297
0
    } else if (ret != 0) {
298
0
        client->_do_keepalive = 0;
299
0
        close_client(client);
300
0
    }
301
#else
302
    h2o_fatal("%s cannot be called", __FUNCTION__);
303
#endif
304
0
}
305
306
static void on_body_chunked(h2o_socket_t *sock, const char *err)
307
0
{
308
0
    struct st_h2o_http1client_t *client = sock->data;
309
0
    h2o_buffer_t *inbuf;
310
311
0
    h2o_timer_unlink(&client->super._timeout);
312
313
0
    if (err != NULL) {
314
0
        if (err == h2o_socket_error_closed && !phr_decode_chunked_is_in_data(&client->_body_decoder.chunked.decoder) &&
315
0
            client->_seen_at_least_one_chunk) {
316
            /*
317
             * if the peer closed after a full chunk, treat this
318
             * as if the transfer had complete, browsers appear to ignore
319
             * a missing 0\r\n chunk
320
             */
321
0
            client->_do_keepalive = 0;
322
0
            client->state.res = STREAM_STATE_CLOSED;
323
0
            client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
324
0
            call_on_body(client, h2o_httpclient_error_is_eos);
325
0
            close_response(client);
326
0
        } else {
327
0
            on_error(client, h2o_httpclient_error_io);
328
0
        }
329
0
        return;
330
0
    }
331
0
    uint64_t size = sock->bytes_read - client->_socket_bytes_processed;
332
0
    client->_socket_bytes_processed = sock->bytes_read;
333
334
0
    client->super.bytes_read.body += size;
335
0
    client->super.bytes_read.total += size;
336
337
0
    inbuf = client->sock->input;
338
0
    if (size != 0) {
339
0
        const char *errstr;
340
0
        int cb_ret;
341
0
        size_t newsz = size;
342
343
0
        switch (phr_decode_chunked(&client->_body_decoder.chunked.decoder, inbuf->bytes + inbuf->size - newsz, &newsz)) {
344
0
        case -1: /* error */
345
0
            newsz = size;
346
0
            client->_do_keepalive = 0;
347
0
            errstr = h2o_httpclient_error_http1_parse_failed;
348
0
            break;
349
0
        case -2: /* incomplete */
350
0
            errstr = NULL;
351
0
            break;
352
0
        default: /* complete, with garbage on tail; should disable keepalive */
353
0
            client->_do_keepalive = 0;
354
        /* fallthru */
355
0
        case 0: /* complete */
356
0
            client->state.res = STREAM_STATE_CLOSED;
357
0
            errstr = h2o_httpclient_error_is_eos;
358
0
            client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
359
0
            break;
360
0
        }
361
0
        inbuf->size -= size - newsz;
362
0
        if (inbuf->size > 0)
363
0
            client->_seen_at_least_one_chunk = 1;
364
0
        cb_ret = call_on_body(client, errstr);
365
0
        if (client->state.res == STREAM_STATE_CLOSED) {
366
0
            close_response(client);
367
0
            return;
368
0
        } else if (errstr != NULL) {
369
0
            close_client(client);
370
0
            return;
371
0
        } else if (cb_ret != 0) {
372
0
            client->_do_keepalive = 0;
373
0
            close_client(client);
374
0
            return;
375
0
        }
376
0
        update_read_state(client);
377
0
    }
378
0
}
379
380
static void on_head_timeout(h2o_timer_t *entry)
381
0
{
382
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
383
0
    on_error(client, h2o_httpclient_error_io_timeout);
384
0
}
385
386
static void on_head(h2o_socket_t *sock, const char *err)
387
480
{
388
480
    struct st_h2o_http1client_t *client = sock->data;
389
480
    int minor_version, version, http_status, rlen;
390
480
    const char *msg;
391
480
#define MAX_HEADERS 100
392
480
    h2o_header_t *headers;
393
480
    h2o_iovec_t *header_names;
394
480
    size_t msg_len, num_headers, i;
395
480
    h2o_socket_cb reader;
396
397
480
    h2o_timer_unlink(&client->super._timeout);
398
399
480
    if (err != NULL) {
400
0
        on_error(client, h2o_httpclient_error_io);
401
0
        return;
402
0
    }
403
404
    /* revert max read size to 1MB now that we have received the first chunk, presumably carrying all the response headers */
405
480
#if USE_PIPE_READER
406
480
    if (client->_app_prefers_pipe_reader)
407
0
        h2o_evloop_socket_set_max_read_size(client->sock, h2o_evloop_socket_max_read_size);
408
480
#endif
409
410
480
    client->super._timeout.cb = on_head_timeout;
411
412
480
    headers = h2o_mem_alloc_pool(client->super.pool, *headers, MAX_HEADERS);
413
480
    header_names = h2o_mem_alloc_pool(client->super.pool, *header_names, MAX_HEADERS);
414
415
    /* continue parsing the responses until we see a final one */
416
480
    while (1) {
417
        /* parse response */
418
480
        struct phr_header src_headers[MAX_HEADERS];
419
480
        num_headers = MAX_HEADERS;
420
480
        rlen = phr_parse_response(sock->input->bytes, sock->input->size, &minor_version, &http_status, &msg, &msg_len, src_headers,
421
480
                                  &num_headers, 0);
422
480
        switch (rlen) {
423
0
        case -1: /* error */
424
0
            on_error(client, h2o_httpclient_error_http1_parse_failed);
425
0
            return;
426
0
        case -2: /* incomplete */
427
0
            h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
428
0
            return;
429
480
        }
430
431
480
        client->super.bytes_read.header += rlen;
432
480
        client->super.bytes_read.total += rlen;
433
434
480
        version = 0x100 | (minor_version != 0);
435
436
        /* fill-in the headers */
437
960
        for (i = 0; i != num_headers; ++i) {
438
480
            if (src_headers[i].name_len == 0) {
439
                /* reject multiline header */
440
0
                on_error(client, h2o_httpclient_error_http1_line_folding);
441
0
                return;
442
0
            }
443
480
            const h2o_token_t *token;
444
480
            char *orig_name = h2o_strdup(client->super.pool, src_headers[i].name, src_headers[i].name_len).base;
445
480
            h2o_strtolower((char *)src_headers[i].name, src_headers[i].name_len);
446
480
            token = h2o_lookup_token(src_headers[i].name, src_headers[i].name_len);
447
480
            if (token != NULL) {
448
480
                headers[i].name = (h2o_iovec_t *)&token->buf;
449
480
            } else {
450
0
                header_names[i] = h2o_iovec_init(src_headers[i].name, src_headers[i].name_len);
451
0
                headers[i].name = &header_names[i];
452
0
            }
453
480
            headers[i].value = h2o_iovec_init(src_headers[i].value, src_headers[i].value_len);
454
480
            headers[i].orig_name = orig_name;
455
480
            headers[i].flags = (h2o_header_flags_t){0};
456
480
        }
457
458
480
        if (http_status == 101) {
459
0
            if (client->_send_own_expect) {
460
                /* expect: 100-continue is incompatible CONNECT or upgrade (when trying to establish a tunnel */
461
0
                on_error(client, h2o_httpclient_error_unexpected_101);
462
0
                return;
463
0
            }
464
0
            break;
465
480
        } else if (http_status == 100 || http_status >= 200) {
466
            /* When request body has been withheld and a 100 or a final response has been received, start sending the request body,
467
             * see: https://github.com/h2o/h2o/pull/3316#discussion_r1456859634. */
468
480
            if (client->_send_own_expect) {
469
0
                client->_send_own_expect = 0;
470
0
                req_body_send(client);
471
0
            }
472
480
            if (http_status >= 200)
473
480
                break;
474
480
        }
475
0
        assert(http_status <= 199);
476
0
        if (client->super.informational_cb != NULL &&
477
0
            client->super.informational_cb(&client->super, version, http_status, h2o_iovec_init(msg, msg_len), headers,
478
0
                                           num_headers) != 0) {
479
0
            close_client(client);
480
0
            return;
481
0
        }
482
483
0
        h2o_buffer_consume(&client->sock->input, rlen);
484
0
        if (client->sock->input->size == 0) {
485
0
            if (!h2o_timer_is_linked(&client->super._timeout)) {
486
0
                h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
487
0
            }
488
0
            return;
489
0
        }
490
0
    }
491
492
    /* recognize hop-by-hop response headers */
493
480
    reader = on_body_until_close;
494
480
    if (!h2o_httpclient__tunnel_is_ready(&client->super, http_status, version)) {
495
480
        client->_do_keepalive = minor_version >= 1;
496
960
        for (i = 0; i != num_headers; ++i) {
497
480
            if (headers[i].name == &H2O_TOKEN_CONNECTION->buf) {
498
480
                if (h2o_contains_token(headers[i].value.base, headers[i].value.len, H2O_STRLIT("keep-alive"), ',')) {
499
0
                    client->_do_keepalive = 1;
500
480
                } else {
501
480
                    client->_do_keepalive = 0;
502
480
                }
503
480
            } else if (headers[i].name == &H2O_TOKEN_TRANSFER_ENCODING->buf) {
504
0
                if (h2o_memis(headers[i].value.base, headers[i].value.len, H2O_STRLIT("chunked"))) {
505
                    /* precond: _body_decoder.chunked is zero-filled */
506
0
                    client->_body_decoder.chunked.decoder.consume_trailer = 1;
507
0
                    reader = on_body_chunked;
508
0
                } else if (h2o_memis(headers[i].value.base, headers[i].value.len, H2O_STRLIT("identity"))) {
509
                    /* continue */
510
0
                } else {
511
0
                    on_error(client, h2o_httpclient_error_http1_unexpected_transfer_encoding);
512
0
                    return;
513
0
                }
514
0
            } else if (headers[i].name == &H2O_TOKEN_CONTENT_LENGTH->buf) {
515
0
                if ((client->_body_decoder.content_length.bytesleft = h2o_strtosize(headers[i].value.base, headers[i].value.len)) ==
516
0
                    SIZE_MAX) {
517
0
                    on_error(client, h2o_httpclient_error_invalid_content_length);
518
0
                    return;
519
0
                }
520
0
                if (reader != on_body_chunked)
521
0
                    reader = on_body_content_length;
522
0
            }
523
480
        }
524
480
    }
525
526
480
    client->state.res = STREAM_STATE_BODY;
527
480
    client->super.timings.response_start_at = h2o_gettimeofday(client->super.ctx->loop);
528
529
    /* RFC 2616 4.4 */
530
480
    if (client->_method_is_head || http_status == 204 || http_status == 304) {
531
0
        client->state.res = STREAM_STATE_CLOSED;
532
0
        client->super.timings.response_end_at = h2o_gettimeofday(client->super.ctx->loop);
533
480
    } else {
534
        /* close the connection if impossible to determine the end of the response (RFC 7230 3.3.3) */
535
480
        if (reader == on_body_until_close)
536
480
            client->_do_keepalive = 0;
537
480
    }
538
539
480
    h2o_httpclient_on_head_t on_head = {
540
480
        .version = version,
541
480
        .status = http_status,
542
480
        .msg = h2o_iovec_init(msg, msg_len),
543
480
        .headers = headers,
544
480
        .num_headers = num_headers,
545
480
        .header_requires_dup = 1,
546
480
    };
547
480
#if USE_PIPE_READER
548
    /* If there is no less than 64KB of data to be read from the socket, offer the application the opportunity to use pipe for
549
     * transferring the content zero-copy. As switching to pipe involves the cost of creating a pipe (and disposing it when the
550
     * request is complete), we adopt this margin of 64KB, which offers clear improvement (5%) on 9th-gen Intel Core. */
551
480
    if (client->_app_prefers_pipe_reader && reader == on_body_content_length &&
552
480
        client->sock->input->size + 65536 <= client->_body_decoder.content_length.bytesleft)
553
0
        on_head.pipe_reader = &client->pipe_reader;
554
480
#endif
555
556
    /* call the callback */
557
480
    client->super._cb.on_body =
558
480
        call_on_head(client, client->state.res == STREAM_STATE_CLOSED ? h2o_httpclient_error_is_eos : NULL, &on_head);
559
560
480
    if (client->state.res == STREAM_STATE_CLOSED) {
561
0
        close_response(client);
562
0
        return;
563
480
    } else if (client->super._cb.on_body == NULL) {
564
0
        client->_do_keepalive = 0;
565
0
        close_client(client);
566
0
        return;
567
0
    }
568
569
480
    h2o_buffer_consume(&sock->input, rlen);
570
480
    client->_socket_bytes_processed = client->sock->bytes_read - client->sock->input->size;
571
572
480
    client->super._timeout.cb = on_body_timeout;
573
480
    h2o_socket_read_start(sock, reader);
574
480
    reader(client->sock, 0);
575
576
480
#undef MAX_HEADERS
577
480
}
578
579
static void on_head_first_byte_timeout(h2o_timer_t *entry)
580
0
{
581
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
582
0
    on_error(client, h2o_httpclient_error_first_byte_timeout);
583
0
}
584
585
static void on_whole_request_sent(h2o_socket_t *sock, const char *err)
586
595
{
587
595
    struct st_h2o_http1client_t *client = sock->data;
588
589
595
    h2o_timer_unlink(&client->super._timeout);
590
591
595
    if (err != NULL) {
592
68
        on_error(client, h2o_httpclient_error_io);
593
68
        return;
594
68
    }
595
596
527
    client->state.req = STREAM_STATE_CLOSED;
597
527
    client->super.timings.request_end_at = h2o_gettimeofday(client->super.ctx->loop);
598
599
527
    if (client->super.upgrade_to != NULL) {
600
        /* TODO use shutdown(2) to signal the peer that our send side has been closed, but continue reading on the receive side. */
601
0
        on_error(client, client->state.res < STREAM_STATE_BODY ? h2o_httpclient_error_io : h2o_httpclient_error_is_eos);
602
527
    } else {
603
527
        switch (client->state.res) {
604
527
        case STREAM_STATE_HEAD:
605
527
            client->super._timeout.cb = on_head_first_byte_timeout;
606
527
            h2o_timer_link(client->super.ctx->loop, client->super.ctx->first_byte_timeout, &client->super._timeout);
607
527
            break;
608
0
        case STREAM_STATE_BODY:
609
0
            break;
610
0
        case STREAM_STATE_CLOSED:
611
0
            close_client(client);
612
0
            break;
613
527
        }
614
527
    }
615
527
}
616
617
static void on_header_sent_wait_100(h2o_socket_t *sock, const char *err)
618
0
{
619
0
    struct st_h2o_http1client_t *client = sock->data;
620
621
0
    h2o_timer_unlink(&client->super._timeout);
622
623
0
    if (err != NULL) {
624
0
        on_error(client, h2o_httpclient_error_io);
625
0
        return;
626
0
    }
627
628
0
    if (client->state.res == STREAM_STATE_HEAD) {
629
0
        client->super._timeout.cb = on_head_first_byte_timeout;
630
0
        h2o_timer_link(client->super.ctx->loop, client->super.ctx->first_byte_timeout, &client->super._timeout);
631
0
    }
632
0
}
633
634
static void req_body_send_complete(h2o_socket_t *sock, const char *err)
635
438
{
636
438
    struct st_h2o_http1client_t *client = sock->data;
637
638
438
    h2o_buffer_consume(&client->body_buf.buf, client->body_buf.buf->size);
639
640
438
    if (err != NULL) {
641
68
        on_whole_request_sent(client->sock, err);
642
68
        return;
643
68
    }
644
645
370
    int is_end_stream = client->body_buf.is_end_stream;
646
647
370
    if (client->proceed_req != NULL) {
648
370
        call_proceed_req(client, NULL);
649
370
    }
650
651
370
    if (is_end_stream)
652
31
        on_whole_request_sent(client->sock, NULL);
653
370
}
654
655
/**
656
 * Encodes data. `bufs` must have at least 4 elements of space.
657
 */
658
static size_t req_body_send_prepare(struct st_h2o_http1client_t *client, h2o_iovec_t *bufs, size_t *bytes)
659
447
{
660
447
    size_t bufcnt = 0;
661
447
    *bytes = 0;
662
663
447
    if (client->_is_chunked) {
664
420
        if (client->body_buf.buf->size != 0) {
665
            /* build chunk header */
666
416
            bufs[bufcnt].base = client->_chunk_len_str;
667
416
            bufs[bufcnt].len =
668
416
                snprintf(client->_chunk_len_str, sizeof(client->_chunk_len_str), "%zx\r\n", client->body_buf.buf->size);
669
416
            *bytes += bufs[bufcnt].len;
670
416
            ++bufcnt;
671
            /* append chunk body */
672
416
            bufs[bufcnt++] = h2o_iovec_init(client->body_buf.buf->bytes, client->body_buf.buf->size);
673
416
            *bytes += client->body_buf.buf->size;
674
            /* append CRLF */
675
416
            bufs[bufcnt++] = h2o_iovec_init("\r\n", 2);
676
416
            *bytes += 2;
677
416
        }
678
420
        if (client->body_buf.is_end_stream) {
679
63
            static const h2o_iovec_t terminator = {H2O_STRLIT("0\r\n\r\n")};
680
63
            bufs[bufcnt++] = terminator;
681
63
            *bytes += terminator.len;
682
63
        }
683
420
    } else if (client->body_buf.buf->size != 0) {
684
27
        bufs[bufcnt++] = h2o_iovec_init(client->body_buf.buf->bytes, client->body_buf.buf->size);
685
27
        *bytes += client->body_buf.buf->size;
686
27
    }
687
688
447
    return bufcnt;
689
447
}
690
691
static void req_body_send(struct st_h2o_http1client_t *client)
692
163
{
693
163
    h2o_iovec_t bufs[4];
694
163
    size_t bytes, bufcnt = req_body_send_prepare(client, bufs, &bytes);
695
696
163
    h2o_timer_unlink(&client->super._timeout);
697
698
163
    h2o_socket_write(client->sock, bufs, bufcnt, req_body_send_complete);
699
163
    client->super.bytes_written.body += bytes;
700
163
    client->super.bytes_written.total += bytes;
701
702
163
    h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
703
163
}
704
705
static int do_write_req(h2o_httpclient_t *_client, h2o_iovec_t chunk, int is_end_stream)
706
163
{
707
163
    struct st_h2o_http1client_t *client = (struct st_h2o_http1client_t *)_client;
708
709
163
    assert(chunk.len != 0 || is_end_stream);
710
163
    assert(!h2o_socket_is_writing(client->sock));
711
163
    assert(client->body_buf.buf->size == 0);
712
713
    /* store given content to buffer */
714
163
    if (chunk.len != 0) {
715
159
        if (!h2o_buffer_try_append(&client->body_buf.buf, chunk.base, chunk.len))
716
0
            return -1;
717
159
    }
718
163
    client->body_buf.is_end_stream = is_end_stream;
719
720
    /* check if the connection has to be closed for correct framing */
721
163
    if (client->state.res == STREAM_STATE_CLOSED)
722
41
        client->_do_keepalive = 0;
723
724
163
    req_body_send(client);
725
726
163
    return 0;
727
163
}
728
729
static void on_send_timeout(h2o_timer_t *entry)
730
0
{
731
0
    struct st_h2o_http1client_t *client = H2O_STRUCT_FROM_MEMBER(struct st_h2o_http1client_t, super._timeout, entry);
732
0
    on_error(client, h2o_httpclient_error_io_timeout);
733
0
}
734
735
static h2o_iovec_t build_request(struct st_h2o_http1client_t *client, h2o_iovec_t method, const h2o_url_t *url,
736
                                 h2o_iovec_t connection, const h2o_header_t *headers, size_t num_headers)
737
780
{
738
780
    h2o_iovec_t buf;
739
780
    size_t offset = 0;
740
741
780
    buf.len = method.len + url->path.len + url->authority.len + 512;
742
780
    buf.base = h2o_mem_alloc_pool(client->super.pool, char, buf.len);
743
744
780
#define RESERVE(sz)                                                                                                                \
745
4.89k
    do {                                                                                                                           \
746
4.89k
        size_t required = offset + sz + 4 /* for "\r\n\r\n" */;                                                                    \
747
4.89k
        if (required > buf.len) {                                                                                                  \
748
1.30k
            do {                                                                                                                   \
749
1.30k
                buf.len *= 2;                                                                                                      \
750
1.30k
            } while (required > buf.len);                                                                                          \
751
444
            char *newp = h2o_mem_alloc_pool(client->super.pool, char, buf.len);                                                    \
752
444
            memcpy(newp, buf.base, offset);                                                                                        \
753
444
            buf.base = newp;                                                                                                       \
754
444
        }                                                                                                                          \
755
4.89k
    } while (0)
756
780
#define APPEND(s, l)                                                                                                               \
757
13.6k
    do {                                                                                                                           \
758
13.6k
        h2o_memcpy(buf.base + offset, (s), (l));                                                                                   \
759
13.6k
        offset += (l);                                                                                                             \
760
13.6k
    } while (0)
761
1.56k
#define APPEND_STRLIT(lit) APPEND((lit), sizeof(lit) - 1)
762
780
#define APPEND_HEADER(h)                                                                                                           \
763
4.89k
    do {                                                                                                                           \
764
4.89k
        RESERVE((h)->name->len + (h)->value.len + 4);                                                                              \
765
4.89k
        APPEND((h)->orig_name ? (h)->orig_name : (h)->name->base, (h)->name->len);                                                 \
766
4.89k
        buf.base[offset++] = ':';                                                                                                  \
767
4.89k
        buf.base[offset++] = ' ';                                                                                                  \
768
4.89k
        APPEND((h)->value.base, (h)->value.len);                                                                                   \
769
4.89k
        buf.base[offset++] = '\r';                                                                                                 \
770
4.89k
        buf.base[offset++] = '\n';                                                                                                 \
771
4.89k
    } while (0)
772
773
780
    APPEND(method.base, method.len);
774
780
    buf.base[offset++] = ' ';
775
780
    if (client->super.upgrade_to == h2o_httpclient_upgrade_to_connect) {
776
0
        if (h2o_memis(method.base, method.len, H2O_STRLIT("CONNECT-UDP"))) {
777
0
            APPEND_STRLIT("masque://");
778
0
            APPEND(url->authority.base, url->authority.len);
779
0
            APPEND_STRLIT("/");
780
0
        } else {
781
0
            APPEND(url->authority.base, url->authority.len);
782
0
        }
783
780
    } else {
784
780
        APPEND(url->path.base, url->path.len);
785
780
    }
786
780
    APPEND_STRLIT(" HTTP/1.1\r\nhost: ");
787
780
    APPEND(url->authority.base, url->authority.len);
788
780
    buf.base[offset++] = '\r';
789
780
    buf.base[offset++] = '\n';
790
780
    assert(offset <= buf.len);
791
792
    /* append supplied connection header, or "connection: upgrade" and upgrade header when request an upgrade */
793
780
    if (client->super.upgrade_to != NULL && client->super.upgrade_to != h2o_httpclient_upgrade_to_connect) {
794
0
        h2o_header_t c = {&H2O_TOKEN_CONNECTION->buf, NULL, h2o_iovec_init(H2O_STRLIT("upgrade"))},
795
0
                     u = {&H2O_TOKEN_UPGRADE->buf, NULL,
796
0
                          h2o_iovec_init(client->super.upgrade_to, strlen(client->super.upgrade_to))};
797
0
        APPEND_HEADER(&c);
798
0
        APPEND_HEADER(&u);
799
780
    } else if (connection.base != NULL) {
800
780
        h2o_header_t h = {&H2O_TOKEN_CONNECTION->buf, NULL, connection};
801
780
        APPEND_HEADER(&h);
802
780
    }
803
804
780
    if (client->_send_own_expect) {
805
0
        h2o_header_t h = {&H2O_TOKEN_EXPECT->buf, NULL, h2o_iovec_init(H2O_STRLIT("100-continue"))};
806
0
        APPEND_HEADER(&h);
807
0
    }
808
809
780
    if (num_headers != 0) {
810
4.89k
        for (const h2o_header_t *h = headers, *h_end = h + num_headers; h != h_end; ++h)
811
4.11k
            APPEND_HEADER(h);
812
780
    }
813
814
780
    APPEND_STRLIT("\r\n");
815
816
    /* set the length */
817
780
    assert(offset <= buf.len);
818
780
    buf.len = offset;
819
820
780
    return buf;
821
822
780
#undef RESERVE
823
780
#undef APPEND
824
780
#undef APPEND_STRLIT
825
780
}
826
827
static void start_request(struct st_h2o_http1client_t *client, h2o_iovec_t method, const h2o_url_t *url,
828
                          const h2o_header_t *headers, size_t num_headers, h2o_iovec_t body,
829
                          const h2o_httpclient_properties_t *props)
830
780
{
831
780
    h2o_iovec_t reqbufs[6]; /* 6 should be the maximum possible elements used */
832
780
    size_t reqbufcnt = 0;
833
780
    if (props->proxy_protocol->base != NULL)
834
0
        reqbufs[reqbufcnt++] = *props->proxy_protocol;
835
836
780
    if (props->send_own_expect && (client->proceed_req != NULL || body.len != 0) && client->super.upgrade_to == NULL)
837
0
        client->_send_own_expect = 1; /* this must be set before calling build_request */
838
839
780
    h2o_iovec_t header = build_request(client, method, url, *props->connection_header, headers, num_headers);
840
780
    reqbufs[reqbufcnt++] = header;
841
780
    client->super.bytes_written.header = header.len;
842
843
780
    client->_is_chunked = *props->chunked;
844
780
    client->_method_is_head = h2o_memis(method.base, method.len, H2O_STRLIT("HEAD"));
845
846
780
    assert(PTLS_ELEMENTSOF(reqbufs) - reqbufcnt >= 4); /* req_body_send_prepare could write to 4 additional elements */
847
780
    if (client->proceed_req != NULL) {
848
284
        h2o_buffer_init(&client->body_buf.buf, &h2o_socket_buffer_prototype);
849
284
        if (body.len != 0 && !h2o_buffer_try_append(&client->body_buf.buf, body.base, body.len)) {
850
0
            on_whole_request_sent(client->sock, h2o_httpclient_error_internal);
851
0
            return;
852
0
        }
853
284
        if (client->_send_own_expect) {
854
0
            h2o_socket_write(client->sock, reqbufs, reqbufcnt, on_header_sent_wait_100);
855
284
        } else {
856
284
            size_t bytes_written;
857
284
            reqbufcnt += req_body_send_prepare(client, reqbufs + reqbufcnt, &bytes_written);
858
284
            client->super.bytes_written.body = bytes_written;
859
284
            h2o_socket_write(client->sock, reqbufs, reqbufcnt, req_body_send_complete);
860
284
        }
861
496
    } else if (body.len != 0) {
862
40
        assert(!client->_is_chunked);
863
40
        if (client->_send_own_expect) {
864
0
            h2o_buffer_init(&client->body_buf.buf, &h2o_socket_buffer_prototype);
865
0
            client->body_buf.is_end_stream = 1;
866
0
            if (!h2o_buffer_try_append(&client->body_buf.buf, body.base, body.len)) {
867
0
                on_whole_request_sent(client->sock, h2o_httpclient_error_internal);
868
0
                return;
869
0
            }
870
0
            h2o_socket_write(client->sock, reqbufs, reqbufcnt, on_header_sent_wait_100);
871
40
        } else {
872
40
            reqbufs[reqbufcnt++] = body;
873
40
            client->super.bytes_written.body = body.len;
874
40
            h2o_socket_write(client->sock, reqbufs, reqbufcnt, on_whole_request_sent);
875
40
        }
876
456
    } else {
877
456
        assert(!client->_is_chunked);
878
456
        h2o_socket_write(client->sock, reqbufs, reqbufcnt, on_whole_request_sent);
879
456
    }
880
780
    client->super.bytes_written.total = client->sock->bytes_written;
881
882
    /* Even all data highly likely has been written into TCP sendbuf, it is our practice to assume the socket write operation is
883
     * asynchronous and link the timer. */
884
780
    client->super._timeout.cb = on_send_timeout;
885
780
    h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
886
887
780
    client->state.req = STREAM_STATE_BODY;
888
780
    client->super.timings.request_begin_at = h2o_gettimeofday(client->super.ctx->loop);
889
890
    /* If there's possibility of using a pipe for forwarding the content, reduce maximum read size before fetching headers. The
891
     * intent here is to not do a full-sized read of 1MB. 16KB has been chosen so that all HTTP response headers would be available,
892
     * and that an almost full-sized HTTP/2 frame / TLS record can be generated for the first chunk of data that we pass through
893
     * memory. */
894
780
#if USE_PIPE_READER
895
780
    if (client->_app_prefers_pipe_reader && h2o_evloop_socket_max_read_size > 16384)
896
0
        h2o_evloop_socket_set_max_read_size(client->sock, 16384);
897
780
#endif
898
899
780
    h2o_socket_read_start(client->sock, on_head);
900
780
}
901
902
static void on_connection_ready(struct st_h2o_http1client_t *client)
903
780
{
904
780
    h2o_iovec_t proxy_protocol = h2o_iovec_init(NULL, 0);
905
780
    int chunked = 0;
906
780
    h2o_iovec_t connection_header = h2o_iovec_init(NULL, 0);
907
780
    h2o_httpclient_properties_t props = {
908
780
        &proxy_protocol,
909
780
        &chunked,
910
780
        &connection_header,
911
780
    };
912
780
    h2o_iovec_t method;
913
780
    h2o_url_t url;
914
780
    h2o_header_t *headers;
915
780
    size_t num_headers;
916
780
    h2o_iovec_t body;
917
918
780
    client->super._cb.on_head = client->super._cb.on_connect(&client->super, NULL, &method, &url, (const h2o_header_t **)&headers,
919
780
                                                             &num_headers, &body, &client->proceed_req, &props, client->_origin);
920
780
    client->_app_prefers_pipe_reader = props.prefer_pipe_reader;
921
922
780
    if (client->super._cb.on_head == NULL) {
923
0
        close_client(client);
924
0
        return;
925
0
    }
926
927
780
    start_request(client, method, &url, headers, num_headers, body, &props);
928
780
}
929
930
static void do_cancel(h2o_httpclient_t *_client)
931
435
{
932
435
    struct st_h2o_http1client_t *client = (struct st_h2o_http1client_t *)_client;
933
435
    client->_do_keepalive = 0;
934
435
    close_client(client);
935
435
}
936
937
void update_read_state(struct st_h2o_http1client_t *client)
938
901
{
939
    /* If pipe used, `client->reader` would have switched to `on_body_pipe` by the time this function is called for the first time.
940
     */
941
901
    assert((client->pipe_reader.on_body_piped != NULL) == (client->reader == on_body_to_pipe));
942
943
901
    if (client->reader == on_body_to_pipe) {
944
        /* When pipe is being used, resume read when consumption is notified from user. `h2o_socket_read_start` is invoked without
945
         * checking if we are already reading; this is because we want to make sure that the read callback replaced to the current
946
         * one. */
947
0
        h2o_socket_read_start(client->sock, client->reader);
948
901
    } else {
949
        /* When buffer is used, start / stop reading based on the amount of data being buffered. */
950
901
        if ((*client->super.buf)->size >= client->super.ctx->max_buffer_size) {
951
0
            if (h2o_socket_is_reading(client->sock)) {
952
0
                client->reader = client->sock->_cb.read;
953
0
                h2o_socket_read_stop(client->sock);
954
0
            }
955
901
        } else {
956
901
            if (!h2o_socket_is_reading(client->sock))
957
0
                h2o_socket_read_start(client->sock, client->reader);
958
901
        }
959
901
    }
960
961
    /* arm or unarm i/o timeout depending on if we are reading */
962
901
    if (h2o_socket_is_reading(client->sock)) {
963
901
        if (h2o_timer_is_linked(&client->super._timeout))
964
421
            h2o_timer_unlink(&client->super._timeout);
965
901
        h2o_timer_link(client->super.ctx->loop, client->super.ctx->io_timeout, &client->super._timeout);
966
901
    } else {
967
0
        if (h2o_timer_is_linked(&client->super._timeout))
968
0
            h2o_timer_unlink(&client->super._timeout);
969
0
    }
970
901
}
971
972
static void do_update_window(struct st_h2o_httpclient_t *_client)
973
421
{
974
421
    struct st_h2o_http1client_t *client = (void *)_client;
975
976
    /* When we are splicing to pipe, read synchronously. For prioritization logic to work correctly, it is important to provide
977
     * additional data synchronously in response to the invocation of `h2o_proceed_response`. When memory buffers are used,
978
     * lib/core/proxy.c uses a double buffering to prepare next chunk of data while a chunk of data is being fed to the HTTP
979
     * handlers via `h2o_sendvec`. But when using splice, the pipe is the only one buffer available. */
980
421
    if (client->reader == on_body_to_pipe) {
981
0
        on_body_to_pipe(client->sock, NULL);
982
0
        return;
983
0
    }
984
985
421
    update_read_state(client);
986
421
}
987
988
static void do_get_conn_properties(h2o_httpclient_t *_client, h2o_httpclient_conn_properties_t *properties)
989
780
{
990
780
    struct st_h2o_http1client_t *client = (void *)_client;
991
780
    h2o_httpclient_set_conn_properties_of_socket(client->sock, properties);
992
780
}
993
994
static void setup_client(struct st_h2o_http1client_t *client, h2o_socket_t *sock, h2o_url_t *origin)
995
780
{
996
780
    memset(&client->sock, 0, sizeof(*client) - offsetof(struct st_h2o_http1client_t, sock));
997
780
    client->super.cancel = do_cancel;
998
780
    client->super.get_conn_properties = do_get_conn_properties;
999
780
    client->super.update_window = do_update_window;
1000
780
    client->super.write_req = do_write_req;
1001
780
    client->super.buf = &sock->input;
1002
780
    client->sock = sock;
1003
780
    sock->data = client;
1004
780
    client->_origin = origin;
1005
780
}
1006
1007
void h2o_httpclient__h1_on_connect(h2o_httpclient_t *_client, h2o_socket_t *sock, h2o_url_t *origin)
1008
780
{
1009
780
    struct st_h2o_http1client_t *client = (void *)_client;
1010
1011
780
    assert(!h2o_timer_is_linked(&client->super._timeout));
1012
1013
780
    setup_client(client, sock, origin);
1014
780
    on_connection_ready(client);
1015
780
}
1016
1017
const size_t h2o_httpclient__h1_size = sizeof(struct st_h2o_http1client_t);