/src/libzmq/src/decoder.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #ifndef __ZMQ_DECODER_HPP_INCLUDED__ |
4 | | #define __ZMQ_DECODER_HPP_INCLUDED__ |
5 | | |
6 | | #include <algorithm> |
7 | | #include <cstddef> |
8 | | #include <cstring> |
9 | | |
10 | | #include "decoder_allocators.hpp" |
11 | | #include "err.hpp" |
12 | | #include "i_decoder.hpp" |
13 | | #include "stdint.hpp" |
14 | | |
15 | | namespace zmq |
16 | | { |
17 | | // Helper base class for decoders that know the amount of data to read |
18 | | // in advance at any moment. Knowing the amount in advance is a property |
19 | | // of the protocol used. 0MQ framing protocol is based size-prefixed |
20 | | // paradigm, which qualifies it to be parsed by this class. |
21 | | // On the other hand, XML-based transports (like XMPP or SOAP) don't allow |
22 | | // for knowing the size of data to read in advance and should use different |
23 | | // decoding algorithms. |
24 | | // |
25 | | // This class implements the state machine that parses the incoming buffer. |
26 | | // Derived class should implement individual state machine actions. |
27 | | // |
28 | | // Buffer management is done by an allocator policy. |
29 | | template <typename T, typename A = c_single_allocator> |
30 | | class decoder_base_t : public i_decoder |
31 | | { |
32 | | public: |
33 | | explicit decoder_base_t (const size_t buf_size_) : |
34 | 0 | _next (NULL), _read_pos (NULL), _to_read (0), _allocator (buf_size_) |
35 | 0 | { |
36 | 0 | _buf = _allocator.allocate (); |
37 | 0 | } Unexecuted instantiation: zmq::decoder_base_t<zmq::v1_decoder_t, zmq::c_single_allocator>::decoder_base_t(unsigned long) Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::decoder_base_t(unsigned long) Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::decoder_base_t(unsigned long) |
38 | | |
39 | 0 | ~decoder_base_t () ZMQ_OVERRIDE { _allocator.deallocate (); } Unexecuted instantiation: zmq::decoder_base_t<zmq::v1_decoder_t, zmq::c_single_allocator>::~decoder_base_t() Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::~decoder_base_t() Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::~decoder_base_t() |
40 | | |
41 | | // Returns a buffer to be filled with binary data. |
42 | | void get_buffer (unsigned char **data_, std::size_t *size_) ZMQ_FINAL |
43 | 0 | { |
44 | 0 | _buf = _allocator.allocate (); |
45 | | |
46 | | // If we are expected to read large message, we'll opt for zero- |
47 | | // copy, i.e. we'll ask caller to fill the data directly to the |
48 | | // message. Note that subsequent read(s) are non-blocking, thus |
49 | | // each single read reads at most SO_RCVBUF bytes at once not |
50 | | // depending on how large is the chunk returned from here. |
51 | | // As a consequence, large messages being received won't block |
52 | | // other engines running in the same I/O thread for excessive |
53 | | // amounts of time. |
54 | 0 | if (_to_read >= _allocator.size ()) { |
55 | 0 | *data_ = _read_pos; |
56 | 0 | *size_ = _to_read; |
57 | 0 | return; |
58 | 0 | } |
59 | | |
60 | 0 | *data_ = _buf; |
61 | 0 | *size_ = _allocator.size (); |
62 | 0 | } Unexecuted instantiation: zmq::decoder_base_t<zmq::v1_decoder_t, zmq::c_single_allocator>::get_buffer(unsigned char**, unsigned long*) Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::get_buffer(unsigned char**, unsigned long*) Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::get_buffer(unsigned char**, unsigned long*) |
63 | | |
64 | | // Processes the data in the buffer previously allocated using |
65 | | // get_buffer function. size_ argument specifies number of bytes |
66 | | // actually filled into the buffer. Function returns 1 when the |
67 | | // whole message was decoded or 0 when more data is required. |
68 | | // On error, -1 is returned and errno set accordingly. |
69 | | // Number of bytes processed is returned in bytes_used_. |
70 | | int decode (const unsigned char *data_, |
71 | | std::size_t size_, |
72 | | std::size_t &bytes_used_) ZMQ_FINAL |
73 | 0 | { |
74 | 0 | bytes_used_ = 0; |
75 | | |
76 | | // In case of zero-copy simply adjust the pointers, no copying |
77 | | // is required. Also, run the state machine in case all the data |
78 | | // were processed. |
79 | 0 | if (data_ == _read_pos) { |
80 | 0 | zmq_assert (size_ <= _to_read); |
81 | 0 | _read_pos += size_; |
82 | 0 | _to_read -= size_; |
83 | 0 | bytes_used_ = size_; |
84 | |
|
85 | 0 | while (!_to_read) { |
86 | 0 | const int rc = |
87 | 0 | (static_cast<T *> (this)->*_next) (data_ + bytes_used_); |
88 | 0 | if (rc != 0) |
89 | 0 | return rc; |
90 | 0 | } |
91 | 0 | return 0; |
92 | 0 | } |
93 | | |
94 | 0 | while (bytes_used_ < size_) { |
95 | | // Copy the data from buffer to the message. |
96 | 0 | const size_t to_copy = std::min (_to_read, size_ - bytes_used_); |
97 | | // Only copy when destination address is different from the |
98 | | // current address in the buffer. |
99 | 0 | if (_read_pos != data_ + bytes_used_) { |
100 | 0 | memcpy (_read_pos, data_ + bytes_used_, to_copy); |
101 | 0 | } |
102 | |
|
103 | 0 | _read_pos += to_copy; |
104 | 0 | _to_read -= to_copy; |
105 | 0 | bytes_used_ += to_copy; |
106 | | // Try to get more space in the message to fill in. |
107 | | // If none is available, return. |
108 | 0 | while (_to_read == 0) { |
109 | | // pass current address in the buffer |
110 | 0 | const int rc = |
111 | 0 | (static_cast<T *> (this)->*_next) (data_ + bytes_used_); |
112 | 0 | if (rc != 0) |
113 | 0 | return rc; |
114 | 0 | } |
115 | 0 | } |
116 | | |
117 | 0 | return 0; |
118 | 0 | } Unexecuted instantiation: zmq::decoder_base_t<zmq::v1_decoder_t, zmq::c_single_allocator>::decode(unsigned char const*, unsigned long, unsigned long&) Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::decode(unsigned char const*, unsigned long, unsigned long&) Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::decode(unsigned char const*, unsigned long, unsigned long&) |
119 | | |
120 | | void resize_buffer (std::size_t new_size_) ZMQ_FINAL |
121 | 0 | { |
122 | 0 | _allocator.resize (new_size_); |
123 | 0 | } Unexecuted instantiation: zmq::decoder_base_t<zmq::v1_decoder_t, zmq::c_single_allocator>::resize_buffer(unsigned long) Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::resize_buffer(unsigned long) Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::resize_buffer(unsigned long) |
124 | | |
125 | | protected: |
126 | | // Prototype of state machine action. Action should return false if |
127 | | // it is unable to push the data to the system. |
128 | | typedef int (T::*step_t) (unsigned char const *); |
129 | | |
130 | | // This function should be called from derived class to read data |
131 | | // from the buffer and schedule next state machine action. |
132 | | void next_step (void *read_pos_, std::size_t to_read_, step_t next_) |
133 | 0 | { |
134 | 0 | _read_pos = static_cast<unsigned char *> (read_pos_); |
135 | 0 | _to_read = to_read_; |
136 | 0 | _next = next_; |
137 | 0 | } Unexecuted instantiation: zmq::decoder_base_t<zmq::v1_decoder_t, zmq::c_single_allocator>::next_step(void*, unsigned long, int (zmq::v1_decoder_t::*)(unsigned char const*)) Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::next_step(void*, unsigned long, int (zmq::v2_decoder_t::*)(unsigned char const*)) Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::next_step(void*, unsigned long, int (zmq::ws_decoder_t::*)(unsigned char const*)) |
138 | | |
139 | 0 | A &get_allocator () { return _allocator; } Unexecuted instantiation: zmq::decoder_base_t<zmq::v2_decoder_t, zmq::shared_message_memory_allocator>::get_allocator() Unexecuted instantiation: zmq::decoder_base_t<zmq::ws_decoder_t, zmq::shared_message_memory_allocator>::get_allocator() |
140 | | |
141 | | private: |
142 | | // Next step. If set to NULL, it means that associated data stream |
143 | | // is dead. Note that there can be still data in the process in such |
144 | | // case. |
145 | | step_t _next; |
146 | | |
147 | | // Where to store the read data. |
148 | | unsigned char *_read_pos; |
149 | | |
150 | | // How much data to read before taking next step. |
151 | | std::size_t _to_read; |
152 | | |
153 | | // The duffer for data to decode. |
154 | | A _allocator; |
155 | | unsigned char *_buf; |
156 | | |
157 | | ZMQ_NON_COPYABLE_NOR_MOVABLE (decoder_base_t) |
158 | | }; |
159 | | } |
160 | | |
161 | | #endif |