Coverage Report

Created: 2025-10-24 06:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/mosquitto/lib/send_publish.c
Line
Count
Source
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <string.h>
23
24
#ifdef WITH_BROKER
25
#  include "mosquitto_broker_internal.h"
26
#  include "sys_tree.h"
27
#else
28
#  define metrics__int_inc(stat, val)
29
#endif
30
31
#include "alias_mosq.h"
32
#include "mosquitto.h"
33
#include "mosquitto_internal.h"
34
#include "logging_mosq.h"
35
#include "mosquitto/mqtt_protocol.h"
36
#include "net_mosq.h"
37
#include "packet_mosq.h"
38
#include "property_mosq.h"
39
#include "property_common.h"
40
#include "send_mosq.h"
41
#include "utlist.h"
42
43
44
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval)
45
0
{
46
0
#ifdef WITH_BROKER
47
0
  size_t len;
48
0
  int rc;
49
0
#ifdef WITH_BRIDGE
50
0
  struct mosquitto__bridge_topic *cur_topic;
51
0
  bool match;
52
0
  char *mapped_topic = NULL;
53
0
  char *topic_temp = NULL;
54
0
#endif
55
0
#endif
56
0
  assert(mosq);
57
58
0
  if(!net__is_connected(mosq)){
59
0
    return MOSQ_ERR_NO_CONN;
60
0
  }
61
62
0
#ifdef WITH_BROKER
63
0
  bool payload_changed = false;
64
0
  bool topic_changed = false;
65
0
  bool properties_changed = false;
66
67
0
  {
68
0
    struct mosquitto_base_msg tmp_msg;
69
0
    tmp_msg.topic = (char *)topic;
70
0
    tmp_msg.payloadlen = payloadlen;
71
0
    tmp_msg.payload = (void *)payload;
72
0
    tmp_msg.qos = qos;
73
0
    tmp_msg.retain = retain;
74
0
    tmp_msg.properties = (mosquitto_property *)store_props;
75
76
0
    rc = plugin__handle_message_out(mosq, &tmp_msg);
77
78
0
    if(tmp_msg.payload != payload){
79
0
      payload_changed = true;
80
0
    }
81
0
    if(tmp_msg.topic != topic){
82
0
      topic_changed = true;
83
0
    }
84
0
    if(tmp_msg.properties != store_props){
85
0
      properties_changed = true;
86
0
    }
87
88
0
    topic = tmp_msg.topic;
89
0
    payloadlen = tmp_msg.payloadlen;
90
0
    payload = tmp_msg.payload;
91
0
    qos = tmp_msg.qos;
92
0
    retain = tmp_msg.retain;
93
0
    store_props = tmp_msg.properties;
94
95
0
    if(rc != MOSQ_ERR_SUCCESS){
96
0
      if(rc == MOSQ_ERR_ACL_DENIED){
97
0
        log__printf(NULL, MOSQ_LOG_DEBUG,
98
0
            "Denied PUBLISH to %s (q%d, r%d, '%s', ... (%ld bytes))",
99
0
            mosq->id, qos, retain, topic, (long)payloadlen);
100
101
0
      }else if(rc == MOSQ_ERR_QUOTA_EXCEEDED){
102
0
        log__printf(NULL, MOSQ_LOG_DEBUG,
103
0
            "Rejected PUBLISH to %s, quota exceeded.", mosq->id);
104
0
      }
105
106
0
      if(payload_changed){
107
0
        mosquitto_free((void *)payload);
108
0
      }
109
0
      if(topic_changed){
110
0
        mosquitto_free((void *)topic);
111
0
      }
112
0
      if(properties_changed){
113
0
        mosquitto_property_free_all((mosquitto_property **)&store_props);
114
0
      }
115
116
0
      return MOSQ_ERR_SUCCESS;
117
0
    }
118
0
  }
119
0
#endif
120
121
0
  if(!mosq->retain_available){
122
0
    retain = false;
123
0
  }
124
125
0
#ifdef WITH_BROKER
126
0
  if(mosq->listener && mosq->listener->mount_point){
127
0
    len = strlen(mosq->listener->mount_point);
128
0
    if(len < strlen(topic)){
129
0
      topic += len;
130
0
    }else{
131
      /* Invalid topic string. Should never happen, but silently swallow the message anyway. */
132
0
      return MOSQ_ERR_SUCCESS;
133
0
    }
134
0
  }
135
0
#ifdef WITH_BRIDGE
136
0
  if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
137
0
    LL_FOREACH(mosq->bridge->topics, cur_topic){
138
0
      if((cur_topic->direction == bd_both || cur_topic->direction == bd_out)
139
0
          && (cur_topic->remote_prefix || cur_topic->local_prefix)){
140
        /* Topic mapping required on this topic if the message matches */
141
142
0
        rc = mosquitto_topic_matches_sub(cur_topic->local_topic, topic, &match);
143
0
        if(rc){
144
0
          return rc;
145
0
        }
146
0
        if(match){
147
0
          mapped_topic = mosquitto_strdup(topic);
148
0
          if(!mapped_topic){
149
0
            return MOSQ_ERR_NOMEM;
150
0
          }
151
0
          if(cur_topic->local_prefix){
152
            /* This prefix needs removing. */
153
0
            if(!strncmp(cur_topic->local_prefix, mapped_topic, strlen(cur_topic->local_prefix))){
154
0
              topic_temp = mosquitto_strdup(mapped_topic+strlen(cur_topic->local_prefix));
155
0
              mosquitto_FREE(mapped_topic);
156
0
              if(!topic_temp){
157
0
                return MOSQ_ERR_NOMEM;
158
0
              }
159
0
              mapped_topic = topic_temp;
160
0
            }
161
0
          }
162
163
0
          if(cur_topic->remote_prefix){
164
            /* This prefix needs adding. */
165
0
            len = strlen(mapped_topic) + strlen(cur_topic->remote_prefix)+1;
166
0
            topic_temp = mosquitto_malloc(len+1);
167
0
            if(!topic_temp){
168
0
              mosquitto_FREE(mapped_topic);
169
0
              return MOSQ_ERR_NOMEM;
170
0
            }
171
0
            snprintf(topic_temp, len, "%s%s", cur_topic->remote_prefix, mapped_topic);
172
0
            topic_temp[len] = '\0';
173
0
            mosquitto_FREE(mapped_topic);
174
0
            mapped_topic = topic_temp;
175
0
          }
176
0
          log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, mapped_topic, (long)payloadlen);
177
0
          metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
178
0
          rc =  send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
179
0
          mosquitto_FREE(mapped_topic);
180
0
          return rc;
181
0
        }
182
0
      }
183
0
    }
184
0
  }
185
0
#endif
186
0
  log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
187
0
  metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
188
#else
189
  log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
190
#endif
191
192
0
#ifdef WITH_BROKER
193
0
  rc = send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
194
0
  if(payload_changed){
195
0
    mosquitto_free((void *)payload);
196
0
  }
197
0
  if(topic_changed){
198
0
    mosquitto_free((void *)topic);
199
0
  }
200
0
  if(properties_changed){
201
0
    mosquitto_property_free_all((mosquitto_property **)&store_props);
202
0
  }
203
0
  return rc;
204
#else
205
  return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
206
#endif
207
0
}
208
209
210
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval)
211
0
{
212
0
  struct mosquitto__packet *packet = NULL;
213
0
  unsigned int packetlen;
214
0
  unsigned int proplen = 0, varbytes;
215
0
  int rc;
216
0
  mosquitto_property expiry_prop;
217
0
#ifdef WITH_BROKER
218
0
  mosquitto_property topic_alias_prop;
219
0
  uint16_t topic_alias = 0;
220
0
  mosquitto_property subscription_id_prop;
221
0
#endif
222
223
#ifndef WITH_BROKER
224
  UNUSED(subscription_identifier);
225
#endif
226
227
0
  assert(mosq);
228
229
0
#ifdef WITH_BROKER
230
0
  if(mosq->protocol == mosq_p_mqtt5){
231
0
    if(alias__find_by_topic(mosq, ALIAS_DIR_L2R, topic, &topic_alias) == MOSQ_ERR_SUCCESS){
232
      /* If we have an existing alias, no need to send the topic */
233
0
      topic = NULL;
234
0
    }else{
235
      /* Try to add a new alias - if this succeeds, topic_alias will be
236
       * set to the new alias but we still need to send the topic. If it
237
       * fails, topic_alias will be set to 0. */
238
0
      alias__add_l2r(mosq, topic, &topic_alias);
239
0
    }
240
0
  }
241
0
#endif
242
243
0
  if(topic){
244
0
    packetlen = 2+(unsigned int)strlen(topic) + payloadlen;
245
0
  }else{
246
0
    packetlen = 2 + payloadlen;
247
0
  }
248
0
  if(qos > 0){
249
0
    packetlen += 2;         /* For message id */
250
0
  }
251
0
  if(mosq->protocol == mosq_p_mqtt5){
252
0
    proplen = 0;
253
0
    proplen += mosquitto_property_get_length_all(store_props);
254
0
    if(expiry_interval > 0 && expiry_interval != MSG_EXPIRY_INFINITE){
255
0
      expiry_prop.next = NULL;
256
0
      expiry_prop.value.i32 = expiry_interval;
257
0
      expiry_prop.identifier = MQTT_PROP_MESSAGE_EXPIRY_INTERVAL;
258
0
      expiry_prop.property_type = MQTT_PROP_TYPE_INT32;
259
0
      expiry_prop.client_generated = false;
260
261
0
      proplen += mosquitto_property_get_length_all(&expiry_prop);
262
0
    }
263
0
#ifdef WITH_BROKER
264
0
    if(topic_alias != 0){
265
0
      topic_alias_prop.next = NULL;
266
0
      topic_alias_prop.value.i16 = topic_alias;
267
0
      topic_alias_prop.identifier = MQTT_PROP_TOPIC_ALIAS;
268
0
      topic_alias_prop.property_type = MQTT_PROP_TYPE_INT16;
269
0
      topic_alias_prop.client_generated = false;
270
271
0
      proplen += mosquitto_property_get_length_all(&topic_alias_prop);
272
0
    }
273
0
    if(subscription_identifier){
274
0
      subscription_id_prop.next = NULL;
275
0
      subscription_id_prop.value.varint = subscription_identifier;
276
0
      subscription_id_prop.identifier = MQTT_PROP_SUBSCRIPTION_IDENTIFIER;
277
0
      subscription_id_prop.property_type = MQTT_PROP_TYPE_VARINT;
278
0
      subscription_id_prop.client_generated = false;
279
280
0
      proplen += mosquitto_property_get_length_all(&subscription_id_prop);
281
0
    }
282
0
#endif
283
284
0
    varbytes = mosquitto_varint_bytes(proplen);
285
0
    if(varbytes > 4){
286
      /* FIXME - Properties too big, don't publish any - should remove some first really */
287
0
      store_props = NULL;
288
0
      expiry_interval = 0;
289
0
    }else{
290
0
      packetlen += proplen + varbytes;
291
0
    }
292
0
  }
293
0
  if(packet__check_oversize(mosq, packetlen)){
294
0
#ifdef WITH_BROKER
295
0
    log__printf(mosq, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH for %s (%d bytes)", SAFE_PRINT(mosq->id), packetlen);
296
#else
297
    log__printf(mosq, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH (%d bytes)", packetlen);
298
#endif
299
0
    return MOSQ_ERR_OVERSIZE_PACKET;
300
0
  }
301
302
0
  rc = packet__alloc(&packet, (uint8_t)(CMD_PUBLISH | (uint8_t)((dup&0x1)<<3) | (uint8_t)(qos<<1) | retain), packetlen);
303
0
  if(rc){
304
0
    return rc;
305
0
  }
306
0
  packet->mid = mid;
307
  /* Variable header (topic string) */
308
0
  if(topic){
309
0
    packet__write_string(packet, topic, (uint16_t)strlen(topic));
310
0
  }else{
311
0
    packet__write_uint16(packet, 0);
312
0
  }
313
0
  if(qos > 0){
314
0
    packet__write_uint16(packet, mid);
315
0
  }
316
317
0
  if(mosq->protocol == mosq_p_mqtt5){
318
0
    packet__write_varint(packet, proplen);
319
0
    property__write_all(packet, store_props, false);
320
0
    if(expiry_interval > 0 && expiry_interval != MSG_EXPIRY_INFINITE){
321
0
      property__write_all(packet, &expiry_prop, false);
322
0
    }
323
0
#ifdef WITH_BROKER
324
0
    if(topic_alias != 0){
325
0
      property__write_all(packet, &topic_alias_prop, false);
326
0
    }
327
0
    if(subscription_identifier != 0){
328
0
      property__write_all(packet, &subscription_id_prop, false);
329
0
    }
330
0
#endif
331
0
  }
332
333
0
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
334
0
  metrics__int_inc(mosq_counter_mqtt_publish_sent, 1);
335
0
#endif
336
337
  /* Payload */
338
0
  if(payloadlen && payload){
339
0
    packet__write_bytes(packet, payload, payloadlen);
340
0
  }
341
342
0
  return packet__queue(mosq, packet);
343
0
}