/src/libzmq/src/raw_engine.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "macros.hpp" |
5 | | |
6 | | #include <limits.h> |
7 | | #include <string.h> |
8 | | |
9 | | #ifndef ZMQ_HAVE_WINDOWS |
10 | | #include <unistd.h> |
11 | | #endif |
12 | | |
13 | | #include <new> |
14 | | #include <sstream> |
15 | | |
16 | | #include "raw_engine.hpp" |
17 | | #include "io_thread.hpp" |
18 | | #include "session_base.hpp" |
19 | | #include "v1_encoder.hpp" |
20 | | #include "v1_decoder.hpp" |
21 | | #include "v2_encoder.hpp" |
22 | | #include "v2_decoder.hpp" |
23 | | #include "null_mechanism.hpp" |
24 | | #include "plain_client.hpp" |
25 | | #include "plain_server.hpp" |
26 | | #include "gssapi_client.hpp" |
27 | | #include "gssapi_server.hpp" |
28 | | #include "curve_client.hpp" |
29 | | #include "curve_server.hpp" |
30 | | #include "raw_decoder.hpp" |
31 | | #include "raw_encoder.hpp" |
32 | | #include "config.hpp" |
33 | | #include "err.hpp" |
34 | | #include "ip.hpp" |
35 | | #include "tcp.hpp" |
36 | | #include "likely.hpp" |
37 | | #include "wire.hpp" |
38 | | |
39 | | zmq::raw_engine_t::raw_engine_t ( |
40 | | fd_t fd_, |
41 | | const options_t &options_, |
42 | | const endpoint_uri_pair_t &endpoint_uri_pair_) : |
43 | 0 | stream_engine_base_t (fd_, options_, endpoint_uri_pair_, false) |
44 | 0 | { |
45 | 0 | } |
46 | | |
47 | | zmq::raw_engine_t::~raw_engine_t () |
48 | 0 | { |
49 | 0 | } |
50 | | |
51 | | void zmq::raw_engine_t::plug_internal () |
52 | 0 | { |
53 | | // no handshaking for raw sock, instantiate raw encoder and decoders |
54 | 0 | _encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size); |
55 | 0 | alloc_assert (_encoder); |
56 | |
|
57 | 0 | _decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size); |
58 | 0 | alloc_assert (_decoder); |
59 | |
|
60 | 0 | _next_msg = &raw_engine_t::pull_msg_from_session; |
61 | 0 | _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> ( |
62 | 0 | &raw_engine_t::push_raw_msg_to_session); |
63 | |
|
64 | 0 | properties_t properties; |
65 | 0 | if (init_properties (properties)) { |
66 | | // Compile metadata. |
67 | 0 | zmq_assert (_metadata == NULL); |
68 | 0 | _metadata = new (std::nothrow) metadata_t (properties); |
69 | 0 | alloc_assert (_metadata); |
70 | 0 | } |
71 | |
|
72 | 0 | if (_options.raw_notify) { |
73 | | // For raw sockets, send an initial 0-length message to the |
74 | | // application so that it knows a peer has connected. |
75 | 0 | msg_t connector; |
76 | 0 | connector.init (); |
77 | 0 | push_raw_msg_to_session (&connector); |
78 | 0 | connector.close (); |
79 | 0 | session ()->flush (); |
80 | 0 | } |
81 | |
|
82 | 0 | set_pollin (); |
83 | 0 | set_pollout (); |
84 | | // Flush all the data that may have been already received downstream. |
85 | 0 | in_event (); |
86 | 0 | } |
87 | | |
88 | | bool zmq::raw_engine_t::handshake () |
89 | 0 | { |
90 | 0 | return true; |
91 | 0 | } |
92 | | |
93 | | void zmq::raw_engine_t::error (error_reason_t reason_) |
94 | 0 | { |
95 | 0 | if (_options.raw_socket && _options.raw_notify) { |
96 | | // For raw sockets, send a final 0-length message to the application |
97 | | // so that it knows the peer has been disconnected. |
98 | 0 | msg_t terminator; |
99 | 0 | terminator.init (); |
100 | 0 | push_raw_msg_to_session (&terminator); |
101 | 0 | terminator.close (); |
102 | 0 | } |
103 | 0 | stream_engine_base_t::error (reason_); |
104 | 0 | } |
105 | | |
106 | | int zmq::raw_engine_t::push_raw_msg_to_session (msg_t *msg_) |
107 | 0 | { |
108 | 0 | if (_metadata && _metadata != msg_->metadata ()) |
109 | 0 | msg_->set_metadata (_metadata); |
110 | 0 | return push_msg_to_session (msg_); |
111 | 0 | } |