Coverage Report

Created: 2024-10-29 06:22

/src/qpid-proton/c/src/core/engine.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 *
3
 * Licensed to the Apache Software Foundation (ASF) under one
4
 * or more contributor license agreements.  See the NOTICE file
5
 * distributed with this work for additional information
6
 * regarding copyright ownership.  The ASF licenses this file
7
 * to you under the Apache License, Version 2.0 (the
8
 * "License"); you may not use this file except in compliance
9
 * with the License.  You may obtain a copy of the License at
10
 *
11
 *   http://www.apache.org/licenses/LICENSE-2.0
12
 *
13
 * Unless required by applicable law or agreed to in writing,
14
 * software distributed under the License is distributed on an
15
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16
 * KIND, either express or implied.  See the License for the
17
 * specific language governing permissions and limitations
18
 * under the License.
19
 *
20
 */
21
22
/* for pn_work_head and related deprecations */
23
#define PN_USE_DEPRECATED_API 1
24
25
#include "engine-internal.h"
26
27
#include "consumers.h"
28
#include "core/frame_consumers.h"
29
#include "fixed_string.h"
30
#include "framing.h"
31
#include "memory.h"
32
#include "platform/platform.h"
33
#include "platform/platform_fmt.h"
34
#include "protocol.h"
35
#include "transport.h"
36
37
#include <assert.h>
38
#include <stdarg.h>
39
#include <stddef.h>
40
#include <stdio.h>
41
#include <string.h>
42
43
44
static void pni_session_bound(pn_session_t *ssn);
45
static void pni_link_bound(pn_link_t *link);
46
47
static void pn_delivery_incref(void *object);
48
static void pn_delivery_finalize(void *object);
49
#define pn_delivery_new NULL
50
#define pn_delivery_refcount NULL
51
#define pn_delivery_decref NULL
52
#define pn_delivery_free NULL
53
#define pn_delivery_initialize NULL
54
#define pn_delivery_hashcode NULL
55
#define pn_delivery_compare NULL
56
static void pn_delivery_inspect(void *obj, pn_fixed_string_t *dst);
57
static const pn_class_t PN_CLASSCLASS(pn_delivery) = PN_METACLASS(pn_delivery);
58
59
// endpoints
60
61
static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint)
62
1.99M
{
63
1.99M
  switch (endpoint->type) {
64
48.1k
  case CONNECTION:
65
48.1k
    return (pn_connection_t *) endpoint;
66
1.12M
  case SESSION:
67
1.12M
    return ((pn_session_t *) endpoint)->connection;
68
148k
  case SENDER:
69
824k
  case RECEIVER:
70
824k
    return ((pn_link_t *) endpoint)->session->connection;
71
1.99M
  }
72
73
0
  return NULL;
74
1.99M
}
75
76
66.4k
static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) {
77
66.4k
  switch (type) {
78
29.9k
  case CONNECTION:
79
29.9k
    return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE;
80
18.2k
  case SESSION:
81
18.2k
    return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE;
82
0
  case SENDER:
83
18.2k
  case RECEIVER:
84
18.2k
    return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE;
85
0
  default:
86
0
    assert(false);
87
0
    return PN_EVENT_NONE;
88
66.4k
  }
89
66.4k
}
90
91
static void pn_endpoint_open(pn_endpoint_t *endpoint)
92
54.7k
{
93
54.7k
  if (!(endpoint->state & PN_LOCAL_ACTIVE)) {
94
54.7k
    PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
95
54.7k
    pn_connection_t *conn = pni_ep_get_connection(endpoint);
96
54.7k
    pn_collector_put_object(conn->collector, endpoint,
97
54.7k
                            endpoint_event((pn_endpoint_type_t) endpoint->type, true));
98
54.7k
    pn_modified(conn, endpoint, true);
99
54.7k
  }
100
54.7k
}
101
102
static void pn_endpoint_close(pn_endpoint_t *endpoint)
103
17.7k
{
104
17.7k
  if (!(endpoint->state & PN_LOCAL_CLOSED)) {
105
11.6k
    PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
106
11.6k
    pn_connection_t *conn = pni_ep_get_connection(endpoint);
107
11.6k
    pn_collector_put_object(conn->collector, endpoint,
108
11.6k
                            endpoint_event((pn_endpoint_type_t) endpoint->type, false));
109
11.6k
    pn_modified(conn, endpoint, true);
110
11.6k
  }
111
17.7k
}
112
113
void pn_connection_reset(pn_connection_t *connection)
114
18.2k
{
115
18.2k
  assert(connection);
116
18.2k
  pn_endpoint_t *endpoint = &connection->endpoint;
117
18.2k
  endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
118
18.2k
}
119
120
void pn_connection_open(pn_connection_t *connection)
121
18.2k
{
122
18.2k
  assert(connection);
123
18.2k
  pn_endpoint_open(&connection->endpoint);
124
18.2k
}
125
126
void pn_connection_close(pn_connection_t *connection)
127
17.7k
{
128
17.7k
  assert(connection);
129
17.7k
  pn_endpoint_close(&connection->endpoint);
130
17.7k
}
131
132
static void pni_endpoint_tini(pn_endpoint_t *endpoint);
133
134
void pn_connection_release(pn_connection_t *connection)
135
18.2k
{
136
18.2k
  assert(!connection->endpoint.freed);
137
  // free those endpoints that haven't been freed by the application
138
18.2k
  LL_REMOVE(connection, endpoint, &connection->endpoint);
139
286k
  while (connection->endpoint_head) {
140
268k
    pn_endpoint_t *ep = connection->endpoint_head;
141
268k
    switch (ep->type) {
142
268k
    case SESSION:
143
      // note: this will free all child links:
144
268k
      pn_session_free((pn_session_t *)ep);
145
268k
      break;
146
0
    case SENDER:
147
0
    case RECEIVER:
148
0
      pn_link_free((pn_link_t *)ep);
149
0
      break;
150
0
    default:
151
0
      assert(false);
152
268k
    }
153
268k
  }
154
18.2k
  connection->endpoint.freed = true;
155
18.2k
  if (!connection->transport) {
156
    // no transport available to consume transport work items,
157
    // so manually clear them:
158
18.2k
    pn_ep_incref(&connection->endpoint);
159
18.2k
    pn_connection_unbound(connection);
160
18.2k
  }
161
18.2k
  pn_ep_decref(&connection->endpoint);
162
18.2k
}
163
164
18.2k
void pn_connection_free(pn_connection_t *connection) {
165
18.2k
  pn_connection_release(connection);
166
18.2k
  pn_decref(connection);
167
18.2k
}
168
169
void pn_connection_bound(pn_connection_t *connection)
170
18.2k
{
171
18.2k
  pn_collector_put_object(connection->collector, connection, PN_CONNECTION_BOUND);
172
18.2k
  pn_ep_incref(&connection->endpoint);
173
174
18.2k
  size_t nsessions = pn_list_size(connection->sessions);
175
36.5k
  for (size_t i = 0; i < nsessions; i++) {
176
18.2k
    pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i));
177
18.2k
  }
178
18.2k
}
179
180
// invoked when transport has been removed:
181
void pn_connection_unbound(pn_connection_t *connection)
182
36.5k
{
183
36.5k
  connection->transport = NULL;
184
36.5k
  if (connection->endpoint.freed) {
185
    // connection has been freed prior to unbinding, thus it
186
    // cannot be re-assigned to a new transport.  Clear the
187
    // transport work lists to allow the connection to be freed.
188
36.5k
    while (connection->transport_head) {
189
18.2k
        pn_clear_modified(connection, connection->transport_head);
190
18.2k
    }
191
18.2k
    while (connection->tpwork_head) {
192
0
      pn_clear_tpwork(connection->tpwork_head);
193
0
    }
194
18.2k
  }
195
36.5k
  pn_ep_decref(&connection->endpoint);
196
36.5k
}
197
198
pn_record_t *pn_connection_attachments(pn_connection_t *connection)
199
0
{
200
0
  assert(connection);
201
0
  return connection->context;
202
0
}
203
204
void *pn_connection_get_context(pn_connection_t *conn)
205
0
{
206
  // XXX: we should really assert on conn here, but this causes
207
  // messenger tests to fail
208
0
  return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL;
209
0
}
210
211
void pn_connection_set_context(pn_connection_t *conn, void *context)
212
0
{
213
0
  assert(conn);
214
0
  pn_record_set(conn->context, PN_LEGCTX, context);
215
0
}
216
217
pn_transport_t *pn_connection_transport(pn_connection_t *connection)
218
0
{
219
0
  assert(connection);
220
0
  return connection->transport;
221
0
}
222
223
void pn_condition_init(pn_condition_t *condition)
224
1.12M
{
225
1.12M
  condition->info_raw = (pn_bytes_t){0, NULL};
226
1.12M
  condition->name = NULL;
227
1.12M
  condition->description = NULL;
228
1.12M
  condition->info = NULL;
229
1.12M
}
230
231
0
pn_condition_t *pn_condition(void) {
232
0
  pn_condition_t *c = (pn_condition_t*)pni_mem_allocate(PN_VOID, sizeof(pn_condition_t));
233
0
  pn_condition_init(c);
234
0
  return c;
235
0
}
236
237
void pn_condition_tini(pn_condition_t *condition)
238
1.12M
{
239
1.12M
  pn_bytes_free(condition->info_raw);
240
1.12M
  pn_data_free(condition->info);
241
1.12M
  pn_free(condition->description);
242
1.12M
  pn_free(condition->name);
243
1.12M
}
244
245
0
void pn_condition_free(pn_condition_t *c) {
246
0
  if (c) {
247
0
    pn_condition_clear(c);
248
0
    pn_condition_tini(c);
249
0
    pni_mem_deallocate(PN_VOID, c);
250
0
  }
251
0
}
252
253
static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn)
254
268k
{
255
268k
  pn_list_add(conn->sessions, ssn);
256
268k
  ssn->connection = conn;
257
268k
  pn_incref(conn);  // keep around until finalized
258
268k
  pn_ep_incref(&conn->endpoint);
259
268k
}
260
261
static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn)
262
537k
{
263
537k
  if (pn_list_remove(conn->sessions, ssn)) {
264
268k
    pn_ep_decref(&conn->endpoint);
265
268k
    LL_REMOVE(conn, endpoint, &ssn->endpoint);
266
268k
  }
267
537k
}
268
269
pn_connection_t *pn_session_connection(pn_session_t *session)
270
6.32k
{
271
6.32k
  if (!session) return NULL;
272
6.32k
  return session->connection;
273
6.32k
}
274
275
void pn_session_open(pn_session_t *session)
276
18.2k
{
277
18.2k
  assert(session);
278
18.2k
  pn_endpoint_open(&session->endpoint);
279
18.2k
}
280
281
void pn_session_close(pn_session_t *session)
282
0
{
283
0
  assert(session);
284
0
  pn_endpoint_close(&session->endpoint);
285
0
}
286
287
void pn_session_free(pn_session_t *session)
288
268k
{
289
268k
  assert(!session->endpoint.freed);
290
523k
  while(pn_list_size(session->links)) {
291
254k
    pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
292
254k
    pn_link_free(link);
293
254k
  }
294
268k
  pni_remove_session(session->connection, session);
295
268k
  pn_list_add(session->connection->freed, session);
296
268k
  session->endpoint.freed = true;
297
268k
  pn_ep_decref(&session->endpoint);
298
299
  // the finalize logic depends on endpoint.freed, so we incref/decref
300
  // to give it a chance to rerun
301
268k
  pn_incref(session);
302
268k
  pn_decref(session);
303
268k
}
304
305
pn_record_t *pn_session_attachments(pn_session_t *session)
306
0
{
307
0
  assert(session);
308
0
  return session->context;
309
0
}
310
311
void *pn_session_get_context(pn_session_t *session)
312
0
{
313
0
  return session ? pn_record_get(session->context, PN_LEGCTX) : 0;
314
0
}
315
316
void pn_session_set_context(pn_session_t *session, void *context)
317
0
{
318
0
  assert(session);
319
0
  pn_record_set(session->context, PN_LEGCTX, context);
320
0
}
321
322
323
static void pni_add_link(pn_session_t *ssn, pn_link_t *link)
324
254k
{
325
254k
  pn_list_add(ssn->links, link);
326
254k
  link->session = ssn;
327
254k
  pn_ep_incref(&ssn->endpoint);
328
254k
}
329
330
static void pni_remove_link(pn_session_t *ssn, pn_link_t *link)
331
509k
{
332
509k
  if (pn_list_remove(ssn->links, link)) {
333
254k
    pn_ep_decref(&ssn->endpoint);
334
254k
    LL_REMOVE(ssn->connection, endpoint, &link->endpoint);
335
254k
  }
336
509k
}
337
338
void pn_link_open(pn_link_t *link)
339
18.2k
{
340
18.2k
  assert(link);
341
18.2k
  pn_endpoint_open(&link->endpoint);
342
18.2k
}
343
344
void pn_link_close(pn_link_t *link)
345
0
{
346
0
  assert(link);
347
0
  pn_endpoint_close(&link->endpoint);
348
0
}
349
350
void pn_link_detach(pn_link_t *link)
351
0
{
352
0
  assert(link);
353
0
  if (link->detached) return;
354
355
0
  link->detached = true;
356
0
  pn_collector_put_object(link->session->connection->collector, link, PN_LINK_LOCAL_DETACH);
357
0
  pn_modified(link->session->connection, &link->endpoint, true);
358
359
0
}
360
361
static void pni_terminus_free(pn_terminus_t *terminus)
362
1.01M
{
363
1.01M
  pn_free(terminus->address);
364
1.01M
  pn_bytes_free(terminus->properties_raw);
365
1.01M
  pn_bytes_free(terminus->capabilities_raw);
366
1.01M
  pn_bytes_free(terminus->outcomes_raw);
367
1.01M
  pn_bytes_free(terminus->filter_raw);
368
1.01M
  pn_free(terminus->properties);
369
1.01M
  pn_free(terminus->capabilities);
370
1.01M
  pn_free(terminus->outcomes);
371
1.01M
  pn_free(terminus->filter);
372
1.01M
}
373
374
void pn_link_free(pn_link_t *link)
375
254k
{
376
254k
  assert(!link->endpoint.freed);
377
254k
  pni_remove_link(link->session, link);
378
254k
  pn_list_add(link->session->freed, link);
379
254k
  pn_delivery_t *delivery = link->unsettled_head;
380
257k
  while (delivery) {
381
2.14k
    pn_delivery_t *next = delivery->unsettled_next;
382
2.14k
    pn_delivery_settle(delivery);
383
2.14k
    delivery = next;
384
2.14k
  }
385
254k
  link->endpoint.freed = true;
386
254k
  pn_ep_decref(&link->endpoint);
387
388
  // the finalize logic depends on endpoint.freed (modified above), so
389
  // we incref/decref to give it a chance to rerun
390
254k
  pn_incref(link);
391
254k
  pn_decref(link);
392
254k
}
393
394
void *pn_link_get_context(pn_link_t *link)
395
0
{
396
0
  assert(link);
397
0
  return pn_record_get(link->context, PN_LEGCTX);
398
0
}
399
400
void pn_link_set_context(pn_link_t *link, void *context)
401
0
{
402
0
  assert(link);
403
0
  pn_record_set(link->context, PN_LEGCTX, context);
404
0
}
405
406
pn_record_t *pn_link_attachments(pn_link_t *link)
407
0
{
408
0
  assert(link);
409
0
  return link->context;
410
0
}
411
412
void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
413
541k
{
414
541k
  endpoint->type = (pn_endpoint_type_t) type;
415
541k
  endpoint->referenced = true;
416
541k
  endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
417
541k
  pn_condition_init(&endpoint->condition);
418
541k
  pn_condition_init(&endpoint->remote_condition);
419
541k
  endpoint->endpoint_next = NULL;
420
541k
  endpoint->endpoint_prev = NULL;
421
541k
  endpoint->transport_next = NULL;
422
541k
  endpoint->transport_prev = NULL;
423
541k
  endpoint->modified = false;
424
541k
  endpoint->freed = false;
425
541k
  endpoint->refcount = 1;
426
  //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint);
427
428
541k
  LL_ADD(conn, endpoint, endpoint);
429
541k
}
430
431
void pn_ep_incref(pn_endpoint_t *endpoint)
432
1.08M
{
433
1.08M
  endpoint->refcount++;
434
1.08M
}
435
436
88.7k
static pn_event_type_t pn_final_type(pn_endpoint_type_t type) {
437
88.7k
  switch (type) {
438
18.2k
  case CONNECTION:
439
18.2k
    return PN_CONNECTION_FINAL;
440
32.3k
  case SESSION:
441
32.3k
    return PN_SESSION_FINAL;
442
2.56k
  case SENDER:
443
38.1k
  case RECEIVER:
444
38.1k
    return PN_LINK_FINAL;
445
0
  default:
446
0
    assert(false);
447
0
    return PN_EVENT_NONE;
448
88.7k
  }
449
88.7k
}
450
451
1.31M
static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) {
452
1.31M
  switch (endpoint->type) {
453
0
  case CONNECTION:
454
0
    return NULL;
455
802k
  case SESSION:
456
802k
    return &((pn_session_t *) endpoint)->connection->endpoint;
457
97.6k
  case SENDER:
458
513k
  case RECEIVER:
459
513k
    return &((pn_link_t *) endpoint)->session->endpoint;
460
0
  default:
461
0
    assert(false);
462
0
    return NULL;
463
1.31M
  }
464
1.31M
}
465
466
void pn_ep_decref(pn_endpoint_t *endpoint)
467
1.17M
{
468
1.17M
  assert(endpoint->refcount > 0);
469
1.17M
  endpoint->refcount--;
470
1.17M
  if (endpoint->refcount == 0) {
471
88.7k
    pn_connection_t *conn = pni_ep_get_connection(endpoint);
472
88.7k
    pn_collector_put_object(conn->collector, endpoint, pn_final_type((pn_endpoint_type_t) endpoint->type));
473
88.7k
  }
474
1.17M
}
475
476
static void pni_endpoint_tini(pn_endpoint_t *endpoint)
477
541k
{
478
541k
  pn_condition_tini(&endpoint->remote_condition);
479
541k
  pn_condition_tini(&endpoint->condition);
480
541k
}
481
482
static void pni_free_children(pn_list_t *children, pn_list_t *freed)
483
286k
{
484
286k
  while (pn_list_size(children) > 0) {
485
0
    pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0);
486
0
    assert(!endpoint->referenced);
487
0
    pn_free(endpoint);
488
0
  }
489
490
286k
  while (pn_list_size(freed) > 0) {
491
0
    pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0);
492
0
    assert(!endpoint->referenced);
493
0
    pn_free(endpoint);
494
0
  }
495
496
286k
  pn_free(children);
497
286k
  pn_free(freed);
498
286k
}
499
500
static void pn_connection_finalize(void *object)
501
18.2k
{
502
18.2k
  pn_connection_t *conn = (pn_connection_t *) object;
503
18.2k
  pn_endpoint_t *endpoint = &conn->endpoint;
504
505
18.2k
  if (conn->transport) {
506
0
    assert(!conn->transport->referenced);
507
0
    pn_free(conn->transport);
508
0
  }
509
510
  // freeing the transport could post events
511
18.2k
  if (pn_refcount(conn) > 0) {
512
0
    return;
513
0
  }
514
515
18.2k
  pni_free_children(conn->sessions, conn->freed);
516
18.2k
  pn_free(conn->context);
517
18.2k
  pn_decref(conn->collector);
518
519
18.2k
  pn_free(conn->container);
520
18.2k
  pn_free(conn->hostname);
521
18.2k
  pn_free(conn->auth_user);
522
18.2k
  pn_free(conn->authzid);
523
18.2k
  pn_free(conn->auth_password);
524
18.2k
  pn_bytes_free(conn->offered_capabilities_raw);
525
18.2k
  pn_bytes_free(conn->desired_capabilities_raw);
526
18.2k
  pn_bytes_free(conn->properties_raw);
527
18.2k
  pn_free(conn->offered_capabilities);
528
18.2k
  pn_free(conn->desired_capabilities);
529
18.2k
  pn_free(conn->properties);
530
18.2k
  pn_free(conn->remote_offered_capabilities);
531
18.2k
  pn_free(conn->remote_desired_capabilities);
532
18.2k
  pn_free(conn->remote_properties);
533
18.2k
  pni_endpoint_tini(endpoint);
534
18.2k
  pn_free(conn->delivery_pool);
535
18.2k
}
536
537
18.2k
#define pn_connection_initialize NULL
538
18.2k
#define pn_connection_hashcode NULL
539
18.2k
#define pn_connection_compare NULL
540
18.2k
#define pn_connection_inspect NULL
541
542
pn_connection_t *pn_connection(void)
543
18.2k
{
544
18.2k
  static const pn_class_t clazz = PN_CLASS(pn_connection);
545
18.2k
  pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
546
18.2k
  if (!conn) return NULL;
547
548
18.2k
  conn->endpoint_head = NULL;
549
18.2k
  conn->endpoint_tail = NULL;
550
18.2k
  pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
551
18.2k
  conn->transport_head = NULL;
552
18.2k
  conn->transport_tail = NULL;
553
18.2k
  conn->sessions = pn_list(PN_WEAKREF, 0);
554
18.2k
  conn->freed = pn_list(PN_WEAKREF, 0);
555
18.2k
  conn->transport = NULL;
556
18.2k
  conn->work_head = NULL;
557
18.2k
  conn->work_tail = NULL;
558
18.2k
  conn->tpwork_head = NULL;
559
18.2k
  conn->tpwork_tail = NULL;
560
18.2k
  conn->container = pn_string(NULL);
561
18.2k
  conn->hostname = pn_string(NULL);
562
18.2k
  conn->auth_user = pn_string(NULL);
563
18.2k
  conn->authzid = pn_string(NULL);
564
18.2k
  conn->auth_password = pn_string(NULL);
565
18.2k
  conn->offered_capabilities_raw = (pn_bytes_t){0, NULL};
566
18.2k
  conn->desired_capabilities_raw = (pn_bytes_t){0, NULL};
567
18.2k
  conn->properties_raw = (pn_bytes_t){0, NULL};
568
18.2k
  conn->offered_capabilities = NULL;
569
18.2k
  conn->desired_capabilities = NULL;
570
18.2k
  conn->properties = NULL;
571
18.2k
  conn->remote_offered_capabilities = NULL;
572
18.2k
  conn->remote_desired_capabilities = NULL;
573
18.2k
  conn->remote_properties = NULL;
574
18.2k
  conn->collector = NULL;
575
18.2k
  conn->context = pn_record();
576
18.2k
  conn->delivery_pool = pn_list(&PN_CLASSCLASS(pn_delivery), 0);
577
18.2k
  conn->driver = NULL;
578
579
18.2k
  return conn;
580
18.2k
}
581
582
static const pn_event_type_t endpoint_init_event_map[] = {
583
  PN_CONNECTION_INIT,  /* CONNECTION */
584
  PN_SESSION_INIT,     /* SESSION */
585
  PN_LINK_INIT,        /* SENDER */
586
  PN_LINK_INIT};       /* RECEIVER */
587
588
void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector)
589
36.5k
{
590
36.5k
  pn_decref(connection->collector);
591
36.5k
  connection->collector = collector;
592
36.5k
  pn_incref(connection->collector);
593
36.5k
  pn_endpoint_t *endpoint = connection->endpoint_head;
594
596k
  while (endpoint) {
595
559k
    pn_collector_put_object(connection->collector, endpoint, endpoint_init_event_map[endpoint->type]);
596
559k
    endpoint = endpoint->endpoint_next;
597
559k
  }
598
36.5k
}
599
600
0
pn_collector_t* pn_connection_collector(pn_connection_t *connection) {
601
0
  return connection->collector;
602
0
}
603
604
pn_state_t pn_connection_state(pn_connection_t *connection)
605
0
{
606
0
  return connection ? connection->endpoint.state : 0;
607
0
}
608
609
const char *pn_connection_get_container(pn_connection_t *connection)
610
0
{
611
0
  assert(connection);
612
0
  return pn_string_get(connection->container);
613
0
}
614
615
void pn_connection_set_container(pn_connection_t *connection, const char *container)
616
18.2k
{
617
18.2k
  assert(connection);
618
18.2k
  pn_string_set(connection->container, container);
619
18.2k
}
620
621
const char *pn_connection_get_hostname(pn_connection_t *connection)
622
0
{
623
0
  assert(connection);
624
0
  return pn_string_get(connection->hostname);
625
0
}
626
627
void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname)
628
0
{
629
0
  assert(connection);
630
0
  pn_string_set(connection->hostname, hostname);
631
0
}
632
633
const char *pn_connection_get_user(pn_connection_t *connection)
634
0
{
635
0
    assert(connection);
636
0
    return pn_string_get(connection->auth_user);
637
0
}
638
639
void pn_connection_set_user(pn_connection_t *connection, const char *user)
640
0
{
641
0
    assert(connection);
642
0
    pn_string_set(connection->auth_user, user);
643
0
}
644
645
const char *pn_connection_get_authorization(pn_connection_t *connection)
646
0
{
647
0
  assert(connection);
648
0
  return pn_string_get(connection->authzid);
649
0
}
650
651
void pn_connection_set_authorization(pn_connection_t *connection, const char *authzid)
652
0
{
653
0
  assert(connection);
654
0
  pn_string_set(connection->authzid, authzid);
655
0
}
656
657
void pn_connection_set_password(pn_connection_t *connection, const char *password)
658
0
{
659
0
    assert(connection);
660
    // Make sure the previous password is erased, if there was one.
661
0
    size_t n = pn_string_size(connection->auth_password);
662
0
    const char* s = pn_string_get(connection->auth_password);
663
0
    if (n > 0 && s) memset((void*)s, 0, n);
664
0
    pn_string_set(connection->auth_password, password);
665
0
}
666
667
pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection)
668
0
{
669
0
  assert(connection);
670
0
  pni_switch_to_data(&connection->offered_capabilities_raw, &connection->offered_capabilities);
671
0
  return connection->offered_capabilities;
672
0
}
673
674
pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection)
675
0
{
676
0
  assert(connection);
677
0
  pni_switch_to_data(&connection->desired_capabilities_raw, &connection->desired_capabilities);
678
0
  return connection->desired_capabilities;
679
0
}
680
681
pn_data_t *pn_connection_properties(pn_connection_t *connection)
682
0
{
683
0
  assert(connection);
684
0
  pni_switch_to_data(&connection->properties_raw, &connection->properties);
685
0
  return connection->properties;
686
0
}
687
688
pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection)
689
0
{
690
0
  assert(connection);
691
0
  if (!connection->transport)
692
0
    return NULL;
693
0
  pni_switch_to_data(&connection->transport->remote_offered_capabilities_raw, &connection->remote_offered_capabilities);
694
0
  return connection->remote_offered_capabilities;
695
0
}
696
697
pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection)
698
0
{
699
0
  assert(connection);
700
0
  if (!connection->transport)
701
0
    return NULL;
702
0
  pni_switch_to_data(&connection->transport->remote_desired_capabilities_raw, &connection->remote_desired_capabilities);
703
0
  return connection->remote_desired_capabilities;
704
0
}
705
706
pn_data_t *pn_connection_remote_properties(pn_connection_t *connection)
707
0
{
708
0
  assert(connection);
709
0
  if (!connection->transport)
710
0
    return NULL;
711
0
  pni_switch_to_data(&connection->transport->remote_properties_raw, &connection->remote_properties);
712
0
  return connection->remote_properties;
713
0
}
714
715
const char *pn_connection_remote_container(pn_connection_t *connection)
716
0
{
717
0
  assert(connection);
718
0
  return connection->transport ? connection->transport->remote_container : NULL;
719
0
}
720
721
const char *pn_connection_remote_hostname(pn_connection_t *connection)
722
0
{
723
0
  assert(connection);
724
0
  return connection->transport ? connection->transport->remote_hostname : NULL;
725
0
}
726
727
pn_delivery_t *pn_work_head(pn_connection_t *connection)
728
0
{
729
0
  assert(connection);
730
0
  return connection->work_head;
731
0
}
732
733
pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
734
0
{
735
0
  assert(delivery);
736
737
0
  if (delivery->work)
738
0
    return delivery->work_next;
739
0
  else
740
0
    return pn_work_head(delivery->link->session->connection);
741
0
}
742
743
static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery)
744
6.38k
{
745
6.38k
  if (!delivery->work)
746
3.83k
  {
747
3.83k
    LL_ADD(connection, work, delivery);
748
3.83k
    delivery->work = true;
749
3.83k
  }
750
6.38k
}
751
752
static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery)
753
10.3k
{
754
10.3k
  if (delivery->work)
755
3.83k
  {
756
3.83k
    LL_REMOVE(connection, work, delivery);
757
3.83k
    delivery->work = false;
758
3.83k
  }
759
10.3k
}
760
761
void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery)
762
16.7k
{
763
16.7k
  pn_link_t *link = pn_delivery_link(delivery);
764
16.7k
  pn_delivery_t *current = pn_link_current(link);
765
16.7k
  if (delivery->updated && !delivery->local.settled) {
766
3.39k
    pni_add_work(connection, delivery);
767
13.3k
  } else if (delivery == current) {
768
3.02k
    if (link->endpoint.type == SENDER) {
769
32
      if (pn_link_credit(link) > 0) {
770
0
        pni_add_work(connection, delivery);
771
32
      } else {
772
32
        pni_clear_work(connection, delivery);
773
32
      }
774
2.99k
    } else {
775
2.99k
      pni_add_work(connection, delivery);
776
2.99k
    }
777
10.3k
  } else {
778
10.3k
    pni_clear_work(connection, delivery);
779
10.3k
  }
780
16.7k
}
781
782
static void pni_add_tpwork(pn_delivery_t *delivery)
783
7.64k
{
784
7.64k
  pn_connection_t *connection = delivery->link->session->connection;
785
7.64k
  if (!delivery->tpwork)
786
3.86k
  {
787
3.86k
    LL_ADD(connection, tpwork, delivery);
788
3.86k
    delivery->tpwork = true;
789
3.86k
  }
790
7.64k
  pn_modified(connection, &connection->endpoint, true);
791
7.64k
}
792
793
void pn_clear_tpwork(pn_delivery_t *delivery)
794
5.57k
{
795
5.57k
  pn_connection_t *connection = delivery->link->session->connection;
796
5.57k
  if (delivery->tpwork)
797
3.86k
  {
798
3.86k
    LL_REMOVE(connection, tpwork, delivery);
799
3.86k
    delivery->tpwork = false;
800
3.86k
    if (pn_refcount(delivery) > 0) {
801
1.71k
      pn_incref(delivery);
802
1.71k
      pn_decref(delivery);
803
1.71k
    }
804
3.86k
  }
805
5.57k
}
806
807
void pn_dump(pn_connection_t *conn)
808
0
{
809
0
  pn_endpoint_t *endpoint = conn->transport_head;
810
0
  while (endpoint)
811
0
  {
812
0
    printf("%p", (void *) endpoint);
813
0
    endpoint = endpoint->transport_next;
814
0
    if (endpoint)
815
0
      printf(" -> ");
816
0
  }
817
0
  printf("\n");
818
0
}
819
820
void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit)
821
635k
{
822
635k
  if (!endpoint->modified) {
823
597k
    LL_ADD(connection, transport, endpoint);
824
597k
    endpoint->modified = true;
825
597k
  }
826
827
635k
  if (emit && connection->transport) {
828
560k
    pn_collector_put_object(connection->collector, connection->transport,
829
560k
                            PN_TRANSPORT);
830
560k
  }
831
635k
}
832
833
void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
834
73.5k
{
835
73.5k
  if (endpoint->modified) {
836
73.5k
    LL_REMOVE(connection, transport, endpoint);
837
73.5k
    endpoint->transport_next = NULL;
838
73.5k
    endpoint->transport_prev = NULL;
839
73.5k
    endpoint->modified = false;
840
73.5k
  }
841
73.5k
}
842
843
static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
844
581k
{
845
581k
  if (endpoint->type != type) return false;
846
847
278k
  if (!state) return true;
848
849
0
  int st = endpoint->state;
850
0
  if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
851
0
    return st & state;
852
0
  else
853
0
    return st == state;
854
0
}
855
856
pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
857
286k
{
858
559k
  while (endpoint)
859
541k
  {
860
541k
    if (pni_matches(endpoint, type, state))
861
268k
      return endpoint;
862
273k
    endpoint = endpoint->endpoint_next;
863
273k
  }
864
18.2k
  return NULL;
865
286k
}
866
867
pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state)
868
18.2k
{
869
18.2k
  if (conn)
870
18.2k
    return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state);
871
0
  else
872
0
    return NULL;
873
18.2k
}
874
875
pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state)
876
268k
{
877
268k
  if (ssn)
878
268k
    return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state);
879
0
  else
880
0
    return NULL;
881
268k
}
882
883
pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state)
884
57
{
885
57
  if (!conn) return NULL;
886
887
57
  pn_endpoint_t *endpoint = conn->endpoint_head;
888
889
171
  while (endpoint)
890
171
  {
891
171
    if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state))
892
57
      return (pn_link_t *) endpoint;
893
114
    endpoint = endpoint->endpoint_next;
894
114
  }
895
896
0
  return NULL;
897
57
}
898
899
pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state)
900
9.75k
{
901
9.75k
  if (!link) return NULL;
902
903
9.75k
  pn_endpoint_t *endpoint = link->endpoint.endpoint_next;
904
905
20.2k
  while (endpoint)
906
20.1k
  {
907
20.1k
    if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state))
908
9.69k
      return (pn_link_t *) endpoint;
909
10.4k
    endpoint = endpoint->endpoint_next;
910
10.4k
  }
911
912
57
  return NULL;
913
9.75k
}
914
915
static void pn_session_incref(void *object)
916
1.32M
{
917
1.32M
  pn_session_t *session = (pn_session_t *) object;
918
1.32M
  if (!session->endpoint.referenced) {
919
533k
    session->endpoint.referenced = true;
920
533k
    pn_incref(session->connection);
921
792k
  } else {
922
792k
    pn_object_incref(object);
923
792k
  }
924
1.32M
}
925
926
static bool pn_ep_bound(pn_endpoint_t *endpoint)
927
523k
{
928
523k
  pn_connection_t *conn = pni_ep_get_connection(endpoint);
929
523k
  pn_session_t *ssn;
930
523k
  pn_link_t *lnk;
931
932
523k
  if (!conn->transport) return false;
933
0
  if (endpoint->modified) return true;
934
935
0
  switch (endpoint->type) {
936
0
  case CONNECTION:
937
0
    return ((pn_connection_t *)endpoint)->transport;
938
0
  case SESSION:
939
0
    ssn = (pn_session_t *) endpoint;
940
0
    return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0);
941
0
  case SENDER:
942
0
  case RECEIVER:
943
0
    lnk = (pn_link_t *) endpoint;
944
0
    return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0;
945
0
  default:
946
0
    assert(false);
947
0
    return false;
948
0
  }
949
0
}
950
951
1.33M
static bool pni_connection_live(pn_connection_t *conn) {
952
1.33M
  return pn_refcount(conn) > 1;
953
1.33M
}
954
955
526k
static bool pni_session_live(pn_session_t *ssn) {
956
526k
  return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1;
957
526k
}
958
959
13.2k
static bool pni_link_live(pn_link_t *link) {
960
13.2k
  return pni_session_live(link->session) || pn_refcount(link) > 1;
961
13.2k
}
962
963
1.31M
static bool pni_endpoint_live(pn_endpoint_t *endpoint) {
964
1.31M
  switch (endpoint->type) {
965
802k
  case CONNECTION:
966
802k
    return pni_connection_live((pn_connection_t *)endpoint);
967
513k
  case SESSION:
968
513k
    return pni_session_live((pn_session_t *) endpoint);
969
0
  case SENDER:
970
0
  case RECEIVER:
971
0
    return pni_link_live((pn_link_t *) endpoint);
972
0
  default:
973
0
    assert(false);
974
0
    return false;
975
1.31M
  }
976
1.31M
}
977
978
static bool pni_preserve_child(pn_endpoint_t *endpoint)
979
1.31M
{
980
1.31M
  pn_connection_t *conn = pni_ep_get_connection(endpoint);
981
1.31M
  pn_endpoint_t *parent = pn_ep_parent(endpoint);
982
1.31M
  if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint)))
983
1.31M
      && endpoint->referenced) {
984
792k
    pn_object_incref(endpoint);
985
792k
    endpoint->referenced = false;
986
792k
    pn_decref(parent);
987
792k
    return true;
988
792k
  } else {
989
523k
    LL_REMOVE(conn, transport, endpoint);
990
523k
    return false;
991
523k
  }
992
1.31M
}
993
994
static void pn_session_finalize(void *object)
995
802k
{
996
802k
  pn_session_t *session = (pn_session_t *) object;
997
802k
  pn_endpoint_t *endpoint = &session->endpoint;
998
999
802k
  if (pni_preserve_child(endpoint)) {
1000
533k
    return;
1001
533k
  }
1002
1003
268k
  pn_free(session->context);
1004
268k
  pni_free_children(session->links, session->freed);
1005
268k
  pni_endpoint_tini(endpoint);
1006
268k
  pn_delivery_map_free(&session->state.incoming);
1007
268k
  pn_delivery_map_free(&session->state.outgoing);
1008
268k
  pn_free(session->state.local_handles);
1009
268k
  pn_free(session->state.remote_handles);
1010
268k
  pni_remove_session(session->connection, session);
1011
268k
  pn_list_remove(session->connection->freed, session);
1012
1013
268k
  if (session->connection->transport) {
1014
0
    pn_transport_t *transport = session->connection->transport;
1015
0
    pn_hash_del(transport->local_channels, session->state.local_channel);
1016
0
    pn_hash_del(transport->remote_channels, session->state.remote_channel);
1017
0
  }
1018
1019
268k
  if (endpoint->referenced) {
1020
268k
    pn_decref(session->connection);
1021
268k
  }
1022
268k
}
1023
1024
268k
#define pn_session_new NULL
1025
268k
#define pn_session_refcount NULL
1026
268k
#define pn_session_decref NULL
1027
268k
#define pn_session_initialize NULL
1028
268k
#define pn_session_hashcode NULL
1029
268k
#define pn_session_compare NULL
1030
268k
#define pn_session_inspect NULL
1031
1032
pn_session_t *pn_session(pn_connection_t *conn)
1033
268k
{
1034
268k
  assert(conn);
1035
268k
#define pn_session_free NULL
1036
268k
  static const pn_class_t clazz = PN_METACLASS(pn_session);
1037
268k
#undef pn_session_free
1038
268k
  pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t));
1039
268k
  if (!ssn) return NULL;
1040
268k
  pn_endpoint_init(&ssn->endpoint, SESSION, conn);
1041
268k
  pni_add_session(conn, ssn);
1042
268k
  ssn->links = pn_list(PN_WEAKREF, 0);
1043
268k
  ssn->freed = pn_list(PN_WEAKREF, 0);
1044
268k
  ssn->context = pn_record();
1045
268k
  ssn->incoming_capacity = 0;
1046
268k
  ssn->incoming_bytes = 0;
1047
268k
  ssn->outgoing_bytes = 0;
1048
268k
  ssn->incoming_deliveries = 0;
1049
268k
  ssn->outgoing_deliveries = 0;
1050
268k
  ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
1051
268k
  ssn->local_handle_max = PN_IMPL_HANDLE_MAX;
1052
268k
  ssn->incoming_window_lwm = 1;
1053
268k
  ssn->check_flow = false;
1054
268k
  ssn->need_flow = false;
1055
268k
  ssn->lwm_default = true;
1056
1057
  // begin transport state
1058
268k
  memset(&ssn->state, 0, sizeof(ssn->state));
1059
268k
  ssn->state.remote_handle_max = UINT32_MAX;
1060
268k
  ssn->state.local_channel = (uint16_t)-1;
1061
268k
  ssn->state.remote_channel = (uint16_t)-1;
1062
268k
  pn_delivery_map_init(&ssn->state.incoming, 0);
1063
268k
  pn_delivery_map_init(&ssn->state.outgoing, 0);
1064
268k
  ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75);
1065
268k
  ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75);
1066
  // end transport state
1067
1068
268k
  pn_collector_put_object(conn->collector, ssn, PN_SESSION_INIT);
1069
268k
  if (conn->transport) {
1070
250k
    pni_session_bound(ssn);
1071
250k
  }
1072
268k
  pn_decref(ssn);
1073
268k
  return ssn;
1074
268k
}
1075
1076
static void pni_session_bound(pn_session_t *ssn)
1077
268k
{
1078
268k
  assert(ssn);
1079
268k
  size_t nlinks = pn_list_size(ssn->links);
1080
286k
  for (size_t i = 0; i < nlinks; i++) {
1081
18.2k
    pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i));
1082
18.2k
  }
1083
268k
}
1084
1085
void pn_session_unbound(pn_session_t* ssn)
1086
36.7k
{
1087
36.7k
  assert(ssn);
1088
36.7k
  ssn->state.local_channel = (uint16_t)-1;
1089
36.7k
  ssn->state.remote_channel = (uint16_t)-1;
1090
36.7k
  ssn->incoming_bytes = 0;
1091
36.7k
  ssn->outgoing_bytes = 0;
1092
36.7k
  ssn->incoming_deliveries = 0;
1093
36.7k
  ssn->outgoing_deliveries = 0;
1094
36.7k
}
1095
1096
size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
1097
0
{
1098
0
  assert(ssn);
1099
0
  return ssn->incoming_capacity;
1100
0
}
1101
1102
// Update required when (re)set by user or when session started (proxy: BEGIN frame).  No
1103
// session flow control actually means flow control with huge window, so set lwm to 1.  There is
1104
// low probability of a stall.  Any link credit flow frame will update session credit too.
1105
0
void pni_session_update_incoming_lwm(pn_session_t *ssn) {
1106
0
  if (ssn->incoming_capacity) {
1107
    // Old API.
1108
0
    if (!ssn->connection->transport)
1109
0
      return; // Defer until called again from BEGIN frame setup with max frame known.
1110
0
    if (ssn->connection->transport->local_max_frame) {
1111
0
      ssn->incoming_window_lwm = (ssn->incoming_capacity / ssn->connection->transport->local_max_frame) / 2;
1112
0
      if (!ssn->incoming_window_lwm)
1113
0
        ssn->incoming_window_lwm = 1; // Zero may hang.
1114
0
    } else {
1115
0
      ssn->incoming_window_lwm = 1;
1116
0
    }
1117
0
  } else if (ssn->max_incoming_window) {
1118
    // New API.
1119
    // Only need to deal with default.  Called whensending BEGIN frame.
1120
0
    if (ssn->connection->transport && ssn->connection->transport->local_max_frame && ssn->lwm_default) {
1121
0
      ssn->incoming_window_lwm = (ssn->max_incoming_window + 1) / 2;
1122
0
    }
1123
0
  } else {
1124
0
    ssn->incoming_window_lwm = 1;
1125
0
  }
1126
0
  assert(ssn->incoming_window_lwm != 0);  // 0 allows session flow to hang
1127
0
}
1128
1129
void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
1130
0
{
1131
0
  assert(ssn);
1132
0
  ssn->incoming_capacity = capacity;
1133
0
  ssn->max_incoming_window = 0;
1134
0
  ssn->incoming_window_lwm = 1;
1135
0
  ssn->lwm_default = true;
1136
0
  if (ssn->connection->transport) {
1137
0
    ssn->check_flow = true;
1138
0
    ssn->need_flow = true;
1139
0
    pn_modified(ssn->connection, &ssn->endpoint, false);
1140
0
  }
1141
0
  pni_session_update_incoming_lwm(ssn);
1142
  // If capacity invalid, failure occurs when transport calculates value of incoming window.
1143
0
}
1144
1145
0
int pn_session_set_incoming_window_and_lwm(pn_session_t *ssn, pn_frame_count_t window, pn_frame_count_t lwm) {
1146
0
  assert(ssn);
1147
0
  if (!window || (lwm && lwm > window))
1148
0
    return PN_ARG_ERR;
1149
  // Settings fixed after session open for simplicity.  AMPQ actually allows dynamic change with risk
1150
  // of overflow if window reduced while transfers in flight.
1151
0
  if (ssn->endpoint.state & PN_LOCAL_ACTIVE)
1152
0
    return PN_STATE_ERR;
1153
0
  ssn->incoming_capacity = 0;
1154
0
  ssn->max_incoming_window = window;
1155
0
  ssn->lwm_default = (lwm == 0);
1156
0
  ssn->incoming_window_lwm = lwm;
1157
0
  return 0;
1158
0
}
1159
1160
0
pn_frame_count_t pn_session_incoming_window(pn_session_t *ssn) {
1161
0
  return ssn->max_incoming_window;
1162
0
}
1163
1164
0
pn_frame_count_t pn_session_incoming_window_lwm(pn_session_t *ssn) {
1165
0
  return (!ssn->max_incoming_window || ssn->lwm_default) ? 0 : ssn->incoming_window_lwm;
1166
0
}
1167
1168
0
pn_frame_count_t pn_session_remote_incoming_window(pn_session_t *ssn) {
1169
0
  return ssn->state.remote_incoming_window;
1170
0
}
1171
1172
size_t pn_session_get_outgoing_window(pn_session_t *ssn)
1173
0
{
1174
0
  assert(ssn);
1175
0
  return ssn->outgoing_window;
1176
0
}
1177
1178
void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window)
1179
0
{
1180
0
  assert(ssn);
1181
0
  ssn->outgoing_window = window;
1182
0
}
1183
1184
size_t pn_session_outgoing_bytes(pn_session_t *ssn)
1185
0
{
1186
0
  assert(ssn);
1187
0
  return ssn->outgoing_bytes;
1188
0
}
1189
1190
size_t pn_session_incoming_bytes(pn_session_t *ssn)
1191
0
{
1192
0
  assert(ssn);
1193
0
  return ssn->incoming_bytes;
1194
0
}
1195
1196
pn_state_t pn_session_state(pn_session_t *session)
1197
0
{
1198
0
  return session->endpoint.state;
1199
0
}
1200
1201
static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type)
1202
1.01M
{
1203
1.01M
  terminus->type = type;
1204
1.01M
  terminus->address = pn_string(NULL);
1205
1.01M
  terminus->durability = PN_NONDURABLE;
1206
1.01M
  terminus->has_expiry_policy = false;
1207
1.01M
  terminus->expiry_policy = PN_EXPIRE_WITH_SESSION;
1208
1.01M
  terminus->timeout = 0;
1209
1.01M
  terminus->dynamic = false;
1210
1.01M
  terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED;
1211
1.01M
  terminus->properties_raw = (pn_bytes_t){0, NULL};
1212
1.01M
  terminus->capabilities_raw = (pn_bytes_t){0, NULL};
1213
1.01M
  terminus->outcomes_raw = (pn_bytes_t){0, NULL};
1214
1.01M
  terminus->filter_raw = (pn_bytes_t){0, NULL};
1215
1.01M
  terminus->properties = NULL;
1216
1.01M
  terminus->capabilities = NULL;
1217
1.01M
  terminus->outcomes = NULL;
1218
1.01M
  terminus->filter = NULL;
1219
1.01M
}
1220
1221
static void pn_link_incref(void *object)
1222
781k
{
1223
781k
  pn_link_t *link = (pn_link_t *) object;
1224
781k
  if (!link->endpoint.referenced) {
1225
258k
    link->endpoint.referenced = true;
1226
258k
    pn_incref(link->session);
1227
522k
  } else {
1228
522k
    pn_object_incref(object);
1229
522k
  }
1230
781k
}
1231
1232
static void pn_link_finalize(void *object)
1233
513k
{
1234
513k
  pn_link_t *link = (pn_link_t *) object;
1235
513k
  pn_endpoint_t *endpoint = &link->endpoint;
1236
1237
513k
  if (pni_preserve_child(endpoint)) {
1238
258k
    return;
1239
258k
  }
1240
1241
256k
  while (link->unsettled_head) {
1242
1.91k
    assert(!link->unsettled_head->referenced);
1243
1.91k
    pn_free(link->unsettled_head);
1244
1.91k
  }
1245
1246
254k
  pn_free(link->context);
1247
254k
  pni_terminus_free(&link->source);
1248
254k
  pni_terminus_free(&link->target);
1249
254k
  pni_terminus_free(&link->remote_source);
1250
254k
  pni_terminus_free(&link->remote_target);
1251
254k
  pn_free(link->name);
1252
254k
  pni_endpoint_tini(endpoint);
1253
254k
  pni_remove_link(link->session, link);
1254
254k
  pn_hash_del(link->session->state.local_handles, link->state.local_handle);
1255
254k
  pn_hash_del(link->session->state.remote_handles, link->state.remote_handle);
1256
254k
  pn_list_remove(link->session->freed, link);
1257
254k
  if (endpoint->referenced) {
1258
254k
    pn_decref(link->session);
1259
254k
  }
1260
254k
  pn_free(link->properties);
1261
254k
  pn_bytes_free(link->properties_raw);
1262
254k
  pn_free(link->remote_properties);
1263
254k
  pn_bytes_free(link->remote_properties_raw);
1264
254k
}
1265
1266
254k
#define pn_link_refcount NULL
1267
254k
#define pn_link_decref NULL
1268
254k
#define pn_link_initialize NULL
1269
254k
#define pn_link_hashcode NULL
1270
254k
#define pn_link_compare NULL
1271
254k
#define pn_link_inspect NULL
1272
1273
pn_link_t *pn_link_new(int type, pn_session_t *session, pn_string_t *name)
1274
254k
{
1275
254k
#define pn_link_new NULL
1276
254k
#define pn_link_free NULL
1277
254k
  static const pn_class_t clazz = PN_METACLASS(pn_link);
1278
254k
#undef pn_link_new
1279
254k
#undef pn_link_free
1280
254k
  pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t));
1281
1282
254k
  pn_endpoint_init(&link->endpoint, type, session->connection);
1283
254k
  pni_add_link(session, link);
1284
254k
  pn_incref(session);  // keep session until link finalized
1285
254k
  link->name = name;
1286
254k
  pni_terminus_init(&link->source, PN_SOURCE);
1287
254k
  pni_terminus_init(&link->target, PN_TARGET);
1288
254k
  pni_terminus_init(&link->remote_source, PN_UNSPECIFIED);
1289
254k
  pni_terminus_init(&link->remote_target, PN_UNSPECIFIED);
1290
254k
  link->unsettled_head = link->unsettled_tail = link->current = NULL;
1291
254k
  link->unsettled_count = 0;
1292
254k
  link->max_message_size = 0;
1293
254k
  link->remote_max_message_size = 0;
1294
254k
  link->available = 0;
1295
254k
  link->credit = 0;
1296
254k
  link->queued = 0;
1297
254k
  link->more_id = 0;
1298
254k
  link->drain = false;
1299
254k
  link->drain_flag_mode = true;
1300
254k
  link->drained = 0;
1301
254k
  link->context = pn_record();
1302
254k
  link->snd_settle_mode = PN_SND_MIXED;
1303
254k
  link->rcv_settle_mode = PN_RCV_FIRST;
1304
254k
  link->remote_snd_settle_mode = PN_SND_MIXED;
1305
254k
  link->remote_rcv_settle_mode = PN_RCV_FIRST;
1306
254k
  link->detached = false;
1307
254k
  link->more_pending = false;
1308
254k
  link->properties = 0;
1309
254k
  link->properties_raw = (pn_bytes_t){0, NULL};
1310
254k
  link->remote_properties = 0;
1311
254k
  link->remote_properties_raw = (pn_bytes_t){0, NULL};
1312
1313
  // begin transport state
1314
254k
  link->state.local_handle = -1;
1315
254k
  link->state.remote_handle = -1;
1316
254k
  link->state.delivery_count = 0;
1317
254k
  link->state.link_credit = 0;
1318
  // end transport state
1319
1320
254k
  pn_collector_put_object(session->connection->collector, link, PN_LINK_INIT);
1321
254k
  if (session->connection->transport) {
1322
236k
    pni_link_bound(link);
1323
236k
  }
1324
254k
  pn_decref(link);
1325
254k
  return link;
1326
254k
}
1327
1328
static void pni_link_bound(pn_link_t *link)
1329
254k
{
1330
254k
}
1331
1332
void pn_link_unbound(pn_link_t* link)
1333
32.2k
{
1334
32.2k
  assert(link);
1335
32.2k
  link->state.local_handle = -1;
1336
32.2k
  link->state.remote_handle = -1;
1337
32.2k
  link->state.delivery_count = 0;
1338
32.2k
  link->state.link_credit = 0;
1339
32.2k
}
1340
1341
pn_terminus_t *pn_link_source(pn_link_t *link)
1342
18.2k
{
1343
18.2k
  return link ? &link->source : NULL;
1344
18.2k
}
1345
1346
pn_terminus_t *pn_link_target(pn_link_t *link)
1347
0
{
1348
0
  return link ? &link->target : NULL;
1349
0
}
1350
1351
pn_terminus_t *pn_link_remote_source(pn_link_t *link)
1352
0
{
1353
0
  return link ? &link->remote_source : NULL;
1354
0
}
1355
1356
pn_terminus_t *pn_link_remote_target(pn_link_t *link)
1357
0
{
1358
0
  return link ? &link->remote_target : NULL;
1359
0
}
1360
1361
int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type)
1362
473k
{
1363
473k
  if (!terminus) return PN_ARG_ERR;
1364
473k
  terminus->type = type;
1365
473k
  return 0;
1366
473k
}
1367
1368
pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus)
1369
236k
{
1370
236k
  return (pn_terminus_type_t) (terminus ? terminus->type : 0);
1371
236k
}
1372
1373
const char *pn_terminus_get_address(pn_terminus_t *terminus)
1374
0
{
1375
0
  assert(terminus);
1376
0
  return pn_string_get(terminus->address);
1377
0
}
1378
1379
int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
1380
18.2k
{
1381
18.2k
  assert(terminus);
1382
18.2k
  return pn_string_set(terminus->address, address);
1383
18.2k
}
1384
1385
pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus)
1386
0
{
1387
0
  return (pn_durability_t) (terminus ? terminus->durability : 0);
1388
0
}
1389
1390
int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability)
1391
17.7k
{
1392
17.7k
  if (!terminus) return PN_ARG_ERR;
1393
17.7k
  terminus->durability = durability;
1394
17.7k
  return 0;
1395
17.7k
}
1396
1397
pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus)
1398
0
{
1399
0
  return (pn_expiry_policy_t) (terminus ? terminus->expiry_policy : 0);
1400
0
}
1401
1402
bool pn_terminus_has_expiry_policy(const pn_terminus_t *terminus)
1403
0
{
1404
0
    return terminus && terminus->has_expiry_policy;
1405
0
}
1406
1407
int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy)
1408
286
{
1409
286
  if (!terminus) return PN_ARG_ERR;
1410
286
  terminus->expiry_policy = expiry_policy;
1411
286
  terminus->has_expiry_policy = true;
1412
286
  return 0;
1413
286
}
1414
1415
pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus)
1416
0
{
1417
0
  return terminus ? terminus->timeout : 0;
1418
0
}
1419
1420
int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout)
1421
17.7k
{
1422
17.7k
  if (!terminus) return PN_ARG_ERR;
1423
17.7k
  terminus->timeout = timeout;
1424
17.7k
  return 0;
1425
17.7k
}
1426
1427
bool pn_terminus_is_dynamic(pn_terminus_t *terminus)
1428
0
{
1429
0
  return terminus ? terminus->dynamic : false;
1430
0
}
1431
1432
int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic)
1433
17.7k
{
1434
17.7k
  if (!terminus) return PN_ARG_ERR;
1435
17.7k
  terminus->dynamic = dynamic;
1436
17.7k
  return 0;
1437
17.7k
}
1438
1439
pn_data_t *pn_terminus_properties(pn_terminus_t *terminus)
1440
0
{
1441
0
  if (!terminus)
1442
0
    return NULL;
1443
0
  pni_switch_to_data(&terminus->properties_raw, &terminus->properties);
1444
0
  return terminus->properties;
1445
0
}
1446
1447
pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus)
1448
0
{
1449
0
  if (!terminus)
1450
0
    return NULL;
1451
0
  pni_switch_to_data(&terminus->capabilities_raw, &terminus->capabilities);
1452
0
  return terminus->capabilities;
1453
0
}
1454
1455
pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus)
1456
0
{
1457
0
  if (!terminus)
1458
0
    return NULL;
1459
0
  pni_switch_to_data(&terminus->outcomes_raw, &terminus->outcomes);
1460
0
  return terminus->outcomes;
1461
0
}
1462
1463
pn_data_t *pn_terminus_filter(pn_terminus_t *terminus)
1464
0
{
1465
0
  if (!terminus)
1466
0
    return NULL;
1467
0
  pni_switch_to_data(&terminus->filter_raw, &terminus->filter);
1468
0
  return terminus->filter;
1469
0
}
1470
1471
pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus)
1472
0
{
1473
0
  return terminus ? (pn_distribution_mode_t) terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED;
1474
0
}
1475
1476
int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m)
1477
13.9k
{
1478
13.9k
  if (!terminus) return PN_ARG_ERR;
1479
13.9k
  terminus->distribution_mode = m;
1480
13.9k
  return 0;
1481
13.9k
}
1482
1483
int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src)
1484
0
{
1485
0
  if (!terminus || !src) {
1486
0
    return PN_ARG_ERR;
1487
0
  }
1488
1489
0
  terminus->type = src->type;
1490
0
  int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src));
1491
0
  if (err) return err;
1492
0
  terminus->durability = src->durability;
1493
0
  terminus->has_expiry_policy = src->has_expiry_policy;
1494
0
  terminus->expiry_policy = src->expiry_policy;
1495
0
  terminus->timeout = src->timeout;
1496
0
  terminus->dynamic = src->dynamic;
1497
0
  terminus->distribution_mode = src->distribution_mode;
1498
0
  pn_bytes_free(terminus->properties_raw);
1499
0
  terminus->properties_raw = pn_bytes_dup(src->properties_raw);
1500
0
  pn_bytes_free(terminus->capabilities_raw);
1501
0
  terminus->capabilities_raw = pn_bytes_dup(src->capabilities_raw);
1502
0
  pn_bytes_free(terminus->outcomes_raw);
1503
0
  terminus->outcomes_raw = pn_bytes_dup(src->outcomes_raw);
1504
0
  pn_bytes_free(terminus->filter_raw);
1505
0
  terminus->filter_raw = pn_bytes_dup(src->filter_raw);
1506
0
  if (!src->properties) {
1507
0
    pn_free(terminus->properties);
1508
0
    terminus->properties = NULL;
1509
0
  } else {
1510
0
    if (!terminus->properties) terminus->properties = pn_data(0);
1511
0
    err = pn_data_copy(terminus->properties, src->properties);
1512
0
    if (err) return err;
1513
0
  }
1514
0
  if (!src->capabilities) {
1515
0
    pn_free(terminus->capabilities);
1516
0
    terminus->capabilities = NULL;
1517
0
  } else {
1518
0
    if (!terminus->capabilities) terminus->capabilities = pn_data(0);
1519
0
    err = pn_data_copy(terminus->capabilities, src->capabilities);
1520
0
    if (err) return err;
1521
0
  }
1522
0
  if (!src->outcomes) {
1523
0
    pn_free(terminus->outcomes);
1524
0
    terminus->outcomes = NULL;
1525
0
  } else {
1526
0
    if (!terminus->outcomes) terminus->outcomes = pn_data(0);
1527
0
    err = pn_data_copy(terminus->outcomes, src->outcomes);
1528
0
    if (err) return err;
1529
0
  }
1530
0
  if (!src->filter) {
1531
0
    pn_free(terminus->filter);
1532
0
    terminus->filter = NULL;
1533
0
  } else {
1534
0
    if (!terminus->filter) terminus->filter = pn_data(0);
1535
0
    err = pn_data_copy(terminus->filter, src->filter);
1536
0
    if (err) return err;
1537
0
  }
1538
0
  return 0;
1539
0
}
1540
1541
pn_link_t *pn_sender(pn_session_t *session, const char *name)
1542
0
{
1543
0
  return pn_link_new(SENDER, session, pn_string(name));
1544
0
}
1545
1546
pn_link_t *pn_receiver(pn_session_t *session, const char *name)
1547
18.2k
{
1548
18.2k
  return pn_link_new(RECEIVER, session, pn_string(name));
1549
18.2k
}
1550
1551
pn_state_t pn_link_state(pn_link_t *link)
1552
0
{
1553
0
  return link->endpoint.state;
1554
0
}
1555
1556
const char *pn_link_name(pn_link_t *link)
1557
0
{
1558
0
  assert(link);
1559
0
  return pn_string_get(link->name);
1560
0
}
1561
1562
bool pn_link_is_sender(pn_link_t *link)
1563
15.3k
{
1564
15.3k
  return link->endpoint.type == SENDER;
1565
15.3k
}
1566
1567
bool pn_link_is_receiver(pn_link_t *link)
1568
4.77k
{
1569
4.77k
  return link->endpoint.type == RECEIVER;
1570
4.77k
}
1571
1572
pn_session_t *pn_link_session(pn_link_t *link)
1573
5.34k
{
1574
5.34k
  assert(link);
1575
5.34k
  return link->session;
1576
5.34k
}
1577
1578
static void pn_disposition_finalize(pn_disposition_t *ds)
1579
7.55k
{
1580
7.55k
  pn_free(ds->data);
1581
7.55k
  pn_bytes_free(ds->data_raw);
1582
7.55k
  pn_free(ds->annotations);
1583
7.55k
  pn_bytes_free(ds->annotations_raw);
1584
7.55k
  pn_condition_tini(&ds->condition);
1585
7.55k
}
1586
1587
static void pn_delivery_incref(void *object)
1588
15.9k
{
1589
15.9k
  pn_delivery_t *delivery = (pn_delivery_t *) object;
1590
15.9k
  if (delivery->link && !delivery->referenced) {
1591
7.49k
    delivery->referenced = true;
1592
7.49k
    pn_incref(delivery->link);
1593
8.42k
  } else {
1594
8.42k
    pn_object_incref(object);
1595
8.42k
  }
1596
15.9k
}
1597
1598
static bool pni_preserve_delivery(pn_delivery_t *delivery)
1599
13.2k
{
1600
13.2k
  pn_connection_t *conn = delivery->link->session->connection;
1601
13.2k
  return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork));
1602
13.2k
}
1603
1604
static void pn_delivery_finalize(void *object)
1605
17.0k
{
1606
17.0k
  pn_delivery_t *delivery = (pn_delivery_t *) object;
1607
17.0k
  pn_link_t *link = delivery->link;
1608
  //  assert(!delivery->state.init);
1609
1610
17.0k
  bool pooled = false;
1611
17.0k
  bool referenced = true;
1612
17.0k
  if (link) {
1613
13.2k
    if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) {
1614
9.40k
      delivery->referenced = false;
1615
9.40k
      pn_object_incref(delivery);
1616
9.40k
      pn_decref(link);
1617
9.40k
      return;
1618
9.40k
    }
1619
3.86k
    referenced = delivery->referenced;
1620
1621
3.86k
    pn_clear_tpwork(delivery);
1622
3.86k
    LL_REMOVE(link, unsettled, delivery);
1623
3.86k
    pn_delivery_map_del(pn_link_is_sender(link)
1624
3.86k
                        ? &link->session->state.outgoing
1625
3.86k
                        : &link->session->state.incoming,
1626
3.86k
                        delivery);
1627
3.86k
    pn_bytes_free(delivery->tag);
1628
3.86k
    delivery->tag = (pn_delivery_tag_t){0, NULL};
1629
3.86k
    pn_buffer_clear(delivery->bytes);
1630
3.86k
    pn_record_clear(delivery->context);
1631
3.86k
    delivery->settled = true;
1632
3.86k
    pn_connection_t *conn = link->session->connection;
1633
3.86k
    assert(pn_refcount(delivery) == 0);
1634
3.86k
    if (pni_connection_live(conn)) {
1635
3.86k
      pn_list_t *pool = link->session->connection->delivery_pool;
1636
3.86k
      delivery->link = NULL;
1637
3.86k
      pn_list_add(pool, delivery);
1638
3.86k
      pooled = true;
1639
3.86k
      assert(pn_refcount(delivery) == 1);
1640
3.86k
    }
1641
3.86k
  }
1642
1643
7.64k
  if (!pooled) {
1644
3.77k
    pn_free(delivery->context);
1645
3.77k
    pn_bytes_free(delivery->tag);
1646
3.77k
    delivery->tag = (pn_delivery_tag_t){0, NULL};
1647
3.77k
    pn_buffer_free(delivery->bytes);
1648
3.77k
    pn_disposition_finalize(&delivery->local);
1649
3.77k
    pn_disposition_finalize(&delivery->remote);
1650
3.77k
  }
1651
1652
7.64k
  if (referenced) {
1653
5.72k
    pn_decref(link);
1654
5.72k
  }
1655
7.64k
}
1656
1657
static void pn_disposition_init(pn_disposition_t *ds)
1658
7.55k
{
1659
7.55k
  ds->data = NULL;
1660
7.55k
  ds->data_raw = (pn_bytes_t){0, NULL};
1661
7.55k
  ds->annotations = NULL;
1662
7.55k
  ds->annotations_raw = (pn_bytes_t){0, NULL};
1663
7.55k
  pn_condition_init(&ds->condition);
1664
7.55k
}
1665
1666
static void pn_disposition_clear(pn_disposition_t *ds)
1667
7.72k
{
1668
7.72k
  ds->type = 0;
1669
7.72k
  ds->section_number = 0;
1670
7.72k
  ds->section_offset = 0;
1671
7.72k
  ds->failed = false;
1672
7.72k
  ds->undeliverable = false;
1673
7.72k
  ds->settled = false;
1674
7.72k
  pn_data_clear(ds->data);
1675
7.72k
  pn_bytes_free(ds->data_raw);
1676
7.72k
  ds->data_raw = (pn_bytes_t){0, NULL};
1677
7.72k
  pn_data_clear(ds->annotations);
1678
7.72k
  pn_bytes_free(ds->annotations_raw);
1679
7.72k
  ds->annotations_raw = (pn_bytes_t){0, NULL};
1680
7.72k
  pn_condition_clear(&ds->condition);
1681
7.72k
}
1682
1683
0
void pn_delivery_inspect(void *obj, pn_fixed_string_t *dst) {
1684
0
  pn_delivery_t *d = (pn_delivery_t*)obj;
1685
0
  const char* dir = pn_link_is_sender(d->link) ? "sending" : "receiving";
1686
0
  pn_bytes_t bytes = d->tag;
1687
0
  pn_fixed_string_addf(dst, "pn_delivery<%p>{%s, tag=b\"", obj, dir);
1688
0
  pn_fixed_string_quote(dst, bytes.start, bytes.size);
1689
0
  pn_fixed_string_addf(dst, "\", local=%s, remote=%s}",
1690
0
                       pn_disposition_type_name(d->local.type),
1691
0
                       pn_disposition_type_name(d->remote.type));
1692
0
  return;
1693
0
}
1694
1695
3.86k
pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) {
1696
3.86k
  pn_delivery_tag_t dtag = {size, bytes};
1697
3.86k
  return dtag;
1698
3.86k
}
1699
1700
pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
1701
3.86k
{
1702
3.86k
  assert(link);
1703
3.86k
  pn_list_t *pool = link->session->connection->delivery_pool;
1704
3.86k
  pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool);
1705
3.86k
  if (!delivery) {
1706
3.77k
    delivery = (pn_delivery_t *) pn_class_new(&PN_CLASSCLASS(pn_delivery), sizeof(pn_delivery_t));
1707
3.77k
    if (!delivery) return NULL;
1708
3.77k
    delivery->bytes = pn_buffer(64);
1709
3.77k
    pn_disposition_init(&delivery->local);
1710
3.77k
    pn_disposition_init(&delivery->remote);
1711
3.77k
    delivery->context = pn_record();
1712
3.77k
  } else {
1713
84
    assert(!delivery->state.init);
1714
84
  }
1715
3.86k
  delivery->link = link;
1716
3.86k
  pn_incref(delivery->link);  // keep link until finalized
1717
3.86k
  delivery->tag = pn_bytes_dup(tag);
1718
3.86k
  pn_disposition_clear(&delivery->local);
1719
3.86k
  pn_disposition_clear(&delivery->remote);
1720
3.86k
  delivery->updated = false;
1721
3.86k
  delivery->settled = false;
1722
3.86k
  LL_ADD(link, unsettled, delivery);
1723
3.86k
  delivery->referenced = true;
1724
3.86k
  delivery->work_next = NULL;
1725
3.86k
  delivery->work_prev = NULL;
1726
3.86k
  delivery->work = false;
1727
3.86k
  delivery->tpwork_next = NULL;
1728
3.86k
  delivery->tpwork_prev = NULL;
1729
3.86k
  delivery->tpwork = false;
1730
3.86k
  pn_buffer_clear(delivery->bytes);
1731
3.86k
  delivery->done = false;
1732
3.86k
  delivery->aborted = false;
1733
3.86k
  pn_record_clear(delivery->context);
1734
1735
  // begin delivery state
1736
3.86k
  delivery->state.init = false;
1737
3.86k
  delivery->state.sending = false; /* True if we have sent at least 1 frame */
1738
3.86k
  delivery->state.sent = false;    /* True if we have sent the entire delivery */
1739
  // end delivery state
1740
1741
3.86k
  if (!link->current)
1742
321
    link->current = delivery;
1743
1744
3.86k
  link->unsettled_count++;
1745
1746
3.86k
  pn_work_update(link->session->connection, delivery);
1747
1748
  // XXX: could just remove incref above
1749
3.86k
  pn_decref(delivery);
1750
1751
3.86k
  return delivery;
1752
3.86k
}
1753
1754
bool pn_delivery_buffered(pn_delivery_t *delivery)
1755
0
{
1756
0
  assert(delivery);
1757
0
  if (delivery->settled) return false;
1758
0
  if (pn_link_is_sender(delivery->link)) {
1759
0
    pn_delivery_state_t *state = &delivery->state;
1760
0
    if (state->sent) {
1761
0
      return false;
1762
0
    } else {
1763
0
      return delivery->done || (pn_buffer_size(delivery->bytes) > 0);
1764
0
    }
1765
0
  } else {
1766
0
    return false;
1767
0
  }
1768
0
}
1769
1770
int pn_link_unsettled(pn_link_t *link)
1771
0
{
1772
0
  return link->unsettled_count;
1773
0
}
1774
1775
pn_delivery_t *pn_unsettled_head(pn_link_t *link)
1776
0
{
1777
0
  pn_delivery_t *d = link->unsettled_head;
1778
0
  while (d && d->local.settled) {
1779
0
    d = d->unsettled_next;
1780
0
  }
1781
0
  return d;
1782
0
}
1783
1784
pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
1785
0
{
1786
0
  pn_delivery_t *d = delivery->unsettled_next;
1787
0
  while (d && d->local.settled) {
1788
0
    d = d->unsettled_next;
1789
0
  }
1790
0
  return d;
1791
0
}
1792
1793
bool pn_delivery_current(pn_delivery_t *delivery)
1794
7.88k
{
1795
7.88k
  pn_link_t *link = delivery->link;
1796
7.88k
  return pn_link_current(link) == delivery;
1797
7.88k
}
1798
1799
void pn_delivery_dump(pn_delivery_t *d)
1800
0
{
1801
0
  char tag[1024];
1802
0
  pn_bytes_t bytes = d->tag;
1803
0
  pn_quote_data(tag, 1024, bytes.start, bytes.size);
1804
0
  printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%d, "
1805
0
         "remote.settled=%d, updated=%d, current=%d, writable=%d, readable=%d, "
1806
0
         "work=%d}",
1807
0
         tag, d->local.type, d->remote.type, d->local.settled,
1808
0
         d->remote.settled, d->updated, pn_delivery_current(d),
1809
0
         pn_delivery_writable(d), pn_delivery_readable(d), d->work);
1810
0
}
1811
1812
void *pn_delivery_get_context(pn_delivery_t *delivery)
1813
0
{
1814
0
  assert(delivery);
1815
0
  return pn_record_get(delivery->context, PN_LEGCTX);
1816
0
}
1817
1818
void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
1819
0
{
1820
0
  assert(delivery);
1821
0
  pn_record_set(delivery->context, PN_LEGCTX, context);
1822
0
}
1823
1824
pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery)
1825
0
{
1826
0
  assert(delivery);
1827
0
  return delivery->context;
1828
0
}
1829
1830
uint64_t pn_disposition_type(pn_disposition_t *disposition)
1831
0
{
1832
0
  assert(disposition);
1833
0
  return disposition->type;
1834
0
}
1835
1836
pn_data_t *pn_disposition_data(pn_disposition_t *disposition)
1837
0
{
1838
0
  assert(disposition);
1839
0
  pni_switch_to_data(&disposition->data_raw, &disposition->data);
1840
0
  return disposition->data;
1841
0
}
1842
1843
uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition)
1844
0
{
1845
0
  assert(disposition);
1846
0
  return disposition->section_number;
1847
0
}
1848
1849
void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number)
1850
0
{
1851
0
  assert(disposition);
1852
0
  disposition->section_number = section_number;
1853
0
}
1854
1855
uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition)
1856
0
{
1857
0
  assert(disposition);
1858
0
  return disposition->section_offset;
1859
0
}
1860
1861
void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset)
1862
0
{
1863
0
  assert(disposition);
1864
0
  disposition->section_offset = section_offset;
1865
0
}
1866
1867
bool pn_disposition_is_failed(pn_disposition_t *disposition)
1868
0
{
1869
0
  assert(disposition);
1870
0
  return disposition->failed;
1871
0
}
1872
1873
void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed)
1874
0
{
1875
0
  assert(disposition);
1876
0
  disposition->failed = failed;
1877
0
}
1878
1879
bool pn_disposition_is_undeliverable(pn_disposition_t *disposition)
1880
0
{
1881
0
  assert(disposition);
1882
0
  return disposition->undeliverable;
1883
0
}
1884
1885
void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable)
1886
0
{
1887
0
  assert(disposition);
1888
0
  disposition->undeliverable = undeliverable;
1889
0
}
1890
1891
pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition)
1892
0
{
1893
0
  assert(disposition);
1894
0
  pni_switch_to_data(&disposition->annotations_raw, &disposition->annotations);
1895
0
  return disposition->annotations;
1896
0
}
1897
1898
pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition)
1899
0
{
1900
0
  assert(disposition);
1901
0
  return &disposition->condition;
1902
0
}
1903
1904
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
1905
0
{
1906
0
  if (delivery) {
1907
0
    return delivery->tag;
1908
0
  } else {
1909
0
    return (pn_delivery_tag_t){0, NULL};
1910
0
  }
1911
0
}
1912
1913
pn_delivery_t *pn_link_current(pn_link_t *link)
1914
24.6k
{
1915
24.6k
  if (!link) return NULL;
1916
24.6k
  return link->current;
1917
24.6k
}
1918
1919
static void pni_advance_sender(pn_link_t *link)
1920
156
{
1921
156
  link->current->done = true;
1922
  /* Skip accounting if the link is aborted and has not sent any frames.
1923
     A delivery that was aborted before sending the first frame was not accounted
1924
     for in pni_process_tpwork_sender() so we don't need to account for it being sent here.
1925
  */
1926
156
  bool skip = link->current->aborted && !link->current->state.sending;
1927
156
  if (!skip) {
1928
156
    link->queued++;
1929
156
    link->credit--;
1930
156
    link->session->outgoing_deliveries++;
1931
156
  }
1932
156
  pni_add_tpwork(link->current);
1933
156
  link->current = link->current->unsettled_next;
1934
156
}
1935
1936
static void pni_advance_receiver(pn_link_t *link)
1937
3.70k
{
1938
3.70k
  link->credit--;
1939
3.70k
  link->queued--;
1940
3.70k
  link->session->incoming_deliveries--;
1941
1942
3.70k
  pn_delivery_t *current = link->current;
1943
3.70k
  size_t drop_count = pn_buffer_size(current->bytes);
1944
3.70k
  pn_buffer_clear(current->bytes);
1945
1946
3.70k
  if (drop_count) {
1947
57
    pn_session_t *ssn = link->session;
1948
57
    ssn->incoming_bytes -= drop_count;
1949
57
    if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) {
1950
0
      ssn->check_flow = true;
1951
0
      pni_add_tpwork(current);
1952
0
    }
1953
57
  }
1954
1955
3.70k
  link->current = link->current->unsettled_next;
1956
3.70k
}
1957
1958
bool pn_link_advance(pn_link_t *link)
1959
3.86k
{
1960
3.86k
  if (link && link->current) {
1961
3.86k
    pn_delivery_t *prev = link->current;
1962
3.86k
    if (link->endpoint.type == SENDER) {
1963
156
      pni_advance_sender(link);
1964
3.70k
    } else {
1965
3.70k
      pni_advance_receiver(link);
1966
3.70k
    }
1967
3.86k
    pn_delivery_t *next = link->current;
1968
3.86k
    pn_work_update(link->session->connection, prev);
1969
3.86k
    if (next) pn_work_update(link->session->connection, next);
1970
3.86k
    return prev != next;
1971
3.86k
  } else {
1972
0
    return false;
1973
0
  }
1974
3.86k
}
1975
1976
int pn_link_credit(pn_link_t *link)
1977
32
{
1978
32
  return link ? link->credit : 0;
1979
32
}
1980
1981
int pn_link_available(pn_link_t *link)
1982
0
{
1983
0
  return link ? link->available : 0;
1984
0
}
1985
1986
int pn_link_queued(pn_link_t *link)
1987
539
{
1988
539
  return link ? link->queued : 0;
1989
539
}
1990
1991
int pn_link_remote_credit(pn_link_t *link)
1992
0
{
1993
0
  assert(link);
1994
0
  return link->credit - link->queued;
1995
0
}
1996
1997
bool pn_link_get_drain(pn_link_t *link)
1998
0
{
1999
0
  assert(link);
2000
0
  return link->drain;
2001
0
}
2002
2003
pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link)
2004
0
{
2005
0
  return link ? (pn_snd_settle_mode_t)link->snd_settle_mode
2006
0
      : PN_SND_MIXED;
2007
0
}
2008
2009
pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link)
2010
0
{
2011
0
  return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode
2012
0
      : PN_RCV_FIRST;
2013
0
}
2014
2015
pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link)
2016
0
{
2017
0
  return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode
2018
0
      : PN_SND_MIXED;
2019
0
}
2020
2021
pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link)
2022
0
{
2023
0
  return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode
2024
0
      : PN_RCV_FIRST;
2025
0
}
2026
void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode)
2027
0
{
2028
0
  if (link)
2029
0
    link->snd_settle_mode = (uint8_t)mode;
2030
0
}
2031
void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode)
2032
0
{
2033
0
  if (link)
2034
0
    link->rcv_settle_mode = (uint8_t)mode;
2035
0
}
2036
2037
void pn_delivery_settle(pn_delivery_t *delivery)
2038
5.77k
{
2039
5.77k
  assert(delivery);
2040
5.77k
  if (!delivery->local.settled) {
2041
3.86k
    pn_link_t *link = delivery->link;
2042
3.86k
    if (pn_delivery_current(delivery)) {
2043
234
      pn_link_advance(link);
2044
234
    }
2045
2046
3.86k
    link->unsettled_count--;
2047
3.86k
    delivery->local.settled = true;
2048
3.86k
    pni_add_tpwork(delivery);
2049
3.86k
    pn_work_update(delivery->link->session->connection, delivery);
2050
3.86k
    pn_incref(delivery);
2051
3.86k
    pn_decref(delivery);
2052
3.86k
  }
2053
5.77k
}
2054
2055
void pn_link_offered(pn_link_t *sender, int credit)
2056
0
{
2057
0
  sender->available = credit;
2058
0
}
2059
2060
ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
2061
0
{
2062
0
  pn_delivery_t *current = pn_link_current(sender);
2063
0
  if (!current) return PN_EOS;
2064
0
  if (!bytes || !n) return 0;
2065
0
  pn_buffer_append(current->bytes, bytes, n);
2066
0
  sender->session->outgoing_bytes += n;
2067
0
  pni_add_tpwork(current);
2068
0
  return n;
2069
0
}
2070
2071
int pn_link_drained(pn_link_t *link)
2072
0
{
2073
0
  assert(link);
2074
0
  int drained = 0;
2075
2076
0
  if (pn_link_is_sender(link)) {
2077
0
    if (link->drain && link->credit > 0) {
2078
0
      link->drained = link->credit;
2079
0
      link->credit = 0;
2080
0
      pn_modified(link->session->connection, &link->endpoint, true);
2081
0
      drained = link->drained;
2082
0
    }
2083
0
  } else {
2084
0
    drained = link->drained;
2085
0
    link->drained = 0;
2086
0
  }
2087
2088
0
  return drained;
2089
0
}
2090
2091
ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
2092
3.62k
{
2093
3.62k
  if (!receiver) return PN_ARG_ERR;
2094
3.62k
  pn_delivery_t *delivery = receiver->current;
2095
3.62k
  if (!delivery) return PN_STATE_ERR;
2096
3.62k
  if (delivery->aborted) return PN_ABORTED;
2097
3.62k
  size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
2098
3.62k
  pn_buffer_trim(delivery->bytes, size, 0);
2099
3.62k
  if (size) {
2100
1.04k
    pn_session_t *ssn = receiver->session;
2101
1.04k
    ssn->incoming_bytes -= size;
2102
1.04k
    if (!ssn->check_flow && ssn->state.incoming_window < ssn->incoming_window_lwm) {
2103
0
      ssn->check_flow = true;
2104
0
      pni_add_tpwork(delivery);
2105
0
    }
2106
1.04k
    return size;
2107
2.58k
  } else {
2108
2.58k
    return delivery->done ? PN_EOS : 0;
2109
2.58k
  }
2110
3.62k
}
2111
2112
2113
void pn_link_flow(pn_link_t *receiver, int credit)
2114
18.2k
{
2115
18.2k
  assert(receiver);
2116
18.2k
  assert(pn_link_is_receiver(receiver));
2117
18.2k
  receiver->credit += credit;
2118
18.2k
  pn_modified(receiver->session->connection, &receiver->endpoint, true);
2119
18.2k
  if (!receiver->drain_flag_mode) {
2120
0
    pn_link_set_drain(receiver, false);
2121
0
    receiver->drain_flag_mode = false;
2122
0
  }
2123
18.2k
}
2124
2125
void pn_link_drain(pn_link_t *receiver, int credit)
2126
0
{
2127
0
  assert(receiver);
2128
0
  assert(pn_link_is_receiver(receiver));
2129
0
  pn_link_set_drain(receiver, true);
2130
0
  pn_link_flow(receiver, credit);
2131
0
  receiver->drain_flag_mode = false;
2132
0
}
2133
2134
void pn_link_set_drain(pn_link_t *receiver, bool drain)
2135
0
{
2136
0
  assert(receiver);
2137
0
  assert(pn_link_is_receiver(receiver));
2138
0
  receiver->drain = drain;
2139
0
  pn_modified(receiver->session->connection, &receiver->endpoint, true);
2140
0
  receiver->drain_flag_mode = true;
2141
0
}
2142
2143
bool pn_link_draining(pn_link_t *receiver)
2144
0
{
2145
0
  assert(receiver);
2146
0
  assert(pn_link_is_receiver(receiver));
2147
0
  return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver));
2148
0
}
2149
2150
uint64_t pn_link_max_message_size(pn_link_t *link)
2151
0
{
2152
0
  return link->max_message_size;
2153
0
}
2154
2155
void pn_link_set_max_message_size(pn_link_t *link, uint64_t size)
2156
0
{
2157
0
  link->max_message_size = size;
2158
0
}
2159
2160
uint64_t pn_link_remote_max_message_size(pn_link_t *link)
2161
0
{
2162
0
  return link->remote_max_message_size;
2163
0
}
2164
2165
pn_data_t *pn_link_properties(pn_link_t *link)
2166
0
{
2167
0
  assert(link);
2168
0
  if (!link->properties)
2169
0
      link->properties = pn_data(0);
2170
0
  return link->properties;
2171
0
}
2172
2173
pn_data_t *pn_link_remote_properties(pn_link_t *link)
2174
0
{
2175
0
  assert(link);
2176
  // Annoying inconsistency: nearly everywhere else you *HAVE* to return an empty pn_data_t not NULL
2177
0
  if (link->remote_properties_raw.size) {
2178
0
    pni_switch_to_data(&link->remote_properties_raw, &link->remote_properties);
2179
0
  }
2180
0
  return link->remote_properties;
2181
0
}
2182
2183
2184
pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
2185
24.0k
{
2186
24.0k
  assert(delivery);
2187
24.0k
  return delivery->link;
2188
24.0k
}
2189
2190
pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)
2191
0
{
2192
0
  assert(delivery);
2193
0
  return &delivery->local;
2194
0
}
2195
2196
uint64_t pn_delivery_local_state(pn_delivery_t *delivery)
2197
0
{
2198
0
  assert(delivery);
2199
0
  return delivery->local.type;
2200
0
}
2201
2202
pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
2203
0
{
2204
0
  assert(delivery);
2205
0
  return &delivery->remote;
2206
0
}
2207
2208
uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
2209
0
{
2210
0
  assert(delivery);
2211
0
  return delivery->remote.type;
2212
0
}
2213
2214
bool pn_delivery_settled(pn_delivery_t *delivery)
2215
0
{
2216
0
  return delivery ? delivery->remote.settled : false;
2217
0
}
2218
2219
bool pn_delivery_updated(pn_delivery_t *delivery)
2220
0
{
2221
0
  return delivery ? delivery->updated : false;
2222
0
}
2223
2224
void pn_delivery_clear(pn_delivery_t *delivery)
2225
0
{
2226
0
  delivery->updated = false;
2227
0
  pn_work_update(delivery->link->session->connection, delivery);
2228
0
}
2229
2230
void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
2231
3.62k
{
2232
3.62k
  if (!delivery) return;
2233
3.62k
  delivery->local.type = state;
2234
3.62k
  pni_add_tpwork(delivery);
2235
3.62k
}
2236
2237
bool pn_delivery_writable(pn_delivery_t *delivery)
2238
0
{
2239
0
  if (!delivery) return false;
2240
2241
0
  pn_link_t *link = delivery->link;
2242
0
  return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0;
2243
0
}
2244
2245
bool pn_delivery_readable(pn_delivery_t *delivery)
2246
4.77k
{
2247
4.77k
  if (delivery) {
2248
4.77k
    pn_link_t *link = delivery->link;
2249
4.77k
    return pn_link_is_receiver(link) && pn_delivery_current(delivery);
2250
4.77k
  } else {
2251
0
    return false;
2252
0
  }
2253
4.77k
}
2254
2255
size_t pn_delivery_pending(pn_delivery_t *delivery)
2256
3.62k
{
2257
  /* Aborted deliveries: for clients that don't check pn_delivery_aborted(),
2258
     return 1 rather than 0. This will force them to call pn_link_recv() and get
2259
     the PN_ABORTED error return code.
2260
  */
2261
3.62k
  if (delivery->aborted) return 1;
2262
3.62k
  return pn_buffer_size(delivery->bytes);
2263
3.62k
}
2264
2265
bool pn_delivery_partial(pn_delivery_t *delivery)
2266
3.67k
{
2267
3.67k
  return !delivery->done;
2268
3.67k
}
2269
2270
0
void pn_delivery_abort(pn_delivery_t *delivery) {
2271
0
  if (!delivery->local.settled) { /* Can't abort a settled delivery */
2272
0
    delivery->aborted = true;
2273
0
    pn_delivery_settle(delivery);
2274
0
    delivery->link->session->outgoing_bytes -= pn_buffer_size(delivery->bytes);
2275
0
    pn_buffer_clear(delivery->bytes);
2276
0
  }
2277
0
}
2278
2279
0
bool pn_delivery_aborted(pn_delivery_t *delivery) {
2280
0
  return delivery->aborted;
2281
0
}
2282
2283
pn_condition_t *pn_connection_condition(pn_connection_t *connection)
2284
269
{
2285
269
  assert(connection);
2286
269
  return &connection->endpoint.condition;
2287
269
}
2288
2289
pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection)
2290
427
{
2291
427
  assert(connection);
2292
427
  pn_transport_t *transport = connection->transport;
2293
427
  return transport ? &transport->remote_condition : NULL;
2294
427
}
2295
2296
pn_condition_t *pn_session_condition(pn_session_t *session)
2297
0
{
2298
0
  assert(session);
2299
0
  return &session->endpoint.condition;
2300
0
}
2301
2302
pn_condition_t *pn_session_remote_condition(pn_session_t *session)
2303
980
{
2304
980
  assert(session);
2305
980
  return &session->endpoint.remote_condition;
2306
980
}
2307
2308
pn_condition_t *pn_link_condition(pn_link_t *link)
2309
0
{
2310
0
  assert(link);
2311
0
  return &link->endpoint.condition;
2312
0
}
2313
2314
pn_condition_t *pn_link_remote_condition(pn_link_t *link)
2315
5.34k
{
2316
5.34k
  assert(link);
2317
5.34k
  return &link->endpoint.remote_condition;
2318
5.34k
}
2319
2320
bool pn_condition_is_set(pn_condition_t *condition)
2321
48.8k
{
2322
48.8k
  return condition && condition->name && pn_string_get(condition->name);
2323
48.8k
}
2324
2325
void pn_condition_clear(pn_condition_t *condition)
2326
549k
{
2327
549k
  assert(condition);
2328
549k
  if (condition->name) pn_string_clear(condition->name);
2329
549k
  if (condition->description) pn_string_clear(condition->description);
2330
549k
  if (condition->info) pn_data_clear(condition->info);
2331
549k
  pn_bytes_free (condition->info_raw);
2332
549k
  condition->info_raw = (pn_bytes_t){0, NULL};
2333
549k
}
2334
2335
const char *pn_condition_get_name(pn_condition_t *condition)
2336
0
{
2337
0
  assert(condition);
2338
0
  if (condition->name == NULL) {
2339
0
    return NULL;
2340
0
  } else {
2341
0
    return pn_string_get(condition->name);
2342
0
  }
2343
0
}
2344
2345
int pn_condition_set_name(pn_condition_t *condition, const char *name)
2346
12.7k
{
2347
12.7k
  assert(condition);
2348
12.7k
  if (condition->name == NULL) {
2349
12.7k
    condition->name = pn_string(name);
2350
12.7k
    return 0;
2351
12.7k
  } else {
2352
0
    return pn_string_set(condition->name, name);
2353
0
  }
2354
12.7k
}
2355
2356
const char *pn_condition_get_description(pn_condition_t *condition)
2357
0
{
2358
0
  assert(condition);
2359
0
  if (condition->description == NULL) {
2360
0
    return NULL;
2361
0
  } else {
2362
0
    return pn_string_get(condition->description);
2363
0
  }
2364
0
}
2365
2366
int pn_condition_set_description(pn_condition_t *condition, const char *description)
2367
12.7k
{
2368
12.7k
  assert(condition);
2369
12.7k
  if (condition->description == NULL) {
2370
12.7k
    condition->description = pn_string(description);
2371
12.7k
    return 0;
2372
12.7k
  } else {
2373
0
    return pn_string_set(condition->description, description);
2374
0
  }
2375
12.7k
}
2376
2377
int pn_condition_vformat(pn_condition_t *condition, const char *name, const char *fmt, va_list ap)
2378
0
{
2379
0
  assert(condition);
2380
0
  int err = pn_condition_set_name(condition, name);
2381
0
  if (err)
2382
0
      return err;
2383
2384
0
  char text[1024];
2385
0
  size_t n = vsnprintf(text, 1024, fmt, ap);
2386
0
  if (n >= sizeof(text))
2387
0
      text[sizeof(text)-1] = '\0';
2388
0
  err = pn_condition_set_description(condition, text);
2389
0
  return err;
2390
0
}
2391
2392
int pn_condition_format(pn_condition_t *condition, const char *name, PN_PRINTF_FORMAT const char *fmt, ...)
2393
0
{
2394
0
  assert(condition);
2395
0
  va_list ap;
2396
0
  va_start(ap, fmt);
2397
0
  int err = pn_condition_vformat(condition, name, fmt, ap);
2398
0
  va_end(ap);
2399
0
  return err;
2400
0
}
2401
2402
pn_data_t *pn_condition_info(pn_condition_t *condition)
2403
0
{
2404
0
  assert(condition);
2405
0
  pni_switch_to_data(&condition->info_raw, &condition->info);
2406
0
  return condition->info;
2407
0
}
2408
2409
bool pn_condition_is_redirect(pn_condition_t *condition)
2410
0
{
2411
0
  const char *name = pn_condition_get_name(condition);
2412
0
  return name && (!strcmp(name, "amqp:connection:redirect") ||
2413
0
                  !strcmp(name, "amqp:link:redirect"));
2414
0
}
2415
2416
const char *pn_condition_redirect_host(pn_condition_t *condition)
2417
0
{
2418
0
  pn_data_t *data = pn_condition_info(condition);
2419
0
  pn_data_rewind(data);
2420
0
  pn_data_next(data);
2421
0
  pn_data_enter(data);
2422
0
  pn_data_lookup(data, "network-host");
2423
0
  pn_bytes_t host = pn_data_get_bytes(data);
2424
0
  pn_data_rewind(data);
2425
0
  return host.start;
2426
0
}
2427
2428
int pn_condition_redirect_port(pn_condition_t *condition)
2429
0
{
2430
0
  pn_data_t *data = pn_condition_info(condition);
2431
0
  pn_data_rewind(data);
2432
0
  pn_data_next(data);
2433
0
  pn_data_enter(data);
2434
0
  pn_data_lookup(data, "port");
2435
0
  int port = pn_data_get_int(data);
2436
0
  pn_data_rewind(data);
2437
0
  return port;
2438
0
}
2439
2440
pn_connection_t *pn_event_connection(pn_event_t *event)
2441
36.4k
{
2442
36.4k
  pn_session_t *ssn;
2443
36.4k
  pn_transport_t *transport;
2444
2445
36.4k
  switch (pn_class_id(pn_event_class(event))) {
2446
19.1k
  case CID_pn_connection:
2447
19.1k
    return (pn_connection_t *) pn_event_context(event);
2448
11.0k
  case CID_pn_transport:
2449
11.0k
    transport = pn_event_transport(event);
2450
11.0k
    if (transport)
2451
11.0k
      return transport->connection;
2452
0
    return NULL;
2453
6.32k
  default:
2454
6.32k
    ssn = pn_event_session(event);
2455
6.32k
    if (ssn)
2456
6.32k
     return pn_session_connection(ssn);
2457
36.4k
  }
2458
0
  return NULL;
2459
36.4k
}
2460
2461
pn_session_t *pn_event_session(pn_event_t *event)
2462
7.30k
{
2463
7.30k
  pn_link_t *link;
2464
7.30k
  switch (pn_class_id(pn_event_class(event))) {
2465
1.96k
  case CID_pn_session:
2466
1.96k
    return (pn_session_t *) pn_event_context(event);
2467
5.34k
  default:
2468
5.34k
    link = pn_event_link(event);
2469
5.34k
    if (link)
2470
5.34k
      return pn_link_session(link);
2471
7.30k
  }
2472
0
  return NULL;
2473
7.30k
}
2474
2475
pn_link_t *pn_event_link(pn_event_t *event)
2476
10.6k
{
2477
10.6k
  pn_delivery_t *dlv;
2478
10.6k
  switch (pn_class_id(pn_event_class(event))) {
2479
10.6k
  case CID_pn_link:
2480
10.6k
    return (pn_link_t *) pn_event_context(event);
2481
0
  default:
2482
0
    dlv = pn_event_delivery(event);
2483
0
    if (dlv)
2484
0
      return pn_delivery_link(dlv);
2485
10.6k
  }
2486
0
  return NULL;
2487
10.6k
}
2488
2489
pn_delivery_t *pn_event_delivery(pn_event_t *event)
2490
4.77k
{
2491
4.77k
  switch (pn_class_id(pn_event_class(event))) {
2492
4.77k
  case CID_pn_delivery:
2493
4.77k
    return (pn_delivery_t *) pn_event_context(event);
2494
0
  default:
2495
0
    return NULL;
2496
4.77k
  }
2497
4.77k
}
2498
2499
pn_transport_t *pn_event_transport(pn_event_t *event)
2500
22.1k
{
2501
22.1k
  switch (pn_class_id(pn_event_class(event))) {
2502
22.1k
  case CID_pn_transport:
2503
22.1k
    return (pn_transport_t *) pn_event_context(event);
2504
0
  default:
2505
0
    {
2506
0
      pn_connection_t *conn = pn_event_connection(event);
2507
0
      if (conn)
2508
0
        return pn_connection_transport(conn);
2509
0
      return NULL;
2510
0
    }
2511
22.1k
  }
2512
22.1k
}
2513
2514
0
int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) {
2515
0
  assert(dest);
2516
0
  assert(src);
2517
0
  int err = 0;
2518
0
  if (src != dest) {
2519
0
    if (!(src->name == NULL && dest->name == NULL)) {
2520
0
      if (src->name == NULL) {
2521
0
        pn_free(dest->name);
2522
0
        dest->name = NULL;
2523
0
      } else {
2524
0
        if (dest->name == NULL) {
2525
0
          dest->name = pn_string(NULL);
2526
0
        }
2527
0
        err = pn_string_copy(dest->name, src->name);
2528
0
      }
2529
0
    }
2530
0
    if (!err && !(src->description == NULL && dest->description == NULL)) {
2531
0
      if (src->description == NULL) {
2532
0
        pn_free(dest->description);
2533
0
        dest->description = NULL;
2534
0
      } else {
2535
0
        if (dest->description == NULL) {
2536
0
          dest->description = pn_string(NULL);
2537
0
        }
2538
0
        err = pn_string_copy(dest->description, src->description);
2539
0
      }
2540
0
    }
2541
0
    if (!err && !(src->info == NULL && dest->info == NULL)) {
2542
0
      if (src->info == NULL) {
2543
0
        pn_data_free(dest->info);
2544
0
        dest->info = NULL;
2545
0
      } else {
2546
0
        if (dest->info == NULL) {
2547
0
          dest->info = pn_data(0);
2548
0
        }
2549
0
        err = pn_data_copy(dest->info, src->info);
2550
0
      }
2551
0
    }
2552
0
  }
2553
0
  return err;
2554
0
}
2555
2556
2557
0
static pn_condition_t *cond_set(pn_condition_t *cond) {
2558
0
  return cond && pn_condition_is_set(cond) ? cond : NULL;
2559
0
}
2560
2561
0
static pn_condition_t *cond2_set(pn_condition_t *cond1, pn_condition_t *cond2) {
2562
0
  pn_condition_t *cond = cond_set(cond1);
2563
0
  if (!cond) cond = cond_set(cond2);
2564
0
  return cond;
2565
0
}
2566
2567
0
pn_condition_t *pn_event_condition(pn_event_t *e) {
2568
0
  void *ctx = pn_event_context(e);
2569
0
  switch (pn_class_id(pn_event_class(e))) {
2570
0
   case CID_pn_connection: {
2571
0
     pn_connection_t *c = (pn_connection_t*)ctx;
2572
0
     return cond2_set(pn_connection_remote_condition(c), pn_connection_condition(c));
2573
0
   }
2574
0
   case CID_pn_session: {
2575
0
     pn_session_t *s = (pn_session_t*)ctx;
2576
0
     return cond2_set(pn_session_remote_condition(s), pn_session_condition(s));
2577
0
   }
2578
0
   case CID_pn_link: {
2579
0
     pn_link_t *l = (pn_link_t*)ctx;
2580
0
     return cond2_set(pn_link_remote_condition(l), pn_link_condition(l));
2581
0
   }
2582
0
   case CID_pn_transport:
2583
0
    return cond_set(pn_transport_condition((pn_transport_t*)ctx));
2584
2585
0
   default:
2586
0
    return NULL;
2587
0
  }
2588
0
}
2589
2590
0
const char *pn_disposition_type_name(uint64_t d) {
2591
0
  switch(d) {
2592
0
   case PN_RECEIVED: return "received";
2593
0
   case PN_ACCEPTED: return "accepted";
2594
0
   case PN_REJECTED: return "rejected";
2595
0
   case PN_RELEASED: return "released";
2596
0
   case PN_MODIFIED: return "modified";
2597
0
   default: return "unknown";
2598
0
  }
2599
0
}
2600
2601
#undef PN_USE_DEPRECATED_API