Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #include "precompiled.hpp" |
4 | | #include "own.hpp" |
5 | | #include "err.hpp" |
6 | | #include "io_thread.hpp" |
7 | | |
8 | | zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) : |
9 | 0 | object_t (parent_, tid_), |
10 | 0 | _terminating (false), |
11 | 0 | _sent_seqnum (0), |
12 | 0 | _processed_seqnum (0), |
13 | 0 | _owner (NULL), |
14 | 0 | _term_acks (0) |
15 | 0 | { |
16 | 0 | } |
17 | | |
18 | | zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) : |
19 | 0 | object_t (io_thread_), |
20 | 0 | options (options_), |
21 | 0 | _terminating (false), |
22 | 0 | _sent_seqnum (0), |
23 | 0 | _processed_seqnum (0), |
24 | 0 | _owner (NULL), |
25 | 0 | _term_acks (0) |
26 | 0 | { |
27 | 0 | } |
28 | | |
29 | | zmq::own_t::~own_t () |
30 | 0 | { |
31 | 0 | } |
32 | | |
33 | | void zmq::own_t::set_owner (own_t *owner_) |
34 | 0 | { |
35 | 0 | zmq_assert (!_owner); |
36 | 0 | _owner = owner_; |
37 | 0 | } |
38 | | |
39 | | void zmq::own_t::inc_seqnum () |
40 | 0 | { |
41 | | // This function may be called from a different thread! |
42 | 0 | _sent_seqnum.add (1); |
43 | 0 | } |
44 | | |
45 | | void zmq::own_t::process_seqnum () |
46 | 0 | { |
47 | | // Catch up with counter of processed commands. |
48 | 0 | _processed_seqnum++; |
49 | | |
50 | | // We may have caught up and still have pending terms acks. |
51 | 0 | check_term_acks (); |
52 | 0 | } |
53 | | |
54 | | void zmq::own_t::launch_child (own_t *object_) |
55 | 0 | { |
56 | | // Specify the owner of the object. |
57 | 0 | object_->set_owner (this); |
58 | | |
59 | | // Plug the object into the I/O thread. |
60 | 0 | send_plug (object_); |
61 | | |
62 | | // Take ownership of the object. |
63 | 0 | send_own (this, object_); |
64 | 0 | } |
65 | | |
66 | | void zmq::own_t::term_child (own_t *object_) |
67 | 0 | { |
68 | 0 | process_term_req (object_); |
69 | 0 | } |
70 | | |
71 | | void zmq::own_t::process_term_req (own_t *object_) |
72 | 0 | { |
73 | | // When shutting down we can ignore termination requests from owned |
74 | | // objects. The termination request was already sent to the object. |
75 | 0 | if (_terminating) |
76 | 0 | return; |
77 | | |
78 | | // If not found, we assume that termination request was already sent to |
79 | | // the object so we can safely ignore the request. |
80 | 0 | if (0 == _owned.erase (object_)) |
81 | 0 | return; |
82 | | |
83 | | // If I/O object is well and alive let's ask it to terminate. |
84 | 0 | register_term_acks (1); |
85 | | |
86 | | // Note that this object is the root of the (partial shutdown) thus, its |
87 | | // value of linger is used, rather than the value stored by the children. |
88 | 0 | send_term (object_, options.linger.load ()); |
89 | 0 | } |
90 | | |
91 | | void zmq::own_t::process_own (own_t *object_) |
92 | 0 | { |
93 | | // If the object is already being shut down, new owned objects are |
94 | | // immediately asked to terminate. Note that linger is set to zero. |
95 | 0 | if (_terminating) { |
96 | 0 | register_term_acks (1); |
97 | 0 | send_term (object_, 0); |
98 | 0 | return; |
99 | 0 | } |
100 | | |
101 | | // Store the reference to the owned object. |
102 | 0 | _owned.insert (object_); |
103 | 0 | } |
104 | | |
105 | | void zmq::own_t::terminate () |
106 | 0 | { |
107 | | // If termination is already underway, there's no point |
108 | | // in starting it anew. |
109 | 0 | if (_terminating) |
110 | 0 | return; |
111 | | |
112 | | // As for the root of the ownership tree, there's no one to terminate it, |
113 | | // so it has to terminate itself. |
114 | 0 | if (!_owner) { |
115 | 0 | process_term (options.linger.load ()); |
116 | 0 | return; |
117 | 0 | } |
118 | | |
119 | | // If I am an owned object, I'll ask my owner to terminate me. |
120 | 0 | send_term_req (_owner, this); |
121 | 0 | } |
122 | | |
123 | | bool zmq::own_t::is_terminating () const |
124 | 0 | { |
125 | 0 | return _terminating; |
126 | 0 | } |
127 | | |
128 | | void zmq::own_t::process_term (int linger_) |
129 | 0 | { |
130 | | // Double termination should never happen. |
131 | 0 | zmq_assert (!_terminating); |
132 | | |
133 | | // Send termination request to all owned objects. |
134 | 0 | for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end; |
135 | 0 | ++it) |
136 | 0 | send_term (*it, linger_); |
137 | 0 | register_term_acks (static_cast<int> (_owned.size ())); |
138 | 0 | _owned.clear (); |
139 | | |
140 | | // Start termination process and check whether by chance we cannot |
141 | | // terminate immediately. |
142 | 0 | _terminating = true; |
143 | 0 | check_term_acks (); |
144 | 0 | } |
145 | | |
146 | | void zmq::own_t::register_term_acks (int count_) |
147 | 0 | { |
148 | 0 | _term_acks += count_; |
149 | 0 | } |
150 | | |
151 | | void zmq::own_t::unregister_term_ack () |
152 | 0 | { |
153 | 0 | zmq_assert (_term_acks > 0); |
154 | 0 | _term_acks--; |
155 | | |
156 | | // This may be a last ack we are waiting for before termination... |
157 | 0 | check_term_acks (); |
158 | 0 | } |
159 | | |
160 | | void zmq::own_t::process_term_ack () |
161 | 0 | { |
162 | 0 | unregister_term_ack (); |
163 | 0 | } |
164 | | |
165 | | void zmq::own_t::check_term_acks () |
166 | 0 | { |
167 | 0 | if (_terminating && _processed_seqnum == _sent_seqnum.get () |
168 | 0 | && _term_acks == 0) { |
169 | | // Sanity check. There should be no active children at this point. |
170 | 0 | zmq_assert (_owned.empty ()); |
171 | | |
172 | | // The root object has nobody to confirm the termination to. |
173 | | // Other nodes will confirm the termination to the owner. |
174 | 0 | if (_owner) |
175 | 0 | send_term_ack (_owner); |
176 | | |
177 | | // Deallocate the resources. |
178 | 0 | process_destroy (); |
179 | 0 | } |
180 | 0 | } |
181 | | |
182 | | void zmq::own_t::process_destroy () |
183 | 0 | { |
184 | 0 | delete this; |
185 | 0 | } |