Coverage Report

Created: 2025-07-12 06:05

/src/libzmq/src/raw_engine.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "macros.hpp"
5
6
#include <limits.h>
7
#include <string.h>
8
9
#ifndef ZMQ_HAVE_WINDOWS
10
#include <unistd.h>
11
#endif
12
13
#include <new>
14
#include <sstream>
15
16
#include "raw_engine.hpp"
17
#include "io_thread.hpp"
18
#include "session_base.hpp"
19
#include "v1_encoder.hpp"
20
#include "v1_decoder.hpp"
21
#include "v2_encoder.hpp"
22
#include "v2_decoder.hpp"
23
#include "null_mechanism.hpp"
24
#include "plain_client.hpp"
25
#include "plain_server.hpp"
26
#include "gssapi_client.hpp"
27
#include "gssapi_server.hpp"
28
#include "curve_client.hpp"
29
#include "curve_server.hpp"
30
#include "raw_decoder.hpp"
31
#include "raw_encoder.hpp"
32
#include "config.hpp"
33
#include "err.hpp"
34
#include "ip.hpp"
35
#include "tcp.hpp"
36
#include "likely.hpp"
37
#include "wire.hpp"
38
39
zmq::raw_engine_t::raw_engine_t (
40
  fd_t fd_,
41
  const options_t &options_,
42
  const endpoint_uri_pair_t &endpoint_uri_pair_) :
43
0
    stream_engine_base_t (fd_, options_, endpoint_uri_pair_, false)
44
0
{
45
0
}
46
47
zmq::raw_engine_t::~raw_engine_t ()
48
0
{
49
0
}
50
51
void zmq::raw_engine_t::plug_internal ()
52
0
{
53
    // no handshaking for raw sock, instantiate raw encoder and decoders
54
0
    _encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size);
55
0
    alloc_assert (_encoder);
56
57
0
    _decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size);
58
0
    alloc_assert (_decoder);
59
60
0
    _next_msg = &raw_engine_t::pull_msg_from_session;
61
0
    _process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
62
0
      &raw_engine_t::push_raw_msg_to_session);
63
64
0
    properties_t properties;
65
0
    if (init_properties (properties)) {
66
        //  Compile metadata.
67
0
        zmq_assert (_metadata == NULL);
68
0
        _metadata = new (std::nothrow) metadata_t (properties);
69
0
        alloc_assert (_metadata);
70
0
    }
71
72
0
    if (_options.raw_notify) {
73
        //  For raw sockets, send an initial 0-length message to the
74
        // application so that it knows a peer has connected.
75
0
        msg_t connector;
76
0
        connector.init ();
77
0
        push_raw_msg_to_session (&connector);
78
0
        connector.close ();
79
0
        session ()->flush ();
80
0
    }
81
82
0
    set_pollin ();
83
0
    set_pollout ();
84
    //  Flush all the data that may have been already received downstream.
85
0
    in_event ();
86
0
}
87
88
bool zmq::raw_engine_t::handshake ()
89
0
{
90
0
    return true;
91
0
}
92
93
void zmq::raw_engine_t::error (error_reason_t reason_)
94
0
{
95
0
    if (_options.raw_socket && _options.raw_notify) {
96
        //  For raw sockets, send a final 0-length message to the application
97
        //  so that it knows the peer has been disconnected.
98
0
        msg_t terminator;
99
0
        terminator.init ();
100
0
        push_raw_msg_to_session (&terminator);
101
0
        terminator.close ();
102
0
    }
103
0
    stream_engine_base_t::error (reason_);
104
0
}
105
106
int zmq::raw_engine_t::push_raw_msg_to_session (msg_t *msg_)
107
0
{
108
0
    if (_metadata && _metadata != msg_->metadata ())
109
0
        msg_->set_metadata (_metadata);
110
0
    return push_msg_to_session (msg_);
111
0
}