Coverage Report

Created: 2023-09-19 06:58

/src/mosquitto/lib/send_publish.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright (c) 2009-2021 Roger Light <roger@atchoo.org>
3
4
All rights reserved. This program and the accompanying materials
5
are made available under the terms of the Eclipse Public License 2.0
6
and Eclipse Distribution License v1.0 which accompany this distribution.
7
8
The Eclipse Public License is available at
9
   https://www.eclipse.org/legal/epl-2.0/
10
and the Eclipse Distribution License is available at
11
  http://www.eclipse.org/org/documents/edl-v10.php.
12
13
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14
15
Contributors:
16
   Roger Light - initial implementation and documentation.
17
*/
18
19
#include "config.h"
20
21
#include <assert.h>
22
#include <string.h>
23
24
#ifdef WITH_BROKER
25
#  include "mosquitto_broker_internal.h"
26
#  include "sys_tree.h"
27
#else
28
#  define metrics__int_inc(stat, val)
29
#endif
30
31
#include "alias_mosq.h"
32
#include "mosquitto.h"
33
#include "mosquitto_internal.h"
34
#include "logging_mosq.h"
35
#include "mqtt_protocol.h"
36
#include "memory_mosq.h"
37
#include "net_mosq.h"
38
#include "packet_mosq.h"
39
#include "property_mosq.h"
40
#include "send_mosq.h"
41
#include "utlist.h"
42
43
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval)
44
0
{
45
0
#ifdef WITH_BROKER
46
0
  size_t len;
47
0
  int rc;
48
0
#ifdef WITH_BRIDGE
49
0
  struct mosquitto__bridge_topic *cur_topic;
50
0
  bool match;
51
0
  char *mapped_topic = NULL;
52
0
  char *topic_temp = NULL;
53
0
#endif
54
0
#endif
55
0
  assert(mosq);
56
57
0
  if(!net__is_connected(mosq)) return MOSQ_ERR_NO_CONN;
58
59
0
#ifdef WITH_BROKER
60
0
  bool payload_changed = false;
61
0
  bool topic_changed = false;
62
0
  bool properties_changed = false;
63
64
0
  {
65
0
    struct mosquitto_base_msg tmp_msg;
66
0
    tmp_msg.topic = (char *) topic;
67
0
    tmp_msg.payloadlen = payloadlen;
68
0
    tmp_msg.payload = (void *) payload;
69
0
    tmp_msg.qos = qos;
70
0
    tmp_msg.retain = retain;
71
0
    tmp_msg.properties = (mosquitto_property *) store_props;
72
73
0
    rc = plugin__handle_message_out(mosq, &tmp_msg);
74
75
0
    if(tmp_msg.payload != payload) payload_changed = true;
76
0
    if(tmp_msg.topic != topic) topic_changed = true;
77
0
    if(tmp_msg.properties != store_props) properties_changed = true;
78
79
0
    topic = tmp_msg.topic;
80
0
    payloadlen = tmp_msg.payloadlen;
81
0
    payload = tmp_msg.payload;
82
0
    qos = tmp_msg.qos;
83
0
    retain = tmp_msg.retain;
84
0
    store_props = tmp_msg.properties;
85
86
0
    if(rc != MOSQ_ERR_SUCCESS){
87
0
      if(rc == MOSQ_ERR_ACL_DENIED){
88
0
        log__printf(NULL, MOSQ_LOG_DEBUG,
89
0
            "Denied PUBLISH to %s (q%d, r%d, '%s', ... (%ld bytes))",
90
0
            mosq->id, qos, retain, topic, (long)payloadlen);
91
92
0
      }else if(rc == MOSQ_ERR_QUOTA_EXCEEDED){
93
0
        log__printf(NULL, MOSQ_LOG_DEBUG,
94
0
            "Rejected PUBLISH to %s, quota exceeded.", mosq->id);
95
0
      }
96
97
0
      if(payload_changed) mosquitto__free((void *) payload);
98
0
      if(topic_changed) mosquitto__free((char *) topic);
99
0
      if(properties_changed) mosquitto_property_free_all((mosquitto_property **) &store_props);
100
101
0
      return MOSQ_ERR_SUCCESS;
102
0
    }
103
0
  }
104
0
#endif
105
106
0
  if(!mosq->retain_available){
107
0
    retain = false;
108
0
  }
109
110
0
#ifdef WITH_BROKER
111
0
  if(mosq->listener && mosq->listener->mount_point){
112
0
    len = strlen(mosq->listener->mount_point);
113
0
    if(len < strlen(topic)){
114
0
      topic += len;
115
0
    }else{
116
      /* Invalid topic string. Should never happen, but silently swallow the message anyway. */
117
0
      return MOSQ_ERR_SUCCESS;
118
0
    }
119
0
  }
120
0
#ifdef WITH_BRIDGE
121
0
  if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
122
0
    LL_FOREACH(mosq->bridge->topics, cur_topic){
123
0
      if((cur_topic->direction == bd_both || cur_topic->direction == bd_out)
124
0
          && (cur_topic->remote_prefix || cur_topic->local_prefix)){
125
        /* Topic mapping required on this topic if the message matches */
126
127
0
        rc = mosquitto_topic_matches_sub(cur_topic->local_topic, topic, &match);
128
0
        if(rc){
129
0
          return rc;
130
0
        }
131
0
        if(match){
132
0
          mapped_topic = mosquitto__strdup(topic);
133
0
          if(!mapped_topic) return MOSQ_ERR_NOMEM;
134
0
          if(cur_topic->local_prefix){
135
            /* This prefix needs removing. */
136
0
            if(!strncmp(cur_topic->local_prefix, mapped_topic, strlen(cur_topic->local_prefix))){
137
0
              topic_temp = mosquitto__strdup(mapped_topic+strlen(cur_topic->local_prefix));
138
0
              mosquitto__FREE(mapped_topic);
139
0
              if(!topic_temp){
140
0
                return MOSQ_ERR_NOMEM;
141
0
              }
142
0
              mapped_topic = topic_temp;
143
0
            }
144
0
          }
145
146
0
          if(cur_topic->remote_prefix){
147
            /* This prefix needs adding. */
148
0
            len = strlen(mapped_topic) + strlen(cur_topic->remote_prefix)+1;
149
0
            topic_temp = mosquitto__malloc(len+1);
150
0
            if(!topic_temp){
151
0
              mosquitto__FREE(mapped_topic);
152
0
              return MOSQ_ERR_NOMEM;
153
0
            }
154
0
            snprintf(topic_temp, len, "%s%s", cur_topic->remote_prefix, mapped_topic);
155
0
            topic_temp[len] = '\0';
156
0
            mosquitto__FREE(mapped_topic);
157
0
            mapped_topic = topic_temp;
158
0
          }
159
0
          log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, mapped_topic, (long)payloadlen);
160
0
          metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
161
0
          rc =  send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
162
0
          mosquitto__FREE(mapped_topic);
163
0
          return rc;
164
0
        }
165
0
      }
166
0
    }
167
0
  }
168
0
#endif
169
0
  log__printf(mosq, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
170
0
  metrics__int_inc(mosq_counter_pub_bytes_sent, payloadlen);
171
#else
172
  log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", SAFE_PRINT(mosq->id), dup, qos, retain, mid, topic, (long)payloadlen);
173
#endif
174
175
0
#ifdef WITH_BROKER
176
0
  rc = send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
177
0
  if(payload_changed) mosquitto__free((void *) payload);
178
0
  if(topic_changed) mosquitto__free((char *) topic);
179
0
  if(properties_changed) mosquitto_property_free_all((mosquitto_property **) &store_props);
180
0
  return rc;
181
#else
182
  return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, subscription_identifier, store_props, expiry_interval);
183
#endif
184
0
}
185
186
187
int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval)
188
0
{
189
0
  struct mosquitto__packet *packet = NULL;
190
0
  unsigned int packetlen;
191
0
  unsigned int proplen = 0, varbytes;
192
0
  int rc;
193
0
  mosquitto_property expiry_prop;
194
0
#ifdef WITH_BROKER
195
0
  mosquitto_property topic_alias_prop;
196
0
  uint16_t topic_alias = 0;
197
0
  mosquitto_property subscription_id_prop;
198
0
#endif
199
200
#ifndef WITH_BROKER
201
  UNUSED(subscription_identifier);
202
#endif
203
204
0
  assert(mosq);
205
206
0
#ifdef WITH_BROKER
207
0
  if(mosq->protocol == mosq_p_mqtt5){
208
0
    if(alias__find_by_topic(mosq, ALIAS_DIR_L2R, topic, &topic_alias) == MOSQ_ERR_SUCCESS){
209
      /* If we have an existing alias, no need to send the topic */
210
0
      topic = NULL;
211
0
    }else{
212
      /* Try to add a new alias - if this succeeds, topic_alias will be
213
       * set to the new alias but we still need to send the topic. If it
214
       * fails, topic_alias will be set to 0. */
215
0
      alias__add_l2r(mosq, topic, &topic_alias);
216
0
    }
217
0
  }
218
0
#endif
219
220
0
  if(topic){
221
0
    packetlen = 2+(unsigned int)strlen(topic) + payloadlen;
222
0
  }else{
223
0
    packetlen = 2 + payloadlen;
224
0
  }
225
0
  if(qos > 0) packetlen += 2; /* For message id */
226
0
  if(mosq->protocol == mosq_p_mqtt5){
227
0
    proplen = 0;
228
0
    proplen += property__get_length_all(store_props);
229
0
    if(expiry_interval > 0){
230
0
      expiry_prop.next = NULL;
231
0
      expiry_prop.value.i32 = expiry_interval;
232
0
      expiry_prop.identifier = MQTT_PROP_MESSAGE_EXPIRY_INTERVAL;
233
0
      expiry_prop.property_type = MQTT_PROP_TYPE_INT32;
234
0
      expiry_prop.client_generated = false;
235
236
0
      proplen += property__get_length_all(&expiry_prop);
237
0
    }
238
0
#ifdef WITH_BROKER
239
0
    if(topic_alias != 0){
240
0
      topic_alias_prop.next = NULL;
241
0
      topic_alias_prop.value.i16 = topic_alias;
242
0
      topic_alias_prop.identifier = MQTT_PROP_TOPIC_ALIAS;
243
0
      topic_alias_prop.property_type = MQTT_PROP_TYPE_INT16;
244
0
      topic_alias_prop.client_generated = false;
245
246
0
      proplen += property__get_length_all(&topic_alias_prop);
247
0
    }
248
0
    if(subscription_identifier){
249
0
      subscription_id_prop.next = NULL;
250
0
      subscription_id_prop.value.varint = subscription_identifier;
251
0
      subscription_id_prop.identifier = MQTT_PROP_SUBSCRIPTION_IDENTIFIER;
252
0
      subscription_id_prop.property_type = MQTT_PROP_TYPE_VARINT;
253
0
      subscription_id_prop.client_generated = false;
254
255
0
      proplen += property__get_length_all(&subscription_id_prop);
256
0
    }
257
0
#endif
258
259
0
    varbytes = packet__varint_bytes(proplen);
260
0
    if(varbytes > 4){
261
      /* FIXME - Properties too big, don't publish any - should remove some first really */
262
0
      store_props = NULL;
263
0
      expiry_interval = 0;
264
0
    }else{
265
0
      packetlen += proplen + varbytes;
266
0
    }
267
0
  }
268
0
  if(packet__check_oversize(mosq, packetlen)){
269
0
#ifdef WITH_BROKER
270
0
    log__printf(mosq, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH for %s (%d bytes)", SAFE_PRINT(mosq->id), packetlen);
271
#else
272
    log__printf(mosq, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH (%d bytes)", packetlen);
273
#endif
274
0
    return MOSQ_ERR_OVERSIZE_PACKET;
275
0
  }
276
277
0
  rc = packet__alloc(&packet, (uint8_t)(CMD_PUBLISH | (uint8_t)((dup&0x1)<<3) | (uint8_t)(qos<<1) | retain), packetlen);
278
0
  if(rc){
279
0
    return rc;
280
0
  }
281
0
  packet->mid = mid;
282
  /* Variable header (topic string) */
283
0
  if(topic){
284
0
    packet__write_string(packet, topic, (uint16_t)strlen(topic));
285
0
  }else{
286
0
    packet__write_uint16(packet, 0);
287
0
  }
288
0
  if(qos > 0){
289
0
    packet__write_uint16(packet, mid);
290
0
  }
291
292
0
  if(mosq->protocol == mosq_p_mqtt5){
293
0
    packet__write_varint(packet, proplen);
294
0
    property__write_all(packet, store_props, false);
295
0
    if(expiry_interval > 0){
296
0
      property__write_all(packet, &expiry_prop, false);
297
0
    }
298
0
#ifdef WITH_BROKER
299
0
    if(topic_alias != 0){
300
0
      property__write_all(packet, &topic_alias_prop, false);
301
0
    }
302
0
    if(subscription_identifier != 0){
303
0
      property__write_all(packet, &subscription_id_prop, false);
304
0
    }
305
0
#endif
306
0
  }
307
308
0
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
309
0
  metrics__int_inc(mosq_counter_mqtt_publish_sent, 1);
310
0
#endif
311
312
  /* Payload */
313
0
  if(payloadlen && payload){
314
0
    packet__write_bytes(packet, payload, payloadlen);
315
0
  }
316
317
0
  return packet__queue(mosq, packet);
318
0
}