/src/libzmq/src/encoder.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #ifndef __ZMQ_ENCODER_HPP_INCLUDED__ |
4 | | #define __ZMQ_ENCODER_HPP_INCLUDED__ |
5 | | |
6 | | #if defined(_MSC_VER) |
7 | | #ifndef NOMINMAX |
8 | | #define NOMINMAX |
9 | | #endif |
10 | | #endif |
11 | | |
12 | | #include <stddef.h> |
13 | | #include <string.h> |
14 | | #include <stdlib.h> |
15 | | #include <algorithm> |
16 | | |
17 | | #include "err.hpp" |
18 | | #include "i_encoder.hpp" |
19 | | #include "msg.hpp" |
20 | | |
21 | | namespace zmq |
22 | | { |
23 | | // Helper base class for encoders. It implements the state machine that |
24 | | // fills the outgoing buffer. Derived classes should implement individual |
25 | | // state machine actions. |
26 | | |
27 | | template <typename T> class encoder_base_t : public i_encoder |
28 | | { |
29 | | public: |
30 | | explicit encoder_base_t (size_t bufsize_) : |
31 | 0 | _write_pos (0), |
32 | 0 | _to_write (0), |
33 | | _next (NULL), |
34 | 0 | _new_msg_flag (false), |
35 | 0 | _buf_size (bufsize_), |
36 | 0 | _buf (static_cast<unsigned char *> (malloc (bufsize_))), |
37 | | _in_progress (NULL) |
38 | 0 | { |
39 | 0 | alloc_assert (_buf); |
40 | 0 | } Unexecuted instantiation: zmq::encoder_base_t<zmq::v1_encoder_t>::encoder_base_t(unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::v2_encoder_t>::encoder_base_t(unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::v3_1_encoder_t>::encoder_base_t(unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::raw_encoder_t>::encoder_base_t(unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::ws_encoder_t>::encoder_base_t(unsigned long) |
41 | | |
42 | 0 | ~encoder_base_t () ZMQ_OVERRIDE { free (_buf); } Unexecuted instantiation: zmq::encoder_base_t<zmq::v1_encoder_t>::~encoder_base_t() Unexecuted instantiation: zmq::encoder_base_t<zmq::v2_encoder_t>::~encoder_base_t() Unexecuted instantiation: zmq::encoder_base_t<zmq::v3_1_encoder_t>::~encoder_base_t() Unexecuted instantiation: zmq::encoder_base_t<zmq::raw_encoder_t>::~encoder_base_t() Unexecuted instantiation: zmq::encoder_base_t<zmq::ws_encoder_t>::~encoder_base_t() |
43 | | |
44 | | // The function returns a batch of binary data. The data |
45 | | // are filled to a supplied buffer. If no buffer is supplied (data_ |
46 | | // points to NULL) decoder object will provide buffer of its own. |
47 | | size_t encode (unsigned char **data_, size_t size_) ZMQ_FINAL |
48 | 0 | { |
49 | 0 | unsigned char *buffer = !*data_ ? _buf : *data_; |
50 | 0 | const size_t buffersize = !*data_ ? _buf_size : size_; |
51 | |
|
52 | 0 | if (in_progress () == NULL) |
53 | 0 | return 0; |
54 | | |
55 | 0 | size_t pos = 0; |
56 | 0 | while (pos < buffersize) { |
57 | | // If there are no more data to return, run the state machine. |
58 | | // If there are still no data, return what we already have |
59 | | // in the buffer. |
60 | 0 | if (!_to_write) { |
61 | 0 | if (_new_msg_flag) { |
62 | 0 | int rc = _in_progress->close (); |
63 | 0 | errno_assert (rc == 0); |
64 | 0 | rc = _in_progress->init (); |
65 | 0 | errno_assert (rc == 0); |
66 | 0 | _in_progress = NULL; |
67 | 0 | break; |
68 | 0 | } |
69 | 0 | (static_cast<T *> (this)->*_next) (); |
70 | 0 | } |
71 | | |
72 | | // If there are no data in the buffer yet and we are able to |
73 | | // fill whole buffer in a single go, let's use zero-copy. |
74 | | // There's no disadvantage to it as we cannot stuck multiple |
75 | | // messages into the buffer anyway. Note that subsequent |
76 | | // write(s) are non-blocking, thus each single write writes |
77 | | // at most SO_SNDBUF bytes at once not depending on how large |
78 | | // is the chunk returned from here. |
79 | | // As a consequence, large messages being sent won't block |
80 | | // other engines running in the same I/O thread for excessive |
81 | | // amounts of time. |
82 | 0 | if (!pos && !*data_ && _to_write >= buffersize) { |
83 | 0 | *data_ = _write_pos; |
84 | 0 | pos = _to_write; |
85 | 0 | _write_pos = NULL; |
86 | 0 | _to_write = 0; |
87 | 0 | return pos; |
88 | 0 | } |
89 | | |
90 | | // Copy data to the buffer. If the buffer is full, return. |
91 | 0 | const size_t to_copy = std::min (_to_write, buffersize - pos); |
92 | 0 | memcpy (buffer + pos, _write_pos, to_copy); |
93 | 0 | pos += to_copy; |
94 | 0 | _write_pos += to_copy; |
95 | 0 | _to_write -= to_copy; |
96 | 0 | } |
97 | | |
98 | 0 | *data_ = buffer; |
99 | 0 | return pos; |
100 | 0 | } Unexecuted instantiation: zmq::encoder_base_t<zmq::v1_encoder_t>::encode(unsigned char**, unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::v2_encoder_t>::encode(unsigned char**, unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::v3_1_encoder_t>::encode(unsigned char**, unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::raw_encoder_t>::encode(unsigned char**, unsigned long) Unexecuted instantiation: zmq::encoder_base_t<zmq::ws_encoder_t>::encode(unsigned char**, unsigned long) |
101 | | |
102 | | void load_msg (msg_t *msg_) ZMQ_FINAL |
103 | 0 | { |
104 | 0 | zmq_assert (in_progress () == NULL); |
105 | 0 | _in_progress = msg_; |
106 | 0 | (static_cast<T *> (this)->*_next) (); |
107 | 0 | } Unexecuted instantiation: zmq::encoder_base_t<zmq::v1_encoder_t>::load_msg(zmq::msg_t*) Unexecuted instantiation: zmq::encoder_base_t<zmq::v2_encoder_t>::load_msg(zmq::msg_t*) Unexecuted instantiation: zmq::encoder_base_t<zmq::v3_1_encoder_t>::load_msg(zmq::msg_t*) Unexecuted instantiation: zmq::encoder_base_t<zmq::raw_encoder_t>::load_msg(zmq::msg_t*) Unexecuted instantiation: zmq::encoder_base_t<zmq::ws_encoder_t>::load_msg(zmq::msg_t*) |
108 | | |
109 | | protected: |
110 | | // Prototype of state machine action. |
111 | | typedef void (T::*step_t) (); |
112 | | |
113 | | // This function should be called from derived class to write the data |
114 | | // to the buffer and schedule next state machine action. |
115 | | void next_step (void *write_pos_, |
116 | | size_t to_write_, |
117 | | step_t next_, |
118 | | bool new_msg_flag_) |
119 | 0 | { |
120 | 0 | _write_pos = static_cast<unsigned char *> (write_pos_); |
121 | 0 | _to_write = to_write_; |
122 | 0 | _next = next_; |
123 | 0 | _new_msg_flag = new_msg_flag_; |
124 | 0 | } Unexecuted instantiation: zmq::encoder_base_t<zmq::v1_encoder_t>::next_step(void*, unsigned long, void (zmq::v1_encoder_t::*)(), bool) Unexecuted instantiation: zmq::encoder_base_t<zmq::v2_encoder_t>::next_step(void*, unsigned long, void (zmq::v2_encoder_t::*)(), bool) Unexecuted instantiation: zmq::encoder_base_t<zmq::v3_1_encoder_t>::next_step(void*, unsigned long, void (zmq::v3_1_encoder_t::*)(), bool) Unexecuted instantiation: zmq::encoder_base_t<zmq::raw_encoder_t>::next_step(void*, unsigned long, void (zmq::raw_encoder_t::*)(), bool) Unexecuted instantiation: zmq::encoder_base_t<zmq::ws_encoder_t>::next_step(void*, unsigned long, void (zmq::ws_encoder_t::*)(), bool) |
125 | | |
126 | 0 | msg_t *in_progress () { return _in_progress; } Unexecuted instantiation: zmq::encoder_base_t<zmq::v1_encoder_t>::in_progress() Unexecuted instantiation: zmq::encoder_base_t<zmq::v2_encoder_t>::in_progress() Unexecuted instantiation: zmq::encoder_base_t<zmq::v3_1_encoder_t>::in_progress() Unexecuted instantiation: zmq::encoder_base_t<zmq::raw_encoder_t>::in_progress() Unexecuted instantiation: zmq::encoder_base_t<zmq::ws_encoder_t>::in_progress() |
127 | | |
128 | | private: |
129 | | // Where to get the data to write from. |
130 | | unsigned char *_write_pos; |
131 | | |
132 | | // How much data to write before next step should be executed. |
133 | | size_t _to_write; |
134 | | |
135 | | // Next step. If set to NULL, it means that associated data stream |
136 | | // is dead. |
137 | | step_t _next; |
138 | | |
139 | | bool _new_msg_flag; |
140 | | |
141 | | // The buffer for encoded data. |
142 | | const size_t _buf_size; |
143 | | unsigned char *const _buf; |
144 | | |
145 | | msg_t *_in_progress; |
146 | | |
147 | | ZMQ_NON_COPYABLE_NOR_MOVABLE (encoder_base_t) |
148 | | }; |
149 | | } |
150 | | |
151 | | #endif |