Coverage Report

Created: 2024-09-08 06:42

/src/ntopng/src/ZMQParserInterface.cpp
Line
Count
Source (jump to first uncovered line)
1
/*
2
 *
3
 * (C) 2013-24 - ntop.org
4
 *
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software Foundation,
18
 * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19
 *
20
 */
21
22
#include "ntop_includes.h"
23
24
#ifndef HAVE_NEDGE
25
26
0
#define VLAN_HASH_KEY  "ntopng.vlan.%d.cache"
27
28
/* **************************************************** */
29
30
/* IMPORTANT: keep it in sync with flow_fields_description part of
31
 * flow_utils.lua */
32
ZMQParserInterface::ZMQParserInterface(const char *endpoint,
33
                                       const char *custom_interface_type)
34
0
    : ParserInterface(endpoint, custom_interface_type) {
35
0
  if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__);
36
37
0
  zmq_initial_bytes = 0, zmq_initial_pkts = 0, zmq_initial_drops = 0;
38
0
  zmq_remote_stats = zmq_remote_stats_shadow = NULL;
39
0
  memset(&last_zmq_remote_stats_update, 0,
40
0
         sizeof(last_zmq_remote_stats_update));
41
0
  zmq_remote_initial_exported_flows = 0;
42
0
  remote_lifetime_timeout = remote_idle_timeout = 0;
43
0
  once = false, is_sampled_traffic = false;
44
0
  flow_max_idle = ntop->getPrefs()->get_pkt_ifaces_flow_max_idle();
45
#ifdef NTOPNG_PRO
46
  custom_app_maps = NULL;
47
#endif
48
0
  polling_start_time = 0;
49
0
  updateFlowMaxIdle();
50
0
  memset(&recvStats, 0, sizeof(recvStats));
51
0
  memset(&recvStatsCheckpoint, 0, sizeof(recvStatsCheckpoint));
52
53
  /*
54
    Populate defaults for @NTOPNG@ nProbe templates. No need to populate
55
    all the fields as nProbe will sent them periodically.
56
57
    This minimum set is required for backward compatibility.
58
  */
59
0
  addMapping("IN_SRC_MAC", IN_SRC_MAC);
60
0
  addMapping("OUT_SRC_MAC", OUT_SRC_MAC);
61
0
  addMapping("IN_DST_MAC", IN_DST_MAC);
62
0
  addMapping("OUT_DST_MAC", OUT_DST_MAC);
63
0
  addMapping("SRC_VLAN", SRC_VLAN);
64
0
  addMapping("DST_VLAN", DST_VLAN);
65
0
  addMapping("DOT1Q_SRC_VLAN", DOT1Q_SRC_VLAN);
66
0
  addMapping("DOT1Q_DST_VLAN", DOT1Q_DST_VLAN);
67
0
  addMapping("INPUT_SNMP", INPUT_SNMP);
68
0
  addMapping("OUTPUT_SNMP", OUTPUT_SNMP);
69
0
  addMapping("IPV4_SRC_ADDR", IPV4_SRC_ADDR);
70
0
  addMapping("IPV4_DST_ADDR", IPV4_DST_ADDR);
71
0
  addMapping("SRC_TOS", SRC_TOS);
72
0
  addMapping("DST_TOS", DST_TOS);
73
0
  addMapping("L4_SRC_PORT", L4_SRC_PORT);
74
0
  addMapping("L4_DST_PORT", L4_DST_PORT);
75
0
  addMapping("IPV6_SRC_ADDR", IPV6_SRC_ADDR);
76
0
  addMapping("IPV6_DST_ADDR", IPV6_DST_ADDR);
77
0
  addMapping("IP_PROTOCOL_VERSION", IP_PROTOCOL_VERSION);
78
0
  addMapping("PROTOCOL", PROTOCOL);
79
0
  addMapping("L7_PROTO", L7_PROTO, NTOP_PEN);
80
0
  addMapping("L7_PROTO_NAME", L7_PROTO_NAME, NTOP_PEN);
81
0
  addMapping("L7_INFO", L7_INFO, NTOP_PEN);
82
0
  addMapping("L7_CONFIDENCE", L7_CONFIDENCE, NTOP_PEN);
83
0
  addMapping("L7_ERROR_CODE", L7_ERROR_CODE, NTOP_PEN);
84
0
  addMapping("IN_BYTES", IN_BYTES);
85
0
  addMapping("IN_PKTS", IN_PKTS);
86
0
  addMapping("OUT_BYTES", OUT_BYTES);
87
0
  addMapping("OUT_PKTS", OUT_PKTS);
88
0
  addMapping("FIRST_SWITCHED", FIRST_SWITCHED);
89
0
  addMapping("LAST_SWITCHED", LAST_SWITCHED);
90
0
  addMapping("EXPORTER_IPV4_ADDRESS", EXPORTER_IPV4_ADDRESS);
91
0
  addMapping("EXPORTER_IPV6_ADDRESS", EXPORTER_IPV6_ADDRESS);
92
0
  addMapping("TOTAL_FLOWS_EXP", TOTAL_FLOWS_EXP);
93
0
  addMapping("NPROBE_IPV4_ADDRESS", NPROBE_IPV4_ADDRESS, NTOP_PEN);
94
0
  addMapping("NPROBE_INSTANCE_NAME", NPROBE_INSTANCE_NAME, NTOP_PEN);
95
0
  addMapping("TCP_FLAGS", TCP_FLAGS);
96
0
  addMapping("INITIATOR_PKTS", INITIATOR_PKTS);
97
0
  addMapping("INITIATOR_OCTETS", INITIATOR_OCTETS);
98
0
  addMapping("RESPONDER_PKTS", RESPONDER_PKTS);
99
0
  addMapping("RESPONDER_OCTETS", RESPONDER_OCTETS);
100
0
  addMapping("SAMPLING_INTERVAL", SAMPLING_INTERVAL);
101
0
  addMapping("DIRECTION", DIRECTION);
102
0
  addMapping("POST_NAT_SRC_IPV4_ADDR", POST_NAT_SRC_IPV4_ADDR);
103
0
  addMapping("POST_NAT_DST_IPV4_ADDR", POST_NAT_DST_IPV4_ADDR);
104
0
  addMapping("POST_NAT_SRC_TRANSPORT_PORT", POST_NAT_SRC_TRANSPORT_PORT);
105
0
  addMapping("POST_NAT_DST_TRANSPORT_PORT", POST_NAT_DST_TRANSPORT_PORT);
106
0
  addMapping("OBSERVATION_POINT_ID", OBSERVATION_POINT_ID);
107
0
  addMapping("INGRESS_VRFID", INGRESS_VRFID);
108
0
  addMapping("IPV4_SRC_MASK", IPV4_SRC_MASK);
109
0
  addMapping("IPV4_DST_MASK", IPV4_DST_MASK);
110
0
  addMapping("IPV4_NEXT_HOP", IPV4_NEXT_HOP);
111
0
  addMapping("SRC_AS", SRC_AS);
112
0
  addMapping("DST_AS", DST_AS);
113
0
  addMapping("BGP_NEXT_ADJACENT_ASN", BGP_NEXT_ADJACENT_ASN);
114
0
  addMapping("BGP_PREV_ADJACENT_ASN", BGP_PREV_ADJACENT_ASN);
115
0
  addMapping("FLOW_END_REASON", FLOW_END_REASON);
116
0
  addMapping("OOORDER_IN_PKTS", OOORDER_IN_PKTS, NTOP_PEN);
117
0
  addMapping("OOORDER_OUT_PKTS", OOORDER_OUT_PKTS, NTOP_PEN);
118
0
  addMapping("RETRANSMITTED_IN_PKTS", RETRANSMITTED_IN_PKTS, NTOP_PEN);
119
0
  addMapping("RETRANSMITTED_OUT_PKTS", RETRANSMITTED_OUT_PKTS, NTOP_PEN);
120
0
  addMapping("DNS_QUERY", DNS_QUERY, NTOP_PEN);
121
0
  addMapping("DNS_QUERY_TYPE", DNS_QUERY_TYPE, NTOP_PEN);
122
0
  addMapping("DNS_RET_CODE", DNS_RET_CODE, NTOP_PEN);
123
0
  addMapping("HTTP_URL", HTTP_URL, NTOP_PEN);
124
0
  addMapping("HTTP_SITE", HTTP_SITE, NTOP_PEN);
125
0
  addMapping("HTTP_RET_CODE", HTTP_RET_CODE, NTOP_PEN);
126
0
  addMapping("HTTP_METHOD", HTTP_METHOD, NTOP_PEN);
127
0
  addMapping("HTTP_USER_AGENT", HTTP_USER_AGENT, NTOP_PEN);
128
0
  addMapping("TLS_SERVER_NAME", TLS_SERVER_NAME, NTOP_PEN);
129
0
  addMapping("TLS_CIPHER", TLS_CIPHER, NTOP_PEN);
130
0
  addMapping("SSL_UNSAFE_CIPHER", SSL_UNSAFE_CIPHER, NTOP_PEN);
131
0
  addMapping("JA4C_HASH", JA4C_HASH, NTOP_PEN);
132
0
  addMapping("BITTORRENT_HASH", BITTORRENT_HASH, NTOP_PEN);
133
0
  addMapping("SRC_FRAGMENTS", SRC_FRAGMENTS, NTOP_PEN);
134
0
  addMapping("DST_FRAGMENTS", DST_FRAGMENTS, NTOP_PEN);
135
0
  addMapping("CLIENT_NW_LATENCY_MS", CLIENT_NW_LATENCY_MS, NTOP_PEN);
136
0
  addMapping("SERVER_NW_LATENCY_MS", SERVER_NW_LATENCY_MS, NTOP_PEN);
137
0
  addMapping("L7_PROTO_RISK", L7_PROTO_RISK, NTOP_PEN);
138
0
  addMapping("L7_PROTO_RISK_NAME", L7_PROTO_RISK_NAME, NTOP_PEN );
139
0
  addMapping("FLOW_VERDICT", FLOW_VERDICT, NTOP_PEN);
140
0
  addMapping("L7_RISK_INFO", L7_RISK_INFO, NTOP_PEN);
141
0
  addMapping("FLOW_SOURCE", FLOW_SOURCE, NTOP_PEN);
142
0
  addMapping("SMTP_MAIL_FROM", SMTP_MAIL_FROM, NTOP_PEN);
143
0
  addMapping("SMTP_RCPT_TO", SMTP_RCPT_TO, NTOP_PEN);
144
0
  addMapping("UNIQUE_SOURCE_ID", UNIQUE_SOURCE_ID, NTOP_PEN);
145
146
  /* eBPF / Process */
147
0
  addMapping("SRC_PROC_PID", SRC_PROC_PID, NTOP_PEN);
148
0
  addMapping("SRC_PROC_NAME", SRC_PROC_NAME, NTOP_PEN);
149
0
  addMapping("SRC_PROC_UID", SRC_PROC_UID, NTOP_PEN);
150
0
  addMapping("SRC_PROC_USER_NAME", SRC_PROC_USER_NAME, NTOP_PEN);
151
0
  addMapping("SRC_FATHER_PROC_PID", SRC_FATHER_PROC_PID, NTOP_PEN);
152
0
  addMapping("SRC_FATHER_PROC_NAME", SRC_FATHER_PROC_NAME, NTOP_PEN);
153
0
  addMapping("SRC_FATHER_PROC_PKG_NAME", SRC_FATHER_PROC_PKG_NAME, NTOP_PEN);
154
0
  addMapping("SRC_FATHER_PROC_UID", SRC_FATHER_PROC_UID, NTOP_PEN);
155
0
  addMapping("SRC_FATHER_PROC_USER_NAME", SRC_FATHER_PROC_USER_NAME, NTOP_PEN);
156
0
  addMapping("SRC_PROC_ACTUAL_MEMORY", SRC_PROC_ACTUAL_MEMORY, NTOP_PEN);
157
0
  addMapping("SRC_PROC_PEAK_MEMORY", SRC_PROC_PEAK_MEMORY, NTOP_PEN);
158
0
  addMapping("SRC_PROC_AVERAGE_CPU_LOAD", SRC_PROC_AVERAGE_CPU_LOAD, NTOP_PEN);
159
0
  addMapping("SRC_PROC_NUM_PAGE_FAULTS", SRC_PROC_NUM_PAGE_FAULTS, NTOP_PEN);
160
0
  addMapping("SRC_PROC_PCTG_IOWAIT", SRC_PROC_PCTG_IOWAIT, NTOP_PEN);
161
0
  addMapping("SRC_PROC_PKG_NAME", SRC_PROC_PKG_NAME, NTOP_PEN);
162
0
  addMapping("SRC_PROC_CMDLINE", SRC_PROC_CMDLINE, NTOP_PEN);
163
0
  addMapping("SRC_PROC_CONTAINER_ID", SRC_PROC_CONTAINER_ID, NTOP_PEN);
164
165
0
  addMapping("DST_PROC_PID", DST_PROC_PID, NTOP_PEN);
166
0
  addMapping("DST_PROC_NAME", DST_PROC_NAME, NTOP_PEN);
167
0
  addMapping("DST_PROC_UID", DST_PROC_UID, NTOP_PEN);
168
0
  addMapping("DST_PROC_USER_NAME", DST_PROC_USER_NAME, NTOP_PEN);
169
0
  addMapping("DST_FATHER_PROC_PID", DST_FATHER_PROC_PID, NTOP_PEN);
170
0
  addMapping("DST_FATHER_PROC_NAME", DST_FATHER_PROC_NAME, NTOP_PEN);
171
0
  addMapping("DST_FATHER_PROC_PKG_NAME", DST_FATHER_PROC_PKG_NAME, NTOP_PEN);
172
0
  addMapping("DST_FATHER_PROC_UID", DST_FATHER_PROC_UID, NTOP_PEN);
173
0
  addMapping("DST_FATHER_PROC_USER_NAME", DST_FATHER_PROC_USER_NAME, NTOP_PEN);
174
0
  addMapping("DST_PROC_ACTUAL_MEMORY", DST_PROC_ACTUAL_MEMORY, NTOP_PEN);
175
0
  addMapping("DST_PROC_PEAK_MEMORY", DST_PROC_PEAK_MEMORY, NTOP_PEN);
176
0
  addMapping("DST_PROC_AVERAGE_CPU_LOAD", DST_PROC_AVERAGE_CPU_LOAD, NTOP_PEN);
177
0
  addMapping("DST_PROC_NUM_PAGE_FAULTS", DST_PROC_NUM_PAGE_FAULTS, NTOP_PEN);
178
0
  addMapping("DST_PROC_PCTG_IOWAIT", DST_PROC_PCTG_IOWAIT, NTOP_PEN);
179
0
  addMapping("DST_PROC_PKG_NAME", DST_PROC_PKG_NAME, NTOP_PEN);
180
0
  addMapping("DST_PROC_CMDLINE", DST_PROC_CMDLINE, NTOP_PEN);
181
0
  addMapping("DST_PROC_CONTAINER_ID", DST_PROC_CONTAINER_ID, NTOP_PEN);
182
183
  /* sFlow Counter Fields */
184
0
  addCounterMapping("deviceIP", SFLOW_DEVICE_IP);
185
0
  addCounterMapping("samplesGenerated", SFLOW_SAMPLES_GENERATED);
186
0
  addCounterMapping("ifIndex", SFLOW_IF_INDEX);
187
0
  addCounterMapping("ifName", SFLOW_IF_NAME);
188
0
  addCounterMapping("ifType", SFLOW_IF_TYPE);
189
0
  addCounterMapping("ifSpeed", SFLOW_IF_SPEED);
190
0
  addCounterMapping("ifDirection", SFLOW_IF_DIRECTION);
191
0
  addCounterMapping("ifAdminStatus", SFLOW_IF_ADMIN_STATUS);
192
0
  addCounterMapping("ifOperStatus", SFLOW_IF_OPER_STATUS);
193
0
  addCounterMapping("ifInOctets", SFLOW_IF_IN_OCTETS);
194
0
  addCounterMapping("ifInPackets", SFLOW_IF_IN_PACKETS);
195
0
  addCounterMapping("ifInErrors", SFLOW_IF_IN_ERRORS);
196
0
  addCounterMapping("ifOutOctets", SFLOW_IF_OUT_OCTETS);
197
0
  addCounterMapping("ifOutPackets", SFLOW_IF_OUT_PACKETS);
198
0
  addCounterMapping("ifOutErrors", SFLOW_IF_OUT_ERRORS);
199
0
  addCounterMapping("ifPromiscuousMode", SFLOW_IF_PROMISCUOUS_MODE);
200
201
0
  if(ntop->getPrefs()->is_edr_mode())
202
0
    loadVLANMappings();
203
0
}
204
205
/* **************************************************** */
206
207
0
ZMQParserInterface::~ZMQParserInterface() {
208
0
  map<u_int32_t, nProbeStats *>::iterator it;
209
210
0
  if (zmq_remote_stats) free(zmq_remote_stats);
211
0
  if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow);
212
#ifdef NTOPNG_PRO
213
  if (custom_app_maps) delete (custom_app_maps);
214
#endif
215
216
0
  for (it = source_id_last_zmq_remote_stats.begin();
217
0
       it != source_id_last_zmq_remote_stats.end(); ++it)
218
0
    delete (it->second);
219
220
0
  source_id_last_zmq_remote_stats.clear();
221
0
}
222
223
/* **************************************************** */
224
225
/*
226
  these mappings are sent by nprobe via zmq at startup and collected via
227
  zmqparserinterface::parsetemplate()
228
*/
229
void ZMQParserInterface::addMapping(const char *sym, u_int32_t num,
230
0
                                    u_int32_t pen, const char *descr) {
231
0
  string label(sym);
232
0
  labels_map_t::iterator it;
233
0
  pen_value_t cur_pair = make_pair(pen, num);
234
235
0
  if ((it = labels_map.find(label)) == labels_map.end())
236
0
    labels_map.insert(make_pair(label, cur_pair));
237
0
  else
238
0
    it->second.first = pen, it->second.second = num;
239
240
0
  if (descr) {
241
0
    descriptions_map_t::iterator dit;
242
243
0
    if ((dit = descriptions_map.find(cur_pair)) == descriptions_map.end())
244
0
      descriptions_map.insert(make_pair(cur_pair, descr));
245
0
  }
246
0
}
247
248
/* **************************************************** */
249
250
bool ZMQParserInterface::getKeyId(char *sym, u_int32_t sym_len,
251
                                  u_int32_t *const pen,
252
0
                                  u_int32_t *const field) const {
253
0
  u_int32_t cur_pen, cur_field;
254
0
  string label(sym);
255
0
  labels_map_t::const_iterator it;
256
0
  bool is_num, is_dotted;
257
258
0
  *pen = UNKNOWN_PEN, *field = UNKNOWN_FLOW_ELEMENT;
259
260
0
  is_num = Utils::isNumber(sym, sym_len, &is_dotted);
261
262
0
  if (is_num && is_dotted) {
263
0
    if (sscanf(sym, "%u.%u", &cur_pen, &cur_field) != 2) return false;
264
0
    *pen = cur_pen, *field = cur_field;
265
0
  } else if (is_num) {
266
0
    cur_field = atoi(sym);
267
0
    *pen = 0, *field = cur_field;
268
0
  } else if ((it = labels_map.find(label)) != labels_map.end()) {
269
0
    *pen = it->second.first, *field = it->second.second;
270
0
  } else {
271
0
    return false;
272
0
  }
273
274
0
  return true;
275
0
}
276
277
/* **************************************************** */
278
279
0
void ZMQParserInterface::addCounterMapping(const char *sym, u_int32_t id) {
280
0
  string label(sym);
281
0
  counters_map_t::iterator it;
282
283
0
  if ((it = counters_map.find(label)) == counters_map.end())
284
0
    counters_map.insert(make_pair(label, id));
285
0
}
286
287
/* **************************************************** */
288
289
bool ZMQParserInterface::getCounterId(char *sym, u_int32_t sym_len,
290
0
                                      u_int32_t *id) const {
291
0
  string label(sym);
292
0
  counters_map_t::const_iterator it;
293
0
  bool is_num, is_dotted;
294
295
0
  is_num = Utils::isNumber(sym, sym_len, &is_dotted);
296
297
0
  if (is_num) {
298
0
    *id = atoi(sym);
299
0
  } else if ((it = counters_map.find(label)) != counters_map.end()) {
300
0
    *id = it->second;
301
0
  } else {
302
0
    return false;
303
0
  }
304
305
0
  return true;
306
0
}
307
308
/* **************************************************** */
309
310
const char *ZMQParserInterface::getKeyDescription(u_int32_t pen,
311
0
                                                  u_int32_t field) const {
312
0
  descriptions_map_t::const_iterator it;
313
314
0
  if ((it = descriptions_map.find(make_pair(pen, field))) !=
315
0
      descriptions_map.end())
316
0
    return it->second.c_str();
317
318
0
  return NULL;
319
0
}
320
321
/* **************************************************** */
322
323
u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,
324
                                        u_int32_t source_id, u_int32_t msg_id,
325
0
                                        void *data) {
326
0
  json_object *o;
327
0
  enum json_tokener_error jerr = json_tokener_success;
328
0
  nProbeStats zrs; /* Do not instantiate, automatically cleaned outside of the scope */
329
0
  const u_int32_t max_timeout = 600, min_timeout = 60;
330
331
0
  if (polling_start_time == 0) polling_start_time = (u_int32_t)time(NULL);
332
333
  // #define TRACE_EXPORTERS
334
335
#ifdef TRACE_EXPORTERS
336
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msg_id: %u] %s", msg_id, payload);
337
#endif
338
339
0
  o = json_tokener_parse_verbose(payload, &jerr);
340
341
0
  if (o) {
342
0
    json_object *w, *z;
343
344
0
    zrs.source_id = source_id, zrs.last_update = time(NULL);
345
346
0
    if (json_object_object_get_ex(o, "bytes", &w))
347
0
      zrs.remote_bytes = (u_int64_t)json_object_get_int64(w);
348
0
    if (json_object_object_get_ex(o, "packets", &w))
349
0
      zrs.remote_pkts = (u_int64_t)json_object_get_int64(w);
350
0
    if (json_object_object_get_ex(o, "packet_drops", &w))
351
0
      zrs.remote_pkt_drops = (u_int64_t)json_object_get_int64(w);
352
353
0
    if (json_object_object_get_ex(o, "iface", &w)) {
354
0
      if (json_object_object_get_ex(w, "name", &z))
355
0
        snprintf(zrs.remote_ifname, sizeof(zrs.remote_ifname), "%s",
356
0
                 json_object_get_string(z));
357
0
      if (json_object_object_get_ex(w, "speed", &z))
358
0
        zrs.remote_ifspeed = (u_int32_t)json_object_get_int64(z);
359
0
      if (json_object_object_get_ex(w, "ip", &z))
360
0
        snprintf(zrs.remote_ifaddress, sizeof(zrs.remote_ifaddress), "%s",
361
0
                 json_object_get_string(z));
362
0
    }
363
364
0
    if (json_object_object_get_ex(o, "mode", &w))
365
0
      snprintf(zrs.mode, sizeof(zrs.mode),
366
0
                "%s", json_object_get_string(w));
367
368
0
    if (json_object_object_get_ex(o, "probe", &w)) {
369
0
      if (json_object_object_get_ex(w, "public_ip", &z))
370
0
        snprintf(zrs.remote_probe_public_address,
371
0
                 sizeof(zrs.remote_probe_public_address), "%s",
372
0
                 json_object_get_string(z));
373
0
      if (json_object_object_get_ex(w, "uuid", &z))
374
0
        snprintf(zrs.uuid, sizeof(zrs.uuid),
375
0
                 "%s", json_object_get_string(z));
376
377
0
      if (json_object_object_get_ex(w, "uuid_num", &z) /* uuid_num (old) == unique_source_id (new) */
378
0
    || json_object_object_get_ex(w, "unique_source_id", &z)) {
379
0
        zrs.uuid_num = (u_int32_t)json_object_get_int64(z);
380
0
      } if (json_object_object_get_ex(w, "ip", &z))
381
0
        snprintf(zrs.remote_probe_address, sizeof(zrs.remote_probe_address),
382
0
                 "%s", json_object_get_string(z));
383
0
      if (json_object_object_get_ex(w, "version", &z))
384
0
        snprintf(zrs.remote_probe_version, sizeof(zrs.remote_probe_version),
385
0
                 "%s", json_object_get_string(z));
386
0
      if (json_object_object_get_ex(w, "osname", &z))
387
0
        snprintf(zrs.remote_probe_os, sizeof(zrs.remote_probe_os), "%s",
388
0
                 json_object_get_string(z));
389
0
      if (json_object_object_get_ex(w, "license", &z))
390
0
        snprintf(zrs.remote_probe_license, sizeof(zrs.remote_probe_license),
391
0
                 "%s", json_object_get_string(z));
392
0
      if (json_object_object_get_ex(w, "edition", &z))
393
0
        snprintf(zrs.remote_probe_edition, sizeof(zrs.remote_probe_edition),
394
0
                 "%s", json_object_get_string(z));
395
0
      if (json_object_object_get_ex(w, "maintenance", &z))
396
0
        snprintf(zrs.remote_probe_maintenance,
397
0
                 sizeof(zrs.remote_probe_maintenance), "%s",
398
0
                 json_object_get_string(z));
399
0
    }
400
401
0
    if (json_object_object_get_ex(o, "time", &w)) {
402
0
      int32_t time_delta;
403
404
0
      zrs.local_time = (u_int32_t)time(NULL);
405
0
      zrs.remote_time =
406
0
          (u_int32_t)json_object_get_int64(w); /* nProbe remote time */
407
408
0
      time_delta = (int32_t)zrs.local_time - zrs.remote_time;
409
410
      /*
411
         Skip the check for the first few seconds
412
         as we might receive old messages from the probe
413
      */
414
0
      if ((msg_id != 0) /* Skip message with drops to avoid miscalculations */
415
0
          && ((zrs.local_time - polling_start_time) > 5)) {
416
0
        if (abs(time_delta) >= 10) {
417
0
          ntop->getTrace()->traceEvent(TRACE_NORMAL,
418
0
                                       "Remote probe clock drift %u sec "
419
0
                                       "detected (local: %u remote: %u [%s])",
420
0
                                       abs(time_delta), zrs.local_time,
421
0
                                       zrs.remote_time,
422
0
                                       zrs.remote_probe_address);
423
0
        }
424
0
      } else
425
0
        zrs.remote_time = zrs.local_time; /* Avoid clock drift messages during
426
                                             the grace period */
427
0
    }
428
429
0
    if (json_object_object_get_ex(o, "avg", &w)) {
430
0
      if (json_object_object_get_ex(w, "bps", &z))
431
0
        zrs.avg_bps = (u_int32_t)json_object_get_int64(z);
432
0
      if (json_object_object_get_ex(w, "pps", &z))
433
0
        zrs.avg_pps = (u_int32_t)json_object_get_int64(z);
434
0
    }
435
436
0
    if (json_object_object_get_ex(o, "timeout", &w)) {
437
0
      if (json_object_object_get_ex(w, "lifetime", &z)) {
438
0
        zrs.remote_lifetime_timeout = (u_int32_t)json_object_get_int64(z);
439
440
0
        if (zrs.remote_lifetime_timeout > max_timeout)
441
0
          zrs.remote_lifetime_timeout = max_timeout;
442
0
      }
443
444
0
      if (json_object_object_get_ex(w, "idle", &z)) {
445
0
        zrs.remote_idle_timeout = (u_int32_t)json_object_get_int64(z);
446
0
        zrs.remote_idle_timeout *= 2; /* Double the idle timeout for NetFlow */
447
0
        if (zrs.remote_idle_timeout > max_timeout)
448
0
          zrs.remote_idle_timeout = max_timeout;
449
0
        if (zrs.remote_idle_timeout < min_timeout)
450
0
          zrs.remote_idle_timeout = min_timeout;
451
0
      }
452
453
0
      if (json_object_object_get_ex(w, "collected_lifetime", &z))
454
0
        zrs.remote_collected_lifetime_timeout =
455
0
            (u_int32_t)json_object_get_int64(z);
456
0
    }
457
458
0
    if (json_object_object_get_ex(o, "drops", &w)) {
459
0
      if (json_object_object_get_ex(w, "export_queue_full", &z))
460
0
        zrs.export_queue_full = (u_int32_t)json_object_get_int64(z);
461
462
0
      if (json_object_object_get_ex(w, "too_many_flows", &z))
463
0
        zrs.too_many_flows = (u_int32_t)json_object_get_int64(z);
464
465
0
      if (json_object_object_get_ex(w, "elk_flow_drops", &z))
466
0
        zrs.elk_flow_drops = (u_int32_t)json_object_get_int64(z);
467
468
0
      if (json_object_object_get_ex(w, "sflow_pkt_sample_drops", &z))
469
0
        zrs.sflow_pkt_sample_drops = (u_int32_t)json_object_get_int64(z);
470
471
0
      if (json_object_object_get_ex(w, "flow_collection_drops", &z))
472
0
        zrs.flow_collection_drops = (u_int32_t)json_object_get_int64(z);
473
474
0
      if (json_object_object_get_ex(w, "flow_collection_udp_socket_drops", &z))
475
0
        zrs.flow_collection_udp_socket_drops =
476
0
            (u_int32_t)json_object_get_int64(z);
477
0
    }
478
479
0
    if (json_object_object_get_ex(o, "flow_collection", &w)) {
480
0
      if (json_object_object_get_ex(w, "nf_ipfix_flows", &z))
481
0
        zrs.flow_collection.nf_ipfix_flows =
482
0
            (u_int64_t)json_object_get_int64(z);
483
484
0
      if (json_object_object_get_ex(w, "collection_port", &z))
485
0
        zrs.flow_collection.collection_port =
486
0
            (u_int64_t)json_object_get_int64(z);
487
488
0
      if (json_object_object_get_ex(w, "sflow_samples", &z))
489
0
        zrs.flow_collection.sflow_samples = (u_int64_t)json_object_get_int64(z);
490
491
0
      if (json_object_object_get_ex(w, "exporters", &z)) {
492
0
        json_object_object_foreach(z, key, val) {
493
          //ntop->getTrace()->traceEvent(TRACE_NORMAL, "Exporter: %s", key);
494
0
          u_int32_t ip = ntohl(inet_addr(key));
495
0
          ExporterStats exp_stats;
496
0
          json_object *x;
497
498
0
    memset(&exp_stats, 0, sizeof(exp_stats));
499
500
0
          if (json_object_object_get_ex(val, "time_last_flow", &x))
501
0
            exp_stats.time_last_used = (u_int32_t)json_object_get_int64(x);
502
503
0
    if (json_object_object_get_ex(val, "num_sflow_flows", &x))
504
0
            exp_stats.num_sflow_flows = (u_int32_t)json_object_get_int64(x);
505
506
0
    if (json_object_object_get_ex(val, "num_netflow_ipfix_flows", &x))
507
0
            exp_stats.num_netflow_flows = (u_int32_t)json_object_get_int64(x);
508
509
0
    if (json_object_object_get_ex(val, "num_drops", &x))
510
0
            exp_stats.num_drops = (u_int32_t)json_object_get_int64(x);
511
512
0
    if (json_object_object_get_ex(val, "unique_source_id", &x))
513
0
            exp_stats.unique_source_id = (u_int32_t)json_object_get_int64(x);
514
515
0
          zrs.exportersStats[ip] = exp_stats;
516
0
        }
517
0
      }
518
0
    }
519
520
0
    if (json_object_object_get_ex(o, "zmq", &w)) {
521
0
      if (json_object_object_get_ex(w, "num_flow_exports", &z))
522
0
        zrs.num_flow_exports = (u_int64_t)json_object_get_int64(z);
523
524
0
      if (json_object_object_get_ex(w, "num_zmq_exporters", &z))
525
0
        zrs.num_exporters = (u_int8_t)json_object_get_int(z);
526
0
    }
527
528
#ifdef ZMQ_EVENT_DEBUG
529
    ntop->getTrace()->traceEvent(TRACE_NORMAL,
530
         "Event parsed "
531
         "[iface: {name: %s, speed: %u, ip: %s}]"
532
         "[probe: {public_ip: %s, ip: %s, version: %s, os: %s, license: %s, "
533
         "edition: %s, maintenance: %s}]"
534
         "[avg: {bps: %u, pps: %u}]"
535
         "[remote: {time: %u, bytes: %u, packets: %u, drops: %u, idle_timeout: %u, "
536
         "lifetime_timeout: %u,"
537
         " collected_lifetime_timeout: %u }]"
538
         "[zmq: {num_exporters: %u, num_flow_exports: %u}]",
539
         zrs.remote_ifname, zrs.remote_ifspeed, zrs.remote_ifaddress,
540
         zrs.remote_probe_version, zrs.remote_probe_os, zrs.remote_probe_license,
541
         zrs.remote_probe_edition, zrs.remote_probe_maintenance,
542
         zrs.remote_probe_public_address, zrs.remote_probe_address, zrs.avg_bps,
543
         zrs.avg_pps, zrs.remote_time, (u_int32_t)zrs.remote_bytes,
544
         (u_int32_t)zrs.remote_pkts, (u_int32_t)zrs.remote_pkt_drops, zrs.remote_idle_timeout,
545
         zrs.remote_lifetime_timeout, zrs.remote_collected_lifetime_timeout,
546
         zrs.num_exporters, zrs.num_flow_exports);
547
#endif
548
549
0
    remote_lifetime_timeout = zrs.remote_lifetime_timeout,
550
0
    remote_idle_timeout = zrs.remote_idle_timeout;
551
552
    /* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps);
553
     */
554
555
    /* Process Flow */
556
0
    setRemoteStats(&zrs);
557
558
0
    for (std::map<u_int64_t, NetworkInterface *>::iterator it = flowHashing.begin();
559
0
         it != flowHashing.end(); ++it) {
560
0
      ZMQParserInterface *z = (ZMQParserInterface *)it->second;
561
562
0
      z->setRemoteStats(&zrs);
563
0
    }
564
565
    /* Dispose memory */
566
0
    json_object_put(o);
567
0
  } else {
568
    // if o != NULL
569
0
    if (!once) {
570
0
      ntop->getTrace()->traceEvent(TRACE_WARNING,
571
0
                                   "Invalid message received: "
572
0
                                   "your nProbe sender is outdated, data "
573
0
                                   "encrypted, invalid JSON, or oom?");
574
0
      ntop->getTrace()->traceEvent(
575
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
576
0
          json_tokener_error_desc(jerr), payload_size, payload);
577
0
    }
578
579
0
    once = true;
580
0
    if (o) json_object_put(o);
581
582
0
    return -1;
583
0
  }
584
585
0
  return 0;
586
0
}
587
588
/* **************************************************** */
589
590
bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
591
                                           u_int32_t field,
592
0
                                           ParsedValue *value) {
593
0
  IpAddress ip_aux; /* used to check empty IPs */
594
595
0
  switch (field) {
596
0
    case IN_SRC_MAC:
597
0
    case OUT_SRC_MAC:
598
      /* Format 00:00:00:00:00:00 */
599
0
      Utils::parseMac(flow->src_mac, value->string);
600
0
      break;
601
0
    case IN_DST_MAC:
602
0
    case OUT_DST_MAC:
603
0
      Utils::parseMac(flow->dst_mac, value->string);
604
0
      break;
605
0
    case SRC_TOS:
606
0
      flow->src_tos = value->int_num;
607
0
      break;
608
0
    case DST_TOS:
609
0
      flow->dst_tos = value->int_num;
610
0
      break;
611
0
    case IPV4_SRC_ADDR:
612
0
    case IPV6_SRC_ADDR:
613
      /*
614
        The following check prevents an empty ip address (e.g., ::) to
615
        to overwrite another valid ip address already set.
616
        This can happen for example when nProbe is configured (-T) to export
617
        both %IPV4_SRC_ADDR and the %IPV6_SRC_ADDR. In that cases nProbe can
618
        export a valid ipv4 and an empty ipv6. Without the check, the empty
619
        v6 address may overwrite the non empty v4.
620
      */
621
0
      if (flow->src_ip.isEmpty()) {
622
0
        if (value->string)
623
0
          flow->src_ip.set((char *)value->string);
624
0
        else
625
0
          flow->src_ip.set(ntohl(value->int_num));
626
0
      } else {
627
0
        ip_aux.set((char *)value->string);
628
629
0
        if (!ip_aux.isEmpty() &&
630
0
            !ntop->getPrefs()->do_override_src_with_post_nat_src())
631
          /* tried to overwrite a non-empty IP with another non-empty IP */
632
0
          ntop->getTrace()->traceEvent(
633
0
              TRACE_WARNING,
634
0
              "Attempt to set source ip multiple times. "
635
0
              "Check exported fields");
636
0
      }
637
      /* Pre-Post nat IPs are only supported for IPv4 */
638
0
      if (flow->src_ip.isIPv4()) {
639
0
        flow->setPreNATSrcIp(flow->src_ip.get_ipv4());
640
0
      }
641
0
      break;
642
0
    case IP_PROTOCOL_VERSION:
643
0
      flow->version = value->int_num;
644
0
      break;
645
646
0
    case IPV4_DST_ADDR:
647
0
    case IPV6_DST_ADDR:
648
0
      if (flow->dst_ip.isEmpty()) {
649
0
        if (value->string)
650
0
          flow->dst_ip.set((char *)value->string);
651
0
        else
652
0
          flow->dst_ip.set(ntohl(value->int_num));
653
0
      } else {
654
0
        ip_aux.set((char *)value->string);
655
656
0
        if (!ip_aux.isEmpty() &&
657
0
            !ntop->getPrefs()->do_override_dst_with_post_nat_dst())
658
0
          ntop->getTrace()->traceEvent(TRACE_WARNING,
659
0
               "Attempt to set destination ip multiple times. "
660
0
               "Check exported fields");
661
0
      }
662
      /* Pre-Post nat IPs are only supported for IPv4 */
663
0
      if (flow->dst_ip.isIPv4()) {
664
0
        flow->setPreNATDstIp(flow->dst_ip.get_ipv4());
665
0
      }
666
0
      break;
667
0
    case L4_SRC_PORT:
668
0
      if (!flow->src_port) {
669
0
        if (value->string)
670
0
          flow->src_port = atoi(value->string);
671
0
        else
672
0
          flow->src_port = ntohs((u_int32_t)value->int_num);
673
674
0
        flow->setPreNATSrcPort(flow->src_port);
675
0
      }
676
0
      break;
677
0
    case L4_DST_PORT:
678
0
      if (!flow->dst_port) {
679
0
        if (value->string)
680
0
          flow->dst_port = atoi(value->string);
681
0
        else
682
0
          flow->dst_port = ntohs((u_int32_t)value->int_num);
683
684
0
        flow->setPreNATDstPort(flow->dst_port);
685
0
      }
686
0
      break;
687
0
    case SRC_VLAN:
688
0
    case DST_VLAN:
689
0
      if((flow->vlan_id = value->int_num) > 4095) /* Sanity check */
690
0
  flow->vlan_id = 0;
691
0
      break;
692
0
    case DOT1Q_SRC_VLAN:
693
0
    case DOT1Q_DST_VLAN:
694
0
      if (flow->vlan_id == 0) {
695
        /* as those fields are the outer vlans in q-in-q
696
           we set the vlan_id only if there is no inner vlan
697
           value set
698
        */
699
0
        flow->vlan_id = value->int_num;
700
0
      }
701
0
      break;
702
0
    case PROTOCOL:
703
0
      if (value->string)
704
0
        flow->l4_proto = atoi(value->string);
705
0
      else
706
0
        flow->l4_proto = value->int_num;
707
0
      break;
708
0
    case TCP_FLAGS:
709
0
      flow->tcp.tcp_flags = value->int_num;
710
0
      break;
711
0
    case INITIATOR_PKTS:
712
0
      flow->absolute_packet_octet_counters = true;
713
      /* Don't break */
714
0
    case IN_PKTS:
715
0
      if (value->string != NULL)
716
0
        flow->in_pkts = atol(value->string);
717
0
      else
718
0
        flow->in_pkts = value->int_num;
719
0
      break;
720
0
    case INITIATOR_OCTETS:
721
0
      flow->absolute_packet_octet_counters = true;
722
      /* Don't break */
723
0
    case IN_BYTES:
724
0
      if (value->string != NULL)
725
0
        flow->in_bytes = atol(value->string);
726
0
      else
727
0
        flow->in_bytes = value->int_num;
728
0
      break;
729
0
    case RESPONDER_PKTS:
730
0
      flow->absolute_packet_octet_counters = true;
731
      /* Don't break */
732
0
    case OUT_PKTS:
733
0
      if (value->string != NULL)
734
0
        flow->out_pkts = atol(value->string);
735
0
      else
736
0
        flow->out_pkts = value->int_num;
737
0
      break;
738
0
    case RESPONDER_OCTETS:
739
0
      flow->absolute_packet_octet_counters = true;
740
      /* Don't break */
741
0
    case OUT_BYTES:
742
0
      if (value->string != NULL)
743
0
        flow->out_bytes = atol(value->string);
744
0
      else
745
0
        flow->out_bytes = value->int_num;
746
0
      break;
747
0
    case FIRST_SWITCHED:
748
0
      if (value->string != NULL)
749
0
        flow->first_switched = atoi(value->string);
750
0
      else
751
0
        flow->first_switched = value->int_num;
752
0
      break;
753
0
    case LAST_SWITCHED:
754
0
      if (value->string != NULL)
755
0
        flow->last_switched = atoi(value->string);
756
0
      else
757
0
        flow->last_switched = value->int_num;
758
0
      break;
759
0
    case SAMPLING_INTERVAL:
760
#if 0
761
      /* Ignore it as nProbe as already implemented upscale */
762
      flow->pkt_sampling_rate = value->int_num;
763
#endif
764
0
      break;
765
0
    case DIRECTION:
766
0
      if (value->string != NULL)
767
0
        flow->direction = atoi(value->string);
768
0
      else
769
0
        flow->direction = value->int_num;
770
0
      break;
771
0
    case EXPORTER_IPV4_ADDRESS:
772
0
      if (value->string != NULL) {
773
        /* Format: a.b.c.d, possibly overrides NPROBE_IPV4_ADDRESS */
774
0
        u_int32_t ip = ntohl(inet_addr(value->string));
775
776
0
        if (ip) {
777
0
    flow->exporter_device_ip = ip;
778
779
0
    if(ntop->getPrefs()->is_edr_mode()) {
780
0
      char buf[32], ipb[24];
781
0
      std::unordered_map<u_int32_t, bool>::iterator it = cloud_flow_exporters.find(ip);
782
783
0
      if(it == cloud_flow_exporters.end()) {
784
0
        cloud_flow_exporters[ip] = true;
785
0
        snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(ip, ipb, sizeof(ipb)));
786
0
        ntop->addLocalCloudAddress(buf);
787
788
        /* Re-evaluate IPVx_SRC_ADDR/IPVx_DST_ADDR */
789
0
        flow->src_ip.checkIP();
790
0
        flow->dst_ip.checkIP();
791
0
      }
792
0
    }
793
0
  }
794
0
      }
795
0
      break;
796
0
    case EXPORTER_IPV6_ADDRESS:
797
0
      if (value->string != NULL && strlen(value->string) > 0)
798
0
        inet_pton(AF_INET6, value->string, &flow->exporter_device_ipv6);
799
0
      break;
800
0
    case FLOW_END_REASON:
801
0
      if (value->string)
802
0
        flow->setEndReason(value->string);
803
0
      break;
804
0
    case TOTAL_FLOWS_EXP:
805
/*
806
      if(value->string != NULL)
807
        total_flows_exp = atol(value->string);
808
      else
809
        total_flows_exp = value->int_num;
810
      ntop->getTrace()->traceEvent(TRACE_INFO,
811
                                   "Total Exported Flows %u", total_flows_exp);
812
*/
813
0
      break;
814
0
    case INPUT_SNMP:
815
0
      flow->inIndex = value->int_num;
816
0
      break;
817
0
    case OUTPUT_SNMP:
818
0
      flow->outIndex = value->int_num;
819
0
      break;
820
0
    case OBSERVATION_POINT_ID:
821
0
      flow->observationPointId = value->int_num;
822
0
      break;
823
0
    case POST_NAT_SRC_IPV4_ADDR:
824
      /* Alwais set src_ip_addr_post_nat, however switch the src_ip only if preference is set*/
825
0
      if (value->string) {
826
0
        IpAddress tmp;
827
0
        tmp.set(value->string);
828
0
        if (!tmp.isEmpty()) {
829
0
          flow->setPostNATSrcIp(tmp.get_ipv4());
830
0
        }
831
0
        if (ntop->getPrefs()->do_override_src_with_post_nat_src()) {
832
0
          if (!tmp.isEmpty()) {
833
0
            flow->src_ip.set((char *)value->string);
834
0
          }
835
0
        }
836
0
      } else if (value->int_num) {
837
0
        if (ntop->getPrefs()->do_override_src_with_post_nat_src()) {
838
0
          flow->src_ip.set(ntohl(value->int_num));
839
0
        }
840
0
        flow->setPostNATSrcIp(ntohl(value->int_num));
841
0
      }
842
0
      break;
843
0
    case POST_NAT_DST_IPV4_ADDR:
844
      /* Alwais set dst_ip_addr_post_nat, however switch the dst_ip only if preference is set*/
845
0
      if (value->string) {
846
0
        IpAddress tmp;
847
0
        tmp.set(value->string);
848
0
        if (!tmp.isEmpty()) {
849
0
          flow->setPostNATDstIp(tmp.get_ipv4());
850
0
        }
851
0
        if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) {
852
0
          if (!tmp.isEmpty()) {
853
0
            flow->dst_ip.set((char *)value->string);
854
0
          }
855
0
        }
856
0
      } else if (value->int_num) {
857
0
        if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) {
858
0
          flow->dst_ip.set(ntohl(value->int_num));
859
0
        }
860
0
        flow->setPostNATDstIp(ntohl(value->int_num));
861
0
      }
862
0
      break;
863
0
    case POST_NAT_SRC_TRANSPORT_PORT:
864
0
      if (ntop->getPrefs()->do_override_src_with_post_nat_src() &&
865
0
          (value->int_num != 0))
866
0
        flow->src_port = htons((u_int16_t)value->int_num);
867
0
      if (value->int_num != 0)
868
0
        flow->setPostNATSrcPort(htons((u_int16_t)value->int_num));
869
0
      break;
870
0
    case POST_NAT_DST_TRANSPORT_PORT:
871
0
      if (ntop->getPrefs()->do_override_dst_with_post_nat_dst() &&
872
0
          (value->int_num != 0))
873
0
        flow->dst_port = htons((u_int16_t)value->int_num);
874
0
      if (value->int_num != 0)
875
0
        flow->setPostNATDstPort(htons((u_int16_t)value->int_num));
876
0
      break;
877
0
    case INGRESS_VRFID:
878
0
      flow->vrfId = value->int_num;
879
0
      break;
880
0
    case IPV4_SRC_MASK:
881
0
    case IPV4_DST_MASK:
882
0
      if (value->int_num != 0) return false;
883
0
      break;
884
0
    case IPV4_NEXT_HOP:
885
0
      if (value->string && strcmp(value->string, "0.0.0.0")) return false;
886
0
      break;
887
0
    case SRC_AS:
888
0
      flow->src_as = value->int_num;
889
0
      break;
890
0
    case DST_AS:
891
0
      flow->dst_as = value->int_num;
892
0
      break;
893
0
    case BGP_NEXT_ADJACENT_ASN:
894
0
      flow->next_adjacent_as = value->int_num;
895
0
      break;
896
0
    case BGP_PREV_ADJACENT_ASN:
897
0
      flow->prev_adjacent_as = value->int_num;
898
0
      break;
899
0
    default:
900
0
      ntop->getTrace()->traceEvent(TRACE_INFO,
901
0
                                   "Skipping no-PEN flow fieldId %u", field);
902
0
      return false;
903
0
  }
904
905
0
  return true;
906
0
}
907
908
/* **************************************************** */
909
910
bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
911
                                           u_int32_t field,
912
0
                                           ParsedValue *value) {
913
  /* Check for backward compatibility to handle cases like field = 123
914
   * (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */
915
0
  if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
916
917
  /* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[field %d][%s]", field, value->string ? value->string : ""); */
918
919
0
  switch (field) {
920
0
  case L7_PROTO:
921
0
    if (value->string) {
922
0
      if (!strchr(value->string, '.')) {
923
  /* Old behaviour, only the app protocol */
924
0
  flow->l7_proto.proto.app_protocol = atoi(value->string);
925
0
      } else {
926
0
  char *proto_dot;
927
928
0
  flow->l7_proto.proto.master_protocol = (u_int16_t)strtoll(value->string, &proto_dot, 10);
929
0
  flow->l7_proto.proto.app_protocol    = (u_int16_t)strtoll(proto_dot + 1, NULL, 10);
930
0
      }
931
0
    } else {
932
0
      flow->l7_proto.proto.app_protocol = value->int_num;
933
0
    }
934
935
#if 0
936
    ntop->getTrace()->traceEvent(TRACE_NORMAL, "[value: %s][master: %u][app: %u]",
937
         value->string ? value->string : "(int)",
938
         flow->l7_proto.master_protocol,
939
         flow->l7_proto.app_protocol);
940
#endif
941
0
    break;
942
943
0
  case NPROBE_INSTANCE_NAME:
944
0
    if(ntop->getPrefs()->is_edr_mode()
945
0
       && ntop->getPrefs()->addVLANCloudToExporters()) {
946
0
      u_int16_t vlan_id = findVLANMapping((char*)value->string);
947
948
0
      flow->vlan_id = vlan_id;
949
0
    }
950
0
    break;
951
952
0
  case L7_PROTO_NAME:
953
0
    break;
954
955
0
  case L7_INFO:
956
0
    if (value->string && value->string[0] && value->string[0] != '\n')
957
0
      flow->setL7Info(value->string);
958
0
    break;
959
960
0
  case L7_CONFIDENCE:
961
0
    flow->setConfidence((ndpi_confidence_t)((value->int_num < NDPI_CONFIDENCE_MAX)
962
0
              ? value->int_num
963
0
              : NDPI_CONFIDENCE_UNKNOWN));
964
0
    break;
965
966
0
  case L7_ERROR_CODE:
967
0
    flow->setL7ErrorCode(value->int_num);
968
0
    break;
969
970
0
  case OOORDER_IN_PKTS:
971
0
    flow->tcp.ooo_in_pkts = value->int_num;
972
0
    break;
973
974
0
  case OOORDER_OUT_PKTS:
975
0
    flow->tcp.ooo_out_pkts = value->int_num;
976
0
    break;
977
978
0
  case RETRANSMITTED_IN_PKTS:
979
0
    flow->tcp.retr_in_pkts = value->int_num;
980
0
    break;
981
982
0
  case RETRANSMITTED_OUT_PKTS:
983
0
    flow->tcp.retr_out_pkts = value->int_num;
984
0
    break;
985
986
    /* TODO add lost in/out to nProbe and here */
987
0
  case CLIENT_NW_LATENCY_MS: {
988
0
    float client_nw_latency;
989
0
    client_nw_latency = value->double_num;
990
0
    flow->tcp.clientNwLatency.tv_sec = client_nw_latency / 1e3;
991
0
    flow->tcp.clientNwLatency.tv_usec =
992
0
      1e3 * (client_nw_latency - flow->tcp.clientNwLatency.tv_sec * 1e3);
993
0
  } break;
994
995
0
  case SERVER_NW_LATENCY_MS: {
996
0
    float server_nw_latency;
997
998
0
    server_nw_latency = value->double_num;
999
0
    flow->tcp.serverNwLatency.tv_sec = server_nw_latency / 1e3;
1000
0
    flow->tcp.serverNwLatency.tv_usec =
1001
0
      1e3 * (server_nw_latency - flow->tcp.serverNwLatency.tv_sec * 1e3);
1002
0
  } break;
1003
1004
0
  case CLIENT_TCP_FLAGS:
1005
0
    flow->tcp.client_tcp_flags = value->int_num;
1006
0
    flow->tcp.tcp_flags |= flow->tcp.client_tcp_flags;
1007
0
    break;
1008
1009
0
  case SERVER_TCP_FLAGS:
1010
0
    flow->tcp.server_tcp_flags = value->int_num;
1011
0
    flow->tcp.tcp_flags |= flow->tcp.server_tcp_flags;
1012
0
    break;
1013
1014
0
  case APPL_LATENCY_MS:
1015
0
    flow->tcp.applLatencyMsec = value->double_num;
1016
0
    break;
1017
1018
0
  case TCP_WIN_MAX_IN:
1019
0
    flow->tcp.in_window = value->int_num;
1020
0
    break;
1021
1022
0
  case TCP_WIN_MAX_OUT:
1023
0
    flow->tcp.out_window = value->int_num;
1024
0
    break;
1025
1026
0
  case DNS_QUERY:
1027
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1028
0
      flow->setDNSQuery(value->string);
1029
0
    break;
1030
1031
0
  case DNS_QUERY_TYPE:
1032
0
    flow->setDNSQueryType(value->string ? atoi(value->string) : value->int_num);
1033
0
    break;
1034
1035
0
  case DNS_RET_CODE:
1036
0
    flow->setDNSRetCode(value->string ? atoi(value->string) : value->int_num);
1037
0
    break;
1038
1039
0
  case HTTP_URL:
1040
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1041
0
      flow->setHTTPurl(value->string);
1042
0
    break;
1043
1044
0
  case HTTP_USER_AGENT:
1045
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1046
0
      flow->setHTTPuserAgent(value->string);
1047
0
    break;
1048
1049
0
  case HTTP_SITE:
1050
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1051
0
      flow->setHTTPsite(value->string);
1052
0
    break;
1053
1054
0
  case HTTP_RET_CODE:
1055
0
    flow->setHTTPRetCode(value->string ? atoi(value->string) : value->int_num);
1056
0
    break;
1057
1058
0
  case HTTP_METHOD:
1059
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1060
0
      flow->setHTTPMethod(ndpi_http_str2method(value->string, strlen(value->string)));
1061
0
    break;
1062
1063
0
  case TLS_SERVER_NAME:
1064
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1065
0
      flow->setTLSserverName(value->string);
1066
0
    break;
1067
1068
0
  case JA4C_HASH:
1069
0
    if (value->string && value->string[0])
1070
0
      flow->setJA4cHash(value->string);
1071
0
    break;
1072
1073
0
  case TLS_CIPHER:
1074
0
    flow->setTLSCipher(value->int_num);
1075
0
    break;
1076
1077
0
  case SSL_UNSAFE_CIPHER:
1078
0
    flow->setTLSUnsafeCipher(value->int_num);
1079
0
    break;
1080
1081
0
  case L7_PROTO_RISK:
1082
0
    flow->setRisk((ndpi_risk)value->int_num);
1083
0
    break;
1084
1085
0
  case L7_PROTO_RISK_NAME:
1086
0
    flow->setRiskName(value->string);
1087
0
    break;
1088
1089
0
  case FLOW_VERDICT:
1090
0
    flow->setFlowVerdict(value->int_num);
1091
0
    break;
1092
1093
0
  case L7_RISK_INFO:
1094
0
    if (value->string && value->string[0])
1095
0
      flow->setRiskInfo(value->string);
1096
0
    break;
1097
1098
0
  case FLOW_SOURCE:
1099
0
    {
1100
0
      FlowSource s;
1101
1102
0
      if((value->int_num < packet_to_flow) || (value->int_num > collected_sflow))
1103
0
  s = packet_to_flow; /* Default value */
1104
0
      else
1105
0
  s = static_cast<FlowSource>(value->int_num);
1106
1107
0
      flow->setFlowSource(s);
1108
0
    }
1109
0
    break;
1110
1111
0
  case BITTORRENT_HASH:
1112
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1113
0
      flow->setBittorrentHash(value->string);
1114
0
    break;
1115
1116
0
  case NPROBE_IPV4_ADDRESS:
1117
0
    if (value->string) {
1118
0
      flow->nprobe_ip = ntohl(inet_addr(value->string));
1119
0
      if(flow->exporter_device_ip == 0 && (flow->exporter_device_ip = ntohl(inet_addr(value->string))))
1120
0
        return false;
1121
0
    }
1122
0
    break;
1123
1124
0
  case UNIQUE_SOURCE_ID:
1125
0
    flow->unique_source_id = value->int_num;
1126
0
    break;
1127
1128
0
  case SRC_FRAGMENTS:
1129
0
    flow->in_fragments = value->int_num;
1130
0
    break;
1131
1132
0
  case DST_FRAGMENTS:
1133
0
    flow->out_fragments = value->int_num;
1134
0
    break;
1135
1136
0
  case SRC_PROC_PID:
1137
0
    flow->src_process_info.pid = value->int_num;
1138
0
    break;
1139
1140
0
  case SRC_FATHER_PROC_PID:
1141
0
    flow->src_process_info.father_pid = value->int_num;
1142
0
    break;
1143
1144
0
  case SRC_PROC_NAME:
1145
0
    if (value->string && value->string[0]) {
1146
0
      flow->setParsedProcessInfo();
1147
0
      flow->src_process_info.process_name = strdup(value->string);
1148
1149
#if 0
1150
      ntop->getTrace()->traceEvent(TRACE_NORMAL, "[SRC] %s (%u)",
1151
           flow->src_process_info.process_name,
1152
           ntohs(flow->src_port));
1153
#endif
1154
0
    }
1155
0
    break;
1156
1157
0
  case SRC_FATHER_PROC_NAME:
1158
0
    if (value->string && value->string[0]) {
1159
0
      flow->setParsedProcessInfo();
1160
0
      flow->src_process_info.father_process_name = strdup(value->string);
1161
0
    }
1162
0
    break;
1163
1164
0
  case SRC_PROC_PKG_NAME:
1165
0
    if (value->string && value->string[0])
1166
0
      flow->src_process_info.pkg_name = strdup(value->string);
1167
0
    break;
1168
1169
0
  case SRC_FATHER_PROC_PKG_NAME:
1170
0
    if (value->string && value->string[0])
1171
0
      flow->src_process_info.father_pkg_name = strdup(value->string);
1172
0
    break;
1173
1174
0
  case SRC_PROC_CMDLINE:
1175
0
    if (value->string && value->string[0])
1176
0
      flow->src_process_info.cmd_line = strdup(value->string);
1177
0
    break;
1178
1179
0
  case SRC_PROC_UID:
1180
0
    flow->src_process_info.uid = value->int_num;
1181
0
    break;
1182
1183
0
  case SRC_FATHER_PROC_UID:
1184
0
    flow->src_process_info.father_uid = value->int_num;
1185
0
    break;
1186
1187
0
  case SRC_PROC_USER_NAME:
1188
0
    if (value->string && value->string[0])
1189
0
      flow->src_process_info.uid_name = strdup(value->string);
1190
0
    break;
1191
1192
0
  case SRC_FATHER_PROC_USER_NAME:
1193
0
    if (value->string && value->string[0])
1194
0
      flow->src_process_info.father_uid_name = strdup(value->string);
1195
0
    break;
1196
1197
0
  case SRC_PROC_CONTAINER_ID:
1198
0
    if (value->string && value->string[0]) {
1199
0
      flow->setParsedContainerInfo();
1200
0
      flow->src_container_info.id = strdup(value->string);
1201
0
    }
1202
0
    break;
1203
1204
0
  case DST_PROC_PID:
1205
0
    flow->dst_process_info.pid = value->int_num;
1206
0
    break;
1207
1208
0
  case DST_FATHER_PROC_PID:
1209
0
    flow->dst_process_info.father_pid = value->int_num;
1210
0
    break;
1211
1212
0
  case DST_PROC_NAME:
1213
0
    if (value->string && value->string[0]) {
1214
0
      flow->setParsedProcessInfo();
1215
0
      flow->dst_process_info.process_name = strdup(value->string);
1216
1217
#if 0
1218
      ntop->getTrace()->traceEvent(TRACE_NORMAL, "[DST] %s (%u)",
1219
           flow->dst_process_info.process_name,
1220
           ntohs(flow->dst_port));
1221
#endif
1222
0
    }
1223
0
    break;
1224
1225
0
  case DST_FATHER_PROC_NAME:
1226
0
    if (value->string && value->string[0]) {
1227
0
      flow->setParsedProcessInfo();
1228
0
      flow->dst_process_info.father_process_name = strdup(value->string);
1229
0
    }
1230
0
    break;
1231
1232
0
  case DST_PROC_PKG_NAME:
1233
0
    if (value->string && value->string[0])
1234
0
      flow->dst_process_info.pkg_name = strdup(value->string);
1235
0
    break;
1236
1237
0
  case DST_FATHER_PROC_PKG_NAME:
1238
0
    if (value->string && value->string[0])
1239
0
      flow->dst_process_info.father_pkg_name = strdup(value->string);
1240
0
    break;
1241
1242
0
  case DST_PROC_CMDLINE:
1243
0
    if (value->string && value->string[0])
1244
0
      flow->dst_process_info.cmd_line = strdup(value->string);
1245
0
    break;
1246
1247
0
  case DST_PROC_UID:
1248
0
    flow->dst_process_info.uid = value->int_num;
1249
0
    break;
1250
1251
0
  case DST_FATHER_PROC_UID:
1252
0
    flow->dst_process_info.father_uid = value->int_num;
1253
0
    break;
1254
1255
0
  case DST_PROC_USER_NAME:
1256
0
    if (value->string && value->string[0])
1257
0
      flow->dst_process_info.uid_name = strdup(value->string);
1258
0
    break;
1259
1260
0
  case DST_FATHER_PROC_USER_NAME:
1261
0
    if (value->string && value->string[0])
1262
0
      flow->dst_process_info.father_uid_name = strdup(value->string);
1263
0
    break;
1264
1265
0
  case DST_PROC_CONTAINER_ID:
1266
0
    if (value->string && value->string[0]) {
1267
0
      flow->setParsedContainerInfo();
1268
0
      flow->dst_container_info.id = strdup(value->string);
1269
0
    }
1270
0
    break;
1271
1272
0
  case SMTP_RCPT_TO:
1273
0
    if(value->string && value->string[0])
1274
0
      flow->setSMTPRcptTo(value->string);
1275
0
    break;
1276
1277
0
  case SMTP_MAIL_FROM:
1278
0
    if(value->string && value->string[0])
1279
0
      flow->setSMTPMailFrom(value->string);
1280
0
    break;
1281
1282
0
  case DHCP_CLIENT_NAME:
1283
0
    if(value->string && value->string[0])
1284
0
      flow->setDHCPClientName(value->string);
1285
0
    break;
1286
1287
0
  case SIP_CALL_ID:
1288
0
    if(value->string && value->string[0])
1289
0
      flow->setSIPCallId(value->string);
1290
0
    break;
1291
1292
0
  default:
1293
0
    return false;
1294
0
  }
1295
1296
0
  return true;
1297
0
}
1298
1299
/* **************************************************** */
1300
1301
bool ZMQParserInterface::matchPENZeroField(ParsedFlow *const flow,
1302
                                           u_int32_t field,
1303
0
                                           ParsedValue *value) {
1304
0
  IpAddress ip_aux; /* used to check empty IPs */
1305
1306
0
  switch (field) {
1307
0
    case IN_SRC_MAC:
1308
0
    case OUT_SRC_MAC: {
1309
0
      u_int8_t mac[6];
1310
0
      Utils::parseMac(mac, value->string);
1311
0
      return (memcmp(flow->src_mac, mac, sizeof(mac)) == 0);
1312
0
    }
1313
1314
0
    case IN_DST_MAC:
1315
0
    case OUT_DST_MAC: {
1316
0
      u_int8_t mac[6];
1317
0
      Utils::parseMac(mac, value->string);
1318
0
      return (memcmp(flow->dst_mac, mac, sizeof(mac)) == 0);
1319
0
    }
1320
1321
0
    case SRC_TOS:
1322
0
      if (value->string)
1323
0
        return (flow->src_tos == atoi(value->string));
1324
0
      else
1325
0
        return (flow->src_tos == value->int_num);
1326
1327
0
    case DST_TOS:
1328
0
      if (value->string)
1329
0
        return (flow->dst_tos == atoi(value->string));
1330
0
      else
1331
0
        return (flow->dst_tos == value->int_num);
1332
1333
0
    case IPV4_SRC_ADDR:
1334
0
    case IPV6_SRC_ADDR: {
1335
0
      IpAddress ip;
1336
0
      if (value->string)
1337
0
        ip.set((char *)value->string);
1338
0
      else
1339
0
        ip.set(ntohl(value->int_num));
1340
0
      return (flow->src_ip.compare(&ip) == 0);
1341
0
    }
1342
1343
0
    case IP_PROTOCOL_VERSION:
1344
0
      if (value->string)
1345
0
        return (flow->version == atoi(value->string));
1346
0
      else
1347
0
        return (flow->version == value->int_num);
1348
1349
0
    case IPV4_DST_ADDR:
1350
0
    case IPV6_DST_ADDR: {
1351
0
      IpAddress ip;
1352
0
      if (value->string)
1353
0
        ip.set((char *)value->string);
1354
0
      else
1355
0
        ip.set(ntohl(value->int_num));
1356
0
      return (flow->dst_ip.compare(&ip) == 0);
1357
0
    }
1358
1359
0
    case L4_SRC_PORT:
1360
0
      if (value->string)
1361
0
        return (flow->src_port == htons((u_int32_t)atoi(value->string)));
1362
0
      else
1363
0
        return (flow->src_port == htons((u_int32_t)value->int_num));
1364
1365
0
    case L4_DST_PORT:
1366
0
      if (value->string)
1367
0
        return (flow->dst_port == htons((u_int32_t)atoi(value->string)));
1368
0
      else
1369
0
        return (flow->dst_port == htons((u_int32_t)value->int_num));
1370
1371
0
    case SRC_VLAN:
1372
0
    case DST_VLAN:
1373
0
    case DOT1Q_SRC_VLAN:
1374
0
    case DOT1Q_DST_VLAN:
1375
0
      if (value->string)
1376
0
        return (flow->vlan_id == atoi(value->string));
1377
0
      else
1378
0
        return (flow->vlan_id == value->int_num);
1379
1380
0
    case PROTOCOL:
1381
0
      if (value->string)
1382
0
        return (flow->l4_proto == atoi(value->string));
1383
0
      else
1384
0
        return (flow->l4_proto == value->int_num);
1385
1386
0
    case DIRECTION:
1387
0
      if (value->string)
1388
0
        return (flow->direction == atoi(value->string));
1389
0
      else
1390
0
        return (flow->direction == value->int_num);
1391
1392
0
    case FLOW_END_REASON:
1393
0
      if (value->string) {
1394
0
        if (flow->getEndReason())
1395
0
          return (!strcmp(value->string, flow->getEndReason()));
1396
0
      }
1397
1398
0
    case EXPORTER_IPV4_ADDRESS:
1399
0
      return (flow->exporter_device_ip == ntohl(inet_addr(value->string)));
1400
1401
0
    case EXPORTER_IPV6_ADDRESS:
1402
0
      if (value->string != NULL && strlen(value->string) > 0) {
1403
0
        struct ndpi_in6_addr ipv6;
1404
1405
0
        if (inet_pton(AF_INET6, value->string, &ipv6) <= 0) return false;
1406
0
        return (memcmp(&flow->exporter_device_ipv6, &ipv6, sizeof(flow->exporter_device_ipv6)) == 0);
1407
0
      }
1408
1409
0
    case INPUT_SNMP:
1410
0
      if (value->string)
1411
0
        return (flow->inIndex == (u_int32_t)atoi(value->string));
1412
0
      else
1413
0
        return (flow->inIndex == value->int_num);
1414
1415
0
    case OUTPUT_SNMP:
1416
0
      if (value->string)
1417
0
        return (flow->outIndex == (u_int32_t)atoi(value->string));
1418
0
      else
1419
0
        return (flow->outIndex == value->int_num);
1420
1421
0
    case OBSERVATION_POINT_ID:
1422
0
      if (value->string)
1423
0
        return (flow->observationPointId == atoi(value->string));
1424
0
      else
1425
0
        return (flow->observationPointId == value->int_num);
1426
1427
0
    case INGRESS_VRFID:
1428
0
      if (value->string)
1429
0
        return (flow->vrfId == (u_int)atoi(value->string));
1430
0
      else
1431
0
        return (flow->vrfId == value->int_num);
1432
1433
0
    case SRC_AS:
1434
0
      if (value->string)
1435
0
        return (flow->src_as == (u_int32_t)atoi(value->string));
1436
0
      else
1437
0
        return (flow->src_as == value->int_num);
1438
1439
0
    case DST_AS:
1440
0
      if (value->string)
1441
0
        return (flow->dst_as == (u_int32_t)atoi(value->string));
1442
0
      else
1443
0
        return (flow->dst_as == value->int_num);
1444
1445
0
    case BGP_NEXT_ADJACENT_ASN:
1446
0
      if (value->string)
1447
0
        return (flow->next_adjacent_as == (u_int32_t)atoi(value->string));
1448
0
      else
1449
0
        return (flow->next_adjacent_as == value->int_num);
1450
1451
0
    case BGP_PREV_ADJACENT_ASN:
1452
0
      if (value->string)
1453
0
        return (flow->prev_adjacent_as == (u_int32_t)atoi(value->string));
1454
0
      else
1455
0
        return (flow->prev_adjacent_as == value->int_num);
1456
1457
0
    default:
1458
0
      ntop->getTrace()->traceEvent(TRACE_INFO,
1459
0
                                   "Skipping no-PEN flow fieldId %u", field);
1460
0
      break;
1461
0
  }
1462
1463
0
  return false;
1464
0
}
1465
1466
/* **************************************************** */
1467
1468
bool ZMQParserInterface::matchPENNtopField(ParsedFlow *const flow,
1469
                                           u_int32_t field,
1470
0
                                           ParsedValue *value) {
1471
  /* Check for backward compatibility to handle cases like field = 123
1472
   * (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */
1473
0
  if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
1474
1475
0
  switch (field) {
1476
0
    case L7_PROTO: {
1477
0
      ndpi_proto l7_proto;;
1478
1479
0
      memset(&l7_proto, 0, sizeof(l7_proto));
1480
1481
0
      if (value->string) {
1482
0
        if (!strchr(value->string, '.')) {
1483
          /* Old behaviour, only the app protocol */
1484
0
          l7_proto.proto.app_protocol = atoi(value->string);
1485
0
        } else {
1486
0
          char *proto_dot;
1487
0
          l7_proto.proto.master_protocol =
1488
0
              (u_int16_t)strtoll(value->string, &proto_dot, 10);
1489
0
          l7_proto.proto.app_protocol = (u_int16_t)strtoll(proto_dot + 1, NULL, 10);
1490
0
        }
1491
0
      } else {
1492
0
        l7_proto.proto.app_protocol = value->int_num;
1493
0
      }
1494
0
      return (flow->l7_proto.proto.app_protocol == l7_proto.proto.app_protocol);
1495
0
    }
1496
1497
0
    case L7_PROTO_NAME:
1498
0
      if (value->string) {
1499
        /* This lookup should be optimized */
1500
0
        u_int16_t app_protocol =
1501
0
            ndpi_get_proto_by_name(get_ndpi_struct(), value->string);
1502
0
        return (flow->l7_proto.proto.app_protocol == app_protocol);
1503
0
      } else
1504
0
        return false;
1505
1506
0
    case L7_INFO:
1507
0
      if (value->string && flow->getL7Info())
1508
0
        return (strcmp(flow->getL7Info(), value->string) == 0);
1509
0
      else
1510
0
        return false;
1511
1512
0
    case L7_ERROR_CODE:
1513
0
      return (flow->getL7ErrorCode() == value->int_num);
1514
1515
0
    case DNS_QUERY:
1516
0
      if (value->string && flow->getDNSQuery())
1517
0
        return (strcmp(flow->getDNSQuery(), value->string) == 0);
1518
0
      else
1519
0
        return false;
1520
1521
0
    case DNS_QUERY_TYPE:
1522
0
      if (value->string)
1523
0
        return (flow->getDNSQueryType() == atoi(value->string));
1524
0
      else
1525
0
        return (flow->getDNSQueryType() == value->int_num);
1526
1527
0
    case HTTP_URL:
1528
0
      if (value->string && flow->getHTTPurl())
1529
0
        return (strcmp(flow->getHTTPurl(), value->string) == 0);
1530
0
      else
1531
0
        return false;
1532
1533
0
    case HTTP_USER_AGENT:
1534
0
      if (value->string && flow->getHTTPuserAgent())
1535
0
        return (strcmp(flow->getHTTPuserAgent(), value->string) == 0);
1536
0
      else
1537
0
        return false;
1538
1539
0
    case HTTP_SITE:
1540
0
      if (value->string && flow->getHTTPsite())
1541
0
        return (strcmp(flow->getHTTPsite(), value->string) == 0);
1542
0
      else
1543
0
        return false;
1544
1545
0
    case TLS_SERVER_NAME:
1546
0
      if (value->string && flow->getTLSserverName())
1547
0
        return (strcmp(flow->getTLSserverName(), value->string) == 0);
1548
0
      else
1549
0
        return false;
1550
1551
0
    case NPROBE_IPV4_ADDRESS:
1552
0
      return (flow->nprobe_ip == ntohl(inet_addr(value->string)));
1553
1554
0
    case UNIQUE_SOURCE_ID:
1555
0
      return (flow->unique_source_id == value->int_num);
1556
1557
0
    case SMTP_MAIL_FROM:
1558
0
      if (value->string && flow->getSMTPMailFrom())
1559
0
        return (strcmp(flow->getSMTPMailFrom(), value->string) == 0);
1560
0
      else
1561
0
        return false;
1562
1563
0
    case SMTP_RCPT_TO:
1564
0
      if (value->string && flow->getSMTPRcptTo())
1565
0
        return (strcmp(flow->getSMTPRcptTo(), value->string) == 0);
1566
0
      else
1567
0
        return false;
1568
1569
0
    default:
1570
0
      break;
1571
0
  }
1572
1573
0
  ntop->getTrace()->traceEvent(
1574
0
      TRACE_WARNING, "Field %u not supported by flow filtering", field);
1575
1576
0
  return false;
1577
0
}
1578
1579
/* **************************************************** */
1580
1581
bool ZMQParserInterface::matchField(ParsedFlow *const flow, const char *key,
1582
0
                                    ParsedValue *value) {
1583
0
  u_int32_t pen, key_id;
1584
0
  bool res;
1585
1586
0
  if (!getKeyId((char *)key, strlen(key), &pen, &key_id)) {
1587
0
    ntop->getTrace()->traceEvent(
1588
0
        TRACE_WARNING, "Field %s not supported by flow filtering", key);
1589
0
    return false;
1590
0
  }
1591
1592
0
  switch (pen) {
1593
0
    case 0: /* No PEN */
1594
0
      res = matchPENZeroField(flow, key_id, value);
1595
0
      break;
1596
0
    case NTOP_PEN:
1597
0
      res = matchPENNtopField(flow, key_id, value);
1598
0
      break;
1599
0
    case UNKNOWN_PEN:
1600
0
    default:
1601
0
      ntop->getTrace()->traceEvent(
1602
0
          TRACE_WARNING, "Field %s not supported by flow filtering", key);
1603
0
      res = false;
1604
0
      break;
1605
0
  }
1606
1607
0
  return res;
1608
0
}
1609
1610
/* **************************************************** */
1611
1612
bool ZMQParserInterface::parseNProbeAgentField(
1613
    ParsedFlow *const flow, const char *key, ParsedValue *value,
1614
0
    json_object *const jvalue) {
1615
0
  bool ret = false;
1616
0
  json_object *obj;
1617
1618
0
  if (!strncmp(key, "timestamp", 9)) {
1619
0
    u_int32_t seconds, nanoseconds /* nanoseconds not currently used */;
1620
0
    if (sscanf(value->string, "%u.%u", &seconds, &nanoseconds) == 2) {
1621
0
      flow->first_switched = flow->last_switched = seconds;
1622
0
      ret = true;
1623
0
    }
1624
0
  } else if (!strncmp(key, "IPV4_LOCAL_ADDR", 15) ||
1625
0
             !strncmp(key, "IPV6_LOCAL_ADDR", 15)) {
1626
0
    flow->src_ip.set(value->string); /* FIX: do not always assume Local == Client */
1627
0
    ret = true;
1628
0
  } else if (!strncmp(key, "IPV4_REMOTE_ADDR", 16) ||
1629
0
             !strncmp(key, "IPV6_REMOTE_ADDR", 16)) {
1630
0
    flow->dst_ip.set(value->string); /* FIX: do not always assume Remote == Server */
1631
0
    ret = true;
1632
0
  } else if (!strncmp(key, "L4_LOCAL_PORT", 13)) {
1633
0
    flow->src_port = htons((u_int32_t)value->int_num);
1634
0
    ret = true;
1635
0
  } else if (!strncmp(key, "L4_REMOTE_PORT", 14)) {
1636
0
    flow->dst_port = htons((u_int32_t)value->int_num);
1637
0
    ret = true;
1638
0
  } else if (!strncmp(key, "INTERFACE_NAME", 7) && strlen(key) == 14) {
1639
0
    flow->ifname = (char *)json_object_get_string(jvalue);
1640
0
    ret = true;
1641
0
  } else if (strlen(key) >= 14 &&
1642
0
             !strncmp(&key[strlen(key) - 14], "FATHER_PROCESS", 14)) {
1643
0
    if (json_object_object_get_ex(jvalue, "PID", &obj))
1644
0
      flow->src_process_info.father_pid = (u_int32_t)json_object_get_int64(obj);
1645
0
    if (json_object_object_get_ex(jvalue, "UID", &obj))
1646
0
      flow->src_process_info.father_uid = (u_int32_t)json_object_get_int64(obj);
1647
0
    if (json_object_object_get_ex(jvalue, "UID_NAME", &obj))
1648
0
      flow->src_process_info.father_uid_name =
1649
0
          strdup((char *)json_object_get_string(obj));
1650
0
    if (json_object_object_get_ex(jvalue, "GID", &obj))
1651
0
      flow->src_process_info.father_gid = (u_int32_t)json_object_get_int64(obj);
1652
0
    if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj))
1653
0
      flow->src_process_info.actual_memory =
1654
0
          (u_int32_t)json_object_get_int64(obj);
1655
0
    if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj))
1656
0
      flow->src_process_info.peak_memory =
1657
0
          (u_int32_t)json_object_get_int64(obj);
1658
0
    if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj))
1659
0
      flow->src_process_info.father_process_name =
1660
0
          strdup((char *)json_object_get_string(obj));
1661
0
    if (!flow->process_info_set) flow->process_info_set = true;
1662
0
    ret = true;
1663
1664
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Father Process [pid: %u][uid:
1665
    // %u][gid: %u][path: %s]",
1666
    //           flow->src_process_info.father_pid,
1667
    // flow->src_process_info.father_uid,
1668
    // flow->src_process_info.father_gid,
1669
    //         flow->src_process_info.father_process_name);
1670
0
  } else if (strlen(key) >= 7 &&
1671
0
             !strncmp(&key[strlen(key) - 7], "PROCESS", 7)) {
1672
0
    if (json_object_object_get_ex(jvalue, "PID", &obj))
1673
0
      flow->src_process_info.pid = (u_int32_t)json_object_get_int64(obj);
1674
0
    if (json_object_object_get_ex(jvalue, "UID", &obj))
1675
0
      flow->src_process_info.uid = (u_int32_t)json_object_get_int64(obj);
1676
0
    if (json_object_object_get_ex(jvalue, "UID_NAME", &obj))
1677
0
      flow->src_process_info.uid_name =
1678
0
          strdup((char *)json_object_get_string(obj));
1679
0
    if (json_object_object_get_ex(jvalue, "GID", &obj))
1680
0
      flow->src_process_info.gid = (u_int32_t)json_object_get_int64(obj);
1681
0
    if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj))
1682
0
      flow->src_process_info.actual_memory =
1683
0
          (u_int32_t)json_object_get_int64(obj);
1684
0
    if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj))
1685
0
      flow->src_process_info.peak_memory =
1686
0
          (u_int32_t)json_object_get_int64(obj);
1687
0
    if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj))
1688
0
      flow->src_process_info.process_name =
1689
0
          strdup((char *)json_object_get_string(obj));
1690
0
    if (!flow->process_info_set) flow->process_info_set = true;
1691
0
    ret = true;
1692
1693
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Process [pid: %u][uid:
1694
    // %u][gid: %u][size/peak vm: %u/%u][path: %s]",
1695
    //         flow->src_process_info.pid,
1696
    // flow->src_process_info.uid, flow->src_process_info.gid,
1697
    // flow->src_process_info.actual_memory, flow->src_process_info.peak_memory,
1698
    // flow->src_process_info.process_name);
1699
0
  } else if (strlen(key) >= 9 &&
1700
0
             !strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) {
1701
0
    if ((ret = parseContainerInfo(jvalue, &flow->src_container_info)))
1702
0
      flow->container_info_set = true;
1703
0
  } else if (!strncmp(key, "TCP", 3) && strlen(key) == 3) {
1704
0
    if (json_object_object_get_ex(jvalue, "CONN_STATE", &obj))
1705
0
      flow->src_tcp_info.conn_state =
1706
0
          Utils::tcpStateStr2State(json_object_get_string(obj));
1707
1708
0
    if (json_object_object_get_ex(jvalue, "SEGS_IN", &obj))
1709
0
      flow->src_tcp_info.in_segs = (u_int32_t)json_object_get_int64(obj);
1710
0
    if (json_object_object_get_ex(jvalue, "SEGS_OUT", &obj))
1711
0
      flow->src_tcp_info.out_segs = (u_int32_t)json_object_get_int64(obj);
1712
0
    if (json_object_object_get_ex(jvalue, "UNACK_SEGMENTS", &obj))
1713
0
      flow->src_tcp_info.unacked_segs = (u_int32_t)json_object_get_int64(obj);
1714
0
    if (json_object_object_get_ex(jvalue, "RETRAN_PKTS", &obj))
1715
0
      flow->src_tcp_info.retx_pkts = (u_int32_t)json_object_get_int64(obj);
1716
0
    if (json_object_object_get_ex(jvalue, "LOST_PKTS", &obj))
1717
0
      flow->src_tcp_info.lost_pkts = (u_int32_t)json_object_get_int64(obj);
1718
1719
0
    if (json_object_object_get_ex(jvalue, "RTT", &obj))
1720
0
      flow->src_tcp_info.rtt = json_object_get_double(obj);
1721
0
    if (json_object_object_get_ex(jvalue, "RTT_VARIANCE", &obj))
1722
0
      flow->src_tcp_info.rtt_var = json_object_get_double(obj);
1723
1724
0
    if (json_object_object_get_ex(jvalue, "BYTES_RCVD", &obj))
1725
0
      flow->out_bytes = flow->src_tcp_info.rcvd_bytes =
1726
0
          (u_int64_t)json_object_get_int64(obj);
1727
1728
0
    if (json_object_object_get_ex(jvalue, "BYTES_ACKED", &obj))
1729
0
      flow->in_bytes = flow->src_tcp_info.sent_bytes =
1730
0
          (u_int64_t)json_object_get_int64(obj);
1731
1732
0
    if (!flow->tcp_info_set) flow->tcp_info_set = true;
1733
0
    flow->absolute_packet_octet_counters = true;
1734
0
    ret = true;
1735
1736
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "TCP INFO [conn state:
1737
    // %s][rcvd_bytes: %u][retx_pkts: %u][lost_pkts: %u]"
1738
    //         "[in_segs: %u][out_segs: %u][unacked_segs: %u]"
1739
    //         "[rtt: %f][rtt_var: %f]",
1740
    //         Utils::tcpState2StateStr(flow->src_tcp_info.conn_state),
1741
    //         flow->src_tcp_info.rcvd_bytes,
1742
    //         flow->src_tcp_info.retx_pkts,
1743
    //         flow->src_tcp_info.lost_pkts,
1744
    //         flow->src_tcp_info.in_segs,
1745
    //         flow->src_tcp_info.out_segs,
1746
    //         flow->src_tcp_info.unacked_segs,
1747
    //         flow->src_tcp_info.rtt,
1748
    //         flow->src_tcp_info.rtt_var);
1749
0
  } else if ((!strncmp(key, "TCP_EVENT_TYPE", 14) && strlen(key) == 14) ||
1750
0
             (!strncmp(key, "UDP_EVENT_TYPE", 14) && strlen(key) == 14)) {
1751
0
    flow->event_type = Utils::eBPFEventStr2Event(value->string);
1752
1753
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Event Type [type: %s]",
1754
    // Utils::eBPFEvent2EventStr(flow->event_type));
1755
0
  }
1756
1757
0
  return ret;
1758
0
}
1759
1760
/* **************************************************** */
1761
1762
0
bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) {
1763
0
  bool invalid_flow = false;
1764
0
  bool rc = false;
1765
1766
0
  if (flow->vlan_id && ntop->getPrefs()->do_ignore_vlans()) flow->vlan_id = 0;
1767
1768
  /*
1769
    Some flow exporters write in host ports the ICMP type/code.
1770
    hence we need to make sure such fields are zeroed in order
1771
    to avoid odd values in the web GUI
1772
   */
1773
0
  if ((flow->l4_proto == IPPROTO_ICMP) || (flow->l4_proto == IPPROTO_ICMPV6))
1774
0
    flow->src_port = flow->dst_port = 0;
1775
1776
  /* Handle zero IPv4/IPv6 discrepancies */
1777
0
  if (!flow->hasParsedeBPF()) {
1778
0
    if (flow->src_ip.getVersion() != flow->dst_ip.getVersion()) {
1779
0
      if (flow->dst_ip.isIPv4() && flow->src_ip.isIPv6() &&
1780
0
          flow->src_ip.isEmpty())
1781
0
        flow->src_ip.setVersion(4);
1782
0
      else if (flow->src_ip.isIPv4() && flow->dst_ip.isIPv6() &&
1783
0
               flow->dst_ip.isEmpty())
1784
0
        flow->dst_ip.setVersion(4);
1785
0
      else if (flow->dst_ip.isIPv6() && flow->src_ip.isIPv4() &&
1786
0
               flow->src_ip.isEmpty())
1787
0
        flow->src_ip.setVersion(6);
1788
0
      else if (flow->src_ip.isIPv6() && flow->dst_ip.isIPv4() &&
1789
0
               flow->dst_ip.isEmpty())
1790
0
        flow->dst_ip.setVersion(6);
1791
0
      else {
1792
0
        invalid_flow = true;
1793
1794
0
        ntop->getTrace()->traceEvent(TRACE_WARNING,
1795
0
             "IP version mismatch: client:%d server:%d - flow will be ignored",
1796
0
             flow->src_ip.getVersion(), flow->dst_ip.getVersion());
1797
0
      }
1798
0
    }
1799
0
  }
1800
1801
0
  if (!invalid_flow) {
1802
0
    if (flow->hasParsedeBPF()) {
1803
      /* Direction already reliable when the event is an accept or a connect.
1804
         Heuristic is only used in the other cases. */
1805
#if 0
1806
      /* Disabled as Flow::setParsedeBPFInfo() now supports directions */
1807
      if(flow->event_type != ebpf_event_type_tcp_accept
1808
   && flow->event_type != ebpf_event_type_tcp_connect
1809
   && ntohs(flow->src_port) < ntohs(flow->dst_port))
1810
  flow->swap();
1811
#endif
1812
0
    } else {
1813
      /* NOTE: keep in sync with Flow::check_swap() */
1814
1815
0
      if(flow->src_ip.isBroadMulticastAddress() || flow->dst_ip.isBroadMulticastAddress())
1816
0
  ; /* Ignore non-unicast IP addresses */
1817
0
      else {
1818
0
  u_int16_t cli_port = ntohs(flow->src_port), srv_port =  ntohs(flow->dst_port);
1819
0
  bool do_swap = (cli_port < srv_port) ? true : false;
1820
1821
0
  if(do_swap) {
1822
0
    if(flow->l4_proto == IPPROTO_TCP) {
1823
      /*
1824
        Don't swap if the client has sent a SYN. Unfortunately as TCP flags
1825
        are in OR we cannot see if this is a initiator or a responder so
1826
        better to be conservative rather than swapping wrongly
1827
1828
        See also https://github.com/ntop/ntopng/issues/1978
1829
      */
1830
1831
0
      if((flow->tcp.client_tcp_flags & TH_SYN) == TH_SYN)
1832
0
        do_swap = false;
1833
0
    } else if(flow->l4_proto == IPPROTO_UDP) {
1834
0
#if 1
1835
      /* We disable UDP swap that might be wrong in particular for probing attempts */
1836
0
      do_swap = false;
1837
#else
1838
      if((cli_port > 32768) && (srv_port > 32768))
1839
        do_swap = false; /* Don't do anything: this might be RTP or similar */
1840
#endif
1841
0
    }
1842
1843
0
    if(do_swap)
1844
0
      flow->swap();
1845
0
  }
1846
0
      }
1847
0
    }
1848
1849
0
    if (flow->pkt_sampling_rate == 0) flow->pkt_sampling_rate = 1;
1850
1851
    /* Process Flow */
1852
0
    INTERFACE_PROFILING_SECTION_ENTER("processFlow", 30);
1853
1854
0
    rc = processFlow(flow);
1855
1856
0
    INTERFACE_PROFILING_SECTION_EXIT(30);
1857
0
  }
1858
1859
0
  if (!rc) recvStats.num_dropped_flows++;
1860
1861
0
  return rc;
1862
0
}
1863
1864
/* **************************************************** */
1865
1866
int ZMQParserInterface::parseSingleJSONFlow(json_object *o,
1867
0
                                            u_int32_t source_id) {
1868
0
  ParsedFlow flow;
1869
0
  struct json_object_iterator it = json_object_iter_begin(o);
1870
0
  struct json_object_iterator itEnd = json_object_iter_end(o);
1871
0
  int ret = 0;
1872
1873
  /* Reset data */
1874
0
  flow.source_id = source_id;
1875
0
  flow.direction = UNKNOWN_FLOW_DIRECTION;
1876
1877
0
  while (!json_object_iter_equal(&it, &itEnd)) {
1878
0
    const char *key = json_object_iter_peek_name(&it);
1879
0
    json_object *jvalue = json_object_iter_peek_value(&it);
1880
0
    json_object *additional_o = NULL;
1881
0
    enum json_type type = json_object_get_type(jvalue);
1882
0
    ParsedValue value = {0};
1883
0
    bool add_to_additional_fields = false;
1884
1885
0
    switch (type) {
1886
0
      case json_type_int:
1887
0
        value.int_num = json_object_get_int64(jvalue);
1888
0
        value.double_num = value.int_num;
1889
0
        break;
1890
0
      case json_type_double:
1891
0
        value.double_num = json_object_get_double(jvalue);
1892
0
        break;
1893
0
      case json_type_boolean:
1894
0
        value.boolean = json_object_get_boolean(jvalue);
1895
0
        break;
1896
0
      case json_type_string:
1897
0
        value.string = json_object_get_string(jvalue);
1898
0
        if (strcmp(key, "json") == 0)
1899
0
          additional_o = json_tokener_parse(value.string);
1900
0
        break;
1901
0
      case json_type_object:
1902
        /* This is handled by parseNProbeAgentField or addAdditionalField */
1903
0
        break;
1904
0
      case json_type_array:
1905
        /* This is handled by parseNProbeAgentField or addAdditionalField */
1906
0
        break;
1907
0
      default:
1908
0
        ntop->getTrace()->traceEvent(
1909
0
            TRACE_WARNING, "JSON type %u not supported [key: %s]\n", type, key);
1910
0
        break;
1911
0
    }
1912
1913
0
    if ((key != NULL) && (jvalue != NULL)) {
1914
0
      u_int32_t pen, key_id;
1915
0
      bool res;
1916
1917
0
      getKeyId((char *)key, strlen(key), &pen, &key_id);
1918
1919
0
      switch (pen) {
1920
0
        case 0: /* No PEN */
1921
0
          res = parsePENZeroField(&flow, key_id, &value);
1922
0
          if (res) break;
1923
          /* Dont'break when res == false for backward compatibility: attempt to
1924
           * parse Zero-PEN as Ntop-PEN */
1925
0
        case NTOP_PEN:
1926
0
          res = parsePENNtopField(&flow, key_id, &value);
1927
0
          break;
1928
0
        case UNKNOWN_PEN:
1929
0
        default:
1930
0
          res = false;
1931
0
          break;
1932
0
      }
1933
1934
0
      if (!res) {
1935
0
        switch (key_id) {
1936
0
          case 0:  // json additional object added by Flow::serialize()
1937
0
            if (additional_o != NULL) {
1938
0
              struct json_object_iterator additional_it =
1939
0
                  json_object_iter_begin(additional_o);
1940
0
              struct json_object_iterator additional_itEnd =
1941
0
                  json_object_iter_end(additional_o);
1942
1943
0
              while (
1944
0
                  !json_object_iter_equal(&additional_it, &additional_itEnd)) {
1945
0
                const char *additional_key =
1946
0
                    json_object_iter_peek_name(&additional_it);
1947
0
                json_object *additional_v =
1948
0
                    json_object_iter_peek_value(&additional_it);
1949
0
                const char *additional_value =
1950
0
                    json_object_get_string(additional_v);
1951
1952
0
                if ((additional_key != NULL) && (additional_value != NULL)) {
1953
                  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
1954
                  // field: %s", additional_key);
1955
0
                  flow.addAdditionalField(
1956
0
                      additional_key, json_object_new_string(additional_value));
1957
0
                }
1958
0
                json_object_iter_next(&additional_it);
1959
0
              }
1960
0
            }
1961
0
            break;
1962
0
          case UNKNOWN_FLOW_ELEMENT:
1963
            /* Attempt to parse it as an nProbe mini field */
1964
0
            if (parseNProbeAgentField(&flow, key, &value, jvalue)) {
1965
0
              if (!flow.hasParsedeBPF()) {
1966
0
                flow.setParsedeBPF();
1967
0
                flow.absolute_packet_octet_counters = true;
1968
0
              }
1969
0
              break;
1970
0
            }
1971
0
          default:
1972
#ifdef NTOPNG_PRO
1973
            if (custom_app_maps ||
1974
                (custom_app_maps = new (std::nothrow) CustomAppMaps()))
1975
              custom_app_maps->checkCustomApp(key, &value, &flow);
1976
#endif
1977
0
            ntop->getTrace()->traceEvent(
1978
0
                TRACE_DEBUG, "Not handled ZMQ field %u/%s", key_id, key);
1979
0
            add_to_additional_fields = true;
1980
0
            break;
1981
0
        } /* switch */
1982
0
      }
1983
1984
0
      if (add_to_additional_fields) {
1985
        // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s",
1986
        // key);
1987
0
        flow.addAdditionalField(key, json_object_get(jvalue));
1988
0
      }
1989
1990
0
      if (additional_o) json_object_put(additional_o);
1991
0
    } /* if */
1992
1993
    /* Move to the next element */
1994
0
    json_object_iter_next(&it);
1995
0
  }  // while json_object_iter_equal
1996
1997
0
  if (preprocessFlow(&flow)) ret = 1;
1998
1999
0
  return ret;
2000
0
}
2001
2002
/* **************************************************** */
2003
2004
int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer,
2005
0
                                           u_int32_t source_id) {
2006
0
  ndpi_serialization_type kt, et;
2007
0
  ParsedFlow flow;
2008
0
  int ret = 0, rc;
2009
0
  bool recordFound = false;
2010
2011
  /* Reset data */
2012
0
  flow.source_id = source_id;
2013
0
  flow.direction = UNKNOWN_FLOW_DIRECTION;
2014
2015
0
  INTERFACE_PROFILING_SECTION_ENTER("Decode TLV", 9);
2016
2017
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Processing TLV record");
2018
0
  while ((et = ndpi_deserialize_get_item_type(deserializer, &kt)) !=
2019
0
         ndpi_serialization_unknown) {
2020
0
    ParsedValue value = {0};
2021
0
    u_int32_t pen = 0, key_id = 0;
2022
0
    u_int32_t v32 = 0;
2023
0
    int32_t i32 = 0;
2024
0
    float f = 0;
2025
0
    u_int64_t v64 = 0;
2026
0
    int64_t i64 = 0;
2027
0
    ndpi_string key, vs;
2028
0
    char key_str[64];
2029
0
    u_int8_t vbkp = 0;
2030
0
    bool add_to_additional_fields = false;
2031
0
    bool key_is_string = false, value_is_string = false;
2032
2033
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "TLV key type = %u value type = %u", kt, et);
2034
2035
0
    if (et == ndpi_serialization_end_of_record) {
2036
0
      ndpi_deserialize_next(deserializer);
2037
0
      goto end_of_record;
2038
0
    }
2039
2040
0
    recordFound = true;
2041
2042
0
    switch (kt) {
2043
0
      case ndpi_serialization_uint32:
2044
0
        ndpi_deserialize_key_uint32(deserializer, &key_id);
2045
0
        break;
2046
0
      case ndpi_serialization_string:
2047
0
        ndpi_deserialize_key_string(deserializer, &key);
2048
0
        key_is_string = true;
2049
0
        break;
2050
0
      default:
2051
0
        ntop->getTrace()->traceEvent(TRACE_WARNING,
2052
0
             "Unsupported TLV key type %u: please update both ntopng and "
2053
0
             "nprobe to the same version", kt);
2054
0
        ret = -1;
2055
0
        goto error;
2056
0
    }
2057
2058
0
    switch (et) {
2059
0
      case ndpi_serialization_uint32:
2060
0
        ndpi_deserialize_value_uint32(deserializer, &v32);
2061
0
        value.double_num = value.int_num = v32;
2062
0
        break;
2063
2064
0
      case ndpi_serialization_uint64:
2065
0
        ndpi_deserialize_value_uint64(deserializer, &v64);
2066
0
        value.double_num = value.int_num = v64;
2067
0
        break;
2068
2069
0
      case ndpi_serialization_int32:
2070
0
        ndpi_deserialize_value_int32(deserializer, &i32);
2071
0
        value.double_num = value.int_num = i32;
2072
0
        break;
2073
2074
0
      case ndpi_serialization_int64:
2075
0
        ndpi_deserialize_value_int64(deserializer, &i64);
2076
0
        value.double_num = value.int_num = i64;
2077
0
        break;
2078
2079
0
      case ndpi_serialization_float:
2080
0
        ndpi_deserialize_value_float(deserializer, &f);
2081
0
        value.double_num = f;
2082
0
        break;
2083
2084
0
      case ndpi_serialization_string:
2085
0
        ndpi_deserialize_value_string(deserializer, &vs);
2086
0
        value.string = vs.str;
2087
0
        value_is_string = true;
2088
0
        break;
2089
2090
0
      default:
2091
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n", et);
2092
0
        ret = -1;
2093
0
        goto error;
2094
0
    }
2095
2096
0
    if (key_is_string) {
2097
0
      u_int8_t kbkp = key.str[key.str_len];
2098
0
      key.str[key.str_len] = '\0';
2099
0
      snprintf(key_str, sizeof(key_str), "%s", key.str);
2100
0
      getKeyId(key.str, key.str_len, &pen, &key_id);
2101
0
      key.str[key.str_len] = kbkp;
2102
0
    }
2103
2104
0
    if (value_is_string) {
2105
      /* Adding '\0' to the end of the string, backing up the character */
2106
0
      vbkp = vs.str[vs.str_len];
2107
0
      vs.str[vs.str_len] = '\0';
2108
0
    }
2109
2110
0
    switch (pen) {
2111
0
      case 0: /* No PEN */
2112
0
        rc = parsePENZeroField(&flow, key_id, &value);
2113
0
        if (rc) break;
2114
        /* Dont'break when rc == false for backward compatibility: attempt to
2115
         * parse Zero-PEN as Ntop-PEN */
2116
0
      case NTOP_PEN:
2117
0
        rc = parsePENNtopField(&flow, key_id, &value);
2118
0
        break;
2119
0
      case UNKNOWN_PEN:
2120
0
      default:
2121
0
        rc = false;
2122
0
        break;
2123
0
    }
2124
2125
0
    if (!key_is_string) {
2126
0
      if (pen)
2127
0
        snprintf(key_str, sizeof(key_str), "%u.%u", pen, key_id);
2128
0
      else
2129
0
        snprintf(key_str, sizeof(key_str), "%u", key_id);
2130
0
    }
2131
2132
#if 0
2133
    if(ntop->getTrace()->get_trace_level() >= TRACE_LEVEL_DEBUG) {
2134
      switch(et) {
2135
      case ndpi_serialization_uint32:
2136
      case ndpi_serialization_uint64:
2137
      case ndpi_serialization_int32:
2138
      case ndpi_serialization_int64:
2139
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %lld", key_str, key_id, pen, value.int_num);
2140
        break;
2141
      case ndpi_serialization_float:
2142
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %.3f", key_str, key_id, pen, value.double_num);
2143
        break;
2144
      case ndpi_serialization_string:
2145
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %s", key_str, key_id, pen, value.string);
2146
        break;
2147
      default:
2148
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: -", key_str, key_id, pen);
2149
        break;
2150
      }
2151
    }
2152
#endif
2153
2154
0
    if (!rc) { /* Not handled */
2155
0
      switch (key_id) {
2156
0
        case 0:  // json additional object added by Flow::serialize()
2157
0
          if (strcmp(key_str, "json") == 0 && value_is_string) {
2158
0
            json_object *additional_o = json_tokener_parse(vs.str);
2159
2160
0
            if (additional_o) {
2161
0
              struct json_object_iterator additional_it =
2162
0
                  json_object_iter_begin(additional_o);
2163
0
              struct json_object_iterator additional_itEnd =
2164
0
                  json_object_iter_end(additional_o);
2165
2166
0
              while (
2167
0
                  !json_object_iter_equal(&additional_it, &additional_itEnd)) {
2168
0
                const char *additional_key   = json_object_iter_peek_name(&additional_it);
2169
0
                json_object *additional_v    = json_object_iter_peek_value(&additional_it);
2170
0
                const char *additional_value = json_object_get_string(additional_v);
2171
2172
0
                if ((additional_key != NULL) && (additional_value != NULL)) {
2173
                  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
2174
                  // field: %s", additional_key);
2175
0
                  flow.addAdditionalField(
2176
0
                      additional_key, json_object_new_string(additional_value));
2177
0
                }
2178
0
                json_object_iter_next(&additional_it);
2179
0
              }
2180
2181
0
              json_object_put(additional_o);
2182
0
            }
2183
0
          }
2184
0
          break;
2185
0
        case UNKNOWN_FLOW_ELEMENT:
2186
#if 0  // TODO
2187
  /* Attempt to parse it as an nProbe mini field */
2188
  if(parseNProbeAgentField(&flow, key_str, &value)) {
2189
    if(!flow.hasParsedeBPF()) {
2190
      flow->setParsedeBPF();
2191
      flow.absolute_packet_octet_counters = true;
2192
    }
2193
    break;
2194
  }
2195
#endif
2196
0
        default:
2197
#ifdef NTOPNG_PRO
2198
          if (custom_app_maps ||
2199
              (custom_app_maps = new (std::nothrow) CustomAppMaps()))
2200
            custom_app_maps->checkCustomApp(key_str, &value, &flow);
2201
#endif
2202
0
          ntop->getTrace()->traceEvent(
2203
0
              TRACE_DEBUG, "Not handled ZMQ field %u.%u", pen, key_id);
2204
0
          add_to_additional_fields = true;
2205
0
          break;
2206
0
      } /* switch */
2207
0
    }
2208
2209
0
    if (add_to_additional_fields) {
2210
      // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s
2211
      // (Key-ID: %u PEN: %u)", key_str, key_id, pen);
2212
0
#if 1
2213
0
      flow.addAdditionalField(deserializer);
2214
#else
2215
      flow.addAdditionalField(
2216
          key_str, value_is_string ? json_object_new_string(value.string)
2217
                                   : json_object_new_int64(value.int_num));
2218
#endif
2219
0
    }
2220
2221
    /* Restoring backed up character at the end of the string in place of '\0'
2222
     */
2223
0
    if (value_is_string) vs.str[vs.str_len] = vbkp;
2224
2225
    /* Move to the next element */
2226
0
    ndpi_deserialize_next(deserializer);
2227
2228
0
  } /* while */
2229
2230
0
end_of_record:
2231
0
  if (recordFound) {
2232
0
    INTERFACE_PROFILING_SECTION_EXIT(9); /* Closes Decode TLV */
2233
0
    INTERFACE_PROFILING_SECTION_ENTER("processFlow", 10);
2234
2235
0
    if (preprocessFlow(&flow)) ret = 1;
2236
2237
0
    INTERFACE_PROFILING_SECTION_EXIT(10);
2238
0
  }
2239
2240
0
error:
2241
0
  return ret;
2242
0
}
2243
2244
/* **************************************************** */
2245
2246
u_int8_t ZMQParserInterface::parseJSONFlow(const char *payload,
2247
                                           int payload_size, u_int32_t source_id,
2248
0
                                           u_int32_t msg_id) {
2249
0
  json_object *f = NULL;
2250
0
  enum json_tokener_error jerr = json_tokener_success;
2251
2252
0
#ifndef NTOPNG_PRO
2253
  /*
2254
    nProbe exports flows in TLV so this code will be removed in the future
2255
    Leaving here for old nProbes that will be discontinued soon
2256
  */
2257
0
  return(0);
2258
0
#endif
2259
2260
#if 0
2261
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "JSON: '%s' [len=%lu]", payload, strlen(payload));
2262
  printf("\n\n%s\n\n", payload);
2263
#endif
2264
2265
0
  if (payload) f = json_tokener_parse_verbose(payload, &jerr);
2266
2267
0
  if (f != NULL) {
2268
0
    int n = 0, rc;
2269
2270
0
    if (json_object_get_type(f) == json_type_array) {
2271
      /* Flow array */
2272
0
      int id, num_elements = json_object_array_length(f);
2273
2274
0
      for (id = 0; id < num_elements; id++) {
2275
0
        rc = parseSingleJSONFlow(json_object_array_get_idx(f, id), source_id);
2276
2277
0
        if (rc > 0) n++;
2278
0
      }
2279
2280
0
    } else {
2281
0
      rc = parseSingleJSONFlow(f, source_id);
2282
2283
0
      if (rc > 0) n++;
2284
0
    }
2285
2286
0
    json_object_put(f);
2287
0
    return n;
2288
0
  } else {
2289
    // if o != NULL
2290
0
    if (!once) {
2291
0
      ntop->getTrace()->traceEvent(TRACE_WARNING,
2292
0
           "Invalid message received: your nProbe sender is outdated, data "
2293
0
           "encrypted or invalid JSON?");
2294
0
      ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2295
0
           json_tokener_error_desc(jerr), payload_size, payload);
2296
0
    }
2297
2298
0
    once = true;
2299
0
    return 0;
2300
0
  }
2301
2302
0
  return 0;
2303
0
}
2304
2305
/* **************************************************** */
2306
2307
u_int8_t ZMQParserInterface::parseTLVFlow(const char *payload, int payload_size,
2308
                                          u_int32_t source_id, u_int32_t msg_id,
2309
0
                                          void *data) {
2310
0
  ndpi_deserializer deserializer;
2311
0
  ndpi_serialization_type kt;
2312
0
  int n = 0, rc;
2313
2314
0
  rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload,
2315
0
                                  payload_size);
2316
2317
0
  if (rc == -1) return 0;
2318
2319
0
  if (ndpi_deserialize_get_format(&deserializer) != ndpi_serialization_format_tlv) {
2320
0
    if (!once) {
2321
0
      ntop->getTrace()->traceEvent(
2322
0
          TRACE_WARNING,
2323
0
          "Invalid TLV message: the TLV generated by your probe does not match "
2324
0
          "the version supported "
2325
0
          "by ntopng, please update both the probe and ntopng to the latest "
2326
0
          "version available");
2327
0
      once = true;
2328
0
    }
2329
2330
0
    return 0;
2331
0
  }
2332
2333
0
  while (ndpi_deserialize_get_item_type(&deserializer, &kt) !=
2334
0
         ndpi_serialization_unknown) {
2335
0
    rc = parseSingleTLVFlow(&deserializer, source_id);
2336
2337
0
    if (rc < 0)
2338
0
      break;
2339
0
    else if (rc > 0)
2340
0
      n++;
2341
0
  }
2342
2343
0
  return n;
2344
0
}
2345
2346
/* **************************************************** */
2347
2348
bool ZMQParserInterface::parseContainerInfo(
2349
0
    json_object *jo, ContainerInfo *const container_info) {
2350
0
  json_object *obj, *obj2;
2351
2352
  /* Keep in sync with ZMQParserInterface::freeContainerInfo and
2353
   * ParsedeBPF::~ParsedeBPF */
2354
2355
0
  if (json_object_object_get_ex(jo, "ID", &obj))
2356
0
    container_info->id = strdup((char *)json_object_get_string(obj));
2357
2358
0
  if (json_object_object_get_ex(jo, "K8S", &obj)) {
2359
0
    if (json_object_object_get_ex(obj, "POD", &obj2))
2360
0
      container_info->data.k8s.pod =
2361
0
          strdup((char *)json_object_get_string(obj2));
2362
0
    if (json_object_object_get_ex(obj, "NS", &obj2))
2363
0
      container_info->data.k8s.ns =
2364
0
          strdup((char *)json_object_get_string(obj2));
2365
0
    container_info->data_type = container_info_data_type_k8s;
2366
0
  } else if (json_object_object_get_ex(jo, "DOCKER", &obj)) {
2367
0
    container_info->data_type = container_info_data_type_k8s;
2368
0
  } else
2369
0
    container_info->data_type = container_info_data_type_unknown;
2370
2371
0
  if (obj) {
2372
0
    if (json_object_object_get_ex(obj, "NAME", &obj2))
2373
0
      container_info->name = strdup((char *)json_object_get_string(obj2));
2374
0
  }
2375
2376
0
  return true;
2377
0
}
2378
2379
/* **************************************************** */
2380
2381
void ZMQParserInterface::freeContainerInfo(
2382
0
    ContainerInfo *const container_info) {
2383
0
  if (container_info->id) free(container_info->id);
2384
0
  if (container_info->name) free(container_info->name);
2385
0
  if (container_info->data.k8s.pod) free(container_info->data.k8s.pod);
2386
0
  if (container_info->data.k8s.ns) free(container_info->data.k8s.ns);
2387
0
}
2388
2389
/* **************************************************** */
2390
2391
u_int8_t ZMQParserInterface::parseJSONCounter(const char *payload,
2392
0
                                              int payload_size) {
2393
0
  json_object *o;
2394
0
  enum json_tokener_error jerr = json_tokener_success;
2395
0
  sFlowInterfaceStats stats;
2396
2397
  //ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2398
2399
0
  memset(&stats, 0, sizeof(stats));
2400
0
  o = json_tokener_parse_verbose(payload, &jerr);
2401
2402
0
  if (o != NULL) {
2403
0
    struct json_object_iterator it = json_object_iter_begin(o);
2404
0
    struct json_object_iterator itEnd = json_object_iter_end(o);
2405
2406
    /* Reset data */
2407
0
    memset(&stats, 0, sizeof(stats));
2408
2409
0
    while (!json_object_iter_equal(&it, &itEnd)) {
2410
0
      const char *key = json_object_iter_peek_name(&it);
2411
0
      json_object *v = json_object_iter_peek_value(&it);
2412
0
      const char *value = json_object_get_string(v);
2413
2414
0
      if ((key != NULL) && (value != NULL)) {
2415
0
        if (!strcmp(key, "deviceIP"))
2416
0
          stats.deviceIP =
2417
0
              (u_int32_t)json_object_get_int64(v);  // ntohl(inet_addr(value));
2418
0
        else if (!strcmp(key, "samplesGenerated"))
2419
0
          stats.samplesGenerated = (u_int32_t)json_object_get_int64(v);
2420
0
        else if (!strcmp(key, "ifIndex"))
2421
0
          stats.ifIndex = (u_int32_t)json_object_get_int64(v);
2422
0
        else if (!strcmp(key, "ifName"))
2423
0
          stats.ifName = (char *)json_object_get_string(v);
2424
0
        else if (!strcmp(key, "ifType"))
2425
0
          stats.ifType = (u_int32_t)json_object_get_int64(v);
2426
0
        else if (!strcmp(key, "ifSpeed"))
2427
0
          stats.ifSpeed = (u_int32_t)json_object_get_int64(v);
2428
0
        else if (!strcmp(key, "ifDirection"))
2429
0
          stats.ifFullDuplex = (!strcmp(value, "Full")) ? true : false;
2430
0
        else if (!strcmp(key, "ifAdminStatus"))
2431
0
          stats.ifAdminStatus = (!strcmp(value, "Up")) ? true : false;
2432
0
        else if (!strcmp(key, "ifOperStatus"))
2433
0
          stats.ifOperStatus = (!strcmp(value, "Up")) ? true : false;
2434
0
        else if (!strcmp(key, "ifInOctets"))
2435
0
          stats.ifInOctets = json_object_get_int64(v);
2436
0
        else if (!strcmp(key, "ifInPackets"))
2437
0
          stats.ifInPackets = json_object_get_int64(v);
2438
0
        else if (!strcmp(key, "ifInErrors"))
2439
0
          stats.ifInErrors = json_object_get_int64(v);
2440
0
        else if (!strcmp(key, "ifOutOctets"))
2441
0
          stats.ifOutOctets = json_object_get_int64(v);
2442
0
        else if (!strcmp(key, "ifOutPackets"))
2443
0
          stats.ifOutPackets = json_object_get_int64(v);
2444
0
        else if (!strcmp(key, "ifOutErrors"))
2445
0
          stats.ifOutErrors = json_object_get_int64(v);
2446
0
        else if (!strcmp(key, "ifPromiscuousMode"))
2447
0
          stats.ifPromiscuousMode =
2448
0
              (((u_int32_t)json_object_get_int64(v)) == 1) ? true : false;
2449
0
        else if (strlen(key) >= 9 &&
2450
0
                 !strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) {
2451
0
          if (parseContainerInfo(v, &stats.container_info))
2452
0
            stats.container_info_set = true;
2453
0
        }
2454
0
      } /* if */
2455
2456
      /* Move to the next element */
2457
0
      json_object_iter_next(&it);
2458
0
    }  // while json_object_iter_equal
2459
2460
    /* Process Flow */
2461
0
    processInterfaceStats(&stats);
2462
2463
0
    freeContainerInfo(&stats.container_info);
2464
0
    json_object_put(o);
2465
0
  } else {
2466
    // if o != NULL
2467
0
    if (!once) {
2468
0
      ntop->getTrace()->traceEvent(
2469
0
          TRACE_WARNING,
2470
0
          "Invalid message received: your nProbe sender is outdated, data "
2471
0
          "encrypted or invalid JSON?");
2472
0
      ntop->getTrace()->traceEvent(
2473
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2474
0
          json_tokener_error_desc(jerr), payload_size, payload);
2475
0
    }
2476
0
    once = true;
2477
0
    return -1;
2478
0
  }
2479
2480
0
  return 0;
2481
0
}
2482
2483
/* **************************************************** */
2484
2485
u_int8_t ZMQParserInterface::parseTLVCounter(const char *payload,
2486
0
                                             int payload_size) {
2487
0
  sFlowInterfaceStats stats;
2488
0
  ndpi_deserializer deserializer;
2489
0
  ndpi_serialization_type kt, et;
2490
0
  int rc, ret = -1;
2491
2492
0
  memset(&stats, 0, sizeof(stats));
2493
2494
0
  rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload,
2495
0
                                  payload_size);
2496
2497
0
  if (rc == -1) {
2498
0
    goto error;
2499
0
  }
2500
2501
0
  if (ndpi_deserialize_get_format(&deserializer) !=
2502
0
      ndpi_serialization_format_tlv) {
2503
0
    if (!once) {
2504
0
      ntop->getTrace()->traceEvent(
2505
0
          TRACE_WARNING,
2506
0
          "Invalid TLV message: the TLV generated by your probe does not match "
2507
0
          "the version supported "
2508
0
          "by ntopng, please update both the probe and ntopng to the latest "
2509
0
          "version available");
2510
0
      once = true;
2511
0
    }
2512
0
    goto error;
2513
0
  }
2514
2515
0
  while ((et = ndpi_deserialize_get_item_type(&deserializer, &kt)) !=
2516
0
         ndpi_serialization_unknown) {
2517
    /* Key */
2518
2519
0
    bool key_is_string = false;
2520
0
    ndpi_string key;
2521
0
    u_int32_t key_id = 0;
2522
2523
0
    switch (kt) {
2524
0
      case ndpi_serialization_uint32:
2525
0
        ndpi_deserialize_key_uint32(&deserializer, &key_id);
2526
0
        break;
2527
0
      case ndpi_serialization_string:
2528
0
        ndpi_deserialize_key_string(&deserializer, &key);
2529
0
        key_is_string = true;
2530
0
        break;
2531
0
      default:
2532
0
        ntop->getTrace()->traceEvent(TRACE_WARNING,
2533
0
             "Unsupported TLV key type %u: "
2534
0
             "please update both ntopng and the probe to the same version",
2535
0
             kt);
2536
0
        goto error;
2537
0
    }
2538
2539
0
    if (key_is_string) {
2540
0
      u_int8_t kbkp = key.str[key.str_len];
2541
0
      key.str[key.str_len] = '\0';
2542
0
      bool found = getCounterId(key.str, key.str_len, &key_id);
2543
0
      if (!found) {
2544
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %s\n",
2545
0
                                     key.str);
2546
0
        key.str[key.str_len] = kbkp;
2547
0
        goto error;
2548
0
      }
2549
0
      key.str[key.str_len] = kbkp;
2550
0
    }
2551
2552
    /* Value */
2553
2554
0
    ParsedValue value = {0};
2555
0
    bool value_is_string = false;
2556
0
    ndpi_string vs;
2557
0
    u_int32_t v32 = 0;
2558
0
    u_int64_t v64 = 0;
2559
2560
0
    switch (et) {
2561
0
      case ndpi_serialization_uint32:
2562
0
        ndpi_deserialize_value_uint32(&deserializer, &v32);
2563
0
        value.int_num = v32;
2564
0
        break;
2565
2566
0
      case ndpi_serialization_uint64:
2567
0
        ndpi_deserialize_value_uint64(&deserializer, &v64);
2568
0
        value.int_num = v64;
2569
0
        break;
2570
2571
0
      case ndpi_serialization_string:
2572
0
        ndpi_deserialize_value_string(&deserializer, &vs);
2573
0
        value.string = vs.str;
2574
0
        value_is_string = true;
2575
0
        break;
2576
2577
0
      default:
2578
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n",
2579
0
                                     et);
2580
0
        goto error;
2581
0
    }
2582
2583
0
    u_int8_t vbkp;
2584
0
    if (value_is_string) {
2585
      /* Adding '\0' to the end of the string, backing up the character */
2586
0
      vbkp = vs.str[vs.str_len];
2587
0
      vs.str[vs.str_len] = '\0';
2588
0
    }
2589
2590
0
    switch (key_id) {
2591
0
      case SFLOW_DEVICE_IP:
2592
0
        if (value_is_string)
2593
0
          stats.deviceIP = ntohl(inet_addr((char *)value.string));
2594
0
        else
2595
0
          stats.deviceIP = (u_int32_t)value.int_num;
2596
0
        break;
2597
0
      case SFLOW_SAMPLES_GENERATED:
2598
0
        stats.samplesGenerated = (u_int32_t)value.int_num;
2599
0
        break;
2600
0
      case SFLOW_IF_INDEX:
2601
0
        stats.ifIndex = (u_int32_t)value.int_num;
2602
0
        break;
2603
0
      case SFLOW_IF_NAME:
2604
0
        stats.ifName = strdup((char *)value.string);
2605
0
        break;
2606
0
      case SFLOW_IF_TYPE:
2607
0
        stats.ifType = (u_int32_t)value.int_num;
2608
0
        break;
2609
0
      case SFLOW_IF_SPEED:
2610
0
        stats.ifSpeed = (u_int32_t)value.int_num;
2611
0
        break;
2612
0
      case SFLOW_IF_DIRECTION:
2613
0
        stats.ifFullDuplex = (!strcmp(value.string, "Full")) ? true : false;
2614
0
        break;
2615
0
      case SFLOW_IF_ADMIN_STATUS:
2616
0
        stats.ifAdminStatus = (!strcmp(value.string, "Up")) ? true : false;
2617
0
        break;
2618
0
      case SFLOW_IF_OPER_STATUS:
2619
0
        stats.ifOperStatus = (!strcmp(value.string, "Up")) ? true : false;
2620
0
        break;
2621
0
      case SFLOW_IF_IN_OCTETS:
2622
0
        stats.ifInOctets = value.int_num;
2623
0
        break;
2624
0
      case SFLOW_IF_IN_PACKETS:
2625
0
        stats.ifInPackets = value.int_num;
2626
0
        break;
2627
0
      case SFLOW_IF_IN_ERRORS:
2628
0
        stats.ifInErrors = value.int_num;
2629
0
        break;
2630
0
      case SFLOW_IF_OUT_OCTETS:
2631
0
        stats.ifOutOctets = value.int_num;
2632
0
        break;
2633
0
      case SFLOW_IF_OUT_PACKETS:
2634
0
        stats.ifOutPackets = value.int_num;
2635
0
        break;
2636
0
      case SFLOW_IF_OUT_ERRORS:
2637
0
        stats.ifOutErrors = value.int_num;
2638
0
        break;
2639
0
      case SFLOW_IF_PROMISCUOUS_MODE:
2640
0
        stats.ifPromiscuousMode = (value.int_num == 1) ? true : false;
2641
0
        break;
2642
0
      default:
2643
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %u\n",
2644
0
                                     key_id);
2645
0
        break;
2646
0
    }
2647
2648
    /* Restoring backed up character at the end of the string in place of '\0'
2649
     */
2650
0
    if (value_is_string) vs.str[vs.str_len] = vbkp;
2651
2652
0
    ndpi_deserialize_next(&deserializer);
2653
0
  }
2654
2655
  /* Process Flow */
2656
0
  processInterfaceStats(&stats);
2657
0
  ret = 0;
2658
2659
0
  if (stats.ifName) free(stats.ifName);
2660
2661
0
error:
2662
0
  return ret;
2663
0
}
2664
2665
/* **************************************************** */
2666
2667
/*
2668
 * Minimum set of fields expected by ntopng
2669
 * NOTE:
2670
 * - the following fields may or may not appear depending on the traffic:
2671
 *   "IPV4_SRC_ADDR", "IPV4_DST_ADDR", "IPV6_SRC_ADDR", "IPV6_DST_ADDR"
2672
 * - some fields may not appear when nprobe runs with --collector-passthrough
2673
 *   "L7_PROTO"
2674
 */
2675
static std::string mandatory_template_fields[] = {
2676
    "FIRST_SWITCHED",      "LAST_SWITCHED", "L4_SRC_PORT", "L4_DST_PORT",
2677
    "IP_PROTOCOL_VERSION", "PROTOCOL",      "IN_BYTES",    "IN_PKTS",
2678
    "OUT_BYTES",           "OUT_PKTS"};
2679
2680
u_int8_t ZMQParserInterface::parseTemplate(const char *payload,
2681
                                           int payload_size, u_int32_t source_id,
2682
0
                                           u_int32_t msg_id, void *data) {
2683
  /* The format that is currently defined for templates is a JSON as follows:
2684
2685
     [{"12/Apr/2023 23:53:04
2686
     [util.cPEN":0,"field":1,"len":4,"format":"formatted_uint","name":"IN_BYTES","descr":"Incoming
2687
     flow bytes
2688
     (src->dst)"},{"PEN":0,"field":2,"len":4,"format":"formatted_uint","name":"IN_PKTS","descr":"Incoming
2689
     flow packets (src->dst)"},]
2690
  */
2691
2692
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2693
2694
0
  ZMQ_Template zmq_template;
2695
0
  json_object *obj, *w, *z;
2696
0
  enum json_tokener_error jerr = json_tokener_success;
2697
2698
0
  memset(&zmq_template, 0, sizeof(zmq_template));
2699
0
  obj = json_tokener_parse_verbose(payload, &jerr);
2700
2701
0
  if (obj) {
2702
0
    if (json_object_get_type(obj) == json_type_array) {
2703
0
      int i, num_elements = json_object_array_length(obj);
2704
0
      std::set<std::string> mandatory_fields(
2705
0
          mandatory_template_fields,
2706
0
          mandatory_template_fields + sizeof(mandatory_template_fields) /
2707
0
                                          sizeof(mandatory_template_fields[0]));
2708
2709
0
      for (i = 0; i < num_elements; i++) {
2710
0
        memset(&zmq_template, 0, sizeof(zmq_template));
2711
2712
0
        w = json_object_array_get_idx(obj, i);
2713
2714
0
        if (json_object_object_get_ex(w, "PEN", &z))
2715
0
          zmq_template.pen = (u_int32_t)json_object_get_int(z);
2716
2717
0
        if (json_object_object_get_ex(w, "field", &z))
2718
0
          zmq_template.field = (u_int32_t)json_object_get_int(z);
2719
2720
0
        if (json_object_object_get_ex(w, "format", &z))
2721
0
          zmq_template.format = json_object_get_string(z);
2722
2723
0
        if (json_object_object_get_ex(w, "name", &z))
2724
0
          zmq_template.name = json_object_get_string(z);
2725
2726
0
        if (json_object_object_get_ex(w, "descr", &z))
2727
0
          zmq_template.descr = json_object_get_string(z);
2728
2729
0
        if (zmq_template.name) {
2730
0
          addMapping(zmq_template.name, zmq_template.field, zmq_template.pen,
2731
0
                     zmq_template.descr);
2732
2733
0
          mandatory_fields.erase(zmq_template.name);
2734
0
        }
2735
2736
        // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Template [PEN: %u][field:
2737
        // %u][format: %s][name: %s][descr: %s]",
2738
        //           zmq_template.pen, zmq_template.field,
2739
        // zmq_template.format, zmq_template.name, zmq_template.descr)
2740
0
        ;
2741
0
      }
2742
2743
0
      if (mandatory_fields.size() > 0) {
2744
0
        static bool template_warning_sent = 0;
2745
2746
0
        if (!template_warning_sent) {
2747
0
          std::set<std::string>::iterator it;
2748
2749
0
          ntop->getTrace()->traceEvent(TRACE_WARNING,
2750
0
               "Some mandatory fields are missing in the ZMQ template:");
2751
0
          template_warning_sent = true;
2752
2753
0
          for (it = mandatory_fields.begin(); it != mandatory_fields.end(); ++it) {
2754
0
            ntop->getTrace()->traceEvent(TRACE_WARNING, "\t%s", (*it).c_str());
2755
0
          }
2756
0
        }
2757
0
      }
2758
0
    }
2759
0
    json_object_put(obj);
2760
0
  } else {
2761
    // if o != NULL
2762
0
    if (!once) {
2763
0
      ntop->getTrace()->traceEvent(
2764
0
          TRACE_WARNING,
2765
0
          "Invalid message received: your nProbe sender is outdated, data "
2766
0
          "encrypted or invalid JSON?");
2767
0
      ntop->getTrace()->traceEvent(
2768
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2769
0
          json_tokener_error_desc(jerr), payload_size, payload);
2770
0
    }
2771
0
    once = true;
2772
0
    return -1;
2773
0
  }
2774
2775
0
  return 0;
2776
0
}
2777
2778
/* **************************************************** */
2779
2780
void ZMQParserInterface::setFieldMap(
2781
0
    const ZMQ_FieldMap *const field_map) const {
2782
0
  char hname[CONST_MAX_LEN_REDIS_KEY], key[32];
2783
0
  snprintf(hname, sizeof(hname), CONST_FIELD_MAP_CACHE_KEY, get_id(),
2784
0
           field_map->pen);
2785
0
  snprintf(key, sizeof(key), "%u", field_map->field);
2786
2787
0
  ntop->getRedis()->hashSet(hname, key, field_map->map);
2788
0
}
2789
2790
/* **************************************************** */
2791
2792
void ZMQParserInterface::setFieldValueMap(
2793
0
    const ZMQ_FieldValueMap *const field_value_map) const {
2794
0
  char hname[CONST_MAX_LEN_REDIS_KEY], key[32];
2795
0
  snprintf(hname, sizeof(hname), CONST_FIELD_VALUE_MAP_CACHE_KEY, get_id(),
2796
0
           field_value_map->pen, field_value_map->field);
2797
0
  snprintf(key, sizeof(key), "%u", field_value_map->value);
2798
2799
0
  ntop->getRedis()->hashSet(hname, key, field_value_map->map);
2800
0
}
2801
2802
/* **************************************************** */
2803
2804
0
u_int8_t ZMQParserInterface::parseOptionFieldMap(json_object *const jo) {
2805
0
  int arraylen = json_object_array_length(jo);
2806
0
  json_object *w, *z;
2807
0
  ZMQ_FieldMap field_map;
2808
2809
0
  memset(&field_map, 0, sizeof(field_map));
2810
2811
0
  for (int i = 0; i < arraylen; i++) {
2812
0
    w = json_object_array_get_idx(jo, i);
2813
2814
0
    if (json_object_object_get_ex(w, "PEN", &z))
2815
0
      field_map.pen = (u_int32_t)json_object_get_int(z);
2816
2817
0
    if (json_object_object_get_ex(w, "field", &z)) {
2818
0
      field_map.field = (u_int32_t)json_object_get_int(z);
2819
2820
0
      if (json_object_object_get_ex(w, "map", &z)) {
2821
0
        field_map.map = json_object_to_json_string(z);
2822
2823
0
        setFieldMap(&field_map);
2824
2825
#ifdef CUSTOM_APP_DEBUG
2826
        ntop->getTrace()->traceEvent(
2827
            TRACE_NORMAL, "Option FieldMap [PEN: %u][field: %u][map: %s]",
2828
            field_map.pen, field_map.field, field_map.map);
2829
#endif
2830
0
      }
2831
0
    }
2832
0
  }
2833
2834
0
  return 0;
2835
0
}
2836
2837
/* **************************************************** */
2838
2839
u_int8_t ZMQParserInterface::parseOptionFieldValueMap(
2840
0
    json_object *const w) {
2841
0
  json_object *z;
2842
0
  ZMQ_FieldValueMap field_value_map;
2843
2844
0
  memset(&field_value_map, 0, sizeof(field_value_map));
2845
2846
0
  if (json_object_object_get_ex(w, "PEN", &z))
2847
0
    field_value_map.pen = (u_int32_t)json_object_get_int(z);
2848
2849
0
  if (json_object_object_get_ex(w, "field", &z)) {
2850
0
    field_value_map.field = (u_int32_t)json_object_get_int(z);
2851
2852
0
    if (json_object_object_get_ex(w, "value", &z)) {
2853
0
      field_value_map.value = (u_int32_t)json_object_get_int(z);
2854
2855
0
      if (json_object_object_get_ex(w, "map", &z)) {
2856
0
        field_value_map.map = json_object_to_json_string(z);
2857
2858
0
        setFieldValueMap(&field_value_map);
2859
2860
#ifdef CUSTOM_APP_DEBUG
2861
        ntop->getTrace()->traceEvent(
2862
            TRACE_NORMAL,
2863
            "Option FieldValueMap [PEN: %u][field: %u][value: %u][map: %s]",
2864
            field_value_map.pen, field_value_map.field, field_value_map.value,
2865
            field_value_map.map);
2866
#endif
2867
0
      }
2868
0
    }
2869
0
  }
2870
2871
0
  return 0;
2872
0
}
2873
2874
/* **************************************************** */
2875
2876
u_int8_t ZMQParserInterface::parseListeningPorts(const char *payload,
2877
                                                 int payload_size,
2878
                                                 u_int32_t source_id,
2879
0
                                                 u_int32_t msg_id, void *data) {
2880
0
  enum json_tokener_error jerr = json_tokener_success;
2881
0
  json_object *o = json_tokener_parse_verbose(payload, &jerr);
2882
2883
0
  if (o != NULL) {
2884
0
    json_object *z;
2885
0
    ListeningPorts pinfo;
2886
0
    u_int16_t vlan_id = 0;
2887
2888
0
    if(ntop->getPrefs()->is_edr_mode()
2889
0
       && ntop->getPrefs()->addVLANCloudToExporters()) {
2890
0
      if(json_object_object_get_ex(o, "instance-name", &z))
2891
0
  vlan_id = findVLANMapping(json_object_get_string(z));
2892
0
    }
2893
2894
    /* Parse port information */
2895
0
    if (json_object_object_get_ex(o, "listening-ports", &z)) {
2896
0
      enum json_type o_type = json_object_get_type(z);
2897
0
      if (o_type == json_type_object) {
2898
0
        pinfo.parsePorts(z);
2899
2900
        /* Parse list of IP addresses */
2901
0
        if (json_object_object_get_ex(o, "ip-addresses", &z)) {
2902
0
          enum json_type o_type = json_object_get_type(z);
2903
2904
0
          if (o_type == json_type_array) {
2905
0
            for (u_int i = 0; i < (u_int)json_object_array_length(z); i++) {
2906
0
              Host *h = NULL;
2907
0
              json_object *host = json_object_array_get_idx(z, i);
2908
0
              const char *ip_addr = json_object_get_string(host);
2909
2910
              // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Received listening
2911
              // ports for %s", ip_addr);
2912
              // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2913
2914
              /* Assign list of ports to the host */
2915
2916
0
              h = getHost((char *)ip_addr,
2917
0
        vlan_id,
2918
0
                          0 /* observationPointId */,
2919
0
        true /* inline */);
2920
0
              if (h) {
2921
0
                h->setListeningPorts(pinfo);
2922
0
              } else {
2923
    // ntop->getTrace()->traceEvent(TRACE_INFO, "Unable to find host  %s", (char *)ip_addr);
2924
0
        }
2925
0
            }
2926
0
          }
2927
0
        }
2928
0
      }
2929
0
    }
2930
2931
0
    json_object_put(o); /* Free memory */
2932
0
  }
2933
2934
0
  return (0);
2935
0
}
2936
2937
/* **************************************************** */
2938
2939
u_int8_t ZMQParserInterface::parseSNMPIntefaces(const char *payload,
2940
                                                int payload_size,
2941
                                                u_int32_t source_id,
2942
0
                                                u_int32_t msg_id, void *data) {
2943
0
  enum json_tokener_error jerr = json_tokener_success;
2944
0
  json_object *f = json_tokener_parse_verbose(payload, &jerr);
2945
2946
0
  if (f != NULL) {
2947
0
    struct json_object_iterator it = json_object_iter_begin(f);
2948
0
    struct json_object_iterator itEnd = json_object_iter_end(f);
2949
2950
0
    while (!json_object_iter_equal(&it, &itEnd)) {
2951
0
      const char *key = json_object_iter_peek_name(&it);
2952
0
      json_object *value = json_object_iter_peek_value(&it);
2953
0
      const char *rsp = json_object_to_json_string(value);
2954
0
      char redis_key[64];
2955
2956
      /*
2957
         Saving 'short' interface names
2958
2959
         Use lua/modules/snmp_mappings.lua to access them from Lua //
2960
       */
2961
0
      snprintf(redis_key, sizeof(redis_key), "cachedexporters.%s.ifnames", key);
2962
0
      ntop->getRedis()->set(redis_key, rsp);
2963
2964
0
      ntop->getTrace()->traceEvent(TRACE_INFO, "[JSON] %s = %s", redis_key,
2965
0
                                   rsp);
2966
2967
      /* Move to the next element */
2968
0
      json_object_iter_next(&it);
2969
0
    }  // while json_object_iter_equal
2970
2971
0
    json_object_put(f); /* Free memory */
2972
2973
0
    return (0);
2974
0
  } else
2975
0
    return (-1);
2976
0
}
2977
2978
/* **************************************************** */
2979
2980
u_int8_t ZMQParserInterface::parseOption(const char *payload, int payload_size,
2981
                                         u_int32_t source_id, u_int32_t msg_id,
2982
0
                                         void *data) {
2983
  /* The format that is currently defined for options is a JSON as follows:
2984
2985
     char opt[] = "
2986
     "{\"PEN\":8741, \"field\": 22, \"value\":1, \"map\":{\"name\":\"Skype\"}},"
2987
     "{\"PEN\":8741, \"field\": 22, \"value\":3, \"map\":{\"name\":\"Winni\"}}";
2988
2989
     parseOption(opt, strlen(opt), source_id, this);
2990
  */
2991
2992
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2993
2994
0
  json_object *o;
2995
0
  enum json_tokener_error jerr = json_tokener_success;
2996
2997
0
  o = json_tokener_parse_verbose(payload, &jerr);
2998
2999
0
  if (o != NULL) {
3000
0
    parseOptionFieldValueMap(o);
3001
0
    json_object_put(o);
3002
0
  } else {
3003
    // if o != NULL
3004
0
    if (!once) {
3005
0
      ntop->getTrace()->traceEvent(
3006
0
          TRACE_WARNING,
3007
0
          "Invalid message received: your nProbe sender is outdated, "
3008
0
          "data encrypted or invalid JSON?");
3009
0
      ntop->getTrace()->traceEvent(
3010
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
3011
0
          json_tokener_error_desc(jerr), payload_size, payload);
3012
0
    }
3013
3014
0
    once = true;
3015
0
    return -1;
3016
0
  }
3017
3018
0
  return 0;
3019
0
}
3020
3021
/* **************************************** */
3022
3023
0
u_int32_t ZMQParserInterface::periodicStatsUpdateFrequency() const {
3024
0
  nProbeStats *zrs = zmq_remote_stats;
3025
0
  u_int32_t update_freq;
3026
0
  u_int32_t update_freq_min = ntop->getPrefs()->get_housekeeping_frequency();
3027
3028
0
  if (zrs)     update_freq =
3029
0
        min_val(max_val(zrs->remote_lifetime_timeout, zrs->remote_idle_timeout),
3030
0
                zrs->remote_collected_lifetime_timeout);
3031
0
  else
3032
0
    update_freq = update_freq_min;
3033
3034
0
  return max_val(update_freq, update_freq_min);
3035
0
}
3036
3037
/* **************************************** */
3038
3039
0
void ZMQParserInterface::setRemoteStats(nProbeStats *zrs) {
3040
0
  nProbeStats *last_zrs, *cumulative_zrs;
3041
0
  map<u_int32_t, nProbeStats *>::iterator it;
3042
0
  u_int32_t last_time = getTimeLastPktRcvdRemote();
3043
0
  struct timeval now;
3044
3045
0
  gettimeofday(&now, NULL);
3046
3047
  /* Store stats for the current exporter */
3048
3049
0
  lock.wrlock(__FILE__, __LINE__);
3050
3051
0
  if (source_id_last_zmq_remote_stats.find(zrs->source_id) ==
3052
0
      source_id_last_zmq_remote_stats.end()) {
3053
0
    last_zrs = new (std::nothrow) nProbeStats();
3054
3055
0
    if (!last_zrs) {
3056
0
      lock.unlock(__FILE__, __LINE__);
3057
0
      return;
3058
0
    }
3059
3060
0
    source_id_last_zmq_remote_stats[zrs->source_id] = last_zrs;
3061
0
  } else
3062
0
    last_zrs = source_id_last_zmq_remote_stats[zrs->source_id];
3063
3064
0
  *last_zrs = *zrs;
3065
3066
0
  lock.unlock(__FILE__, __LINE__);
3067
3068
0
  if (Utils::msTimevalDiff(&now, &last_zmq_remote_stats_update) < 1000) {
3069
    /* Do not update cumulative stats more frequently than once per second.
3070
     * Note: this also avoids concurrent access (use after free) of shadow */
3071
0
    return;
3072
0
  }
3073
3074
  /* Sum stats from all exporters */
3075
3076
0
  cumulative_zrs = new (std::nothrow) nProbeStats();
3077
0
  if (!cumulative_zrs) return;
3078
3079
0
  lock.wrlock(__FILE__, __LINE__); /* Need write lock due to (*) */
3080
3081
0
  for (it = source_id_last_zmq_remote_stats.begin();
3082
0
       it != source_id_last_zmq_remote_stats.end();) {
3083
0
    nProbeStats *zrs_i = it->second;
3084
3085
0
    if ((last_time > MAX_PROBE_IDLE_IDLE)
3086
0
  && (zrs_i->remote_time < last_time - MAX_PROBE_IDLE_IDLE /* sec */)) {
3087
      /* Do not account inactive exporters, release them */
3088
      // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Erased %s [local_time:
3089
      // %u][last_time: %u]", zrs_i->remote_ifname, zrs_i->local_time,
3090
      // last_time);
3091
0
      delete (zrs_i);
3092
0
      source_id_last_zmq_remote_stats.erase(it++); /* (*) */
3093
0
    } else {
3094
0
      cumulative_zrs->num_exporters += zrs_i->num_exporters;
3095
0
      cumulative_zrs->remote_bytes += zrs_i->remote_bytes;
3096
0
      cumulative_zrs->remote_pkts += zrs_i->remote_pkts;
3097
0
      cumulative_zrs->remote_pkt_drops += zrs_i->remote_pkt_drops;
3098
0
      cumulative_zrs->num_flow_exports += zrs_i->num_flow_exports;
3099
0
      cumulative_zrs->remote_ifspeed =
3100
0
          max_val(cumulative_zrs->remote_ifspeed, zrs_i->remote_ifspeed);
3101
0
      cumulative_zrs->remote_time =
3102
0
          max_val(cumulative_zrs->remote_time, zrs_i->remote_time);
3103
0
      cumulative_zrs->local_time =
3104
0
          max_val(cumulative_zrs->local_time, zrs_i->local_time);
3105
0
      cumulative_zrs->avg_bps += zrs_i->avg_bps;
3106
0
      cumulative_zrs->avg_pps += zrs_i->avg_pps;
3107
0
      cumulative_zrs->remote_lifetime_timeout =
3108
0
          max_val(cumulative_zrs->remote_lifetime_timeout,
3109
0
                  zrs_i->remote_lifetime_timeout);
3110
0
      cumulative_zrs->remote_collected_lifetime_timeout =
3111
0
          max_val(cumulative_zrs->remote_collected_lifetime_timeout,
3112
0
                  zrs_i->remote_collected_lifetime_timeout);
3113
0
      cumulative_zrs->remote_idle_timeout = max_val(
3114
0
          cumulative_zrs->remote_idle_timeout, zrs_i->remote_idle_timeout);
3115
0
      cumulative_zrs->export_queue_full += zrs_i->export_queue_full;
3116
0
      cumulative_zrs->too_many_flows += zrs_i->too_many_flows;
3117
0
      cumulative_zrs->elk_flow_drops += zrs_i->elk_flow_drops;
3118
0
      cumulative_zrs->sflow_pkt_sample_drops += zrs_i->sflow_pkt_sample_drops;
3119
0
      cumulative_zrs->flow_collection_drops += zrs_i->flow_collection_drops;
3120
0
      cumulative_zrs->flow_collection_udp_socket_drops +=
3121
0
          zrs_i->flow_collection_udp_socket_drops;
3122
0
      cumulative_zrs->flow_collection.nf_ipfix_flows +=
3123
0
          zrs_i->flow_collection.nf_ipfix_flows;
3124
0
      cumulative_zrs->flow_collection.sflow_samples +=
3125
0
          zrs_i->flow_collection.sflow_samples;
3126
3127
0
      ++it;
3128
0
    }
3129
0
  }
3130
3131
0
  lock.unlock(__FILE__, __LINE__);
3132
3133
0
  ifSpeed = cumulative_zrs->remote_ifspeed;
3134
0
  last_pkt_rcvd = 0;
3135
0
  last_pkt_rcvd_remote = cumulative_zrs->remote_time;
3136
0
  last_remote_pps = cumulative_zrs->avg_pps;
3137
0
  last_remote_bps = cumulative_zrs->avg_bps;
3138
3139
0
  if (cumulative_zrs->flow_collection.sflow_samples > 0)
3140
0
    is_sampled_traffic = true;
3141
3142
  /* Recalculate the flow max idle according to the timeouts received */
3143
0
  flow_max_idle = min_val(cumulative_zrs->remote_lifetime_timeout,
3144
0
                          cumulative_zrs->remote_collected_lifetime_timeout) +
3145
0
                  10 /* Safe margin */;
3146
0
  updateFlowMaxIdle();
3147
3148
0
  if ((zmq_initial_pkts == 0) /* ntopng has been restarted */
3149
0
      || (cumulative_zrs->remote_bytes <
3150
0
          zmq_initial_bytes) /* nProbe has been restarted */
3151
0
  ) {
3152
    /* Start over */
3153
0
    zmq_initial_bytes = cumulative_zrs->remote_bytes,
3154
0
    zmq_initial_pkts  = cumulative_zrs->remote_pkts;
3155
0
    zmq_initial_drops = cumulative_zrs->remote_pkt_drops;
3156
0
  }
3157
3158
0
  if (zmq_remote_initial_exported_flows == 0 /* ntopng has been restarted */
3159
0
      || cumulative_zrs->num_flow_exports <
3160
0
             zmq_remote_initial_exported_flows) /* nProbe has been restarted */
3161
0
    zmq_remote_initial_exported_flows = cumulative_zrs->num_flow_exports;
3162
3163
  /* Swap values */
3164
0
  if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow);
3165
0
  zmq_remote_stats_shadow = zmq_remote_stats;
3166
0
  zmq_remote_stats = cumulative_zrs;
3167
3168
0
  memcpy(&last_zmq_remote_stats_update, &now, sizeof(now));
3169
3170
  /*
3171
   * Don't override ethStats here, these stats are properly updated
3172
   * inside NetworkInterface::processFlow for ZMQ interfaces.
3173
   * Overriding values here may cause glitches and non-strictly-increasing counters
3174
   * yielding negative rates.
3175
   ethStats.setNumBytes(cumulative_zrs->remote_bytes),
3176
   ethStats.setNumPackets(cumulative_zrs->remote_pkts);
3177
   *
3178
   */
3179
0
}
3180
3181
/* **************************************************** */
3182
3183
#ifdef NTOPNG_PRO
3184
bool ZMQParserInterface::getCustomAppDetails(u_int32_t remapped_app_id,
3185
                                             u_int32_t *const pen,
3186
                                             u_int32_t *const app_field,
3187
                                             u_int32_t *const app_id) {
3188
  return custom_app_maps
3189
    && custom_app_maps->getCustomAppDetails(remapped_app_id, pen, app_field, app_id);
3190
}
3191
#endif
3192
3193
/* **************************************************** */
3194
3195
0
void ZMQParserInterface::probeLuaStats(lua_State *vm) {
3196
0
  std::map<u_int32_t, nProbeStats *>::iterator it;
3197
0
  lua_newtable(vm);
3198
3199
0
  lock.rdlock(__FILE__, __LINE__);
3200
3201
0
  for (it = source_id_last_zmq_remote_stats.begin();
3202
0
       it != source_id_last_zmq_remote_stats.end(); ++it) {
3203
0
    nProbeStats *zrs = it->second;
3204
3205
0
    lua_newtable(vm);
3206
3207
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s (%u)", zrs->remote_ifname,
3208
    // it->first);
3209
3210
0
    lua_push_uint32_table_entry(vm, "probe.last_update", zrs->last_update);
3211
0
    lua_push_str_table_entry(vm, "remote.name", zrs->remote_ifname);
3212
0
    lua_push_str_table_entry(vm, "remote.if_addr", zrs->remote_ifaddress);
3213
0
    lua_push_uint64_table_entry(vm, "remote.ifspeed", zrs->remote_ifspeed);
3214
0
    lua_push_str_table_entry(vm, "probe.ip", zrs->remote_probe_address);
3215
0
    lua_push_str_table_entry(vm, "probe.uuid", zrs->uuid);
3216
0
    lua_push_uint64_table_entry(vm, "probe.uuid_num", zrs->uuid_num);
3217
0
    lua_push_str_table_entry(vm, "probe.public_ip", zrs->remote_probe_public_address);
3218
0
    lua_push_str_table_entry(vm, "probe.probe_version", zrs->remote_probe_version);
3219
0
    lua_push_str_table_entry(vm, "probe.probe_os", zrs->remote_probe_os);
3220
0
    lua_push_str_table_entry(vm, "probe.probe_license", zrs->remote_probe_license);
3221
0
    lua_push_str_table_entry(vm, "probe.probe_edition", zrs->remote_probe_edition);
3222
0
    lua_push_str_table_entry(vm, "probe.mode", zrs->mode);
3223
0
    lua_push_str_table_entry(vm, "probe.probe_maintenance", zrs->remote_probe_maintenance);
3224
0
    lua_push_uint64_table_entry(vm, "drops.export_queue_full", zrs->export_queue_full);
3225
0
    lua_push_uint64_table_entry(vm, "drops.too_many_flows", zrs->too_many_flows);
3226
0
    lua_push_uint64_table_entry(vm, "drops.elk_flow_drops", zrs->elk_flow_drops);
3227
0
    lua_push_uint64_table_entry(vm, "drops.sflow_pkt_sample_drops", zrs->sflow_pkt_sample_drops);
3228
0
    lua_push_uint64_table_entry(vm, "drops.flow_collection_drops", zrs->flow_collection_drops);
3229
0
    lua_push_uint64_table_entry(vm, "drops.flow_collection_udp_socket_drops", zrs->flow_collection_udp_socket_drops);
3230
0
    lua_push_uint64_table_entry(vm, "packets.total", zrs->remote_pkts);
3231
0
    lua_push_uint64_table_entry(vm, "packets.drops", zrs->remote_pkt_drops);
3232
0
    lua_push_uint64_table_entry(vm, "flow_collection.nf_ipfix_flows", zrs->flow_collection.nf_ipfix_flows);
3233
0
    lua_push_uint64_table_entry(vm, "flow_collection.collection_port", zrs->flow_collection.collection_port);
3234
0
    lua_push_uint64_table_entry(vm, "flow_collection.sflow_samples", zrs->flow_collection.sflow_samples);
3235
0
    lua_push_uint64_table_entry(vm, "zmq.num_flow_exports", zrs->num_flow_exports);
3236
0
    lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
3237
0
    exporterLuaStats(vm, zrs);
3238
3239
    /* ************************************* */
3240
3241
0
    if (zrs) {
3242
0
      lua_push_uint64_table_entry(vm, "probe.remote_time",
3243
0
          zrs->remote_time); /* remote time when last event has been sent */
3244
0
      lua_push_uint64_table_entry(vm, "probe.local_time",
3245
0
          zrs->local_time); /* local time when last event has been received */
3246
3247
0
      if(zrs->num_flow_exports < zmq_remote_initial_exported_flows)
3248
0
  zmq_remote_initial_exported_flows = zrs->num_flow_exports; /* nProbe has been reset */
3249
3250
0
      lua_push_uint64_table_entry(vm, "zmq.num_flow_exports",
3251
0
          zrs->num_flow_exports - zmq_remote_initial_exported_flows);
3252
0
      lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
3253
3254
0
      if (zrs->export_queue_full > 0)
3255
0
        lua_push_uint64_table_entry(vm, "zmq.drops.export_queue_full",
3256
0
                                    zrs->export_queue_full);
3257
0
      if (zrs->flow_collection_drops)
3258
0
        lua_push_uint64_table_entry(vm, "zmq.drops.flow_collection_drops",
3259
0
                                    zrs->flow_collection_drops);
3260
0
      if (zrs->flow_collection_udp_socket_drops)
3261
0
        lua_push_uint64_table_entry(vm,
3262
0
                                    "zmq.drops.flow_collection_udp_socket_drops",
3263
0
                                    zrs->flow_collection_udp_socket_drops);
3264
3265
0
      lua_push_uint64_table_entry(vm, "timeout.lifetime",
3266
0
                                  zrs->remote_lifetime_timeout);
3267
0
      lua_push_uint64_table_entry(vm, "timeout.collected_lifetime",
3268
0
                                  zrs->remote_collected_lifetime_timeout);
3269
0
      lua_push_uint64_table_entry(vm, "timeout.idle", zrs->remote_idle_timeout);
3270
0
    }
3271
3272
0
    lua_pushstring(vm, std::to_string(it->first).c_str() /* The source_id as string (can't use integers or Lua will think it's an array ) */);
3273
0
    lua_insert(vm, -2);
3274
0
    lua_settable(vm, -3);
3275
0
  }
3276
0
  lua_rawseti(vm, -2, get_id());
3277
  /* Here the Interface ID is added because in case of View Interfaces
3278
   * this field could be the same for different interfaces
3279
   */
3280
3281
0
  lock.unlock(__FILE__, __LINE__);
3282
0
}
3283
3284
/* **************************************************** */
3285
3286
0
void ZMQParserInterface::lua(lua_State *vm, bool fullStats) {
3287
0
  NetworkInterface::lua(vm, fullStats);
3288
0
  lua_newtable(vm);
3289
0
  probeLuaStats(vm);
3290
0
  lua_pushstring(vm, "probes");
3291
0
  lua_insert(vm, -2);
3292
0
  lua_settable(vm, -3);
3293
0
}
3294
3295
/* **************************************************** */
3296
3297
0
void ZMQParserInterface::exporterLuaStats(lua_State *vm, nProbeStats *zrs) {
3298
0
  std::map<u_int32_t, ExporterStats>::iterator it;
3299
0
  lua_newtable(vm);
3300
3301
0
  for (it = zrs->exportersStats.begin();
3302
0
       it != zrs->exportersStats.end(); ++it) {
3303
0
    lua_newtable(vm);
3304
0
    char buf[32], ipb[24];
3305
0
    snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(it->first, ipb, sizeof(ipb)));
3306
3307
0
    lua_push_uint64_table_entry(vm, "time_last_used", it->second.time_last_used);
3308
0
    lua_push_uint64_table_entry(vm, "num_netflow_flows", it->second.num_netflow_flows);
3309
0
    lua_push_uint64_table_entry(vm, "num_sflow_flows", it->second.num_sflow_flows);
3310
0
    lua_push_uint64_table_entry(vm, "num_drops", it->second.num_drops);
3311
0
    lua_push_uint64_table_entry(vm, "unique_source_id", it->second.unique_source_id);
3312
3313
0
    lua_pushstring(vm, buf);
3314
0
    lua_insert(vm, -2);
3315
0
    lua_settable(vm, -3);
3316
0
  }
3317
3318
0
  lua_pushstring(vm, "exporters");
3319
0
  lua_insert(vm, -2);
3320
0
  lua_settable(vm, -3);
3321
0
}
3322
3323
/* **************************************************** */
3324
3325
0
void ZMQParserInterface::loadVLANMappings() {
3326
0
  char **keys, **values, buf[64];
3327
0
  int rc;
3328
0
  Redis *redis = ntop->getRedis();
3329
3330
0
  top_vlan_id = 0;
3331
3332
0
  snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id());
3333
3334
0
  rc = redis->hashGetAll(buf, &keys, &values);
3335
3336
0
  if(rc > 0) {
3337
0
    for (int i = 0; i < rc; i++) {
3338
0
      if(values[i] && keys[i]) {
3339
0
  u_int16_t v = atoi(values[i]);
3340
3341
0
  if(v > top_vlan_id) top_vlan_id = v;
3342
0
  name_to_vlan[keys[i]] = v;
3343
0
      }
3344
3345
0
      if (values[i]) free(values[i]);
3346
0
      if (keys[i])   free(keys[i]);
3347
0
    }
3348
3349
0
    free(keys);
3350
0
    free(values);
3351
0
  }
3352
0
}
3353
3354
/* **************************************************** */
3355
3356
0
u_int16_t ZMQParserInterface::findVLANMapping(std::string name) {
3357
0
  std::unordered_map<std::string, u_int16_t>::iterator it = name_to_vlan.find(name);
3358
3359
0
  if(it != name_to_vlan.end())
3360
0
    return(it->second);
3361
0
  else if(top_vlan_id < 4095) {
3362
0
    char value[16], buf[64];
3363
0
    u_int16_t id = ++top_vlan_id;
3364
0
    Redis *redis = ntop->getRedis();
3365
3366
0
    if(id >= 4096) return(0 /* too many vlans */);
3367
3368
0
    snprintf(value, sizeof(value), "%u", id);
3369
3370
0
    snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id());
3371
0
    redis->hashSet(buf, name.c_str(), value);
3372
3373
    /* Add VLAN mapping */
3374
0
    redis->hashSet(NTOPNG_VLAN_ALIASES, value, name.c_str());
3375
3376
0
    name_to_vlan[name] = id;
3377
0
    ntop->getTrace()->traceEvent(TRACE_NORMAL, "Added %s = %d", name.c_str(), id);
3378
0
    return(id);
3379
0
  } else
3380
0
    return(0);
3381
0
}
3382
3383
/* **************************************************** */
3384
3385
#endif