/src/qpid-proton/c/src/core/engine.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * |
3 | | * Licensed to the Apache Software Foundation (ASF) under one |
4 | | * or more contributor license agreements. See the NOTICE file |
5 | | * distributed with this work for additional information |
6 | | * regarding copyright ownership. The ASF licenses this file |
7 | | * to you under the Apache License, Version 2.0 (the |
8 | | * "License"); you may not use this file except in compliance |
9 | | * with the License. You may obtain a copy of the License at |
10 | | * |
11 | | * http://www.apache.org/licenses/LICENSE-2.0 |
12 | | * |
13 | | * Unless required by applicable law or agreed to in writing, |
14 | | * software distributed under the License is distributed on an |
15 | | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
16 | | * KIND, either express or implied. See the License for the |
17 | | * specific language governing permissions and limitations |
18 | | * under the License. |
19 | | * |
20 | | */ |
21 | | |
22 | | /* for pn_work_head and related deprecations */ |
23 | | #define PN_USE_DEPRECATED_API 1 |
24 | | |
25 | | #include "engine-internal.h" |
26 | | |
27 | | #include "consumers.h" |
28 | | #include "core/frame_consumers.h" |
29 | | #include "fixed_string.h" |
30 | | #include "framing.h" |
31 | | #include "memory.h" |
32 | | #include "platform/platform.h" |
33 | | #include "platform/platform_fmt.h" |
34 | | #include "protocol.h" |
35 | | #include "transport.h" |
36 | | |
37 | | #include <assert.h> |
38 | | #include <stdarg.h> |
39 | | #include <stddef.h> |
40 | | #include <stdio.h> |
41 | | #include <string.h> |
42 | | |
43 | | |
44 | | static void pni_session_bound(pn_session_t *ssn); |
45 | | static void pni_link_bound(pn_link_t *link); |
46 | | |
47 | | static void pn_delivery_incref(void *object); |
48 | | static void pn_delivery_finalize(void *object); |
49 | | #define pn_delivery_new NULL |
50 | | #define pn_delivery_refcount NULL |
51 | | #define pn_delivery_decref NULL |
52 | | #define pn_delivery_free NULL |
53 | | #define pn_delivery_initialize NULL |
54 | | #define pn_delivery_hashcode NULL |
55 | | #define pn_delivery_compare NULL |
56 | | static void pn_delivery_inspect(void *obj, pn_fixed_string_t *dst); |
57 | | static const pn_class_t PN_CLASSCLASS(pn_delivery) = PN_METACLASS(pn_delivery); |
58 | | |
59 | | // endpoints |
60 | | |
61 | | static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint) |
62 | 1.99M | { |
63 | 1.99M | switch (endpoint->type) { |
64 | 48.1k | case CONNECTION: |
65 | 48.1k | return (pn_connection_t *) endpoint; |
66 | 1.12M | case SESSION: |
67 | 1.12M | return ((pn_session_t *) endpoint)->connection; |
68 | 148k | case SENDER: |
69 | 824k | case RECEIVER: |
70 | 824k | return ((pn_link_t *) endpoint)->session->connection; |
71 | 1.99M | } |
72 | | |
73 | 0 | return NULL; |
74 | 1.99M | } |
75 | | |
76 | 66.4k | static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) { |
77 | 66.4k | switch (type) { |
78 | 29.9k | case CONNECTION: |
79 | 29.9k | return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE; |
80 | 18.2k | case SESSION: |
81 | 18.2k | return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE; |
82 | 0 | case SENDER: |
83 | 18.2k | case RECEIVER: |
84 | 18.2k | return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE; |
85 | 0 | default: |
86 | 0 | assert(false); |
87 | 0 | return PN_EVENT_NONE; |
88 | 66.4k | } |
89 | 66.4k | } |
90 | | |
91 | | static void pn_endpoint_open(pn_endpoint_t *endpoint) |
92 | 54.7k | { |
93 | 54.7k | if (!(endpoint->state & PN_LOCAL_ACTIVE)) { |
94 | 54.7k | PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE); |
95 | 54.7k | pn_connection_t *conn = pni_ep_get_connection(endpoint); |
96 | 54.7k | pn_collector_put_object(conn->collector, endpoint, |
97 | 54.7k | endpoint_event((pn_endpoint_type_t) endpoint->type, true)); |
98 | 54.7k | pn_modified(conn, endpoint, true); |
99 | 54.7k | } |
100 | 54.7k | } |
101 | | |
102 | | static void pn_endpoint_close(pn_endpoint_t *endpoint) |
103 | 17.7k | { |
104 | 17.7k | if (!(endpoint->state & PN_LOCAL_CLOSED)) { |
105 | 11.6k | PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED); |
106 | 11.6k | pn_connection_t *conn = pni_ep_get_connection(endpoint); |
107 | 11.6k | pn_collector_put_object(conn->collector, endpoint, |
108 | 11.6k | endpoint_event((pn_endpoint_type_t) endpoint->type, false)); |
109 | 11.6k | pn_modified(conn, endpoint, true); |
110 | 11.6k | } |
111 | 17.7k | } |
112 | | |
113 | | void pn_connection_reset(pn_connection_t *connection) |
114 | 18.2k | { |
115 | 18.2k | assert(connection); |
116 | 18.2k | pn_endpoint_t *endpoint = &connection->endpoint; |
117 | 18.2k | endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; |
118 | 18.2k | } |
119 | | |
120 | | void pn_connection_open(pn_connection_t *connection) |
121 | 18.2k | { |
122 | 18.2k | assert(connection); |
123 | 18.2k | pn_endpoint_open(&connection->endpoint); |
124 | 18.2k | } |
125 | | |
126 | | void pn_connection_close(pn_connection_t *connection) |
127 | 17.7k | { |
128 | 17.7k | assert(connection); |
129 | 17.7k | pn_endpoint_close(&connection->endpoint); |
130 | 17.7k | } |
131 | | |
132 | | static void pni_endpoint_tini(pn_endpoint_t *endpoint); |
133 | | |
134 | | void pn_connection_release(pn_connection_t *connection) |
135 | 18.2k | { |
136 | 18.2k | assert(!connection->endpoint.freed); |
137 | | // free those endpoints that haven't been freed by the application |
138 | 18.2k | LL_REMOVE(connection, endpoint, &connection->endpoint); |
139 | 286k | while (connection->endpoint_head) { |
140 | 268k | pn_endpoint_t *ep = connection->endpoint_head; |
141 | 268k | switch (ep->type) { |
142 | 268k | case SESSION: |
143 | | // note: this will free all child links: |
144 | 268k | pn_session_free((pn_session_t *)ep); |
145 | 268k | break; |
146 | 0 | case SENDER: |
147 | 0 | case RECEIVER: |
148 | 0 | pn_link_free((pn_link_t *)ep); |
149 | 0 | break; |
150 | 0 | default: |
151 | 0 | assert(false); |
152 | 268k | } |
153 | 268k | } |
154 | 18.2k | connection->endpoint.freed = true; |
155 | 18.2k | if (!connection->transport) { |
156 | | // no transport available to consume transport work items, |
157 | | // so manually clear them: |
158 | 18.2k | pn_ep_incref(&connection->endpoint); |
159 | 18.2k | pn_connection_unbound(connection); |
160 | 18.2k | } |
161 | 18.2k | pn_ep_decref(&connection->endpoint); |
162 | 18.2k | } |
163 | | |
164 | 18.2k | void pn_connection_free(pn_connection_t *connection) { |
165 | 18.2k | pn_connection_release(connection); |
166 | 18.2k | pn_decref(connection); |
167 | 18.2k | } |
168 | | |
169 | | void pn_connection_bound(pn_connection_t *connection) |
170 | 18.2k | { |
171 | 18.2k | pn_collector_put_object(connection->collector, connection, PN_CONNECTION_BOUND); |
172 | 18.2k | pn_ep_incref(&connection->endpoint); |
173 | | |
174 | 18.2k | size_t nsessions = pn_list_size(connection->sessions); |
175 | 36.5k | for (size_t i = 0; i < nsessions; i++) { |
176 | 18.2k | pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i)); |
177 | 18.2k | } |
178 | 18.2k | } |
179 | | |
180 | | // invoked when transport has been removed: |
181 | | void pn_connection_unbound(pn_connection_t *connection) |
182 | 36.5k | { |
183 | 36.5k | connection->transport = NULL; |
184 | 36.5k | if (connection->endpoint.freed) { |
185 | | // connection has been freed prior to unbinding, thus it |
186 | | // cannot be re-assigned to a new transport. Clear the |
187 | | // transport work lists to allow the connection to be freed. |
188 | 36.5k | while (connection->transport_head) { |
189 | 18.2k | pn_clear_modified(connection, connection->transport_head); |
190 | 18.2k | } |
191 | 18.2k | while (connection->tpwork_head) { |
192 | 0 | pn_clear_tpwork(connection->tpwork_head); |
193 | 0 | } |
194 | 18.2k | } |
195 | 36.5k | pn_ep_decref(&connection->endpoint); |
196 | 36.5k | } |
197 | | |
198 | | pn_record_t *pn_connection_attachments(pn_connection_t *connection) |
199 | 0 | { |
200 | 0 | assert(connection); |
201 | 0 | return connection->context; |
202 | 0 | } |
203 | | |
204 | | void *pn_connection_get_context(pn_connection_t *conn) |
205 | 0 | { |
206 | | // XXX: we should really assert on conn here, but this causes |
207 | | // messenger tests to fail |
208 | 0 | return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL; |
209 | 0 | } |
210 | | |
211 | | void pn_connection_set_context(pn_connection_t *conn, void *context) |
212 | 0 | { |
213 | 0 | assert(conn); |
214 | 0 | pn_record_set(conn->context, PN_LEGCTX, context); |
215 | 0 | } |
216 | | |
217 | | pn_transport_t *pn_connection_transport(pn_connection_t *connection) |
218 | 0 | { |
219 | 0 | assert(connection); |
220 | 0 | return connection->transport; |
221 | 0 | } |
222 | | |
223 | | void pn_condition_init(pn_condition_t *condition) |
224 | 1.12M | { |
225 | 1.12M | condition->info_raw = (pn_bytes_t){0, NULL}; |
226 | 1.12M | condition->name = NULL; |
227 | 1.12M | condition->description = NULL; |
228 | 1.12M | condition->info = NULL; |
229 | 1.12M | } |
230 | | |
231 | 0 | pn_condition_t *pn_condition(void) { |
232 | 0 | pn_condition_t *c = (pn_condition_t*)pni_mem_allocate(PN_VOID, sizeof(pn_condition_t)); |
233 | 0 | pn_condition_init(c); |
234 | 0 | return c; |
235 | 0 | } |
236 | | |
237 | | void pn_condition_tini(pn_condition_t *condition) |
238 | 1.12M | { |
239 | 1.12M | pn_bytes_free(condition->info_raw); |
240 | 1.12M | pn_data_free(condition->info); |
241 | 1.12M | pn_free(condition->description); |
242 | 1.12M | pn_free(condition->name); |
243 | 1.12M | } |
244 | | |
245 | 0 | void pn_condition_free(pn_condition_t *c) { |
246 | 0 | if (c) { |
247 | 0 | pn_condition_clear(c); |
248 | 0 | pn_condition_tini(c); |
249 | 0 | pni_mem_deallocate(PN_VOID, c); |
250 | 0 | } |
251 | 0 | } |
252 | | |
253 | | static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn) |
254 | 268k | { |
255 | 268k | pn_list_add(conn->sessions, ssn); |
256 | 268k | ssn->connection = conn; |
257 | 268k | pn_incref(conn); // keep around until finalized |
258 | 268k | pn_ep_incref(&conn->endpoint); |
259 | 268k | } |
260 | | |
261 | | static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn) |
262 | 537k | { |
263 | 537k | if (pn_list_remove(conn->sessions, ssn)) { |
264 | 268k | pn_ep_decref(&conn->endpoint); |
265 | 268k | LL_REMOVE(conn, endpoint, &ssn->endpoint); |
266 | 268k | } |
267 | 537k | } |
268 | | |
269 | | pn_connection_t *pn_session_connection(pn_session_t *session) |
270 | 6.32k | { |
271 | 6.32k | if (!session) return NULL; |
272 | 6.32k | return session->connection; |
273 | 6.32k | } |
274 | | |
275 | | void pn_session_open(pn_session_t *session) |
276 | 18.2k | { |
277 | 18.2k | assert(session); |
278 | 18.2k | pn_endpoint_open(&session->endpoint); |
279 | 18.2k | } |
280 | | |
281 | | void pn_session_close(pn_session_t *session) |
282 | 0 | { |
283 | 0 | assert(session); |
284 | 0 | pn_endpoint_close(&session->endpoint); |
285 | 0 | } |
286 | | |
287 | | void pn_session_free(pn_session_t *session) |
288 | 268k | { |
289 | 268k | assert(!session->endpoint.freed); |
290 | 523k | while(pn_list_size(session->links)) { |
291 | 254k | pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0); |
292 | 254k | pn_link_free(link); |
293 | 254k | } |
294 | 268k | pni_remove_session(session->connection, session); |
295 | 268k | pn_list_add(session->connection->freed, session); |
296 | 268k | session->endpoint.freed = true; |
297 | 268k | pn_ep_decref(&session->endpoint); |
298 | | |
299 | | // the finalize logic depends on endpoint.freed, so we incref/decref |
300 | | // to give it a chance to rerun |
301 | 268k | pn_incref(session); |
302 | 268k | pn_decref(session); |
303 | 268k | } |
304 | | |
305 | | pn_record_t *pn_session_attachments(pn_session_t *session) |
306 | 0 | { |
307 | 0 | assert(session); |
308 | 0 | return session->context; |
309 | 0 | } |
310 | | |
311 | | void *pn_session_get_context(pn_session_t *session) |
312 | 0 | { |
313 | 0 | return session ? pn_record_get(session->context, PN_LEGCTX) : 0; |
314 | 0 | } |
315 | | |
316 | | void pn_session_set_context(pn_session_t *session, void *context) |
317 | 0 | { |
318 | 0 | assert(session); |
319 | 0 | pn_record_set(session->context, PN_LEGCTX, context); |
320 | 0 | } |
321 | | |
322 | | |
323 | | static void pni_add_link(pn_session_t *ssn, pn_link_t *link) |
324 | 254k | { |
325 | 254k | pn_list_add(ssn->links, link); |
326 | 254k | link->session = ssn; |
327 | 254k | pn_ep_incref(&ssn->endpoint); |
328 | 254k | } |
329 | | |
330 | | static void pni_remove_link(pn_session_t *ssn, pn_link_t *link) |
331 | 509k | { |
332 | 509k | if (pn_list_remove(ssn->links, link)) { |
333 | 254k | pn_ep_decref(&ssn->endpoint); |
334 | 254k | LL_REMOVE(ssn->connection, endpoint, &link->endpoint); |
335 | 254k | } |
336 | 509k | } |
337 | | |
338 | | void pn_link_open(pn_link_t *link) |
339 | 18.2k | { |
340 | 18.2k | assert(link); |
341 | 18.2k | pn_endpoint_open(&link->endpoint); |
342 | 18.2k | } |
343 | | |
344 | | void pn_link_close(pn_link_t *link) |
345 | 0 | { |
346 | 0 | assert(link); |
347 | 0 | pn_endpoint_close(&link->endpoint); |
348 | 0 | } |
349 | | |
350 | | void pn_link_detach(pn_link_t *link) |
351 | 0 | { |
352 | 0 | assert(link); |
353 | 0 | if (link->detached) return; |
354 | | |
355 | 0 | link->detached = true; |
356 | 0 | pn_collector_put_object(link->session->connection->collector, link, PN_LINK_LOCAL_DETACH); |
357 | 0 | pn_modified(link->session->connection, &link->endpoint, true); |
358 | |
|
359 | 0 | } |
360 | | |
361 | | static void pni_terminus_free(pn_terminus_t *terminus) |
362 | 1.01M | { |
363 | 1.01M | pn_free(terminus->address); |
364 | 1.01M | pn_bytes_free(terminus->properties_raw); |
365 | 1.01M | pn_bytes_free(terminus->capabilities_raw); |
366 | 1.01M | pn_bytes_free(terminus->outcomes_raw); |
367 | 1.01M | pn_bytes_free(terminus->filter_raw); |
368 | 1.01M | pn_free(terminus->properties); |
369 | 1.01M | pn_free(terminus->capabilities); |
370 | 1.01M | pn_free(terminus->outcomes); |
371 | 1.01M | pn_free(terminus->filter); |
372 | 1.01M | } |
373 | | |
374 | | void pn_link_free(pn_link_t *link) |
375 | 254k | { |
376 | 254k | assert(!link->endpoint.freed); |
377 | 254k | pni_remove_link(link->session, link); |
378 | 254k | pn_list_add(link->session->freed, link); |
379 | 254k | pn_delivery_t *delivery = link->unsettled_head; |
380 | 257k | while (delivery) { |
381 | 2.14k | pn_delivery_t *next = delivery->unsettled_next; |
382 | 2.14k | pn_delivery_settle(delivery); |
383 | 2.14k | delivery = next; |
384 | 2.14k | } |
385 | 254k | link->endpoint.freed = true; |
386 | 254k | pn_ep_decref(&link->endpoint); |
387 | | |
388 | | // the finalize logic depends on endpoint.freed (modified above), so |
389 | | // we incref/decref to give it a chance to rerun |
390 | 254k | pn_incref(link); |
391 | 254k | pn_decref(link); |
392 | 254k | } |
393 | | |
394 | | void *pn_link_get_context(pn_link_t *link) |
395 | 0 | { |
396 | 0 | assert(link); |
397 | 0 | return pn_record_get(link->context, PN_LEGCTX); |
398 | 0 | } |
399 | | |
400 | | void pn_link_set_context(pn_link_t *link, void *context) |
401 | 0 | { |
402 | 0 | assert(link); |
403 | 0 | pn_record_set(link->context, PN_LEGCTX, context); |
404 | 0 | } |
405 | | |
406 | | pn_record_t *pn_link_attachments(pn_link_t *link) |
407 | 0 | { |
408 | 0 | assert(link); |
409 | 0 | return link->context; |
410 | 0 | } |
411 | | |
412 | | void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn) |
413 | 541k | { |
414 | 541k | endpoint->type = (pn_endpoint_type_t) type; |
415 | 541k | endpoint->referenced = true; |
416 | 541k | endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; |
417 | 541k | pn_condition_init(&endpoint->condition); |
418 | 541k | pn_condition_init(&endpoint->remote_condition); |
419 | 541k | endpoint->endpoint_next = NULL; |
420 | 541k | endpoint->endpoint_prev = NULL; |
421 | 541k | endpoint->transport_next = NULL; |
422 | 541k | endpoint->transport_prev = NULL; |
423 | 541k | endpoint->modified = false; |
424 | 541k | endpoint->freed = false; |
425 | 541k | endpoint->refcount = 1; |
426 | | //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint); |
427 | | |
428 | 541k | LL_ADD(conn, endpoint, endpoint); |
429 | 541k | } |
430 | | |
431 | | void pn_ep_incref(pn_endpoint_t *endpoint) |
432 | 1.08M | { |
433 | 1.08M | endpoint->refcount++; |
434 | 1.08M | } |
435 | | |
436 | 88.7k | static pn_event_type_t pn_final_type(pn_endpoint_type_t type) { |
437 | 88.7k | switch (type) { |
438 | 18.2k | case CONNECTION: |
439 | 18.2k | return PN_CONNECTION_FINAL; |
440 | 32.3k | case SESSION: |
441 | 32.3k | return PN_SESSION_FINAL; |
442 | 2.56k | case SENDER: |
443 | 38.1k | case RECEIVER: |
444 | 38.1k | return PN_LINK_FINAL; |
445 | 0 | default: |
446 | 0 | assert(false); |
447 | 0 | return PN_EVENT_NONE; |
448 | 88.7k | } |
449 | 88.7k | } |
450 | | |
451 | 1.31M | static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) { |
452 | 1.31M | switch (endpoint->type) { |
453 | 0 | case CONNECTION: |
454 | 0 | return NULL; |
455 | 802k | case SESSION: |
456 | 802k | return &((pn_session_t *) endpoint)->connection->endpoint; |
457 | 97.6k | case SENDER: |
458 | 513k | case RECEIVER: |
459 | 513k | return &((pn_link_t *) endpoint)->session->endpoint; |
460 | 0 | default: |
461 | 0 | assert(false); |
462 | 0 | return NULL; |
463 | 1.31M | } |
464 | 1.31M | } |
465 | | |
466 | | void pn_ep_decref(pn_endpoint_t *endpoint) |
467 | 1.17M | { |
468 | 1.17M | assert(endpoint->refcount > 0); |
469 | 1.17M | endpoint->refcount--; |
470 | 1.17M | if (endpoint->refcount == 0) { |
471 | 88.7k | pn_connection_t *conn = pni_ep_get_connection(endpoint); |
472 | 88.7k | pn_collector_put_object(conn->collector, endpoint, pn_final_type((pn_endpoint_type_t) endpoint->type)); |
473 | 88.7k | } |
474 | 1.17M | } |
475 | | |
476 | | static void pni_endpoint_tini(pn_endpoint_t *endpoint) |
477 | 541k | { |
478 | 541k | pn_condition_tini(&endpoint->remote_condition); |
479 | 541k | pn_condition_tini(&endpoint->condition); |
480 | 541k | } |
481 | | |
482 | | static void pni_free_children(pn_list_t *children, pn_list_t *freed) |
483 | 286k | { |
484 | 286k | while (pn_list_size(children) > 0) { |
485 | 0 | pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0); |
486 | 0 | assert(!endpoint->referenced); |
487 | 0 | pn_free(endpoint); |
488 | 0 | } |
489 | | |
490 | 286k | while (pn_list_size(freed) > 0) { |
491 | 0 | pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0); |
492 | 0 | assert(!endpoint->referenced); |
493 | 0 | pn_free(endpoint); |
494 | 0 | } |
495 | | |
496 | 286k | pn_free(children); |
497 | 286k | pn_free(freed); |
498 | 286k | } |
499 | | |
500 | | static void pn_connection_finalize(void *object) |
501 | 18.2k | { |
502 | 18.2k | pn_connection_t *conn = (pn_connection_t *) object; |
503 | 18.2k | pn_endpoint_t *endpoint = &conn->endpoint; |
504 | | |
505 | 18.2k | if (conn->transport) { |
506 | 0 | assert(!conn->transport->referenced); |
507 | 0 | pn_free(conn->transport); |
508 | 0 | } |
509 | | |
510 | | // freeing the transport could post events |
511 | 18.2k | if (pn_refcount(conn) > 0) { |
512 | 0 | return; |
513 | 0 | } |
514 | | |
515 | 18.2k | pni_free_children(conn->sessions, conn->freed); |
516 | 18.2k | pn_free(conn->context); |
517 | 18.2k | pn_decref(conn->collector); |
518 | | |
519 | 18.2k | pn_free(conn->container); |
520 | 18.2k | pn_free(conn->hostname); |
521 | 18.2k | pn_free(conn->auth_user); |
522 | 18.2k | pn_free(conn->authzid); |
523 | 18.2k | pn_free(conn->auth_password); |
524 | 18.2k | pn_bytes_free(conn->offered_capabilities_raw); |
525 | 18.2k | pn_bytes_free(conn->desired_capabilities_raw); |
526 | 18.2k | pn_bytes_free(conn->properties_raw); |
527 | 18.2k | pn_free(conn->offered_capabilities); |
528 | 18.2k | pn_free(conn->desired_capabilities); |
529 | 18.2k | pn_free(conn->properties); |
530 | 18.2k | pn_free(conn->remote_offered_capabilities); |
531 | 18.2k | pn_free(conn->remote_desired_capabilities); |
532 | 18.2k | pn_free(conn->remote_properties); |
533 | 18.2k | pni_endpoint_tini(endpoint); |
534 | 18.2k | pn_free(conn->delivery_pool); |
535 | 18.2k | } |
536 | | |
537 | 18.2k | #define pn_connection_initialize NULL |
538 | 18.2k | #define pn_connection_hashcode NULL |
539 | 18.2k | #define pn_connection_compare NULL |
540 | 18.2k | #define pn_connection_inspect NULL |
541 | | |
542 | | pn_connection_t *pn_connection(void) |
543 | 18.2k | { |
544 | 18.2k | static const pn_class_t clazz = PN_CLASS(pn_connection); |
545 | 18.2k | pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); |
546 | 18.2k | if (!conn) return NULL; |
547 | | |
548 | 18.2k | conn->endpoint_head = NULL; |
549 | 18.2k | conn->endpoint_tail = NULL; |
550 | 18.2k | pn_endpoint_init(&conn->endpoint, CONNECTION, conn); |
551 | 18.2k | conn->transport_head = NULL; |
552 | 18.2k | conn->transport_tail = NULL; |
553 | 18.2k | conn->sessions = pn_list(PN_WEAKREF, 0); |
554 | 18.2k | conn->freed = pn_list(PN_WEAKREF, 0); |
555 | 18.2k | conn->transport = NULL; |
556 | 18.2k | conn->work_head = NULL; |
557 | 18.2k | conn->work_tail = NULL; |
558 | 18.2k | conn->tpwork_head = NULL; |
559 | 18.2k | conn->tpwork_tail = NULL; |
560 | 18.2k | conn->container = pn_string(NULL); |
561 | 18.2k | conn->hostname = pn_string(NULL); |
562 | 18.2k | conn->auth_user = pn_string(NULL); |
563 | 18.2k | conn->authzid = pn_string(NULL); |
564 | 18.2k | conn->auth_password = pn_string(NULL); |
565 | 18.2k | conn->offered_capabilities_raw = (pn_bytes_t){0, NULL}; |
566 | 18.2k | conn->desired_capabilities_raw = (pn_bytes_t){0, NULL}; |
567 | 18.2k | conn->properties_raw = (pn_bytes_t){0, NULL}; |
568 | 18.2k | conn->offered_capabilities = NULL; |
569 | 18.2k | conn->desired_capabilities = NULL; |
570 | 18.2k | conn->properties = NULL; |
571 | 18.2k | conn->remote_offered_capabilities = NULL; |
572 | 18.2k | conn->remote_desired_capabilities = NULL; |
573 | 18.2k | conn->remote_properties = NULL; |
574 | 18.2k | conn->collector = NULL; |
575 | 18.2k | conn->context = pn_record(); |
576 | 18.2k | conn->delivery_pool = pn_list(&PN_CLASSCLASS(pn_delivery), 0); |
577 | 18.2k | conn->driver = NULL; |
578 | | |
579 | 18.2k | return conn; |
580 | 18.2k | } |
581 | | |
582 | | static const pn_event_type_t endpoint_init_event_map[] = { |
583 | | PN_CONNECTION_INIT, /* CONNECTION */ |
584 | | PN_SESSION_INIT, /* SESSION */ |
585 | | PN_LINK_INIT, /* SENDER */ |
586 | | PN_LINK_INIT}; /* RECEIVER */ |
587 | | |
588 | | void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector) |
589 | 36.5k | { |
590 | 36.5k | pn_decref(connection->collector); |
591 | 36.5k | connection->collector = collector; |
592 | 36.5k | pn_incref(connection->collector); |
593 | 36.5k | pn_endpoint_t *endpoint = connection->endpoint_head; |
594 | 596k | while (endpoint) { |
595 | 559k | pn_collector_put_object(connection->collector, endpoint, endpoint_init_event_map[endpoint->type]); |
596 | 559k | endpoint = endpoint->endpoint_next; |
597 | 559k | } |
598 | 36.5k | } |
599 | | |
600 | 0 | pn_collector_t* pn_connection_collector(pn_connection_t *connection) { |
601 | 0 | return connection->collector; |
602 | 0 | } |
603 | | |
604 | | pn_state_t pn_connection_state(pn_connection_t *connection) |
605 | 0 | { |
606 | 0 | return connection ? connection->endpoint.state : 0; |
607 | 0 | } |
608 | | |
609 | | const char *pn_connection_get_container(pn_connection_t *connection) |
610 | 0 | { |
611 | 0 | assert(connection); |
612 | 0 | return pn_string_get(connection->container); |
613 | 0 | } |
614 | | |
615 | | void pn_connection_set_container(pn_connection_t *connection, const char *container) |
616 | 18.2k | { |
617 | 18.2k | assert(connection); |
618 | 18.2k | pn_string_set(connection->container, container); |
619 | 18.2k | } |
620 | | |
621 | | const char *pn_connection_get_hostname(pn_connection_t *connection) |
622 | 0 | { |
623 | 0 | assert(connection); |
624 | 0 | return pn_string_get(connection->hostname); |
625 | 0 | } |
626 | | |
627 | | void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname) |
628 | 0 | { |
629 | 0 | assert(connection); |
630 | 0 | pn_string_set(connection->hostname, hostname); |
631 | 0 | } |
632 | | |
633 | | const char *pn_connection_get_user(pn_connection_t *connection) |
634 | 0 | { |
635 | 0 | assert(connection); |
636 | 0 | return pn_string_get(connection->auth_user); |
637 | 0 | } |
638 | | |
639 | | void pn_connection_set_user(pn_connection_t *connection, const char *user) |
640 | 0 | { |
641 | 0 | assert(connection); |
642 | 0 | pn_string_set(connection->auth_user, user); |
643 | 0 | } |
644 | | |
645 | | const char *pn_connection_get_authorization(pn_connection_t *connection) |
646 | 0 | { |
647 | 0 | assert(connection); |
648 | 0 | return pn_string_get(connection->authzid); |
649 | 0 | } |
650 | | |
651 | | void pn_connection_set_authorization(pn_connection_t *connection, const char *authzid) |
652 | 0 | { |
653 | 0 | assert(connection); |
654 | 0 | pn_string_set(connection->authzid, authzid); |
655 | 0 | } |
656 | | |
657 | | void pn_connection_set_password(pn_connection_t *connection, const char *password) |
658 | 0 | { |
659 | 0 | assert(connection); |
660 | | // Make sure the previous password is erased, if there was one. |
661 | 0 | size_t n = pn_string_size(connection->auth_password); |
662 | 0 | const char* s = pn_string_get(connection->auth_password); |
663 | 0 | if (n > 0 && s) memset((void*)s, 0, n); |
664 | 0 | pn_string_set(connection->auth_password, password); |
665 | 0 | } |
666 | | |
667 | | pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection) |
668 | 0 | { |
669 | 0 | assert(connection); |
670 | 0 | pni_switch_to_data(&connection->offered_capabilities_raw, &connection->offered_capabilities); |
671 | 0 | return connection->offered_capabilities; |
672 | 0 | } |
673 | | |
674 | | pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection) |
675 | 0 | { |
676 | 0 | assert(connection); |
677 | 0 | pni_switch_to_data(&connection->desired_capabilities_raw, &connection->desired_capabilities); |
678 | 0 | return connection->desired_capabilities; |
679 | 0 | } |
680 | | |
681 | | pn_data_t *pn_connection_properties(pn_connection_t *connection) |
682 | 0 | { |
683 | 0 | assert(connection); |
684 | 0 | pni_switch_to_data(&connection->properties_raw, &connection->properties); |
685 | 0 | return connection->properties; |
686 | 0 | } |
687 | | |
688 | | pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection) |
689 | 0 | { |
690 | 0 | assert(connection); |
691 | 0 | if (!connection->transport) |
692 | 0 | return NULL; |
693 | 0 | pni_switch_to_data(&connection->transport->remote_offered_capabilities_raw, &connection->remote_offered_capabilities); |
694 | 0 | return connection->remote_offered_capabilities; |
695 | 0 | } |
696 | | |
697 | | pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection) |
698 | 0 | { |
699 | 0 | assert(connection); |
700 | 0 | if (!connection->transport) |
701 | 0 | return NULL; |
702 | 0 | pni_switch_to_data(&connection->transport->remote_desired_capabilities_raw, &connection->remote_desired_capabilities); |
703 | 0 | return connection->remote_desired_capabilities; |
704 | 0 | } |
705 | | |
706 | | pn_data_t *pn_connection_remote_properties(pn_connection_t *connection) |
707 | 0 | { |
708 | 0 | assert(connection); |
709 | 0 | if (!connection->transport) |
710 | 0 | return NULL; |
711 | 0 | pni_switch_to_data(&connection->transport->remote_properties_raw, &connection->remote_properties); |
712 | 0 | return connection->remote_properties; |
713 | 0 | } |
714 | | |
715 | | const char *pn_connection_remote_container(pn_connection_t *connection) |
716 | 0 | { |
717 | 0 | assert(connection); |
718 | 0 | return connection->transport ? connection->transport->remote_container : NULL; |
719 | 0 | } |
720 | | |
721 | | const char *pn_connection_remote_hostname(pn_connection_t *connection) |
722 | 0 | { |
723 | 0 | assert(connection); |
724 | 0 | return connection->transport ? connection->transport->remote_hostname : NULL; |
725 | 0 | } |
726 | | |
727 | | pn_delivery_t *pn_work_head(pn_connection_t *connection) |
728 | 0 | { |
729 | 0 | assert(connection); |
730 | 0 | return connection->work_head; |
731 | 0 | } |
732 | | |
733 | | pn_delivery_t *pn_work_next(pn_delivery_t *delivery) |
734 | 0 | { |
735 | 0 | assert(delivery); |
736 | |
|
737 | 0 | if (delivery->work) |
738 | 0 | return delivery->work_next; |
739 | 0 | else |
740 | 0 | return pn_work_head(delivery->link->session->connection); |
741 | 0 | } |
742 | | |
743 | | static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery) |
744 | 6.38k | { |
745 | 6.38k | if (!delivery->work) |
746 | 3.83k | { |
747 | 3.83k | LL_ADD(connection, work, delivery); |
748 | 3.83k | delivery->work = true; |
749 | 3.83k | } |
750 | 6.38k | } |
751 | | |
752 | | static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery) |
753 | 10.3k | { |
754 | 10.3k | if (delivery->work) |
755 | 3.83k | { |
756 | 3.83k | LL_REMOVE(connection, work, delivery); |
757 | 3.83k | delivery->work = false; |
758 | 3.83k | } |
759 | 10.3k | } |
760 | | |
761 | | void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery) |
762 | 16.7k | { |
763 | 16.7k | pn_link_t *link = pn_delivery_link(delivery); |
764 | 16.7k | pn_delivery_t *current = pn_link_current(link); |
765 | 16.7k | if (delivery->updated && !delivery->local.settled) { |
766 | 3.39k | pni_add_work(connection, delivery); |
767 | 13.3k | } else if (delivery == current) { |
768 | 3.02k | if (link->endpoint.type == SENDER) { |
769 | 32 | if (pn_link_credit(link) > 0) { |
770 | 0 | pni_add_work(connection, delivery); |
771 | 32 | } else { |
772 | 32 | pni_clear_work(connection, delivery); |
773 | 32 | } |
774 | 2.99k | } else { |
775 | 2.99k | pni_add_work(connection, delivery); |
776 | 2.99k | } |
777 | 10.3k | } else { |
778 | 10.3k | pni_clear_work(connection, delivery); |
779 | 10.3k | } |
780 | 16.7k | } |
781 | | |
782 | | static void pni_add_tpwork(pn_delivery_t *delivery) |
783 | 7.64k | { |
784 | 7.64k | pn_connection_t *connection = delivery->link->session->connection; |
785 | 7.64k | if (!delivery->tpwork) |
786 | 3.86k | { |
787 | 3.86k | LL_ADD(connection, tpwork, delivery); |
788 | 3.86k | delivery->tpwork = true; |
789 | 3.86k | } |
790 | 7.64k | pn_modified(connection, &connection->endpoint, true); |
791 | 7.64k | } |
792 | | |
793 | | void pn_clear_tpwork(pn_delivery_t *delivery) |
794 | 5.57k | { |
795 | 5.57k | pn_connection_t *connection = delivery->link->session->connection; |
796 | 5.57k | if (delivery->tpwork) |
797 | 3.86k | { |
798 | 3.86k | LL_REMOVE(connection, tpwork, delivery); |
799 | 3.86k | delivery->tpwork = false; |
800 | 3.86k | if (pn_refcount(delivery) > 0) { |
801 | 1.71k | pn_incref(delivery); |
802 | 1.71k | pn_decref(delivery); |
803 | 1.71k | } |
804 | 3.86k | } |
805 | 5.57k | } |
806 | | |
807 | | void pn_dump(pn_connection_t *conn) |
808 | 0 | { |
809 | 0 | pn_endpoint_t *endpoint = conn->transport_head; |
810 | 0 | while (endpoint) |
811 | 0 | { |
812 | 0 | printf("%p", (void *) endpoint); |
813 | 0 | endpoint = endpoint->transport_next; |
814 | 0 | if (endpoint) |
815 | 0 | printf(" -> "); |
816 | 0 | } |
817 | 0 | printf("\n"); |
818 | 0 | } |
819 | | |
820 | | void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit) |
821 | 635k | { |
822 | 635k | if (!endpoint->modified) { |
823 | 597k | LL_ADD(connection, transport, endpoint); |
824 | 597k | endpoint->modified = true; |
825 | 597k | } |
826 | | |
827 | 635k | if (emit && connection->transport) { |
828 | 560k | pn_collector_put_object(connection->collector, connection->transport, |
829 | 560k | PN_TRANSPORT); |
830 | 560k | } |
831 | 635k | } |
832 | | |
833 | | void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) |
834 | 73.5k | { |
835 | 73.5k | if (endpoint->modified) { |
836 | 73.5k | LL_REMOVE(connection, transport, endpoint); |
837 | 73.5k | endpoint->transport_next = NULL; |
838 | 73.5k | endpoint->transport_prev = NULL; |
839 | 73.5k | endpoint->modified = false; |
840 | 73.5k | } |
841 | 73.5k | } |
842 | | |
843 | | static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) |
844 | 581k | { |
845 | 581k | if (endpoint->type != type) return false; |
846 | | |
847 | 278k | if (!state) return true; |
848 | | |
849 | 0 | int st = endpoint->state; |
850 | 0 | if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0) |
851 | 0 | return st & state; |
852 | 0 | else |
853 | 0 | return st == state; |
854 | 0 | } |
855 | | |
856 | | pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) |
857 | 286k | { |
858 | 559k | while (endpoint) |
859 | 541k | { |
860 | 541k | if (pni_matches(endpoint, type, state)) |
861 | 268k | return endpoint; |
862 | 273k | endpoint = endpoint->endpoint_next; |
863 | 273k | } |
864 | 18.2k | return NULL; |
865 | 286k | } |
866 | | |
867 | | pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state) |
868 | 18.2k | { |
869 | 18.2k | if (conn) |
870 | 18.2k | return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state); |
871 | 0 | else |
872 | 0 | return NULL; |
873 | 18.2k | } |
874 | | |
875 | | pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state) |
876 | 268k | { |
877 | 268k | if (ssn) |
878 | 268k | return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state); |
879 | 0 | else |
880 | 0 | return NULL; |
881 | 268k | } |
882 | | |
883 | | pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state) |
884 | 57 | { |
885 | 57 | if (!conn) return NULL; |
886 | | |
887 | 57 | pn_endpoint_t *endpoint = conn->endpoint_head; |
888 | | |
889 | 171 | while (endpoint) |
890 | 171 | { |
891 | 171 | if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) |
892 | 57 | return (pn_link_t *) endpoint; |
893 | 114 | endpoint = endpoint->endpoint_next; |
894 | 114 | } |
895 | | |
896 | 0 | return NULL; |
897 | 57 | } |
898 | | |
899 | | pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state) |
900 | 9.75k | { |
901 | 9.75k | if (!link) return NULL; |
902 | | |
903 | 9.75k | pn_endpoint_t *endpoint = link->endpoint.endpoint_next; |
904 | | |
905 | 20.2k | while (endpoint) |
906 | 20.1k | { |
907 | 20.1k | if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) |
908 | 9.69k | return (pn_link_t *) endpoint; |
909 | 10.4k | endpoint = endpoint->endpoint_next; |
910 | 10.4k | } |
911 | | |
912 | 57 | return NULL; |
913 | 9.75k | } |
914 | | |
915 | | static void pn_session_incref(void *object) |
916 | 1.32M | { |
917 | 1.32M | pn_session_t *session = (pn_session_t *) object; |
918 | 1.32M | if (!session->endpoint.referenced) { |
919 | 533k | session->endpoint.referenced = true; |
920 | 533k | pn_incref(session->connection); |
921 | 792k | } else { |
922 | 792k | pn_object_incref(object); |
923 | 792k | } |
924 | 1.32M | } |
925 | | |
926 | | static bool pn_ep_bound(pn_endpoint_t *endpoint) |
927 | 523k | { |
928 | 523k | pn_connection_t *conn = pni_ep_get_connection(endpoint); |
929 | 523k | pn_session_t *ssn; |
930 | 523k | pn_link_t *lnk; |
931 | | |
932 | 523k | if (!conn->transport) return false; |
933 | 0 | if (endpoint->modified) return true; |
934 | | |
935 | 0 | switch (endpoint->type) { |
936 | 0 | case CONNECTION: |
937 | 0 | return ((pn_connection_t *)endpoint)->transport; |
938 | 0 | case SESSION: |
939 | 0 | ssn = (pn_session_t *) endpoint; |
940 | 0 | return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0); |
941 | 0 | case SENDER: |
942 | 0 | case RECEIVER: |
943 | 0 | lnk = (pn_link_t *) endpoint; |
944 | 0 | return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0; |
945 | 0 | default: |
946 | 0 | assert(false); |
947 | 0 | return false; |
948 | 0 | } |
949 | 0 | } |
950 | | |
951 | 1.33M | static bool pni_connection_live(pn_connection_t *conn) { |
952 | 1.33M | return pn_refcount(conn) > 1; |
953 | 1.33M | } |
954 | | |
955 | 526k | static bool pni_session_live(pn_session_t *ssn) { |
956 | 526k | return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1; |
957 | 526k | } |
958 | | |
959 | 13.2k | static bool pni_link_live(pn_link_t *link) { |
960 | 13.2k | return pni_session_live(link->session) || pn_refcount(link) > 1; |
961 | 13.2k | } |
962 | | |
963 | 1.31M | static bool pni_endpoint_live(pn_endpoint_t *endpoint) { |
964 | 1.31M | switch (endpoint->type) { |
965 | 802k | case CONNECTION: |
966 | 802k | return pni_connection_live((pn_connection_t *)endpoint); |
967 | 513k | case SESSION: |
968 | 513k | return pni_session_live((pn_session_t *) endpoint); |
969 | 0 | case SENDER: |
970 | 0 | case RECEIVER: |
971 | 0 | return pni_link_live((pn_link_t *) endpoint); |
972 | 0 | default: |
973 | 0 | assert(false); |
974 | 0 | return false; |
975 | 1.31M | } |
976 | 1.31M | } |
977 | | |
978 | | static bool pni_preserve_child(pn_endpoint_t *endpoint) |
979 | 1.31M | { |
980 | 1.31M | pn_connection_t *conn = pni_ep_get_connection(endpoint); |
981 | 1.31M | pn_endpoint_t *parent = pn_ep_parent(endpoint); |
982 | 1.31M | if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint))) |
983 | 1.31M | && endpoint->referenced) { |
984 | 792k | pn_object_incref(endpoint); |
985 | 792k | endpoint->referenced = false; |
986 | 792k | pn_decref(parent); |
987 | 792k | return true; |
988 | 792k | } else { |
989 | 523k | LL_REMOVE(conn, transport, endpoint); |
990 | 523k | return false; |
991 | 523k | } |
992 | 1.31M | } |
993 | | |
994 | | static void pn_session_finalize(void *object) |
995 | 802k | { |
996 | 802k | pn_session_t *session = (pn_session_t *) object; |
997 | 802k | pn_endpoint_t *endpoint = &session->endpoint; |
998 | | |
999 | 802k | if (pni_preserve_child(endpoint)) { |
1000 | 533k | return; |
1001 | 533k | } |
1002 | | |
1003 | 268k | pn_free(session->context); |
1004 | 268k | pni_free_children(session->links, session->freed); |
1005 | 268k | pni_endpoint_tini(endpoint); |
1006 | 268k | pn_delivery_map_free(&session->state.incoming); |
1007 | 268k | pn_delivery_map_free(&session->state.outgoing); |
1008 | 268k | pn_free(session->state.local_handles); |
1009 | 268k | pn_free(session->state.remote_handles); |
1010 | 268k | pni_remove_session(session->connection, session); |
1011 | 268k | pn_list_remove(session->connection->freed, session); |
1012 | | |
1013 | 268k | if (session->connection->transport) { |
1014 | 0 | pn_transport_t *transport = session->connection->transport; |
1015 | 0 | pn_hash_del(transport->local_channels, session->state.local_channel); |
1016 | 0 | pn_hash_del(transport->remote_channels, session->state.remote_channel); |
1017 | 0 | } |
1018 | | |
1019 | 268k | if (endpoint->referenced) { |
1020 | 268k | pn_decref(session->connection); |
1021 | 268k | } |
1022 | 268k | } |
1023 | | |
1024 | 268k | #define pn_session_new NULL |
1025 | 268k | #define pn_session_refcount NULL |
1026 | 268k | #define pn_session_decref NULL |
1027 | 268k | #define pn_session_initialize NULL |
1028 | 268k | #define pn_session_hashcode NULL |
1029 | 268k | #define pn_session_compare NULL |
1030 | 268k | #define pn_session_inspect NULL |
1031 | | |
1032 | | pn_session_t *pn_session(pn_connection_t *conn) |
1033 | 268k | { |
1034 | 268k | assert(conn); |
1035 | 268k | #define pn_session_free NULL |
1036 | 268k | static const pn_class_t clazz = PN_METACLASS(pn_session); |
1037 | 268k | #undef pn_session_free |
1038 | 268k | pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t)); |
1039 | 268k | if (!ssn) return NULL; |
1040 | 268k | pn_endpoint_init(&ssn->endpoint, SESSION, conn); |
1041 | 268k | pni_add_session(conn, ssn); |
1042 | 268k | ssn->links = pn_list(PN_WEAKREF, 0); |
1043 | 268k | ssn->freed = pn_list(PN_WEAKREF, 0); |
1044 | 268k | ssn->context = pn_record(); |
1045 | 268k | ssn->incoming_capacity = 0; |
1046 | 268k | ssn->incoming_bytes = 0; |
1047 | 268k | ssn->outgoing_bytes = 0; |
1048 | 268k | ssn->incoming_deliveries = 0; |
1049 | 268k | ssn->outgoing_deliveries = 0; |
1050 | 268k | ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE; |
1051 | 268k | ssn->local_handle_max = PN_IMPL_HANDLE_MAX; |
1052 | 268k | ssn->incoming_window_lwm = 1; |
1053 | 268k | ssn->check_flow = false; |
1054 | 268k | ssn->need_flow = false; |
1055 | 268k | ssn->lwm_default = true; |
1056 | | |
1057 | | // begin transport state |
1058 | 268k | memset(&ssn->state, 0, sizeof(ssn->state)); |
1059 | 268k | ssn->state.remote_handle_max = UINT32_MAX; |
1060 | 268k | ssn->state.local_channel = (uint16_t)-1; |
1061 | 268k | ssn->state.remote_channel = (uint16_t)-1; |
1062 | 268k | pn_delivery_map_init(&ssn->state.incoming, 0); |
1063 | 268k | pn_delivery_map_init(&ssn->state.outgoing, 0); |
1064 | 268k | ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75); |
1065 | 268k | ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75); |
1066 | | // end transport state |
1067 | | |
1068 | 268k | pn_collector_put_object(conn->collector, ssn, PN_SESSION_INIT); |
1069 | 268k | if (conn->transport) { |
1070 | 250k | pni_session_bound(ssn); |
1071 | 250k | } |
1072 | 268k | pn_decref(ssn); |
1073 | 268k | return ssn; |
1074 | 268k | } |
1075 | | |
1076 | | static void pni_session_bound(pn_session_t *ssn) |
1077 | 268k | { |
1078 | 268k | assert(ssn); |
1079 | 268k | size_t nlinks = pn_list_size(ssn->links); |
1080 | 286k | for (size_t i = 0; i < nlinks; i++) { |
1081 | 18.2k | pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i)); |
1082 | 18.2k | } |
1083 | 268k | } |
1084 | | |
1085 | | void pn_session_unbound(pn_session_t* ssn) |
1086 | 36.7k | { |
1087 | 36.7k | assert(ssn); |
1088 | 36.7k | ssn->state.local_channel = (uint16_t)-1; |
1089 | 36.7k | ssn->state.remote_channel = (uint16_t)-1; |
1090 | 36.7k | ssn->incoming_bytes = 0; |
1091 | 36.7k | ssn->outgoing_bytes = 0; |
1092 | 36.7k | ssn->incoming_deliveries = 0; |
1093 | 36.7k | ssn->outgoing_deliveries = 0; |
1094 | 36.7k | } |
1095 | | |
1096 | | size_t pn_session_get_incoming_capacity(pn_session_t *ssn) |
1097 | 0 | { |
1098 | 0 | assert(ssn); |
1099 | 0 | return ssn->incoming_capacity; |
1100 | 0 | } |
1101 | | |
1102 | | // Update required when (re)set by user or when session started (proxy: BEGIN frame). No |
1103 | | // session flow control actually means flow control with huge window, so set lwm to 1. There is |
1104 | | // low probability of a stall. Any link credit flow frame will update session credit too. |
1105 | 0 | void pni_session_update_incoming_lwm(pn_session_t *ssn) { |
1106 | 0 | if (ssn->incoming_capacity) { |
1107 | | // Old API. |
1108 | 0 | if (!ssn->connection->transport) |
1109 | 0 | return; // Defer until called again from BEGIN frame setup with max frame known. |
1110 | 0 | if (ssn->connection->transport->local_max_frame) { |
1111 | 0 | ssn->incoming_window_lwm = (ssn->incoming_capacity / ssn->connection->transport->local_max_frame) / 2; |
1112 | 0 | if (!ssn->incoming_window_lwm) |
1113 | 0 | ssn->incoming_window_lwm = 1; // Zero may hang. |
1114 | 0 | } else { |
1115 | 0 | ssn->incoming_window_lwm = 1; |
1116 | 0 | } |
1117 | 0 | } else if (ssn->max_incoming_window) { |
1118 | | // New API. |
1119 | | // Only need to deal with default. Called whensending BEGIN frame. |
1120 | 0 | if (ssn->connection->transport && ssn->connection->transport->local_max_frame && ssn->lwm_default) { |
1121 | 0 | ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2; |
1122 | 0 | } |
1123 | 0 | } else { |
1124 | 0 | ssn->incoming_window_lwm = 1; |
1125 | 0 | } |
1126 | 0 | assert(ssn->incoming_window_lwm != 0); // 0 allows session flow to hang |
1127 | 0 | } |
1128 | | |
1129 | | void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity) |
1130 | 0 | { |
1131 | 0 | assert(ssn); |
1132 | 0 | ssn->incoming_capacity = capacity; |
1133 | 0 | ssn->max_incoming_window = 0; |
1134 | 0 | ssn->incoming_window_lwm = 1; |
1135 | 0 | ssn->lwm_default = true; |
1136 | 0 | if (ssn->connection->transport) { |
1137 | 0 | ssn->check_flow = true; |
1138 | 0 | ssn->need_flow = true; |
1139 | 0 | pn_modified(ssn->connection, &ssn->endpoint, false); |
1140 | 0 | } |
1141 | 0 | pni_session_update_incoming_lwm(ssn); |
1142 | | // If capacity invalid, failure occurs when transport calculates value of incoming window. |
1143 | 0 | } |
1144 | | |
1145 | 0 | int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t window, pn_frame_count_t lwm) { |
1146 | 0 | assert(ssn); |
1147 | 0 | if (!window || (lwm && lwm > window)) |
1148 | 0 | return PN_ARG_ERR; |
1149 | | // Settings fixed after session open for simplicity. AMPQ actually allows dynamic change with risk |
1150 | | // of overflow if window reduced while transfers in flight. |
1151 | 0 | if (ssn->endpoint.state & PN_LOCAL_ACTIVE) |
1152 | 0 | return PN_STATE_ERR; |
1153 | 0 | ssn->incoming_capacity = 0; |
1154 | 0 | ssn->max_incoming_window = window; |
1155 | 0 | ssn->lwm_default = (lwm == 0); |
1156 | 0 | ssn->incoming_window_lwm = lwm; |
1157 | 0 | return 0; |
1158 | 0 | } |
1159 | | |
1160 | 0 | pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) { |
1161 | 0 | return ssn->max_incoming_window; |
1162 | 0 | } |
1163 | | |
1164 | 0 | pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) { |
1165 | 0 | return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 : ssn->incoming_window_lwm; |
1166 | 0 | } |
1167 | | |
1168 | 0 | pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) { |
1169 | 0 | return ssn->state.remote_incoming_window; |
1170 | 0 | } |
1171 | | |
1172 | | size_t pn_session_get_outgoing_window(pn_session_t *ssn) |
1173 | 0 | { |
1174 | 0 | assert(ssn); |
1175 | 0 | return ssn->outgoing_window; |
1176 | 0 | } |
1177 | | |
1178 | | void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window) |
1179 | 0 | { |
1180 | 0 | assert(ssn); |
1181 | 0 | ssn->outgoing_window = window; |
1182 | 0 | } |
1183 | | |
1184 | | size_t pn_session_outgoing_bytes(pn_session_t *ssn) |
1185 | 0 | { |
1186 | 0 | assert(ssn); |
1187 | 0 | return ssn->outgoing_bytes; |
1188 | 0 | } |
1189 | | |
1190 | | size_t pn_session_incoming_bytes(pn_session_t *ssn) |
1191 | 0 | { |
1192 | 0 | assert(ssn); |
1193 | 0 | return ssn->incoming_bytes; |
1194 | 0 | } |
1195 | | |
1196 | | pn_state_t pn_session_state(pn_session_t *session) |
1197 | 0 | { |
1198 | 0 | return session->endpoint.state; |
1199 | 0 | } |
1200 | | |
1201 | | static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type) |
1202 | 1.01M | { |
1203 | 1.01M | terminus->type = type; |
1204 | 1.01M | terminus->address = pn_string(NULL); |
1205 | 1.01M | terminus->durability = PN_NONDURABLE; |
1206 | 1.01M | terminus->has_expiry_policy = false; |
1207 | 1.01M | terminus->expiry_policy = PN_EXPIRE_WITH_SESSION; |
1208 | 1.01M | terminus->timeout = 0; |
1209 | 1.01M | terminus->dynamic = false; |
1210 | 1.01M | terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED; |
1211 | 1.01M | terminus->properties_raw = (pn_bytes_t){0, NULL}; |
1212 | 1.01M | terminus->capabilities_raw = (pn_bytes_t){0, NULL}; |
1213 | 1.01M | terminus->outcomes_raw = (pn_bytes_t){0, NULL}; |
1214 | 1.01M | terminus->filter_raw = (pn_bytes_t){0, NULL}; |
1215 | 1.01M | terminus->properties = NULL; |
1216 | 1.01M | terminus->capabilities = NULL; |
1217 | 1.01M | terminus->outcomes = NULL; |
1218 | 1.01M | terminus->filter = NULL; |
1219 | 1.01M | } |
1220 | | |
1221 | | static void pn_link_incref(void *object) |
1222 | 781k | { |
1223 | 781k | pn_link_t *link = (pn_link_t *) object; |
1224 | 781k | if (!link->endpoint.referenced) { |
1225 | 258k | link->endpoint.referenced = true; |
1226 | 258k | pn_incref(link->session); |
1227 | 522k | } else { |
1228 | 522k | pn_object_incref(object); |
1229 | 522k | } |
1230 | 781k | } |
1231 | | |
1232 | | static void pn_link_finalize(void *object) |
1233 | 513k | { |
1234 | 513k | pn_link_t *link = (pn_link_t *) object; |
1235 | 513k | pn_endpoint_t *endpoint = &link->endpoint; |
1236 | | |
1237 | 513k | if (pni_preserve_child(endpoint)) { |
1238 | 258k | return; |
1239 | 258k | } |
1240 | | |
1241 | 256k | while (link->unsettled_head) { |
1242 | 1.91k | assert(!link->unsettled_head->referenced); |
1243 | 1.91k | pn_free(link->unsettled_head); |
1244 | 1.91k | } |
1245 | | |
1246 | 254k | pn_free(link->context); |
1247 | 254k | pni_terminus_free(&link->source); |
1248 | 254k | pni_terminus_free(&link->target); |
1249 | 254k | pni_terminus_free(&link->remote_source); |
1250 | 254k | pni_terminus_free(&link->remote_target); |
1251 | 254k | pn_free(link->name); |
1252 | 254k | pni_endpoint_tini(endpoint); |
1253 | 254k | pni_remove_link(link->session, link); |
1254 | 254k | pn_hash_del(link->session->state.local_handles, link->state.local_handle); |
1255 | 254k | pn_hash_del(link->session->state.remote_handles, link->state.remote_handle); |
1256 | 254k | pn_list_remove(link->session->freed, link); |
1257 | 254k | if (endpoint->referenced) { |
1258 | 254k | pn_decref(link->session); |
1259 | 254k | } |
1260 | 254k | pn_free(link->properties); |
1261 | 254k | pn_bytes_free(link->properties_raw); |
1262 | 254k | pn_free(link->remote_properties); |
1263 | 254k | pn_bytes_free(link->remote_properties_raw); |
1264 | 254k | } |
1265 | | |
1266 | 254k | #define pn_link_refcount NULL |
1267 | 254k | #define pn_link_decref NULL |
1268 | 254k | #define pn_link_initialize NULL |
1269 | 254k | #define pn_link_hashcode NULL |
1270 | 254k | #define pn_link_compare NULL |
1271 | 254k | #define pn_link_inspect NULL |
1272 | | |
1273 | | pn_link_t *pn_link_new(int type, pn_session_t *session, pn_string_t *name) |
1274 | 254k | { |
1275 | 254k | #define pn_link_new NULL |
1276 | 254k | #define pn_link_free NULL |
1277 | 254k | static const pn_class_t clazz = PN_METACLASS(pn_link); |
1278 | 254k | #undef pn_link_new |
1279 | 254k | #undef pn_link_free |
1280 | 254k | pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t)); |
1281 | | |
1282 | 254k | pn_endpoint_init(&link->endpoint, type, session->connection); |
1283 | 254k | pni_add_link(session, link); |
1284 | 254k | pn_incref(session); // keep session until link finalized |
1285 | 254k | link->name = name; |
1286 | 254k | pni_terminus_init(&link->source, PN_SOURCE); |
1287 | 254k | pni_terminus_init(&link->target, PN_TARGET); |
1288 | 254k | pni_terminus_init(&link->remote_source, PN_UNSPECIFIED); |
1289 | 254k | pni_terminus_init(&link->remote_target, PN_UNSPECIFIED); |
1290 | 254k | link->unsettled_head = link->unsettled_tail = link->current = NULL; |
1291 | 254k | link->unsettled_count = 0; |
1292 | 254k | link->max_message_size = 0; |
1293 | 254k | link->remote_max_message_size = 0; |
1294 | 254k | link->available = 0; |
1295 | 254k | link->credit = 0; |
1296 | 254k | link->queued = 0; |
1297 | 254k | link->more_id = 0; |
1298 | 254k | link->drain = false; |
1299 | 254k | link->drain_flag_mode = true; |
1300 | 254k | link->drained = 0; |
1301 | 254k | link->context = pn_record(); |
1302 | 254k | link->snd_settle_mode = PN_SND_MIXED; |
1303 | 254k | link->rcv_settle_mode = PN_RCV_FIRST; |
1304 | 254k | link->remote_snd_settle_mode = PN_SND_MIXED; |
1305 | 254k | link->remote_rcv_settle_mode = PN_RCV_FIRST; |
1306 | 254k | link->detached = false; |
1307 | 254k | link->more_pending = false; |
1308 | 254k | link->properties = 0; |
1309 | 254k | link->properties_raw = (pn_bytes_t){0, NULL}; |
1310 | 254k | link->remote_properties = 0; |
1311 | 254k | link->remote_properties_raw = (pn_bytes_t){0, NULL}; |
1312 | | |
1313 | | // begin transport state |
1314 | 254k | link->state.local_handle = -1; |
1315 | 254k | link->state.remote_handle = -1; |
1316 | 254k | link->state.delivery_count = 0; |
1317 | 254k | link->state.link_credit = 0; |
1318 | | // end transport state |
1319 | | |
1320 | 254k | pn_collector_put_object(session->connection->collector, link, PN_LINK_INIT); |
1321 | 254k | if (session->connection->transport) { |
1322 | 236k | pni_link_bound(link); |
1323 | 236k | } |
1324 | 254k | pn_decref(link); |
1325 | 254k | return link; |
1326 | 254k | } |
1327 | | |
1328 | | static void pni_link_bound(pn_link_t *link) |
1329 | 254k | { |
1330 | 254k | } |
1331 | | |
1332 | | void pn_link_unbound(pn_link_t* link) |
1333 | 32.2k | { |
1334 | 32.2k | assert(link); |
1335 | 32.2k | link->state.local_handle = -1; |
1336 | 32.2k | link->state.remote_handle = -1; |
1337 | 32.2k | link->state.delivery_count = 0; |
1338 | 32.2k | link->state.link_credit = 0; |
1339 | 32.2k | } |
1340 | | |
1341 | | pn_terminus_t *pn_link_source(pn_link_t *link) |
1342 | 18.2k | { |
1343 | 18.2k | return link ? &link->source : NULL; |
1344 | 18.2k | } |
1345 | | |
1346 | | pn_terminus_t *pn_link_target(pn_link_t *link) |
1347 | 0 | { |
1348 | 0 | return link ? &link->target : NULL; |
1349 | 0 | } |
1350 | | |
1351 | | pn_terminus_t *pn_link_remote_source(pn_link_t *link) |
1352 | 0 | { |
1353 | 0 | return link ? &link->remote_source : NULL; |
1354 | 0 | } |
1355 | | |
1356 | | pn_terminus_t *pn_link_remote_target(pn_link_t *link) |
1357 | 0 | { |
1358 | 0 | return link ? &link->remote_target : NULL; |
1359 | 0 | } |
1360 | | |
1361 | | int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type) |
1362 | 473k | { |
1363 | 473k | if (!terminus) return PN_ARG_ERR; |
1364 | 473k | terminus->type = type; |
1365 | 473k | return 0; |
1366 | 473k | } |
1367 | | |
1368 | | pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus) |
1369 | 236k | { |
1370 | 236k | return (pn_terminus_type_t) (terminus ? terminus->type : 0); |
1371 | 236k | } |
1372 | | |
1373 | | const char *pn_terminus_get_address(pn_terminus_t *terminus) |
1374 | 0 | { |
1375 | 0 | assert(terminus); |
1376 | 0 | return pn_string_get(terminus->address); |
1377 | 0 | } |
1378 | | |
1379 | | int pn_terminus_set_address(pn_terminus_t *terminus, const char *address) |
1380 | 18.2k | { |
1381 | 18.2k | assert(terminus); |
1382 | 18.2k | return pn_string_set(terminus->address, address); |
1383 | 18.2k | } |
1384 | | |
1385 | | pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus) |
1386 | 0 | { |
1387 | 0 | return (pn_durability_t) (terminus ? terminus->durability : 0); |
1388 | 0 | } |
1389 | | |
1390 | | int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability) |
1391 | 17.7k | { |
1392 | 17.7k | if (!terminus) return PN_ARG_ERR; |
1393 | 17.7k | terminus->durability = durability; |
1394 | 17.7k | return 0; |
1395 | 17.7k | } |
1396 | | |
1397 | | pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus) |
1398 | 0 | { |
1399 | 0 | return (pn_expiry_policy_t) (terminus ? terminus->expiry_policy : 0); |
1400 | 0 | } |
1401 | | |
1402 | | bool pn_terminus_has_expiry_policy(const pn_terminus_t *terminus) |
1403 | 0 | { |
1404 | 0 | return terminus && terminus->has_expiry_policy; |
1405 | 0 | } |
1406 | | |
1407 | | int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy) |
1408 | 286 | { |
1409 | 286 | if (!terminus) return PN_ARG_ERR; |
1410 | 286 | terminus->expiry_policy = expiry_policy; |
1411 | 286 | terminus->has_expiry_policy = true; |
1412 | 286 | return 0; |
1413 | 286 | } |
1414 | | |
1415 | | pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus) |
1416 | 0 | { |
1417 | 0 | return terminus ? terminus->timeout : 0; |
1418 | 0 | } |
1419 | | |
1420 | | int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout) |
1421 | 17.7k | { |
1422 | 17.7k | if (!terminus) return PN_ARG_ERR; |
1423 | 17.7k | terminus->timeout = timeout; |
1424 | 17.7k | return 0; |
1425 | 17.7k | } |
1426 | | |
1427 | | bool pn_terminus_is_dynamic(pn_terminus_t *terminus) |
1428 | 0 | { |
1429 | 0 | return terminus ? terminus->dynamic : false; |
1430 | 0 | } |
1431 | | |
1432 | | int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic) |
1433 | 17.7k | { |
1434 | 17.7k | if (!terminus) return PN_ARG_ERR; |
1435 | 17.7k | terminus->dynamic = dynamic; |
1436 | 17.7k | return 0; |
1437 | 17.7k | } |
1438 | | |
1439 | | pn_data_t *pn_terminus_properties(pn_terminus_t *terminus) |
1440 | 0 | { |
1441 | 0 | if (!terminus) |
1442 | 0 | return NULL; |
1443 | 0 | pni_switch_to_data(&terminus->properties_raw, &terminus->properties); |
1444 | 0 | return terminus->properties; |
1445 | 0 | } |
1446 | | |
1447 | | pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus) |
1448 | 0 | { |
1449 | 0 | if (!terminus) |
1450 | 0 | return NULL; |
1451 | 0 | pni_switch_to_data(&terminus->capabilities_raw, &terminus->capabilities); |
1452 | 0 | return terminus->capabilities; |
1453 | 0 | } |
1454 | | |
1455 | | pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus) |
1456 | 0 | { |
1457 | 0 | if (!terminus) |
1458 | 0 | return NULL; |
1459 | 0 | pni_switch_to_data(&terminus->outcomes_raw, &terminus->outcomes); |
1460 | 0 | return terminus->outcomes; |
1461 | 0 | } |
1462 | | |
1463 | | pn_data_t *pn_terminus_filter(pn_terminus_t *terminus) |
1464 | 0 | { |
1465 | 0 | if (!terminus) |
1466 | 0 | return NULL; |
1467 | 0 | pni_switch_to_data(&terminus->filter_raw, &terminus->filter); |
1468 | 0 | return terminus->filter; |
1469 | 0 | } |
1470 | | |
1471 | | pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus) |
1472 | 0 | { |
1473 | 0 | return terminus ? (pn_distribution_mode_t) terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED; |
1474 | 0 | } |
1475 | | |
1476 | | int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m) |
1477 | 13.9k | { |
1478 | 13.9k | if (!terminus) return PN_ARG_ERR; |
1479 | 13.9k | terminus->distribution_mode = m; |
1480 | 13.9k | return 0; |
1481 | 13.9k | } |
1482 | | |
1483 | | int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src) |
1484 | 0 | { |
1485 | 0 | if (!terminus || !src) { |
1486 | 0 | return PN_ARG_ERR; |
1487 | 0 | } |
1488 | | |
1489 | 0 | terminus->type = src->type; |
1490 | 0 | int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src)); |
1491 | 0 | if (err) return err; |
1492 | 0 | terminus->durability = src->durability; |
1493 | 0 | terminus->has_expiry_policy = src->has_expiry_policy; |
1494 | 0 | terminus->expiry_policy = src->expiry_policy; |
1495 | 0 | terminus->timeout = src->timeout; |
1496 | 0 | terminus->dynamic = src->dynamic; |
1497 | 0 | terminus->distribution_mode = src->distribution_mode; |
1498 | 0 | pn_bytes_free(terminus->properties_raw); |
1499 | 0 | terminus->properties_raw = pn_bytes_dup(src->properties_raw); |
1500 | 0 | pn_bytes_free(terminus->capabilities_raw); |
1501 | 0 | terminus->capabilities_raw = pn_bytes_dup(src->capabilities_raw); |
1502 | 0 | pn_bytes_free(terminus->outcomes_raw); |
1503 | 0 | terminus->outcomes_raw = pn_bytes_dup(src->outcomes_raw); |
1504 | 0 | pn_bytes_free(terminus->filter_raw); |
1505 | 0 | terminus->filter_raw = pn_bytes_dup(src->filter_raw); |
1506 | 0 | if (!src->properties) { |
1507 | 0 | pn_free(terminus->properties); |
1508 | 0 | terminus->properties = NULL; |
1509 | 0 | } else { |
1510 | 0 | if (!terminus->properties) terminus->properties = pn_data(0); |
1511 | 0 | err = pn_data_copy(terminus->properties, src->properties); |
1512 | 0 | if (err) return err; |
1513 | 0 | } |
1514 | 0 | if (!src->capabilities) { |
1515 | 0 | pn_free(terminus->capabilities); |
1516 | 0 | terminus->capabilities = NULL; |
1517 | 0 | } else { |
1518 | 0 | if (!terminus->capabilities) terminus->capabilities = pn_data(0); |
1519 | 0 | err = pn_data_copy(terminus->capabilities, src->capabilities); |
1520 | 0 | if (err) return err; |
1521 | 0 | } |
1522 | 0 | if (!src->outcomes) { |
1523 | 0 | pn_free(terminus->outcomes); |
1524 | 0 | terminus->outcomes = NULL; |
1525 | 0 | } else { |
1526 | 0 | if (!terminus->outcomes) terminus->outcomes = pn_data(0); |
1527 | 0 | err = pn_data_copy(terminus->outcomes, src->outcomes); |
1528 | 0 | if (err) return err; |
1529 | 0 | } |
1530 | 0 | if (!src->filter) { |
1531 | 0 | pn_free(terminus->filter); |
1532 | 0 | terminus->filter = NULL; |
1533 | 0 | } else { |
1534 | 0 | if (!terminus->filter) terminus->filter = pn_data(0); |
1535 | 0 | err = pn_data_copy(terminus->filter, src->filter); |
1536 | 0 | if (err) return err; |
1537 | 0 | } |
1538 | 0 | return 0; |
1539 | 0 | } |
1540 | | |
1541 | | pn_link_t *pn_sender(pn_session_t *session, const char *name) |
1542 | 0 | { |
1543 | 0 | return pn_link_new(SENDER, session, pn_string(name)); |
1544 | 0 | } |
1545 | | |
1546 | | pn_link_t *pn_receiver(pn_session_t *session, const char *name) |
1547 | 18.2k | { |
1548 | 18.2k | return pn_link_new(RECEIVER, session, pn_string(name)); |
1549 | 18.2k | } |
1550 | | |
1551 | | pn_state_t pn_link_state(pn_link_t *link) |
1552 | 0 | { |
1553 | 0 | return link->endpoint.state; |
1554 | 0 | } |
1555 | | |
1556 | | const char *pn_link_name(pn_link_t *link) |
1557 | 0 | { |
1558 | 0 | assert(link); |
1559 | 0 | return pn_string_get(link->name); |
1560 | 0 | } |
1561 | | |
1562 | | bool pn_link_is_sender(pn_link_t *link) |
1563 | 15.3k | { |
1564 | 15.3k | return link->endpoint.type == SENDER; |
1565 | 15.3k | } |
1566 | | |
1567 | | bool pn_link_is_receiver(pn_link_t *link) |
1568 | 4.77k | { |
1569 | 4.77k | return link->endpoint.type == RECEIVER; |
1570 | 4.77k | } |
1571 | | |
1572 | | pn_session_t *pn_link_session(pn_link_t *link) |
1573 | 5.34k | { |
1574 | 5.34k | assert(link); |
1575 | 5.34k | return link->session; |
1576 | 5.34k | } |
1577 | | |
1578 | | static void pn_disposition_finalize(pn_disposition_t *ds) |
1579 | 7.55k | { |
1580 | 7.55k | pn_free(ds->data); |
1581 | 7.55k | pn_bytes_free(ds->data_raw); |
1582 | 7.55k | pn_free(ds->annotations); |
1583 | 7.55k | pn_bytes_free(ds->annotations_raw); |
1584 | 7.55k | pn_condition_tini(&ds->condition); |
1585 | 7.55k | } |
1586 | | |
1587 | | static void pn_delivery_incref(void *object) |
1588 | 15.9k | { |
1589 | 15.9k | pn_delivery_t *delivery = (pn_delivery_t *) object; |
1590 | 15.9k | if (delivery->link && !delivery->referenced) { |
1591 | 7.49k | delivery->referenced = true; |
1592 | 7.49k | pn_incref(delivery->link); |
1593 | 8.42k | } else { |
1594 | 8.42k | pn_object_incref(object); |
1595 | 8.42k | } |
1596 | 15.9k | } |
1597 | | |
1598 | | static bool pni_preserve_delivery(pn_delivery_t *delivery) |
1599 | 13.2k | { |
1600 | 13.2k | pn_connection_t *conn = delivery->link->session->connection; |
1601 | 13.2k | return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork)); |
1602 | 13.2k | } |
1603 | | |
1604 | | static void pn_delivery_finalize(void *object) |
1605 | 17.0k | { |
1606 | 17.0k | pn_delivery_t *delivery = (pn_delivery_t *) object; |
1607 | 17.0k | pn_link_t *link = delivery->link; |
1608 | | // assert(!delivery->state.init); |
1609 | | |
1610 | 17.0k | bool pooled = false; |
1611 | 17.0k | bool referenced = true; |
1612 | 17.0k | if (link) { |
1613 | 13.2k | if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) { |
1614 | 9.40k | delivery->referenced = false; |
1615 | 9.40k | pn_object_incref(delivery); |
1616 | 9.40k | pn_decref(link); |
1617 | 9.40k | return; |
1618 | 9.40k | } |
1619 | 3.86k | referenced = delivery->referenced; |
1620 | | |
1621 | 3.86k | pn_clear_tpwork(delivery); |
1622 | 3.86k | LL_REMOVE(link, unsettled, delivery); |
1623 | 3.86k | pn_delivery_map_del(pn_link_is_sender(link) |
1624 | 3.86k | ? &link->session->state.outgoing |
1625 | 3.86k | : &link->session->state.incoming, |
1626 | 3.86k | delivery); |
1627 | 3.86k | pn_bytes_free(delivery->tag); |
1628 | 3.86k | delivery->tag = (pn_delivery_tag_t){0, NULL}; |
1629 | 3.86k | pn_buffer_clear(delivery->bytes); |
1630 | 3.86k | pn_record_clear(delivery->context); |
1631 | 3.86k | delivery->settled = true; |
1632 | 3.86k | pn_connection_t *conn = link->session->connection; |
1633 | 3.86k | assert(pn_refcount(delivery) == 0); |
1634 | 3.86k | if (pni_connection_live(conn)) { |
1635 | 3.86k | pn_list_t *pool = link->session->connection->delivery_pool; |
1636 | 3.86k | delivery->link = NULL; |
1637 | 3.86k | pn_list_add(pool, delivery); |
1638 | 3.86k | pooled = true; |
1639 | 3.86k | assert(pn_refcount(delivery) == 1); |
1640 | 3.86k | } |
1641 | 3.86k | } |
1642 | | |
1643 | 7.64k | if (!pooled) { |
1644 | 3.77k | pn_free(delivery->context); |
1645 | 3.77k | pn_bytes_free(delivery->tag); |
1646 | 3.77k | delivery->tag = (pn_delivery_tag_t){0, NULL}; |
1647 | 3.77k | pn_buffer_free(delivery->bytes); |
1648 | 3.77k | pn_disposition_finalize(&delivery->local); |
1649 | 3.77k | pn_disposition_finalize(&delivery->remote); |
1650 | 3.77k | } |
1651 | | |
1652 | 7.64k | if (referenced) { |
1653 | 5.72k | pn_decref(link); |
1654 | 5.72k | } |
1655 | 7.64k | } |
1656 | | |
1657 | | static void pn_disposition_init(pn_disposition_t *ds) |
1658 | 7.55k | { |
1659 | 7.55k | ds->data = NULL; |
1660 | 7.55k | ds->data_raw = (pn_bytes_t){0, NULL}; |
1661 | 7.55k | ds->annotations = NULL; |
1662 | 7.55k | ds->annotations_raw = (pn_bytes_t){0, NULL}; |
1663 | 7.55k | pn_condition_init(&ds->condition); |
1664 | 7.55k | } |
1665 | | |
1666 | | static void pn_disposition_clear(pn_disposition_t *ds) |
1667 | 7.72k | { |
1668 | 7.72k | ds->type = 0; |
1669 | 7.72k | ds->section_number = 0; |
1670 | 7.72k | ds->section_offset = 0; |
1671 | 7.72k | ds->failed = false; |
1672 | 7.72k | ds->undeliverable = false; |
1673 | 7.72k | ds->settled = false; |
1674 | 7.72k | pn_data_clear(ds->data); |
1675 | 7.72k | pn_bytes_free(ds->data_raw); |
1676 | 7.72k | ds->data_raw = (pn_bytes_t){0, NULL}; |
1677 | 7.72k | pn_data_clear(ds->annotations); |
1678 | 7.72k | pn_bytes_free(ds->annotations_raw); |
1679 | 7.72k | ds->annotations_raw = (pn_bytes_t){0, NULL}; |
1680 | 7.72k | pn_condition_clear(&ds->condition); |
1681 | 7.72k | } |
1682 | | |
1683 | 0 | void pn_delivery_inspect(void *obj, pn_fixed_string_t *dst) { |
1684 | 0 | pn_delivery_t *d = (pn_delivery_t*)obj; |
1685 | 0 | const char* dir = pn_link_is_sender(d->link) ? "sending" : "receiving"; |
1686 | 0 | pn_bytes_t bytes = d->tag; |
1687 | 0 | pn_fixed_string_addf(dst, "pn_delivery<%p>{%s, tag=b\"", obj, dir); |
1688 | 0 | pn_fixed_string_quote(dst, bytes.start, bytes.size); |
1689 | 0 | pn_fixed_string_addf(dst, "\", local=%s, remote=%s}", |
1690 | 0 | pn_disposition_type_name(d->local.type), |
1691 | 0 | pn_disposition_type_name(d->remote.type)); |
1692 | 0 | return; |
1693 | 0 | } |
1694 | | |
1695 | 3.86k | pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) { |
1696 | 3.86k | pn_delivery_tag_t dtag = {size, bytes}; |
1697 | 3.86k | return dtag; |
1698 | 3.86k | } |
1699 | | |
1700 | | pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) |
1701 | 3.86k | { |
1702 | 3.86k | assert(link); |
1703 | 3.86k | pn_list_t *pool = link->session->connection->delivery_pool; |
1704 | 3.86k | pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool); |
1705 | 3.86k | if (!delivery) { |
1706 | 3.77k | delivery = (pn_delivery_t *) pn_class_new(&PN_CLASSCLASS(pn_delivery), sizeof(pn_delivery_t)); |
1707 | 3.77k | if (!delivery) return NULL; |
1708 | 3.77k | delivery->bytes = pn_buffer(64); |
1709 | 3.77k | pn_disposition_init(&delivery->local); |
1710 | 3.77k | pn_disposition_init(&delivery->remote); |
1711 | 3.77k | delivery->context = pn_record(); |
1712 | 3.77k | } else { |
1713 | 84 | assert(!delivery->state.init); |
1714 | 84 | } |
1715 | 3.86k | delivery->link = link; |
1716 | 3.86k | pn_incref(delivery->link); // keep link until finalized |
1717 | 3.86k | delivery->tag = pn_bytes_dup(tag); |
1718 | 3.86k | pn_disposition_clear(&delivery->local); |
1719 | 3.86k | pn_disposition_clear(&delivery->remote); |
1720 | 3.86k | delivery->updated = false; |
1721 | 3.86k | delivery->settled = false; |
1722 | 3.86k | LL_ADD(link, unsettled, delivery); |
1723 | 3.86k | delivery->referenced = true; |
1724 | 3.86k | delivery->work_next = NULL; |
1725 | 3.86k | delivery->work_prev = NULL; |
1726 | 3.86k | delivery->work = false; |
1727 | 3.86k | delivery->tpwork_next = NULL; |
1728 | 3.86k | delivery->tpwork_prev = NULL; |
1729 | 3.86k | delivery->tpwork = false; |
1730 | 3.86k | pn_buffer_clear(delivery->bytes); |
1731 | 3.86k | delivery->done = false; |
1732 | 3.86k | delivery->aborted = false; |
1733 | 3.86k | pn_record_clear(delivery->context); |
1734 | | |
1735 | | // begin delivery state |
1736 | 3.86k | delivery->state.init = false; |
1737 | 3.86k | delivery->state.sending = false; /* True if we have sent at least 1 frame */ |
1738 | 3.86k | delivery->state.sent = false; /* True if we have sent the entire delivery */ |
1739 | | // end delivery state |
1740 | | |
1741 | 3.86k | if (!link->current) |
1742 | 321 | link->current = delivery; |
1743 | | |
1744 | 3.86k | link->unsettled_count++; |
1745 | | |
1746 | 3.86k | pn_work_update(link->session->connection, delivery); |
1747 | | |
1748 | | // XXX: could just remove incref above |
1749 | 3.86k | pn_decref(delivery); |
1750 | | |
1751 | 3.86k | return delivery; |
1752 | 3.86k | } |
1753 | | |
1754 | | bool pn_delivery_buffered(pn_delivery_t *delivery) |
1755 | 0 | { |
1756 | 0 | assert(delivery); |
1757 | 0 | if (delivery->settled) return false; |
1758 | 0 | if (pn_link_is_sender(delivery->link)) { |
1759 | 0 | pn_delivery_state_t *state = &delivery->state; |
1760 | 0 | if (state->sent) { |
1761 | 0 | return false; |
1762 | 0 | } else { |
1763 | 0 | return delivery->done || (pn_buffer_size(delivery->bytes) > 0); |
1764 | 0 | } |
1765 | 0 | } else { |
1766 | 0 | return false; |
1767 | 0 | } |
1768 | 0 | } |
1769 | | |
1770 | | int pn_link_unsettled(pn_link_t *link) |
1771 | 0 | { |
1772 | 0 | return link->unsettled_count; |
1773 | 0 | } |
1774 | | |
1775 | | pn_delivery_t *pn_unsettled_head(pn_link_t *link) |
1776 | 0 | { |
1777 | 0 | pn_delivery_t *d = link->unsettled_head; |
1778 | 0 | while (d && d->local.settled) { |
1779 | 0 | d = d->unsettled_next; |
1780 | 0 | } |
1781 | 0 | return d; |
1782 | 0 | } |
1783 | | |
1784 | | pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery) |
1785 | 0 | { |
1786 | 0 | pn_delivery_t *d = delivery->unsettled_next; |
1787 | 0 | while (d && d->local.settled) { |
1788 | 0 | d = d->unsettled_next; |
1789 | 0 | } |
1790 | 0 | return d; |
1791 | 0 | } |
1792 | | |
1793 | | bool pn_delivery_current(pn_delivery_t *delivery) |
1794 | 7.88k | { |
1795 | 7.88k | pn_link_t *link = delivery->link; |
1796 | 7.88k | return pn_link_current(link) == delivery; |
1797 | 7.88k | } |
1798 | | |
1799 | | void pn_delivery_dump(pn_delivery_t *d) |
1800 | 0 | { |
1801 | 0 | char tag[1024]; |
1802 | 0 | pn_bytes_t bytes = d->tag; |
1803 | 0 | pn_quote_data(tag, 1024, bytes.start, bytes.size); |
1804 | 0 | printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%d, " |
1805 | 0 | "remote.settled=%d, updated=%d, current=%d, writable=%d, readable=%d, " |
1806 | 0 | "work=%d}", |
1807 | 0 | tag, d->local.type, d->remote.type, d->local.settled, |
1808 | 0 | d->remote.settled, d->updated, pn_delivery_current(d), |
1809 | 0 | pn_delivery_writable(d), pn_delivery_readable(d), d->work); |
1810 | 0 | } |
1811 | | |
1812 | | void *pn_delivery_get_context(pn_delivery_t *delivery) |
1813 | 0 | { |
1814 | 0 | assert(delivery); |
1815 | 0 | return pn_record_get(delivery->context, PN_LEGCTX); |
1816 | 0 | } |
1817 | | |
1818 | | void pn_delivery_set_context(pn_delivery_t *delivery, void *context) |
1819 | 0 | { |
1820 | 0 | assert(delivery); |
1821 | 0 | pn_record_set(delivery->context, PN_LEGCTX, context); |
1822 | 0 | } |
1823 | | |
1824 | | pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery) |
1825 | 0 | { |
1826 | 0 | assert(delivery); |
1827 | 0 | return delivery->context; |
1828 | 0 | } |
1829 | | |
1830 | | uint64_t pn_disposition_type(pn_disposition_t *disposition) |
1831 | 0 | { |
1832 | 0 | assert(disposition); |
1833 | 0 | return disposition->type; |
1834 | 0 | } |
1835 | | |
1836 | | pn_data_t *pn_disposition_data(pn_disposition_t *disposition) |
1837 | 0 | { |
1838 | 0 | assert(disposition); |
1839 | 0 | pni_switch_to_data(&disposition->data_raw, &disposition->data); |
1840 | 0 | return disposition->data; |
1841 | 0 | } |
1842 | | |
1843 | | uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition) |
1844 | 0 | { |
1845 | 0 | assert(disposition); |
1846 | 0 | return disposition->section_number; |
1847 | 0 | } |
1848 | | |
1849 | | void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number) |
1850 | 0 | { |
1851 | 0 | assert(disposition); |
1852 | 0 | disposition->section_number = section_number; |
1853 | 0 | } |
1854 | | |
1855 | | uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition) |
1856 | 0 | { |
1857 | 0 | assert(disposition); |
1858 | 0 | return disposition->section_offset; |
1859 | 0 | } |
1860 | | |
1861 | | void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset) |
1862 | 0 | { |
1863 | 0 | assert(disposition); |
1864 | 0 | disposition->section_offset = section_offset; |
1865 | 0 | } |
1866 | | |
1867 | | bool pn_disposition_is_failed(pn_disposition_t *disposition) |
1868 | 0 | { |
1869 | 0 | assert(disposition); |
1870 | 0 | return disposition->failed; |
1871 | 0 | } |
1872 | | |
1873 | | void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed) |
1874 | 0 | { |
1875 | 0 | assert(disposition); |
1876 | 0 | disposition->failed = failed; |
1877 | 0 | } |
1878 | | |
1879 | | bool pn_disposition_is_undeliverable(pn_disposition_t *disposition) |
1880 | 0 | { |
1881 | 0 | assert(disposition); |
1882 | 0 | return disposition->undeliverable; |
1883 | 0 | } |
1884 | | |
1885 | | void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable) |
1886 | 0 | { |
1887 | 0 | assert(disposition); |
1888 | 0 | disposition->undeliverable = undeliverable; |
1889 | 0 | } |
1890 | | |
1891 | | pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition) |
1892 | 0 | { |
1893 | 0 | assert(disposition); |
1894 | 0 | pni_switch_to_data(&disposition->annotations_raw, &disposition->annotations); |
1895 | 0 | return disposition->annotations; |
1896 | 0 | } |
1897 | | |
1898 | | pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition) |
1899 | 0 | { |
1900 | 0 | assert(disposition); |
1901 | 0 | return &disposition->condition; |
1902 | 0 | } |
1903 | | |
1904 | | pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) |
1905 | 0 | { |
1906 | 0 | if (delivery) { |
1907 | 0 | return delivery->tag; |
1908 | 0 | } else { |
1909 | 0 | return (pn_delivery_tag_t){0, NULL}; |
1910 | 0 | } |
1911 | 0 | } |
1912 | | |
1913 | | pn_delivery_t *pn_link_current(pn_link_t *link) |
1914 | 24.6k | { |
1915 | 24.6k | if (!link) return NULL; |
1916 | 24.6k | return link->current; |
1917 | 24.6k | } |
1918 | | |
1919 | | static void pni_advance_sender(pn_link_t *link) |
1920 | 156 | { |
1921 | 156 | link->current->done = true; |
1922 | | /* Skip accounting if the link is aborted and has not sent any frames. |
1923 | | A delivery that was aborted before sending the first frame was not accounted |
1924 | | for in pni_process_tpwork_sender() so we don't need to account for it being sent here. |
1925 | | */ |
1926 | 156 | bool skip = link->current->aborted && !link->current->state.sending; |
1927 | 156 | if (!skip) { |
1928 | 156 | link->queued++; |
1929 | 156 | link->credit--; |
1930 | 156 | link->session->outgoing_deliveries++; |
1931 | 156 | } |
1932 | 156 | pni_add_tpwork(link->current); |
1933 | 156 | link->current = link->current->unsettled_next; |
1934 | 156 | } |
1935 | | |
1936 | | static void pni_advance_receiver(pn_link_t *link) |
1937 | 3.70k | { |
1938 | 3.70k | link->credit--; |
1939 | 3.70k | link->queued--; |
1940 | 3.70k | link->session->incoming_deliveries--; |
1941 | | |
1942 | 3.70k | pn_delivery_t *current = link->current; |
1943 | 3.70k | size_t drop_count = pn_buffer_size(current->bytes); |
1944 | 3.70k | pn_buffer_clear(current->bytes); |
1945 | | |
1946 | 3.70k | if (drop_count) { |
1947 | 57 | pn_session_t *ssn = link->session; |
1948 | 57 | ssn->incoming_bytes -= drop_count; |
1949 | 57 | if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) { |
1950 | 0 | ssn->check_flow = true; |
1951 | 0 | pni_add_tpwork(current); |
1952 | 0 | } |
1953 | 57 | } |
1954 | | |
1955 | 3.70k | link->current = link->current->unsettled_next; |
1956 | 3.70k | } |
1957 | | |
1958 | | bool pn_link_advance(pn_link_t *link) |
1959 | 3.86k | { |
1960 | 3.86k | if (link && link->current) { |
1961 | 3.86k | pn_delivery_t *prev = link->current; |
1962 | 3.86k | if (link->endpoint.type == SENDER) { |
1963 | 156 | pni_advance_sender(link); |
1964 | 3.70k | } else { |
1965 | 3.70k | pni_advance_receiver(link); |
1966 | 3.70k | } |
1967 | 3.86k | pn_delivery_t *next = link->current; |
1968 | 3.86k | pn_work_update(link->session->connection, prev); |
1969 | 3.86k | if (next) pn_work_update(link->session->connection, next); |
1970 | 3.86k | return prev != next; |
1971 | 3.86k | } else { |
1972 | 0 | return false; |
1973 | 0 | } |
1974 | 3.86k | } |
1975 | | |
1976 | | int pn_link_credit(pn_link_t *link) |
1977 | 32 | { |
1978 | 32 | return link ? link->credit : 0; |
1979 | 32 | } |
1980 | | |
1981 | | int pn_link_available(pn_link_t *link) |
1982 | 0 | { |
1983 | 0 | return link ? link->available : 0; |
1984 | 0 | } |
1985 | | |
1986 | | int pn_link_queued(pn_link_t *link) |
1987 | 539 | { |
1988 | 539 | return link ? link->queued : 0; |
1989 | 539 | } |
1990 | | |
1991 | | int pn_link_remote_credit(pn_link_t *link) |
1992 | 0 | { |
1993 | 0 | assert(link); |
1994 | 0 | return link->credit - link->queued; |
1995 | 0 | } |
1996 | | |
1997 | | bool pn_link_get_drain(pn_link_t *link) |
1998 | 0 | { |
1999 | 0 | assert(link); |
2000 | 0 | return link->drain; |
2001 | 0 | } |
2002 | | |
2003 | | pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link) |
2004 | 0 | { |
2005 | 0 | return link ? (pn_snd_settle_mode_t)link->snd_settle_mode |
2006 | 0 | : PN_SND_MIXED; |
2007 | 0 | } |
2008 | | |
2009 | | pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link) |
2010 | 0 | { |
2011 | 0 | return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode |
2012 | 0 | : PN_RCV_FIRST; |
2013 | 0 | } |
2014 | | |
2015 | | pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link) |
2016 | 0 | { |
2017 | 0 | return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode |
2018 | 0 | : PN_SND_MIXED; |
2019 | 0 | } |
2020 | | |
2021 | | pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link) |
2022 | 0 | { |
2023 | 0 | return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode |
2024 | 0 | : PN_RCV_FIRST; |
2025 | 0 | } |
2026 | | void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode) |
2027 | 0 | { |
2028 | 0 | if (link) |
2029 | 0 | link->snd_settle_mode = (uint8_t)mode; |
2030 | 0 | } |
2031 | | void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode) |
2032 | 0 | { |
2033 | 0 | if (link) |
2034 | 0 | link->rcv_settle_mode = (uint8_t)mode; |
2035 | 0 | } |
2036 | | |
2037 | | void pn_delivery_settle(pn_delivery_t *delivery) |
2038 | 5.77k | { |
2039 | 5.77k | assert(delivery); |
2040 | 5.77k | if (!delivery->local.settled) { |
2041 | 3.86k | pn_link_t *link = delivery->link; |
2042 | 3.86k | if (pn_delivery_current(delivery)) { |
2043 | 234 | pn_link_advance(link); |
2044 | 234 | } |
2045 | | |
2046 | 3.86k | link->unsettled_count--; |
2047 | 3.86k | delivery->local.settled = true; |
2048 | 3.86k | pni_add_tpwork(delivery); |
2049 | 3.86k | pn_work_update(delivery->link->session->connection, delivery); |
2050 | 3.86k | pn_incref(delivery); |
2051 | 3.86k | pn_decref(delivery); |
2052 | 3.86k | } |
2053 | 5.77k | } |
2054 | | |
2055 | | void pn_link_offered(pn_link_t *sender, int credit) |
2056 | 0 | { |
2057 | 0 | sender->available = credit; |
2058 | 0 | } |
2059 | | |
2060 | | ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) |
2061 | 0 | { |
2062 | 0 | pn_delivery_t *current = pn_link_current(sender); |
2063 | 0 | if (!current) return PN_EOS; |
2064 | 0 | if (!bytes || !n) return 0; |
2065 | 0 | pn_buffer_append(current->bytes, bytes, n); |
2066 | 0 | sender->session->outgoing_bytes += n; |
2067 | 0 | pni_add_tpwork(current); |
2068 | 0 | return n; |
2069 | 0 | } |
2070 | | |
2071 | | int pn_link_drained(pn_link_t *link) |
2072 | 0 | { |
2073 | 0 | assert(link); |
2074 | 0 | int drained = 0; |
2075 | |
|
2076 | 0 | if (pn_link_is_sender(link)) { |
2077 | 0 | if (link->drain && link->credit > 0) { |
2078 | 0 | link->drained = link->credit; |
2079 | 0 | link->credit = 0; |
2080 | 0 | pn_modified(link->session->connection, &link->endpoint, true); |
2081 | 0 | drained = link->drained; |
2082 | 0 | } |
2083 | 0 | } else { |
2084 | 0 | drained = link->drained; |
2085 | 0 | link->drained = 0; |
2086 | 0 | } |
2087 | |
|
2088 | 0 | return drained; |
2089 | 0 | } |
2090 | | |
2091 | | ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n) |
2092 | 3.62k | { |
2093 | 3.62k | if (!receiver) return PN_ARG_ERR; |
2094 | 3.62k | pn_delivery_t *delivery = receiver->current; |
2095 | 3.62k | if (!delivery) return PN_STATE_ERR; |
2096 | 3.62k | if (delivery->aborted) return PN_ABORTED; |
2097 | 3.62k | size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes); |
2098 | 3.62k | pn_buffer_trim(delivery->bytes, size, 0); |
2099 | 3.62k | if (size) { |
2100 | 1.04k | pn_session_t *ssn = receiver->session; |
2101 | 1.04k | ssn->incoming_bytes -= size; |
2102 | 1.04k | if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) { |
2103 | 0 | ssn->check_flow = true; |
2104 | 0 | pni_add_tpwork(delivery); |
2105 | 0 | } |
2106 | 1.04k | return size; |
2107 | 2.58k | } else { |
2108 | 2.58k | return delivery->done ? PN_EOS : 0; |
2109 | 2.58k | } |
2110 | 3.62k | } |
2111 | | |
2112 | | |
2113 | | void pn_link_flow(pn_link_t *receiver, int credit) |
2114 | 18.2k | { |
2115 | 18.2k | assert(receiver); |
2116 | 18.2k | assert(pn_link_is_receiver(receiver)); |
2117 | 18.2k | receiver->credit += credit; |
2118 | 18.2k | pn_modified(receiver->session->connection, &receiver->endpoint, true); |
2119 | 18.2k | if (!receiver->drain_flag_mode) { |
2120 | 0 | pn_link_set_drain(receiver, false); |
2121 | 0 | receiver->drain_flag_mode = false; |
2122 | 0 | } |
2123 | 18.2k | } |
2124 | | |
2125 | | void pn_link_drain(pn_link_t *receiver, int credit) |
2126 | 0 | { |
2127 | 0 | assert(receiver); |
2128 | 0 | assert(pn_link_is_receiver(receiver)); |
2129 | 0 | pn_link_set_drain(receiver, true); |
2130 | 0 | pn_link_flow(receiver, credit); |
2131 | 0 | receiver->drain_flag_mode = false; |
2132 | 0 | } |
2133 | | |
2134 | | void pn_link_set_drain(pn_link_t *receiver, bool drain) |
2135 | 0 | { |
2136 | 0 | assert(receiver); |
2137 | 0 | assert(pn_link_is_receiver(receiver)); |
2138 | 0 | receiver->drain = drain; |
2139 | 0 | pn_modified(receiver->session->connection, &receiver->endpoint, true); |
2140 | 0 | receiver->drain_flag_mode = true; |
2141 | 0 | } |
2142 | | |
2143 | | bool pn_link_draining(pn_link_t *receiver) |
2144 | 0 | { |
2145 | 0 | assert(receiver); |
2146 | 0 | assert(pn_link_is_receiver(receiver)); |
2147 | 0 | return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver)); |
2148 | 0 | } |
2149 | | |
2150 | | uint64_t pn_link_max_message_size(pn_link_t *link) |
2151 | 0 | { |
2152 | 0 | return link->max_message_size; |
2153 | 0 | } |
2154 | | |
2155 | | void pn_link_set_max_message_size(pn_link_t *link, uint64_t size) |
2156 | 0 | { |
2157 | 0 | link->max_message_size = size; |
2158 | 0 | } |
2159 | | |
2160 | | uint64_t pn_link_remote_max_message_size(pn_link_t *link) |
2161 | 0 | { |
2162 | 0 | return link->remote_max_message_size; |
2163 | 0 | } |
2164 | | |
2165 | | pn_data_t *pn_link_properties(pn_link_t *link) |
2166 | 0 | { |
2167 | 0 | assert(link); |
2168 | 0 | if (!link->properties) |
2169 | 0 | link->properties = pn_data(0); |
2170 | 0 | return link->properties; |
2171 | 0 | } |
2172 | | |
2173 | | pn_data_t *pn_link_remote_properties(pn_link_t *link) |
2174 | 0 | { |
2175 | 0 | assert(link); |
2176 | | // Annoying inconsistency: nearly everywhere else you *HAVE* to return an empty pn_data_t not NULL |
2177 | 0 | if (link->remote_properties_raw.size) { |
2178 | 0 | pni_switch_to_data(&link->remote_properties_raw, &link->remote_properties); |
2179 | 0 | } |
2180 | 0 | return link->remote_properties; |
2181 | 0 | } |
2182 | | |
2183 | | |
2184 | | pn_link_t *pn_delivery_link(pn_delivery_t *delivery) |
2185 | 24.0k | { |
2186 | 24.0k | assert(delivery); |
2187 | 24.0k | return delivery->link; |
2188 | 24.0k | } |
2189 | | |
2190 | | pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery) |
2191 | 0 | { |
2192 | 0 | assert(delivery); |
2193 | 0 | return &delivery->local; |
2194 | 0 | } |
2195 | | |
2196 | | uint64_t pn_delivery_local_state(pn_delivery_t *delivery) |
2197 | 0 | { |
2198 | 0 | assert(delivery); |
2199 | 0 | return delivery->local.type; |
2200 | 0 | } |
2201 | | |
2202 | | pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery) |
2203 | 0 | { |
2204 | 0 | assert(delivery); |
2205 | 0 | return &delivery->remote; |
2206 | 0 | } |
2207 | | |
2208 | | uint64_t pn_delivery_remote_state(pn_delivery_t *delivery) |
2209 | 0 | { |
2210 | 0 | assert(delivery); |
2211 | 0 | return delivery->remote.type; |
2212 | 0 | } |
2213 | | |
2214 | | bool pn_delivery_settled(pn_delivery_t *delivery) |
2215 | 0 | { |
2216 | 0 | return delivery ? delivery->remote.settled : false; |
2217 | 0 | } |
2218 | | |
2219 | | bool pn_delivery_updated(pn_delivery_t *delivery) |
2220 | 0 | { |
2221 | 0 | return delivery ? delivery->updated : false; |
2222 | 0 | } |
2223 | | |
2224 | | void pn_delivery_clear(pn_delivery_t *delivery) |
2225 | 0 | { |
2226 | 0 | delivery->updated = false; |
2227 | 0 | pn_work_update(delivery->link->session->connection, delivery); |
2228 | 0 | } |
2229 | | |
2230 | | void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) |
2231 | 3.62k | { |
2232 | 3.62k | if (!delivery) return; |
2233 | 3.62k | delivery->local.type = state; |
2234 | 3.62k | pni_add_tpwork(delivery); |
2235 | 3.62k | } |
2236 | | |
2237 | | bool pn_delivery_writable(pn_delivery_t *delivery) |
2238 | 0 | { |
2239 | 0 | if (!delivery) return false; |
2240 | | |
2241 | 0 | pn_link_t *link = delivery->link; |
2242 | 0 | return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0; |
2243 | 0 | } |
2244 | | |
2245 | | bool pn_delivery_readable(pn_delivery_t *delivery) |
2246 | 4.77k | { |
2247 | 4.77k | if (delivery) { |
2248 | 4.77k | pn_link_t *link = delivery->link; |
2249 | 4.77k | return pn_link_is_receiver(link) && pn_delivery_current(delivery); |
2250 | 4.77k | } else { |
2251 | 0 | return false; |
2252 | 0 | } |
2253 | 4.77k | } |
2254 | | |
2255 | | size_t pn_delivery_pending(pn_delivery_t *delivery) |
2256 | 3.62k | { |
2257 | | /* Aborted deliveries: for clients that don't check pn_delivery_aborted(), |
2258 | | return 1 rather than 0. This will force them to call pn_link_recv() and get |
2259 | | the PN_ABORTED error return code. |
2260 | | */ |
2261 | 3.62k | if (delivery->aborted) return 1; |
2262 | 3.62k | return pn_buffer_size(delivery->bytes); |
2263 | 3.62k | } |
2264 | | |
2265 | | bool pn_delivery_partial(pn_delivery_t *delivery) |
2266 | 3.67k | { |
2267 | 3.67k | return !delivery->done; |
2268 | 3.67k | } |
2269 | | |
2270 | 0 | void pn_delivery_abort(pn_delivery_t *delivery) { |
2271 | 0 | if (!delivery->local.settled) { /* Can't abort a settled delivery */ |
2272 | 0 | delivery->aborted = true; |
2273 | 0 | pn_delivery_settle(delivery); |
2274 | 0 | delivery->link->session->outgoing_bytes -= pn_buffer_size(delivery->bytes); |
2275 | 0 | pn_buffer_clear(delivery->bytes); |
2276 | 0 | } |
2277 | 0 | } |
2278 | | |
2279 | 0 | bool pn_delivery_aborted(pn_delivery_t *delivery) { |
2280 | 0 | return delivery->aborted; |
2281 | 0 | } |
2282 | | |
2283 | | pn_condition_t *pn_connection_condition(pn_connection_t *connection) |
2284 | 269 | { |
2285 | 269 | assert(connection); |
2286 | 269 | return &connection->endpoint.condition; |
2287 | 269 | } |
2288 | | |
2289 | | pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection) |
2290 | 427 | { |
2291 | 427 | assert(connection); |
2292 | 427 | pn_transport_t *transport = connection->transport; |
2293 | 427 | return transport ? &transport->remote_condition : NULL; |
2294 | 427 | } |
2295 | | |
2296 | | pn_condition_t *pn_session_condition(pn_session_t *session) |
2297 | 0 | { |
2298 | 0 | assert(session); |
2299 | 0 | return &session->endpoint.condition; |
2300 | 0 | } |
2301 | | |
2302 | | pn_condition_t *pn_session_remote_condition(pn_session_t *session) |
2303 | 980 | { |
2304 | 980 | assert(session); |
2305 | 980 | return &session->endpoint.remote_condition; |
2306 | 980 | } |
2307 | | |
2308 | | pn_condition_t *pn_link_condition(pn_link_t *link) |
2309 | 0 | { |
2310 | 0 | assert(link); |
2311 | 0 | return &link->endpoint.condition; |
2312 | 0 | } |
2313 | | |
2314 | | pn_condition_t *pn_link_remote_condition(pn_link_t *link) |
2315 | 5.34k | { |
2316 | 5.34k | assert(link); |
2317 | 5.34k | return &link->endpoint.remote_condition; |
2318 | 5.34k | } |
2319 | | |
2320 | | bool pn_condition_is_set(pn_condition_t *condition) |
2321 | 48.8k | { |
2322 | 48.8k | return condition && condition->name && pn_string_get(condition->name); |
2323 | 48.8k | } |
2324 | | |
2325 | | void pn_condition_clear(pn_condition_t *condition) |
2326 | 549k | { |
2327 | 549k | assert(condition); |
2328 | 549k | if (condition->name) pn_string_clear(condition->name); |
2329 | 549k | if (condition->description) pn_string_clear(condition->description); |
2330 | 549k | if (condition->info) pn_data_clear(condition->info); |
2331 | 549k | pn_bytes_free (condition->info_raw); |
2332 | 549k | condition->info_raw = (pn_bytes_t){0, NULL}; |
2333 | 549k | } |
2334 | | |
2335 | | const char *pn_condition_get_name(pn_condition_t *condition) |
2336 | 0 | { |
2337 | 0 | assert(condition); |
2338 | 0 | if (condition->name == NULL) { |
2339 | 0 | return NULL; |
2340 | 0 | } else { |
2341 | 0 | return pn_string_get(condition->name); |
2342 | 0 | } |
2343 | 0 | } |
2344 | | |
2345 | | int pn_condition_set_name(pn_condition_t *condition, const char *name) |
2346 | 12.7k | { |
2347 | 12.7k | assert(condition); |
2348 | 12.7k | if (condition->name == NULL) { |
2349 | 12.7k | condition->name = pn_string(name); |
2350 | 12.7k | return 0; |
2351 | 12.7k | } else { |
2352 | 0 | return pn_string_set(condition->name, name); |
2353 | 0 | } |
2354 | 12.7k | } |
2355 | | |
2356 | | const char *pn_condition_get_description(pn_condition_t *condition) |
2357 | 0 | { |
2358 | 0 | assert(condition); |
2359 | 0 | if (condition->description == NULL) { |
2360 | 0 | return NULL; |
2361 | 0 | } else { |
2362 | 0 | return pn_string_get(condition->description); |
2363 | 0 | } |
2364 | 0 | } |
2365 | | |
2366 | | int pn_condition_set_description(pn_condition_t *condition, const char *description) |
2367 | 12.7k | { |
2368 | 12.7k | assert(condition); |
2369 | 12.7k | if (condition->description == NULL) { |
2370 | 12.7k | condition->description = pn_string(description); |
2371 | 12.7k | return 0; |
2372 | 12.7k | } else { |
2373 | 0 | return pn_string_set(condition->description, description); |
2374 | 0 | } |
2375 | 12.7k | } |
2376 | | |
2377 | | int pn_condition_vformat(pn_condition_t *condition, const char *name, const char *fmt, va_list ap) |
2378 | 0 | { |
2379 | 0 | assert(condition); |
2380 | 0 | int err = pn_condition_set_name(condition, name); |
2381 | 0 | if (err) |
2382 | 0 | return err; |
2383 | | |
2384 | 0 | char text[1024]; |
2385 | 0 | size_t n = vsnprintf(text, 1024, fmt, ap); |
2386 | 0 | if (n >= sizeof(text)) |
2387 | 0 | text[sizeof(text)-1] = '\0'; |
2388 | 0 | err = pn_condition_set_description(condition, text); |
2389 | 0 | return err; |
2390 | 0 | } |
2391 | | |
2392 | | int pn_condition_format(pn_condition_t *condition, const char *name, PN_PRINTF_FORMAT const char *fmt, ...) |
2393 | 0 | { |
2394 | 0 | assert(condition); |
2395 | 0 | va_list ap; |
2396 | 0 | va_start(ap, fmt); |
2397 | 0 | int err = pn_condition_vformat(condition, name, fmt, ap); |
2398 | 0 | va_end(ap); |
2399 | 0 | return err; |
2400 | 0 | } |
2401 | | |
2402 | | pn_data_t *pn_condition_info(pn_condition_t *condition) |
2403 | 0 | { |
2404 | 0 | assert(condition); |
2405 | 0 | pni_switch_to_data(&condition->info_raw, &condition->info); |
2406 | 0 | return condition->info; |
2407 | 0 | } |
2408 | | |
2409 | | bool pn_condition_is_redirect(pn_condition_t *condition) |
2410 | 0 | { |
2411 | 0 | const char *name = pn_condition_get_name(condition); |
2412 | 0 | return name && (!strcmp(name, "amqp:connection:redirect") || |
2413 | 0 | !strcmp(name, "amqp:link:redirect")); |
2414 | 0 | } |
2415 | | |
2416 | | const char *pn_condition_redirect_host(pn_condition_t *condition) |
2417 | 0 | { |
2418 | 0 | pn_data_t *data = pn_condition_info(condition); |
2419 | 0 | pn_data_rewind(data); |
2420 | 0 | pn_data_next(data); |
2421 | 0 | pn_data_enter(data); |
2422 | 0 | pn_data_lookup(data, "network-host"); |
2423 | 0 | pn_bytes_t host = pn_data_get_bytes(data); |
2424 | 0 | pn_data_rewind(data); |
2425 | 0 | return host.start; |
2426 | 0 | } |
2427 | | |
2428 | | int pn_condition_redirect_port(pn_condition_t *condition) |
2429 | 0 | { |
2430 | 0 | pn_data_t *data = pn_condition_info(condition); |
2431 | 0 | pn_data_rewind(data); |
2432 | 0 | pn_data_next(data); |
2433 | 0 | pn_data_enter(data); |
2434 | 0 | pn_data_lookup(data, "port"); |
2435 | 0 | int port = pn_data_get_int(data); |
2436 | 0 | pn_data_rewind(data); |
2437 | 0 | return port; |
2438 | 0 | } |
2439 | | |
2440 | | pn_connection_t *pn_event_connection(pn_event_t *event) |
2441 | 36.4k | { |
2442 | 36.4k | pn_session_t *ssn; |
2443 | 36.4k | pn_transport_t *transport; |
2444 | | |
2445 | 36.4k | switch (pn_class_id(pn_event_class(event))) { |
2446 | 19.1k | case CID_pn_connection: |
2447 | 19.1k | return (pn_connection_t *) pn_event_context(event); |
2448 | 11.0k | case CID_pn_transport: |
2449 | 11.0k | transport = pn_event_transport(event); |
2450 | 11.0k | if (transport) |
2451 | 11.0k | return transport->connection; |
2452 | 0 | return NULL; |
2453 | 6.32k | default: |
2454 | 6.32k | ssn = pn_event_session(event); |
2455 | 6.32k | if (ssn) |
2456 | 6.32k | return pn_session_connection(ssn); |
2457 | 36.4k | } |
2458 | 0 | return NULL; |
2459 | 36.4k | } |
2460 | | |
2461 | | pn_session_t *pn_event_session(pn_event_t *event) |
2462 | 7.30k | { |
2463 | 7.30k | pn_link_t *link; |
2464 | 7.30k | switch (pn_class_id(pn_event_class(event))) { |
2465 | 1.96k | case CID_pn_session: |
2466 | 1.96k | return (pn_session_t *) pn_event_context(event); |
2467 | 5.34k | default: |
2468 | 5.34k | link = pn_event_link(event); |
2469 | 5.34k | if (link) |
2470 | 5.34k | return pn_link_session(link); |
2471 | 7.30k | } |
2472 | 0 | return NULL; |
2473 | 7.30k | } |
2474 | | |
2475 | | pn_link_t *pn_event_link(pn_event_t *event) |
2476 | 10.6k | { |
2477 | 10.6k | pn_delivery_t *dlv; |
2478 | 10.6k | switch (pn_class_id(pn_event_class(event))) { |
2479 | 10.6k | case CID_pn_link: |
2480 | 10.6k | return (pn_link_t *) pn_event_context(event); |
2481 | 0 | default: |
2482 | 0 | dlv = pn_event_delivery(event); |
2483 | 0 | if (dlv) |
2484 | 0 | return pn_delivery_link(dlv); |
2485 | 10.6k | } |
2486 | 0 | return NULL; |
2487 | 10.6k | } |
2488 | | |
2489 | | pn_delivery_t *pn_event_delivery(pn_event_t *event) |
2490 | 4.77k | { |
2491 | 4.77k | switch (pn_class_id(pn_event_class(event))) { |
2492 | 4.77k | case CID_pn_delivery: |
2493 | 4.77k | return (pn_delivery_t *) pn_event_context(event); |
2494 | 0 | default: |
2495 | 0 | return NULL; |
2496 | 4.77k | } |
2497 | 4.77k | } |
2498 | | |
2499 | | pn_transport_t *pn_event_transport(pn_event_t *event) |
2500 | 22.1k | { |
2501 | 22.1k | switch (pn_class_id(pn_event_class(event))) { |
2502 | 22.1k | case CID_pn_transport: |
2503 | 22.1k | return (pn_transport_t *) pn_event_context(event); |
2504 | 0 | default: |
2505 | 0 | { |
2506 | 0 | pn_connection_t *conn = pn_event_connection(event); |
2507 | 0 | if (conn) |
2508 | 0 | return pn_connection_transport(conn); |
2509 | 0 | return NULL; |
2510 | 0 | } |
2511 | 22.1k | } |
2512 | 22.1k | } |
2513 | | |
2514 | 0 | int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) { |
2515 | 0 | assert(dest); |
2516 | 0 | assert(src); |
2517 | 0 | int err = 0; |
2518 | 0 | if (src != dest) { |
2519 | 0 | if (!(src->name == NULL && dest->name == NULL)) { |
2520 | 0 | if (src->name == NULL) { |
2521 | 0 | pn_free(dest->name); |
2522 | 0 | dest->name = NULL; |
2523 | 0 | } else { |
2524 | 0 | if (dest->name == NULL) { |
2525 | 0 | dest->name = pn_string(NULL); |
2526 | 0 | } |
2527 | 0 | err = pn_string_copy(dest->name, src->name); |
2528 | 0 | } |
2529 | 0 | } |
2530 | 0 | if (!err && !(src->description == NULL && dest->description == NULL)) { |
2531 | 0 | if (src->description == NULL) { |
2532 | 0 | pn_free(dest->description); |
2533 | 0 | dest->description = NULL; |
2534 | 0 | } else { |
2535 | 0 | if (dest->description == NULL) { |
2536 | 0 | dest->description = pn_string(NULL); |
2537 | 0 | } |
2538 | 0 | err = pn_string_copy(dest->description, src->description); |
2539 | 0 | } |
2540 | 0 | } |
2541 | 0 | if (!err && !(src->info == NULL && dest->info == NULL)) { |
2542 | 0 | if (src->info == NULL) { |
2543 | 0 | pn_data_free(dest->info); |
2544 | 0 | dest->info = NULL; |
2545 | 0 | } else { |
2546 | 0 | if (dest->info == NULL) { |
2547 | 0 | dest->info = pn_data(0); |
2548 | 0 | } |
2549 | 0 | err = pn_data_copy(dest->info, src->info); |
2550 | 0 | } |
2551 | 0 | } |
2552 | 0 | } |
2553 | 0 | return err; |
2554 | 0 | } |
2555 | | |
2556 | | |
2557 | 0 | static pn_condition_t *cond_set(pn_condition_t *cond) { |
2558 | 0 | return cond && pn_condition_is_set(cond) ? cond : NULL; |
2559 | 0 | } |
2560 | | |
2561 | 0 | static pn_condition_t *cond2_set(pn_condition_t *cond1, pn_condition_t *cond2) { |
2562 | 0 | pn_condition_t *cond = cond_set(cond1); |
2563 | 0 | if (!cond) cond = cond_set(cond2); |
2564 | 0 | return cond; |
2565 | 0 | } |
2566 | | |
2567 | 0 | pn_condition_t *pn_event_condition(pn_event_t *e) { |
2568 | 0 | void *ctx = pn_event_context(e); |
2569 | 0 | switch (pn_class_id(pn_event_class(e))) { |
2570 | 0 | case CID_pn_connection: { |
2571 | 0 | pn_connection_t *c = (pn_connection_t*)ctx; |
2572 | 0 | return cond2_set(pn_connection_remote_condition(c), pn_connection_condition(c)); |
2573 | 0 | } |
2574 | 0 | case CID_pn_session: { |
2575 | 0 | pn_session_t *s = (pn_session_t*)ctx; |
2576 | 0 | return cond2_set(pn_session_remote_condition(s), pn_session_condition(s)); |
2577 | 0 | } |
2578 | 0 | case CID_pn_link: { |
2579 | 0 | pn_link_t *l = (pn_link_t*)ctx; |
2580 | 0 | return cond2_set(pn_link_remote_condition(l), pn_link_condition(l)); |
2581 | 0 | } |
2582 | 0 | case CID_pn_transport: |
2583 | 0 | return cond_set(pn_transport_condition((pn_transport_t*)ctx)); |
2584 | | |
2585 | 0 | default: |
2586 | 0 | return NULL; |
2587 | 0 | } |
2588 | 0 | } |
2589 | | |
2590 | 0 | const char *pn_disposition_type_name(uint64_t d) { |
2591 | 0 | switch(d) { |
2592 | 0 | case PN_RECEIVED: return "received"; |
2593 | 0 | case PN_ACCEPTED: return "accepted"; |
2594 | 0 | case PN_REJECTED: return "rejected"; |
2595 | 0 | case PN_RELEASED: return "released"; |
2596 | 0 | case PN_MODIFIED: return "modified"; |
2597 | 0 | default: return "unknown"; |
2598 | 0 | } |
2599 | 0 | } |
2600 | | |
2601 | | #undef PN_USE_DEPRECATED_API |