Coverage Report

Created: 2023-11-19 06:42

/src/ntopng/src/ZMQParserInterface.cpp
Line
Count
Source (jump to first uncovered line)
1
/*
2
 *
3
 * (C) 2013-23 - ntop.org
4
 *
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software Foundation,
18
 * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19
 *
20
 */
21
22
#include "ntop_includes.h"
23
24
#ifndef HAVE_NEDGE
25
26
0
#define VLAN_HASH_KEY  "ntopng.vlan.%d.cache"
27
28
/* **************************************************** */
29
30
/* IMPORTANT: keep it in sync with flow_fields_description part of
31
 * flow_utils.lua */
32
ZMQParserInterface::ZMQParserInterface(const char *endpoint,
33
                                       const char *custom_interface_type)
34
0
    : ParserInterface(endpoint, custom_interface_type) {
35
0
  zmq_initial_bytes = 0, zmq_initial_pkts = 0;
36
0
  zmq_remote_stats = zmq_remote_stats_shadow = NULL;
37
0
  memset(&last_zmq_remote_stats_update, 0,
38
0
         sizeof(last_zmq_remote_stats_update));
39
0
  zmq_remote_initial_exported_flows = 0;
40
0
  remote_lifetime_timeout = remote_idle_timeout = 0;
41
0
  once = false, is_sampled_traffic = false;
42
0
  flow_max_idle = ntop->getPrefs()->get_pkt_ifaces_flow_max_idle();
43
#ifdef NTOPNG_PRO
44
  custom_app_maps = NULL;
45
#endif
46
0
  polling_start_time = 0;
47
0
  updateFlowMaxIdle();
48
0
  memset(&recvStats, 0, sizeof(recvStats));
49
0
  memset(&recvStatsCheckpoint, 0, sizeof(recvStatsCheckpoint));
50
51
  /* Populate defaults for @NTOPNG@ nProbe templates. No need to populate
52
     all the fields as nProbe will sent them periodically.
53
54
     This minimum set is required for backward compatibility. */
55
0
  addMapping("IN_SRC_MAC", IN_SRC_MAC);
56
0
  addMapping("OUT_SRC_MAC", OUT_SRC_MAC);
57
0
  addMapping("IN_DST_MAC", IN_DST_MAC);
58
0
  addMapping("OUT_DST_MAC", OUT_DST_MAC);
59
0
  addMapping("SRC_VLAN", SRC_VLAN);
60
0
  addMapping("DST_VLAN", DST_VLAN);
61
0
  addMapping("DOT1Q_SRC_VLAN", DOT1Q_SRC_VLAN);
62
0
  addMapping("DOT1Q_DST_VLAN", DOT1Q_DST_VLAN);
63
0
  addMapping("INPUT_SNMP", INPUT_SNMP);
64
0
  addMapping("OUTPUT_SNMP", OUTPUT_SNMP);
65
0
  addMapping("IPV4_SRC_ADDR", IPV4_SRC_ADDR);
66
0
  addMapping("IPV4_DST_ADDR", IPV4_DST_ADDR);
67
0
  addMapping("SRC_TOS", SRC_TOS);
68
0
  addMapping("DST_TOS", DST_TOS);
69
0
  addMapping("L4_SRC_PORT", L4_SRC_PORT);
70
0
  addMapping("L4_DST_PORT", L4_DST_PORT);
71
0
  addMapping("IPV6_SRC_ADDR", IPV6_SRC_ADDR);
72
0
  addMapping("IPV6_DST_ADDR", IPV6_DST_ADDR);
73
0
  addMapping("IP_PROTOCOL_VERSION", IP_PROTOCOL_VERSION);
74
0
  addMapping("PROTOCOL", PROTOCOL);
75
0
  addMapping("L7_PROTO", L7_PROTO, NTOP_PEN);
76
0
  addMapping("L7_PROTO_NAME", L7_PROTO_NAME, NTOP_PEN);
77
0
  addMapping("L7_INFO", L7_INFO, NTOP_PEN);
78
0
  addMapping("L7_CONFIDENCE", L7_CONFIDENCE, NTOP_PEN);
79
0
  addMapping("L7_ERROR_CODE", L7_ERROR_CODE, NTOP_PEN);
80
0
  addMapping("IN_BYTES", IN_BYTES);
81
0
  addMapping("IN_PKTS", IN_PKTS);
82
0
  addMapping("OUT_BYTES", OUT_BYTES);
83
0
  addMapping("OUT_PKTS", OUT_PKTS);
84
0
  addMapping("FIRST_SWITCHED", FIRST_SWITCHED);
85
0
  addMapping("LAST_SWITCHED", LAST_SWITCHED);
86
0
  addMapping("EXPORTER_IPV4_ADDRESS", EXPORTER_IPV4_ADDRESS);
87
0
  addMapping("EXPORTER_IPV6_ADDRESS", EXPORTER_IPV6_ADDRESS);
88
0
  addMapping("TOTAL_FLOWS_EXP", TOTAL_FLOWS_EXP);
89
0
  addMapping("NPROBE_IPV4_ADDRESS", NPROBE_IPV4_ADDRESS, NTOP_PEN);
90
0
  addMapping("NPROBE_INSTANCE_NAME", NPROBE_INSTANCE_NAME, NTOP_PEN);
91
0
  addMapping("TCP_FLAGS", TCP_FLAGS);
92
0
  addMapping("INITIATOR_PKTS", INITIATOR_PKTS);
93
0
  addMapping("INITIATOR_OCTETS", INITIATOR_OCTETS);
94
0
  addMapping("RESPONDER_PKTS", RESPONDER_PKTS);
95
0
  addMapping("RESPONDER_OCTETS", RESPONDER_OCTETS);
96
0
  addMapping("SAMPLING_INTERVAL", SAMPLING_INTERVAL);
97
0
  addMapping("DIRECTION", DIRECTION);
98
0
  addMapping("POST_NAT_SRC_IPV4_ADDR", POST_NAT_SRC_IPV4_ADDR);
99
0
  addMapping("POST_NAT_DST_IPV4_ADDR", POST_NAT_DST_IPV4_ADDR);
100
0
  addMapping("POST_NAPT_SRC_TRANSPORT_PORT", POST_NAPT_SRC_TRANSPORT_PORT);
101
0
  addMapping("POST_NAPT_DST_TRANSPORT_PORT", POST_NAPT_DST_TRANSPORT_PORT);
102
0
  addMapping("OBSERVATION_POINT_ID", OBSERVATION_POINT_ID);
103
0
  addMapping("INGRESS_VRFID", INGRESS_VRFID);
104
0
  addMapping("IPV4_SRC_MASK", IPV4_SRC_MASK);
105
0
  addMapping("IPV4_DST_MASK", IPV4_DST_MASK);
106
0
  addMapping("IPV4_NEXT_HOP", IPV4_NEXT_HOP);
107
0
  addMapping("SRC_AS", SRC_AS);
108
0
  addMapping("DST_AS", DST_AS);
109
0
  addMapping("BGP_NEXT_ADJACENT_ASN", BGP_NEXT_ADJACENT_ASN);
110
0
  addMapping("BGP_PREV_ADJACENT_ASN", BGP_PREV_ADJACENT_ASN);
111
0
  addMapping("OOORDER_IN_PKTS", OOORDER_IN_PKTS, NTOP_PEN);
112
0
  addMapping("OOORDER_OUT_PKTS", OOORDER_OUT_PKTS, NTOP_PEN);
113
0
  addMapping("RETRANSMITTED_IN_PKTS", RETRANSMITTED_IN_PKTS, NTOP_PEN);
114
0
  addMapping("RETRANSMITTED_OUT_PKTS", RETRANSMITTED_OUT_PKTS, NTOP_PEN);
115
0
  addMapping("DNS_QUERY", DNS_QUERY, NTOP_PEN);
116
0
  addMapping("DNS_QUERY_TYPE", DNS_QUERY_TYPE, NTOP_PEN);
117
0
  addMapping("DNS_RET_CODE", DNS_RET_CODE, NTOP_PEN);
118
0
  addMapping("HTTP_URL", HTTP_URL, NTOP_PEN);
119
0
  addMapping("HTTP_SITE", HTTP_SITE, NTOP_PEN);
120
0
  addMapping("HTTP_RET_CODE", HTTP_RET_CODE, NTOP_PEN);
121
0
  addMapping("HTTP_METHOD", HTTP_METHOD, NTOP_PEN);
122
0
  addMapping("HTTP_USER_AGENT", HTTP_USER_AGENT, NTOP_PEN);
123
0
  addMapping("TLS_SERVER_NAME", TLS_SERVER_NAME, NTOP_PEN);
124
0
  addMapping("TLS_CIPHER", TLS_CIPHER, NTOP_PEN);
125
0
  addMapping("SSL_UNSAFE_CIPHER", SSL_UNSAFE_CIPHER, NTOP_PEN);
126
0
  addMapping("JA3C_HASH", JA3C_HASH, NTOP_PEN);
127
0
  addMapping("JA3S_HASH", JA3S_HASH, NTOP_PEN);
128
0
  addMapping("BITTORRENT_HASH", BITTORRENT_HASH, NTOP_PEN);
129
0
  addMapping("SRC_FRAGMENTS", SRC_FRAGMENTS, NTOP_PEN);
130
0
  addMapping("DST_FRAGMENTS", DST_FRAGMENTS, NTOP_PEN);
131
0
  addMapping("CLIENT_NW_LATENCY_MS", CLIENT_NW_LATENCY_MS, NTOP_PEN);
132
0
  addMapping("SERVER_NW_LATENCY_MS", SERVER_NW_LATENCY_MS, NTOP_PEN);
133
0
  addMapping("L7_PROTO_RISK", L7_PROTO_RISK, NTOP_PEN);
134
0
  addMapping("FLOW_VERDICT", FLOW_VERDICT, NTOP_PEN);
135
0
  addMapping("L7_RISK_INFO", L7_RISK_INFO, NTOP_PEN);
136
137
  /* eBPF / Process */
138
0
  addMapping("SRC_PROC_PID", SRC_PROC_PID, NTOP_PEN);
139
0
  addMapping("SRC_PROC_NAME", SRC_PROC_NAME, NTOP_PEN);
140
0
  addMapping("SRC_PROC_UID", SRC_PROC_UID, NTOP_PEN);
141
0
  addMapping("SRC_PROC_USER_NAME", SRC_PROC_USER_NAME, NTOP_PEN);
142
0
  addMapping("SRC_FATHER_PROC_PID", SRC_FATHER_PROC_PID, NTOP_PEN);
143
0
  addMapping("SRC_FATHER_PROC_NAME", SRC_FATHER_PROC_NAME, NTOP_PEN);
144
0
  addMapping("SRC_FATHER_PROC_PKG_NAME", SRC_FATHER_PROC_PKG_NAME, NTOP_PEN);
145
0
  addMapping("SRC_FATHER_PROC_UID", SRC_FATHER_PROC_UID, NTOP_PEN);
146
0
  addMapping("SRC_FATHER_PROC_USER_NAME", SRC_FATHER_PROC_USER_NAME, NTOP_PEN);
147
0
  addMapping("SRC_PROC_ACTUAL_MEMORY", SRC_PROC_ACTUAL_MEMORY, NTOP_PEN);
148
0
  addMapping("SRC_PROC_PEAK_MEMORY", SRC_PROC_PEAK_MEMORY, NTOP_PEN);
149
0
  addMapping("SRC_PROC_AVERAGE_CPU_LOAD", SRC_PROC_AVERAGE_CPU_LOAD, NTOP_PEN);
150
0
  addMapping("SRC_PROC_NUM_PAGE_FAULTS", SRC_PROC_NUM_PAGE_FAULTS, NTOP_PEN);
151
0
  addMapping("SRC_PROC_PCTG_IOWAIT", SRC_PROC_PCTG_IOWAIT, NTOP_PEN);
152
0
  addMapping("SRC_PROC_PKG_NAME", SRC_PROC_PKG_NAME, NTOP_PEN);
153
0
  addMapping("SRC_PROC_CMDLINE", SRC_PROC_CMDLINE, NTOP_PEN);
154
0
  addMapping("SRC_PROC_CONTAINER_ID", SRC_PROC_CONTAINER_ID, NTOP_PEN);
155
156
0
  addMapping("DST_PROC_PID", DST_PROC_PID, NTOP_PEN);
157
0
  addMapping("DST_PROC_NAME", DST_PROC_NAME, NTOP_PEN);
158
0
  addMapping("DST_PROC_UID", DST_PROC_UID, NTOP_PEN);
159
0
  addMapping("DST_PROC_USER_NAME", DST_PROC_USER_NAME, NTOP_PEN);
160
0
  addMapping("DST_FATHER_PROC_PID", DST_FATHER_PROC_PID, NTOP_PEN);
161
0
  addMapping("DST_FATHER_PROC_NAME", DST_FATHER_PROC_NAME, NTOP_PEN);
162
0
  addMapping("DST_FATHER_PROC_PKG_NAME", DST_FATHER_PROC_PKG_NAME, NTOP_PEN);
163
0
  addMapping("DST_FATHER_PROC_UID", DST_FATHER_PROC_UID, NTOP_PEN);
164
0
  addMapping("DST_FATHER_PROC_USER_NAME", DST_FATHER_PROC_USER_NAME, NTOP_PEN);
165
0
  addMapping("DST_PROC_ACTUAL_MEMORY", DST_PROC_ACTUAL_MEMORY, NTOP_PEN);
166
0
  addMapping("DST_PROC_PEAK_MEMORY", DST_PROC_PEAK_MEMORY, NTOP_PEN);
167
0
  addMapping("DST_PROC_AVERAGE_CPU_LOAD", DST_PROC_AVERAGE_CPU_LOAD, NTOP_PEN);
168
0
  addMapping("DST_PROC_NUM_PAGE_FAULTS", DST_PROC_NUM_PAGE_FAULTS, NTOP_PEN);
169
0
  addMapping("DST_PROC_PCTG_IOWAIT", DST_PROC_PCTG_IOWAIT, NTOP_PEN);
170
0
  addMapping("DST_PROC_PKG_NAME", DST_PROC_PKG_NAME, NTOP_PEN);
171
0
  addMapping("DST_PROC_CMDLINE", DST_PROC_CMDLINE, NTOP_PEN);
172
0
  addMapping("DST_PROC_CONTAINER_ID", DST_PROC_CONTAINER_ID, NTOP_PEN);
173
174
  /* sFlow Counter Fields */
175
0
  addCounterMapping("deviceIP", SFLOW_DEVICE_IP);
176
0
  addCounterMapping("samplesGenerated", SFLOW_SAMPLES_GENERATED);
177
0
  addCounterMapping("ifIndex", SFLOW_IF_INDEX);
178
0
  addCounterMapping("ifName", SFLOW_IF_NAME);
179
0
  addCounterMapping("ifType", SFLOW_IF_TYPE);
180
0
  addCounterMapping("ifSpeed", SFLOW_IF_SPEED);
181
0
  addCounterMapping("ifDirection", SFLOW_IF_DIRECTION);
182
0
  addCounterMapping("ifAdminStatus", SFLOW_IF_ADMIN_STATUS);
183
0
  addCounterMapping("ifOperStatus", SFLOW_IF_OPER_STATUS);
184
0
  addCounterMapping("ifInOctets", SFLOW_IF_IN_OCTETS);
185
0
  addCounterMapping("ifInPackets", SFLOW_IF_IN_PACKETS);
186
0
  addCounterMapping("ifInErrors", SFLOW_IF_IN_ERRORS);
187
0
  addCounterMapping("ifOutOctets", SFLOW_IF_OUT_OCTETS);
188
0
  addCounterMapping("ifOutPackets", SFLOW_IF_OUT_PACKETS);
189
0
  addCounterMapping("ifOutErrors", SFLOW_IF_OUT_ERRORS);
190
0
  addCounterMapping("ifPromiscuousMode", SFLOW_IF_PROMISCUOUS_MODE);
191
192
0
  if(ntop->getPrefs()->is_cloud_edition())
193
0
    loadVLANMappings();
194
0
}
195
196
/* **************************************************** */
197
198
0
ZMQParserInterface::~ZMQParserInterface() {
199
0
  map<u_int32_t, ZMQ_RemoteStats *>::iterator it;
200
201
0
  if (zmq_remote_stats) free(zmq_remote_stats);
202
0
  if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow);
203
#ifdef NTOPNG_PRO
204
  if (custom_app_maps) delete (custom_app_maps);
205
#endif
206
207
0
  for (it = source_id_last_zmq_remote_stats.begin();
208
0
       it != source_id_last_zmq_remote_stats.end(); ++it)
209
0
    free(it->second);
210
211
0
  source_id_last_zmq_remote_stats.clear();
212
0
}
213
214
/* **************************************************** */
215
216
/*
217
  these mappings are sent by nprobe via zmq at startup and collected via
218
  zmqparserinterface::parsetemplate()
219
*/
220
void ZMQParserInterface::addMapping(const char *sym, u_int32_t num,
221
0
                                    u_int32_t pen, const char *descr) {
222
0
  string label(sym);
223
0
  labels_map_t::iterator it;
224
0
  pen_value_t cur_pair = make_pair(pen, num);
225
226
0
  if ((it = labels_map.find(label)) == labels_map.end())
227
0
    labels_map.insert(make_pair(label, cur_pair));
228
0
  else
229
0
    it->second.first = pen, it->second.second = num;
230
231
0
  if (descr) {
232
0
    descriptions_map_t::iterator dit;
233
234
0
    if ((dit = descriptions_map.find(cur_pair)) == descriptions_map.end())
235
0
      descriptions_map.insert(make_pair(cur_pair, descr));
236
0
  }
237
0
}
238
239
/* **************************************************** */
240
241
bool ZMQParserInterface::getKeyId(char *sym, u_int32_t sym_len,
242
                                  u_int32_t *const pen,
243
0
                                  u_int32_t *const field) const {
244
0
  u_int32_t cur_pen, cur_field;
245
0
  string label(sym);
246
0
  labels_map_t::const_iterator it;
247
0
  bool is_num, is_dotted;
248
249
0
  *pen = UNKNOWN_PEN, *field = UNKNOWN_FLOW_ELEMENT;
250
251
0
  is_num = Utils::isNumber(sym, sym_len, &is_dotted);
252
253
0
  if (is_num && is_dotted) {
254
0
    if (sscanf(sym, "%u.%u", &cur_pen, &cur_field) != 2) return false;
255
0
    *pen = cur_pen, *field = cur_field;
256
0
  } else if (is_num) {
257
0
    cur_field = atoi(sym);
258
0
    *pen = 0, *field = cur_field;
259
0
  } else if ((it = labels_map.find(label)) != labels_map.end()) {
260
0
    *pen = it->second.first, *field = it->second.second;
261
0
  } else {
262
0
    return false;
263
0
  }
264
265
0
  return true;
266
0
}
267
268
/* **************************************************** */
269
270
0
void ZMQParserInterface::addCounterMapping(const char *sym, u_int32_t id) {
271
0
  string label(sym);
272
0
  counters_map_t::iterator it;
273
274
0
  if ((it = counters_map.find(label)) == counters_map.end())
275
0
    counters_map.insert(make_pair(label, id));
276
0
}
277
278
/* **************************************************** */
279
280
bool ZMQParserInterface::getCounterId(char *sym, u_int32_t sym_len,
281
0
                                      u_int32_t *id) const {
282
0
  string label(sym);
283
0
  counters_map_t::const_iterator it;
284
0
  bool is_num, is_dotted;
285
286
0
  is_num = Utils::isNumber(sym, sym_len, &is_dotted);
287
288
0
  if (is_num) {
289
0
    *id = atoi(sym);
290
0
  } else if ((it = counters_map.find(label)) != counters_map.end()) {
291
0
    *id = it->second;
292
0
  } else {
293
0
    return false;
294
0
  }
295
296
0
  return true;
297
0
}
298
299
/* **************************************************** */
300
301
const char *ZMQParserInterface::getKeyDescription(u_int32_t pen,
302
0
                                                  u_int32_t field) const {
303
0
  descriptions_map_t::const_iterator it;
304
305
0
  if ((it = descriptions_map.find(make_pair(pen, field))) !=
306
0
      descriptions_map.end())
307
0
    return it->second.c_str();
308
309
0
  return NULL;
310
0
}
311
312
/* **************************************************** */
313
314
u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,
315
                                        u_int32_t source_id, u_int32_t msg_id,
316
0
                                        void *data) {
317
0
  json_object *o;
318
0
  enum json_tokener_error jerr = json_tokener_success;
319
0
  ZMQ_RemoteStats zrs;
320
0
  const u_int32_t max_timeout = 600, min_timeout = 60;
321
322
0
  if (polling_start_time == 0) polling_start_time = (u_int32_t)time(NULL);
323
324
0
  memset(&zrs, 0, sizeof(zrs));
325
326
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msg_id: %u] %s", msg_id,
327
  // payload);
328
329
0
  o = json_tokener_parse_verbose(payload, &jerr);
330
331
0
  if (o) {
332
0
    json_object *w, *z;
333
334
0
    zrs.source_id = source_id;
335
336
0
    if (json_object_object_get_ex(o, "bytes", &w))
337
0
      zrs.remote_bytes = (u_int64_t)json_object_get_int64(w);
338
0
    if (json_object_object_get_ex(o, "packets", &w))
339
0
      zrs.remote_pkts = (u_int64_t)json_object_get_int64(w);
340
341
0
    if (json_object_object_get_ex(o, "iface", &w)) {
342
0
      if (json_object_object_get_ex(w, "name", &z))
343
0
        snprintf(zrs.remote_ifname, sizeof(zrs.remote_ifname), "%s",
344
0
                 json_object_get_string(z));
345
0
      if (json_object_object_get_ex(w, "speed", &z))
346
0
        zrs.remote_ifspeed = (u_int32_t)json_object_get_int64(z);
347
0
      if (json_object_object_get_ex(w, "ip", &z))
348
0
        snprintf(zrs.remote_ifaddress, sizeof(zrs.remote_ifaddress), "%s",
349
0
                 json_object_get_string(z));
350
0
    }
351
352
0
    if (json_object_object_get_ex(o, "probe", &w)) {
353
0
      if (json_object_object_get_ex(w, "public_ip", &z))
354
0
        snprintf(zrs.remote_probe_public_address,
355
0
                 sizeof(zrs.remote_probe_public_address), "%s",
356
0
                 json_object_get_string(z));
357
0
      if (json_object_object_get_ex(w, "ip", &z))
358
0
        snprintf(zrs.remote_probe_address, sizeof(zrs.remote_probe_address),
359
0
                 "%s", json_object_get_string(z));
360
0
      if (json_object_object_get_ex(w, "version", &z))
361
0
        snprintf(zrs.remote_probe_version, sizeof(zrs.remote_probe_version),
362
0
                 "%s", json_object_get_string(z));
363
0
      if (json_object_object_get_ex(w, "osname", &z))
364
0
        snprintf(zrs.remote_probe_os, sizeof(zrs.remote_probe_os), "%s",
365
0
                 json_object_get_string(z));
366
0
      if (json_object_object_get_ex(w, "license", &z))
367
0
        snprintf(zrs.remote_probe_license, sizeof(zrs.remote_probe_license),
368
0
                 "%s", json_object_get_string(z));
369
0
      if (json_object_object_get_ex(w, "edition", &z))
370
0
        snprintf(zrs.remote_probe_edition, sizeof(zrs.remote_probe_edition),
371
0
                 "%s", json_object_get_string(z));
372
0
      if (json_object_object_get_ex(w, "maintenance", &z))
373
0
        snprintf(zrs.remote_probe_maintenance,
374
0
                 sizeof(zrs.remote_probe_maintenance), "%s",
375
0
                 json_object_get_string(z));
376
0
    }
377
378
0
    if (json_object_object_get_ex(o, "time", &w)) {
379
0
      int32_t time_delta;
380
381
0
      zrs.local_time = (u_int32_t)time(NULL);
382
0
      zrs.remote_time =
383
0
          (u_int32_t)json_object_get_int64(w); /* nProbe remote time */
384
385
0
      time_delta = (int32_t)zrs.local_time - zrs.remote_time;
386
387
      /*
388
         Skip the check for the first few seconds
389
         as we might receive old messages from the probe
390
      */
391
0
      if ((msg_id != 0) /* Skip message with drops to avoid miscalculations */
392
0
          && ((zrs.local_time - polling_start_time) > 5)) {
393
0
        if (abs(time_delta) >= 10) {
394
0
          ntop->getTrace()->traceEvent(TRACE_NORMAL,
395
0
                                       "Remote probe clock drift %u sec "
396
0
                                       "detected (local: %u remote: %u [%s])",
397
0
                                       abs(time_delta), zrs.local_time,
398
0
                                       zrs.remote_time,
399
0
                                       zrs.remote_probe_address);
400
0
        }
401
0
      } else
402
0
        zrs.remote_time = zrs.local_time; /* Avoid clock drift messages during
403
                                             the grace period */
404
0
    }
405
406
0
    if (json_object_object_get_ex(o, "avg", &w)) {
407
0
      if (json_object_object_get_ex(w, "bps", &z))
408
0
        zrs.avg_bps = (u_int32_t)json_object_get_int64(z);
409
0
      if (json_object_object_get_ex(w, "pps", &z))
410
0
        zrs.avg_pps = (u_int32_t)json_object_get_int64(z);
411
0
    }
412
413
0
    if (json_object_object_get_ex(o, "timeout", &w)) {
414
0
      if (json_object_object_get_ex(w, "lifetime", &z)) {
415
0
        zrs.remote_lifetime_timeout = (u_int32_t)json_object_get_int64(z);
416
417
0
        if (zrs.remote_lifetime_timeout > max_timeout)
418
0
          zrs.remote_lifetime_timeout = max_timeout;
419
0
      }
420
421
0
      if (json_object_object_get_ex(w, "idle", &z)) {
422
0
        zrs.remote_idle_timeout = (u_int32_t)json_object_get_int64(z);
423
0
        zrs.remote_idle_timeout *= 2; /* Double the idle timeout for NetFlow */
424
0
        if (zrs.remote_idle_timeout > max_timeout)
425
0
          zrs.remote_idle_timeout = max_timeout;
426
0
        if (zrs.remote_idle_timeout < min_timeout)
427
0
          zrs.remote_idle_timeout = min_timeout;
428
0
      }
429
430
0
      if (json_object_object_get_ex(w, "collected_lifetime", &z))
431
0
        zrs.remote_collected_lifetime_timeout =
432
0
            (u_int32_t)json_object_get_int64(z);
433
0
    }
434
435
0
    if (json_object_object_get_ex(o, "drops", &w)) {
436
0
      if (json_object_object_get_ex(w, "export_queue_full", &z))
437
0
        zrs.export_queue_full = (u_int32_t)json_object_get_int64(z);
438
439
0
      if (json_object_object_get_ex(w, "too_many_flows", &z))
440
0
        zrs.too_many_flows = (u_int32_t)json_object_get_int64(z);
441
442
0
      if (json_object_object_get_ex(w, "elk_flow_drops", &z))
443
0
        zrs.elk_flow_drops = (u_int32_t)json_object_get_int64(z);
444
445
0
      if (json_object_object_get_ex(w, "sflow_pkt_sample_drops", &z))
446
0
        zrs.sflow_pkt_sample_drops = (u_int32_t)json_object_get_int64(z);
447
448
0
      if (json_object_object_get_ex(w, "flow_collection_drops", &z))
449
0
        zrs.flow_collection_drops = (u_int32_t)json_object_get_int64(z);
450
451
0
      if (json_object_object_get_ex(w, "flow_collection_udp_socket_drops", &z))
452
0
        zrs.flow_collection_udp_socket_drops =
453
0
            (u_int32_t)json_object_get_int64(z);
454
0
    }
455
456
0
    if (json_object_object_get_ex(o, "flow_collection", &w)) {
457
0
      if (json_object_object_get_ex(w, "nf_ipfix_flows", &z))
458
0
        zrs.flow_collection.nf_ipfix_flows =
459
0
            (u_int64_t)json_object_get_int64(z);
460
461
0
      if (json_object_object_get_ex(w, "sflow_samples", &z))
462
0
        zrs.flow_collection.sflow_samples = (u_int64_t)json_object_get_int64(z);
463
0
    }
464
465
0
    if (json_object_object_get_ex(o, "zmq", &w)) {
466
0
      if (json_object_object_get_ex(w, "num_flow_exports", &z))
467
0
        zrs.num_flow_exports = (u_int64_t)json_object_get_int64(z);
468
469
0
      if (json_object_object_get_ex(w, "num_exporters", &z))
470
0
        zrs.num_exporters = (u_int8_t)json_object_get_int(z);
471
0
    }
472
473
#ifdef ZMQ_EVENT_DEBUG
474
    ntop->getTrace()->traceEvent(
475
        TRACE_NORMAL,
476
        "Event parsed "
477
        "[iface: {name: %s, speed: %u, ip: %s}]"
478
        "[probe: {public_ip: %s, ip: %s, version: %s, os: %s, license: %s, "
479
        "edition: %s, maintenance: %s}]"
480
        "[avg: {bps: %u, pps: %u}]"
481
        "[remote: {time: %u, bytes: %u, packets: %u, idle_timeout: %u, "
482
        "lifetime_timeout: %u,"
483
        " collected_lifetime_timeout: %u }]"
484
        "[zmq: {num_exporters: %u, num_flow_exports: %u}]",
485
        zrs.remote_ifname, zrs.remote_ifspeed, zrs.remote_ifaddress,
486
        zrs.remote_probe_version, zrs.remote_probe_os, zrs.remote_probe_license,
487
        zrs.remote_probe_edition, zrs.remote_probe_maintenance,
488
        zrs.remote_probe_public_address, zrs.remote_probe_address, zrs.avg_bps,
489
        zrs.avg_pps, zrs.remote_time, (u_int32_t)zrs.remote_bytes,
490
        (u_int32_t)zrs.remote_pkts, zrs.remote_idle_timeout,
491
        zrs.remote_lifetime_timeout, zrs.remote_collected_lifetime_timeout,
492
        zrs.num_exporters, zrs.num_flow_exports);
493
#endif
494
495
0
    remote_lifetime_timeout = zrs.remote_lifetime_timeout,
496
0
    remote_idle_timeout = zrs.remote_idle_timeout;
497
498
    /* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps);
499
     */
500
501
    /* Process Flow */
502
0
    setRemoteStats(&zrs);
503
504
0
    for (std::map<u_int64_t, NetworkInterface *>::iterator it =
505
0
             flowHashing.begin();
506
0
         it != flowHashing.end(); ++it) {
507
0
      ZMQParserInterface *z = (ZMQParserInterface *)it->second;
508
509
0
      z->setRemoteStats(&zrs);
510
0
    }
511
512
    /* Dispose memory */
513
0
    json_object_put(o);
514
0
  } else {
515
    // if o != NULL
516
0
    if (!once) {
517
0
      ntop->getTrace()->traceEvent(TRACE_WARNING,
518
0
                                   "Invalid message received: "
519
0
                                   "your nProbe sender is outdated, data "
520
0
                                   "encrypted, invalid JSON, or oom?");
521
0
      ntop->getTrace()->traceEvent(
522
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
523
0
          json_tokener_error_desc(jerr), payload_size, payload);
524
0
    }
525
0
    once = true;
526
0
    if (o) json_object_put(o);
527
0
    return -1;
528
0
  }
529
530
0
  return 0;
531
0
}
532
533
/* **************************************************** */
534
535
bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
536
                                           u_int32_t field,
537
0
                                           ParsedValue *value) {
538
0
  IpAddress ip_aux; /* used to check empty IPs */
539
540
0
  switch (field) {
541
0
    case IN_SRC_MAC:
542
0
    case OUT_SRC_MAC:
543
      /* Format 00:00:00:00:00:00 */
544
0
      Utils::parseMac(flow->src_mac, value->string);
545
0
      break;
546
0
    case IN_DST_MAC:
547
0
    case OUT_DST_MAC:
548
0
      Utils::parseMac(flow->dst_mac, value->string);
549
0
      break;
550
0
    case SRC_TOS:
551
0
      flow->src_tos = value->int_num;
552
0
      break;
553
0
    case DST_TOS:
554
0
      flow->dst_tos = value->int_num;
555
0
      break;
556
0
    case IPV4_SRC_ADDR:
557
0
    case IPV6_SRC_ADDR:
558
      /*
559
        The following check prevents an empty ip address (e.g., ::) to
560
        to overwrite another valid ip address already set.
561
        This can happen for example when nProbe is configured (-T) to export
562
        both %IPV4_SRC_ADDR and the %IPV6_SRC_ADDR. In that cases nProbe can
563
        export a valid ipv4 and an empty ipv6. Without the check, the empty
564
        v6 address may overwrite the non empty v4.
565
      */
566
0
      if (flow->src_ip.isEmpty()) {
567
0
        if (value->string)
568
0
          flow->src_ip.set((char *)value->string);
569
0
        else
570
0
          flow->src_ip.set(ntohl(value->int_num));
571
0
      } else {
572
0
        ip_aux.set((char *)value->string);
573
574
0
        if (!ip_aux.isEmpty() &&
575
0
            !ntop->getPrefs()->do_override_src_with_post_nat_src())
576
          /* tried to overwrite a non-empty IP with another non-empty IP */
577
0
          ntop->getTrace()->traceEvent(
578
0
              TRACE_WARNING,
579
0
              "Attempt to set source ip multiple times. "
580
0
              "Check exported fields");
581
0
      }
582
0
      break;
583
0
    case IP_PROTOCOL_VERSION:
584
0
      flow->version = value->int_num;
585
0
      break;
586
587
0
    case IPV4_DST_ADDR:
588
0
    case IPV6_DST_ADDR:
589
0
      if (flow->dst_ip.isEmpty()) {
590
0
        if (value->string)
591
0
          flow->dst_ip.set((char *)value->string);
592
0
        else
593
0
          flow->dst_ip.set(ntohl(value->int_num));
594
0
      } else {
595
0
        ip_aux.set((char *)value->string);
596
  
597
0
        if (!ip_aux.isEmpty() &&
598
0
            !ntop->getPrefs()->do_override_dst_with_post_nat_dst())
599
0
          ntop->getTrace()->traceEvent(TRACE_WARNING,
600
0
               "Attempt to set destination ip multiple times. "
601
0
               "Check exported fields");
602
0
      }
603
0
      break;
604
0
    case L4_SRC_PORT:
605
0
      if (!flow->src_port) {
606
0
        if (value->string)
607
0
          flow->src_port = atoi(value->string);
608
0
        else
609
0
          flow->src_port = ntohs((u_int32_t)value->int_num);
610
0
      }
611
0
      break;
612
0
    case L4_DST_PORT:
613
0
      if (!flow->dst_port) {
614
0
        if (value->string)
615
0
          flow->dst_port = atoi(value->string);
616
0
        else
617
0
          flow->dst_port = ntohs((u_int32_t)value->int_num);
618
0
      }
619
0
      break;
620
0
    case SRC_VLAN:
621
0
    case DST_VLAN:
622
0
      flow->vlan_id = value->int_num;
623
0
      break;
624
0
    case DOT1Q_SRC_VLAN:
625
0
    case DOT1Q_DST_VLAN:
626
0
      if (flow->vlan_id == 0) {
627
        /* as those fields are the outer vlans in q-in-q
628
           we set the vlan_id only if there is no inner vlan
629
           value set
630
        */
631
0
        flow->vlan_id = value->int_num;
632
0
      }
633
0
      break;
634
0
    case PROTOCOL:
635
0
      if (value->string)
636
0
        flow->l4_proto = atoi(value->string);
637
0
      else
638
0
        flow->l4_proto = value->int_num;
639
0
      break;
640
0
    case TCP_FLAGS:
641
0
      flow->tcp.tcp_flags = value->int_num;
642
0
      break;
643
0
    case INITIATOR_PKTS:
644
0
      flow->absolute_packet_octet_counters = true;
645
      /* Don't break */
646
0
    case IN_PKTS:
647
0
      if (value->string != NULL)
648
0
        flow->in_pkts = atol(value->string);
649
0
      else
650
0
        flow->in_pkts = value->int_num;
651
0
      break;
652
0
    case INITIATOR_OCTETS:
653
0
      flow->absolute_packet_octet_counters = true;
654
      /* Don't break */
655
0
    case IN_BYTES:
656
0
      if (value->string != NULL)
657
0
        flow->in_bytes = atol(value->string);
658
0
      else
659
0
        flow->in_bytes = value->int_num;
660
0
      break;
661
0
    case RESPONDER_PKTS:
662
0
      flow->absolute_packet_octet_counters = true;
663
      /* Don't break */
664
0
    case OUT_PKTS:
665
0
      if (value->string != NULL)
666
0
        flow->out_pkts = atol(value->string);
667
0
      else
668
0
        flow->out_pkts = value->int_num;
669
0
      break;
670
0
    case RESPONDER_OCTETS:
671
0
      flow->absolute_packet_octet_counters = true;
672
      /* Don't break */
673
0
    case OUT_BYTES:
674
0
      if (value->string != NULL)
675
0
        flow->out_bytes = atol(value->string);
676
0
      else
677
0
        flow->out_bytes = value->int_num;
678
0
      break;
679
0
    case FIRST_SWITCHED:
680
0
      if (value->string != NULL)
681
0
        flow->first_switched = atoi(value->string);
682
0
      else
683
0
        flow->first_switched = value->int_num;
684
0
      break;
685
0
    case LAST_SWITCHED:
686
0
      if (value->string != NULL)
687
0
        flow->last_switched = atoi(value->string);
688
0
      else
689
0
        flow->last_switched = value->int_num;
690
0
      break;
691
0
    case SAMPLING_INTERVAL:
692
#if 0
693
      /* Ignore it as nProbe as already implemented upscale */
694
      flow->pkt_sampling_rate = value->int_num;
695
#endif
696
0
      break;
697
0
    case DIRECTION:
698
0
      if (value->string != NULL)
699
0
        flow->direction = atoi(value->string);
700
0
      else
701
0
        flow->direction = value->int_num;
702
0
      break;
703
0
    case EXPORTER_IPV4_ADDRESS:
704
0
      if (value->string != NULL) {
705
        /* Format: a.b.c.d, possibly overrides NPROBE_IPV4_ADDRESS */
706
0
        u_int32_t ip = ntohl(inet_addr(value->string));
707
708
0
        if (ip) {
709
0
    flow->device_ip = ip;
710
    
711
0
    if(ntop->getPrefs()->is_cloud_edition()) {
712
0
      char buf[32], ipb[24];
713
0
      std::unordered_map<u_int32_t, bool>::iterator it = cloud_flow_exporters.find(ip);
714
      
715
0
      if(it == cloud_flow_exporters.end()) {
716
0
        cloud_flow_exporters[ip] = true;
717
0
        snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(ip, ipb, sizeof(ipb)));
718
0
        ntop->addLocalCloudAddress(buf);
719
720
        /* Re-evaluate IPVx_SRC_ADDR/IPVx_DST_ADDR */
721
0
        flow->src_ip.checkIP();
722
0
        flow->dst_ip.checkIP();
723
0
      }
724
0
    }
725
0
  }
726
0
      }
727
0
      break;
728
0
    case EXPORTER_IPV6_ADDRESS:
729
0
      if (value->string != NULL && strlen(value->string) > 0)
730
0
        inet_pton(AF_INET6, value->string, &flow->device_ipv6);
731
0
      break;
732
0
    case TOTAL_FLOWS_EXP:
733
      /* Not used
734
         if(value->string != NULL)
735
         total_flows_exp = atol(value->string);
736
         else
737
         total_flows_exp = value->int_num;
738
      */
739
0
      break;
740
0
    case INPUT_SNMP:
741
0
      flow->inIndex = value->int_num;
742
0
      break;
743
0
    case OUTPUT_SNMP:
744
0
      flow->outIndex = value->int_num;
745
0
      break;
746
0
    case OBSERVATION_POINT_ID:
747
0
      flow->observationPointId = value->int_num;
748
0
      break;
749
0
    case POST_NAT_SRC_IPV4_ADDR:
750
0
      if (ntop->getPrefs()->do_override_src_with_post_nat_src()) {
751
0
        if (value->string) {
752
0
          IpAddress tmp;
753
0
          tmp.set(value->string);
754
0
          if (!tmp.isEmpty()) {
755
0
            flow->src_ip.set((char *)value->string);
756
0
          }
757
0
        } else if (value->int_num) {
758
0
          flow->src_ip.set(ntohl(value->int_num));
759
0
        }
760
0
      }
761
0
      break;
762
0
    case POST_NAT_DST_IPV4_ADDR:
763
0
      if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) {
764
0
        if (value->string) {
765
0
          IpAddress tmp;
766
    
767
0
          tmp.set(value->string);
768
    
769
0
          if (!tmp.isEmpty()) {
770
0
            flow->dst_ip.set((char *)value->string);
771
0
          }
772
0
        } else if (value->int_num) {
773
0
          flow->dst_ip.set(ntohl(value->int_num));
774
0
        }
775
0
      }
776
0
      break;
777
0
    case POST_NAPT_SRC_TRANSPORT_PORT:
778
0
      if (ntop->getPrefs()->do_override_src_with_post_nat_src() &&
779
0
          (value->int_num != 0))
780
0
        flow->src_port = htons((u_int16_t)value->int_num);
781
0
      break;
782
0
    case POST_NAPT_DST_TRANSPORT_PORT:
783
0
      if (ntop->getPrefs()->do_override_dst_with_post_nat_dst() &&
784
0
          (value->int_num != 0))
785
0
        flow->dst_port = htons((u_int16_t)value->int_num);
786
0
      break;
787
0
    case INGRESS_VRFID:
788
0
      flow->vrfId = value->int_num;
789
0
      break;
790
0
    case IPV4_SRC_MASK:
791
0
    case IPV4_DST_MASK:
792
0
      if (value->int_num != 0) return false;
793
0
      break;
794
0
    case IPV4_NEXT_HOP:
795
0
      if (value->string && strcmp(value->string, "0.0.0.0")) return false;
796
0
      break;
797
0
    case SRC_AS:
798
0
      flow->src_as = value->int_num;
799
0
      break;
800
0
    case DST_AS:
801
0
      flow->dst_as = value->int_num;
802
0
      break;
803
0
    case BGP_NEXT_ADJACENT_ASN:
804
0
      flow->next_adjacent_as = value->int_num;
805
0
      break;
806
0
    case BGP_PREV_ADJACENT_ASN:
807
0
      flow->prev_adjacent_as = value->int_num;
808
0
      break;
809
0
    default:
810
0
      ntop->getTrace()->traceEvent(TRACE_INFO,
811
0
                                   "Skipping no-PEN flow fieldId %u", field);
812
0
      return false;
813
0
  }
814
815
0
  return true;
816
0
}
817
818
/* **************************************************** */
819
820
bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
821
                                           u_int32_t field,
822
0
                                           ParsedValue *value) {
823
  /* Check for backward compatibility to handle cases like field = 123
824
   * (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */
825
0
  if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
826
827
0
  switch (field) {
828
0
  case L7_PROTO:
829
0
    if (value->string) {
830
0
      if (!strchr(value->string, '.')) {
831
  /* Old behaviour, only the app protocol */
832
0
  flow->l7_proto.app_protocol = atoi(value->string);
833
0
      } else {
834
0
  char *proto_dot;
835
836
0
  flow->l7_proto.master_protocol =
837
0
    (u_int16_t)strtoll(value->string, &proto_dot, 10);
838
0
  flow->l7_proto.app_protocol =
839
0
    (u_int16_t)strtoll(proto_dot + 1, NULL, 10);
840
0
      }
841
0
    } else {
842
0
      flow->l7_proto.app_protocol = value->int_num;
843
0
    }
844
845
#if 0
846
    ntop->getTrace()->traceEvent(TRACE_NORMAL, "[value: %s][master: %u][app: %u]",
847
         value->string ? value->string : "(int)",
848
         flow->l7_proto.master_protocol,
849
         flow->l7_proto.app_protocol);
850
#endif
851
0
    break;
852
853
0
  case NPROBE_INSTANCE_NAME:
854
0
    if(ntop->getPrefs()->is_cloud_edition()
855
0
       && ntop->getPrefs()->addVLANCloudToExporters()) {
856
0
      u_int16_t vlan_id = findVLANMapping((char*)value->string);
857
858
0
      flow->vlan_id = vlan_id;
859
0
    }
860
0
    break;
861
862
0
  case L7_PROTO_NAME:
863
0
    break;
864
865
0
  case L7_INFO:
866
0
    if (value->string && value->string[0] && value->string[0] != '\n')
867
0
      flow->setL7Info(value->string);
868
0
    break;
869
870
0
  case L7_CONFIDENCE:
871
0
    flow->setConfidence((ndpi_confidence_t)((value->int_num < NDPI_CONFIDENCE_MAX)
872
0
              ? value->int_num
873
0
              : NDPI_CONFIDENCE_UNKNOWN));
874
0
    break;
875
876
0
  case L7_ERROR_CODE:
877
0
    flow->setL7ErrorCode(value->int_num);
878
0
    break;
879
880
0
  case OOORDER_IN_PKTS:
881
0
    flow->tcp.ooo_in_pkts = value->int_num;
882
0
    break;
883
884
0
  case OOORDER_OUT_PKTS:
885
0
    flow->tcp.ooo_out_pkts = value->int_num;
886
0
    break;
887
888
0
  case RETRANSMITTED_IN_PKTS:
889
0
    flow->tcp.retr_in_pkts = value->int_num;
890
0
    break;
891
892
0
  case RETRANSMITTED_OUT_PKTS:
893
0
    flow->tcp.retr_out_pkts = value->int_num;
894
0
    break;
895
896
    /* TODO add lost in/out to nProbe and here */
897
0
  case CLIENT_NW_LATENCY_MS: {
898
0
    float client_nw_latency;
899
0
    client_nw_latency = value->double_num;
900
0
    flow->tcp.clientNwLatency.tv_sec = client_nw_latency / 1e3;
901
0
    flow->tcp.clientNwLatency.tv_usec =
902
0
      1e3 * (client_nw_latency - flow->tcp.clientNwLatency.tv_sec * 1e3);
903
0
  } break;
904
905
0
  case SERVER_NW_LATENCY_MS: {
906
0
    float server_nw_latency;
907
908
0
    server_nw_latency = value->double_num;
909
0
    flow->tcp.serverNwLatency.tv_sec = server_nw_latency / 1e3;
910
0
    flow->tcp.serverNwLatency.tv_usec =
911
0
      1e3 * (server_nw_latency - flow->tcp.serverNwLatency.tv_sec * 1e3);
912
0
  } break;
913
914
0
  case CLIENT_TCP_FLAGS:
915
0
    flow->tcp.client_tcp_flags = value->int_num;
916
0
    flow->tcp.tcp_flags |= flow->tcp.client_tcp_flags;
917
0
    break;
918
919
0
  case SERVER_TCP_FLAGS:
920
0
    flow->tcp.server_tcp_flags = value->int_num;
921
0
    flow->tcp.tcp_flags |= flow->tcp.server_tcp_flags;
922
0
    break;
923
924
0
  case APPL_LATENCY_MS:
925
0
    flow->tcp.applLatencyMsec = value->double_num;
926
0
    break;
927
928
0
  case TCP_WIN_MAX_IN:
929
0
    flow->tcp.in_window = value->int_num;
930
0
    break;
931
932
0
  case TCP_WIN_MAX_OUT:
933
0
    flow->tcp.out_window = value->int_num;
934
0
    break;
935
936
0
  case DNS_QUERY:
937
0
    if (value->string && value->string[0] && value->string[0] != '\n')
938
0
      flow->setDNSQuery(value->string);
939
0
    break;
940
941
0
  case DNS_QUERY_TYPE:
942
0
    flow->setDNSQueryType(value->string ? atoi(value->string) : value->int_num);
943
0
    break;
944
945
0
  case DNS_RET_CODE:
946
0
    flow->setDNSRetCode(value->string ? atoi(value->string) : value->int_num);
947
0
    break;
948
949
0
  case HTTP_URL:
950
0
    if (value->string && value->string[0] && value->string[0] != '\n')
951
0
      flow->setHTTPurl(value->string);
952
0
    break;
953
954
0
  case HTTP_USER_AGENT:
955
0
    if (value->string && value->string[0] && value->string[0] != '\n')
956
0
      flow->setHTTPuserAgent(value->string);
957
0
    break;
958
959
0
  case HTTP_SITE:
960
0
    if (value->string && value->string[0] && value->string[0] != '\n')
961
0
      flow->setHTTPsite(value->string);
962
0
    break;
963
964
0
  case HTTP_RET_CODE:
965
0
    flow->setHTTPRetCode(value->string ? atoi(value->string) : value->int_num);
966
0
    break;
967
968
0
  case HTTP_METHOD:
969
0
    if (value->string && value->string[0] && value->string[0] != '\n')
970
0
      flow->setHTTPMethod(ndpi_http_str2method(value->string, strlen(value->string)));
971
0
    break;
972
973
0
  case TLS_SERVER_NAME:
974
0
    if (value->string && value->string[0] && value->string[0] != '\n')
975
0
      flow->setTLSserverName(value->string);
976
0
    break;
977
978
0
  case JA3C_HASH:
979
0
    if (value->string && value->string[0])
980
0
      flow->setJA3cHash(value->string);
981
0
    break;
982
983
0
  case JA3S_HASH:
984
0
    if (value->string && value->string[0])
985
0
      flow->setJA3sHash(value->string);
986
0
    break;
987
988
0
  case TLS_CIPHER:
989
0
    flow->setTLSCipher(value->int_num);
990
0
    break;
991
992
0
  case SSL_UNSAFE_CIPHER:
993
0
    flow->setTLSUnsafeCipher(value->int_num);
994
0
    break;
995
996
0
  case L7_PROTO_RISK:
997
0
    flow->setRisk((ndpi_risk)value->int_num);
998
0
    break;
999
1000
0
  case FLOW_VERDICT:
1001
0
    flow->setFlowVerdict(value->int_num);
1002
0
    break;
1003
1004
0
  case L7_RISK_INFO:
1005
0
    if (value->string && value->string[0])
1006
0
      flow->setRiskInfo(value->string);
1007
0
    break;
1008
1009
0
  case BITTORRENT_HASH:
1010
0
    if (value->string && value->string[0] && value->string[0] != '\n')
1011
0
      flow->setBittorrentHash(value->string);
1012
0
    break;
1013
1014
0
  case NPROBE_IPV4_ADDRESS:
1015
    /* Do not override EXPORTER_IPV4_ADDRESS */
1016
0
    if (value->string && flow->device_ip == 0 &&
1017
0
  (flow->device_ip = ntohl(inet_addr(value->string))))
1018
0
      return false;
1019
0
    break;
1020
1021
0
  case SRC_FRAGMENTS:
1022
0
    flow->in_fragments = value->int_num;
1023
0
    break;
1024
1025
0
  case DST_FRAGMENTS:
1026
0
    flow->out_fragments = value->int_num;
1027
0
    break;
1028
1029
0
  case SRC_PROC_PID:
1030
0
    flow->src_process_info.pid = value->int_num;
1031
0
    break;
1032
1033
0
  case SRC_FATHER_PROC_PID:
1034
0
    flow->src_process_info.father_pid = value->int_num;
1035
0
    break;
1036
1037
0
  case SRC_PROC_NAME:
1038
0
    if (value->string && value->string[0]) {
1039
0
      flow->setParsedProcessInfo();
1040
0
      flow->src_process_info.process_name = strdup(value->string);
1041
1042
#if 0
1043
      ntop->getTrace()->traceEvent(TRACE_NORMAL, "[SRC] %s (%u)",
1044
           flow->src_process_info.process_name,
1045
           ntohs(flow->src_port));
1046
#endif
1047
0
    }
1048
0
    break;
1049
1050
0
  case SRC_FATHER_PROC_NAME:
1051
0
    if (value->string && value->string[0]) {
1052
0
      flow->setParsedProcessInfo();
1053
0
      flow->src_process_info.father_process_name = strdup(value->string);
1054
0
    }
1055
0
    break;
1056
1057
0
  case SRC_PROC_PKG_NAME:
1058
0
    if (value->string && value->string[0])
1059
0
      flow->src_process_info.pkg_name = strdup(value->string);
1060
0
    break;
1061
1062
0
  case SRC_FATHER_PROC_PKG_NAME:
1063
0
    if (value->string && value->string[0])
1064
0
      flow->src_process_info.father_pkg_name = strdup(value->string);
1065
0
    break;
1066
1067
0
  case SRC_PROC_CMDLINE:
1068
0
    if (value->string && value->string[0])
1069
0
      flow->src_process_info.cmd_line = strdup(value->string);
1070
0
    break;
1071
1072
0
  case SRC_PROC_UID:
1073
0
    flow->src_process_info.uid = value->int_num;
1074
0
    break;
1075
1076
0
  case SRC_FATHER_PROC_UID:
1077
0
    flow->src_process_info.father_uid = value->int_num;
1078
0
    break;
1079
1080
0
  case SRC_PROC_USER_NAME:
1081
0
    if (value->string && value->string[0])
1082
0
      flow->src_process_info.uid_name = strdup(value->string);
1083
0
    break;
1084
1085
0
  case SRC_FATHER_PROC_USER_NAME:
1086
0
    if (value->string && value->string[0])
1087
0
      flow->src_process_info.father_uid_name = strdup(value->string);
1088
0
    break;
1089
1090
0
  case SRC_PROC_CONTAINER_ID:
1091
0
    if (value->string && value->string[0]) {
1092
0
      flow->setParsedContainerInfo();
1093
0
      flow->src_container_info.id = strdup(value->string);
1094
0
    }
1095
0
    break;
1096
1097
0
  case DST_PROC_PID:
1098
0
    flow->dst_process_info.pid = value->int_num;
1099
0
    break;
1100
1101
0
  case DST_FATHER_PROC_PID:
1102
0
    flow->dst_process_info.father_pid = value->int_num;
1103
0
    break;
1104
1105
0
  case DST_PROC_NAME:
1106
0
    if (value->string && value->string[0]) {
1107
0
      flow->setParsedProcessInfo();
1108
0
      flow->dst_process_info.process_name = strdup(value->string);
1109
1110
#if 0
1111
      ntop->getTrace()->traceEvent(TRACE_NORMAL, "[DST] %s (%u)",
1112
           flow->dst_process_info.process_name,
1113
           ntohs(flow->dst_port));
1114
#endif
1115
0
    }
1116
0
    break;
1117
1118
0
  case DST_FATHER_PROC_NAME:
1119
0
    if (value->string && value->string[0]) {
1120
0
      flow->setParsedProcessInfo();
1121
0
      flow->dst_process_info.father_process_name = strdup(value->string);
1122
0
    }
1123
0
    break;
1124
1125
0
  case DST_PROC_PKG_NAME:
1126
0
    if (value->string && value->string[0])
1127
0
      flow->dst_process_info.pkg_name = strdup(value->string);
1128
0
    break;
1129
1130
0
  case DST_FATHER_PROC_PKG_NAME:
1131
0
    if (value->string && value->string[0])
1132
0
      flow->dst_process_info.father_pkg_name = strdup(value->string);
1133
0
    break;
1134
1135
0
  case DST_PROC_CMDLINE:
1136
0
    if (value->string && value->string[0])
1137
0
      flow->dst_process_info.cmd_line = strdup(value->string);
1138
0
    break;
1139
1140
0
  case DST_PROC_UID:
1141
0
    flow->dst_process_info.uid = value->int_num;
1142
0
    break;
1143
1144
0
  case DST_FATHER_PROC_UID:
1145
0
    flow->dst_process_info.father_uid = value->int_num;
1146
0
    break;
1147
1148
0
  case DST_PROC_USER_NAME:
1149
0
    if (value->string && value->string[0])
1150
0
      flow->dst_process_info.uid_name = strdup(value->string);
1151
0
    break;
1152
1153
0
  case DST_FATHER_PROC_USER_NAME:
1154
0
    if (value->string && value->string[0])
1155
0
      flow->dst_process_info.father_uid_name = strdup(value->string);
1156
0
    break;
1157
1158
0
  case DST_PROC_CONTAINER_ID:
1159
0
    if (value->string && value->string[0]) {
1160
0
      flow->setParsedContainerInfo();
1161
0
      flow->dst_container_info.id = strdup(value->string);
1162
0
    }
1163
0
    break;
1164
1165
0
  default:
1166
0
    return false;
1167
0
  }
1168
1169
0
  return true;
1170
0
}
1171
1172
/* **************************************************** */
1173
1174
bool ZMQParserInterface::matchPENZeroField(ParsedFlow *const flow,
1175
                                           u_int32_t field,
1176
0
                                           ParsedValue *value) {
1177
0
  IpAddress ip_aux; /* used to check empty IPs */
1178
1179
0
  switch (field) {
1180
0
    case IN_SRC_MAC:
1181
0
    case OUT_SRC_MAC: {
1182
0
      u_int8_t mac[6];
1183
0
      Utils::parseMac(mac, value->string);
1184
0
      return (memcmp(flow->src_mac, mac, sizeof(mac)) == 0);
1185
0
    }
1186
1187
0
    case IN_DST_MAC:
1188
0
    case OUT_DST_MAC: {
1189
0
      u_int8_t mac[6];
1190
0
      Utils::parseMac(mac, value->string);
1191
0
      return (memcmp(flow->dst_mac, mac, sizeof(mac)) == 0);
1192
0
    }
1193
1194
0
    case SRC_TOS:
1195
0
      if (value->string)
1196
0
        return (flow->src_tos == atoi(value->string));
1197
0
      else
1198
0
        return (flow->src_tos == value->int_num);
1199
1200
0
    case DST_TOS:
1201
0
      if (value->string)
1202
0
        return (flow->dst_tos == atoi(value->string));
1203
0
      else
1204
0
        return (flow->dst_tos == value->int_num);
1205
1206
0
    case IPV4_SRC_ADDR:
1207
0
    case IPV6_SRC_ADDR: {
1208
0
      IpAddress ip;
1209
0
      if (value->string)
1210
0
        ip.set((char *)value->string);
1211
0
      else
1212
0
        ip.set(ntohl(value->int_num));
1213
0
      return (flow->src_ip.compare(&ip) == 0);
1214
0
    }
1215
1216
0
    case IP_PROTOCOL_VERSION:
1217
0
      if (value->string)
1218
0
        return (flow->version == atoi(value->string));
1219
0
      else
1220
0
        return (flow->version == value->int_num);
1221
1222
0
    case IPV4_DST_ADDR:
1223
0
    case IPV6_DST_ADDR: {
1224
0
      IpAddress ip;
1225
0
      if (value->string)
1226
0
        ip.set((char *)value->string);
1227
0
      else
1228
0
        ip.set(ntohl(value->int_num));
1229
0
      return (flow->dst_ip.compare(&ip) == 0);
1230
0
    }
1231
1232
0
    case L4_SRC_PORT:
1233
0
      if (value->string)
1234
0
        return (flow->src_port == htons((u_int32_t)atoi(value->string)));
1235
0
      else
1236
0
        return (flow->src_port == htons((u_int32_t)value->int_num));
1237
1238
0
    case L4_DST_PORT:
1239
0
      if (value->string)
1240
0
        return (flow->dst_port == htons((u_int32_t)atoi(value->string)));
1241
0
      else
1242
0
        return (flow->dst_port == htons((u_int32_t)value->int_num));
1243
1244
0
    case SRC_VLAN:
1245
0
    case DST_VLAN:
1246
0
    case DOT1Q_SRC_VLAN:
1247
0
    case DOT1Q_DST_VLAN:
1248
0
      if (value->string)
1249
0
        return (flow->vlan_id == atoi(value->string));
1250
0
      else
1251
0
        return (flow->vlan_id == value->int_num);
1252
1253
0
    case PROTOCOL:
1254
0
      if (value->string)
1255
0
        return (flow->l4_proto == atoi(value->string));
1256
0
      else
1257
0
        return (flow->l4_proto == value->int_num);
1258
1259
0
    case DIRECTION:
1260
0
      if (value->string)
1261
0
        return (flow->direction == atoi(value->string));
1262
0
      else
1263
0
        return (flow->direction == value->int_num);
1264
1265
0
    case EXPORTER_IPV4_ADDRESS:
1266
0
      return (flow->device_ip == ntohl(inet_addr(value->string)));
1267
1268
0
    case EXPORTER_IPV6_ADDRESS:
1269
0
      if (value->string != NULL && strlen(value->string) > 0) {
1270
0
        struct ndpi_in6_addr ipv6;
1271
1272
0
        if (inet_pton(AF_INET6, value->string, &ipv6) <= 0) return false;
1273
0
        return (memcmp(&flow->device_ipv6, &ipv6, sizeof(flow->device_ipv6)) == 0);
1274
0
      }
1275
1276
0
    case INPUT_SNMP:
1277
0
      if (value->string)
1278
0
        return (flow->inIndex == (u_int32_t)atoi(value->string));
1279
0
      else
1280
0
        return (flow->inIndex == value->int_num);
1281
1282
0
    case OUTPUT_SNMP:
1283
0
      if (value->string)
1284
0
        return (flow->outIndex == (u_int32_t)atoi(value->string));
1285
0
      else
1286
0
        return (flow->outIndex == value->int_num);
1287
1288
0
    case OBSERVATION_POINT_ID:
1289
0
      if (value->string)
1290
0
        return (flow->observationPointId == atoi(value->string));
1291
0
      else
1292
0
        return (flow->observationPointId == value->int_num);
1293
1294
0
    case INGRESS_VRFID:
1295
0
      if (value->string)
1296
0
        return (flow->vrfId == (u_int)atoi(value->string));
1297
0
      else
1298
0
        return (flow->vrfId == value->int_num);
1299
1300
0
    case SRC_AS:
1301
0
      if (value->string)
1302
0
        return (flow->src_as == (u_int32_t)atoi(value->string));
1303
0
      else
1304
0
        return (flow->src_as == value->int_num);
1305
1306
0
    case DST_AS:
1307
0
      if (value->string)
1308
0
        return (flow->dst_as == (u_int32_t)atoi(value->string));
1309
0
      else
1310
0
        return (flow->dst_as == value->int_num);
1311
1312
0
    case BGP_NEXT_ADJACENT_ASN:
1313
0
      if (value->string)
1314
0
        return (flow->next_adjacent_as == (u_int32_t)atoi(value->string));
1315
0
      else
1316
0
        return (flow->next_adjacent_as == value->int_num);
1317
1318
0
    case BGP_PREV_ADJACENT_ASN:
1319
0
      if (value->string)
1320
0
        return (flow->prev_adjacent_as == (u_int32_t)atoi(value->string));
1321
0
      else
1322
0
        return (flow->prev_adjacent_as == value->int_num);
1323
1324
0
    default:
1325
0
      ntop->getTrace()->traceEvent(TRACE_INFO,
1326
0
                                   "Skipping no-PEN flow fieldId %u", field);
1327
0
      break;
1328
0
  }
1329
1330
0
  return false;
1331
0
}
1332
1333
/* **************************************************** */
1334
1335
bool ZMQParserInterface::matchPENNtopField(ParsedFlow *const flow,
1336
                                           u_int32_t field,
1337
0
                                           ParsedValue *value) {
1338
  /* Check for backward compatibility to handle cases like field = 123
1339
   * (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */
1340
0
  if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
1341
1342
0
  switch (field) {
1343
0
    case L7_PROTO: {
1344
0
      ndpi_proto l7_proto = {0};
1345
0
      if (value->string) {
1346
0
        if (!strchr(value->string, '.')) {
1347
          /* Old behaviour, only the app protocol */
1348
0
          l7_proto.app_protocol = atoi(value->string);
1349
0
        } else {
1350
0
          char *proto_dot;
1351
0
          l7_proto.master_protocol =
1352
0
              (u_int16_t)strtoll(value->string, &proto_dot, 10);
1353
0
          l7_proto.app_protocol = (u_int16_t)strtoll(proto_dot + 1, NULL, 10);
1354
0
        }
1355
0
      } else {
1356
0
        l7_proto.app_protocol = value->int_num;
1357
0
      }
1358
0
      return (flow->l7_proto.app_protocol == l7_proto.app_protocol);
1359
0
    }
1360
1361
0
    case L7_PROTO_NAME:
1362
0
      if (value->string) {
1363
        /* This lookup should be optimized */
1364
0
        u_int16_t app_protocol =
1365
0
            ndpi_get_proto_by_name(get_ndpi_struct(), value->string);
1366
0
        return (flow->l7_proto.app_protocol == app_protocol);
1367
0
      } else
1368
0
        return false;
1369
1370
0
    case L7_INFO:
1371
0
      if (value->string && flow->getL7Info())
1372
0
        return (strcmp(flow->getL7Info(), value->string) == 0);
1373
0
      else
1374
0
        return false;
1375
1376
0
    case L7_ERROR_CODE:
1377
0
      return (flow->getL7ErrorCode() == value->int_num);
1378
1379
0
    case DNS_QUERY:
1380
0
      if (value->string && flow->getDNSQuery())
1381
0
        return (strcmp(flow->getDNSQuery(), value->string) == 0);
1382
0
      else
1383
0
        return false;
1384
1385
0
    case DNS_QUERY_TYPE:
1386
0
      if (value->string)
1387
0
        return (flow->getDNSQueryType() == atoi(value->string));
1388
0
      else
1389
0
        return (flow->getDNSQueryType() == value->int_num);
1390
1391
0
    case HTTP_URL:
1392
0
      if (value->string && flow->getHTTPurl())
1393
0
        return (strcmp(flow->getHTTPurl(), value->string) == 0);
1394
0
      else
1395
0
        return false;
1396
1397
0
    case HTTP_USER_AGENT:
1398
0
      if (value->string && flow->getHTTPuserAgent())
1399
0
        return (strcmp(flow->getHTTPuserAgent(), value->string) == 0);
1400
0
      else
1401
0
        return false;
1402
1403
0
    case HTTP_SITE:
1404
0
      if (value->string && flow->getHTTPsite())
1405
0
        return (strcmp(flow->getHTTPsite(), value->string) == 0);
1406
0
      else
1407
0
        return false;
1408
1409
0
    case TLS_SERVER_NAME:
1410
0
      if (value->string && flow->getTLSserverName())
1411
0
        return (strcmp(flow->getTLSserverName(), value->string) == 0);
1412
0
      else
1413
0
        return false;
1414
1415
0
    case NPROBE_IPV4_ADDRESS:
1416
0
      return (flow->device_ip == ntohl(inet_addr(value->string)));
1417
1418
0
    default:
1419
0
      break;
1420
0
  }
1421
1422
0
  ntop->getTrace()->traceEvent(
1423
0
      TRACE_WARNING, "Field %u not supported by flow filtering", field);
1424
1425
0
  return false;
1426
0
}
1427
1428
/* **************************************************** */
1429
1430
bool ZMQParserInterface::matchField(ParsedFlow *const flow, const char *key,
1431
0
                                    ParsedValue *value) {
1432
0
  u_int32_t pen, key_id;
1433
0
  bool res;
1434
1435
0
  if (!getKeyId((char *)key, strlen(key), &pen, &key_id)) {
1436
0
    ntop->getTrace()->traceEvent(
1437
0
        TRACE_WARNING, "Field %s not supported by flow filtering", key);
1438
0
    return false;
1439
0
  }
1440
1441
0
  switch (pen) {
1442
0
    case 0: /* No PEN */
1443
0
      res = matchPENZeroField(flow, key_id, value);
1444
0
      break;
1445
0
    case NTOP_PEN:
1446
0
      res = matchPENNtopField(flow, key_id, value);
1447
0
      break;
1448
0
    case UNKNOWN_PEN:
1449
0
    default:
1450
0
      ntop->getTrace()->traceEvent(
1451
0
          TRACE_WARNING, "Field %s not supported by flow filtering", key);
1452
0
      res = false;
1453
0
      break;
1454
0
  }
1455
1456
0
  return res;
1457
0
}
1458
1459
/* **************************************************** */
1460
1461
bool ZMQParserInterface::parseNProbeAgentField(
1462
    ParsedFlow *const flow, const char *key, ParsedValue *value,
1463
0
    json_object *const jvalue) {
1464
0
  bool ret = false;
1465
0
  json_object *obj;
1466
1467
0
  if (!strncmp(key, "timestamp", 9)) {
1468
0
    u_int32_t seconds, nanoseconds /* nanoseconds not currently used */;
1469
0
    if (sscanf(value->string, "%u.%u", &seconds, &nanoseconds) == 2) {
1470
0
      flow->first_switched = flow->last_switched = seconds;
1471
0
      ret = true;
1472
0
    }
1473
0
  } else if (!strncmp(key, "IPV4_LOCAL_ADDR", 15) ||
1474
0
             !strncmp(key, "IPV6_LOCAL_ADDR", 15)) {
1475
0
    flow->src_ip.set(value->string); /* FIX: do not always assume Local == Client */
1476
0
    ret = true;
1477
0
  } else if (!strncmp(key, "IPV4_REMOTE_ADDR", 16) ||
1478
0
             !strncmp(key, "IPV6_REMOTE_ADDR", 16)) {
1479
0
    flow->dst_ip.set(value->string); /* FIX: do not always assume Remote == Server */
1480
0
    ret = true;
1481
0
  } else if (!strncmp(key, "L4_LOCAL_PORT", 13)) {
1482
0
    flow->src_port = htons((u_int32_t)value->int_num);
1483
0
    ret = true;
1484
0
  } else if (!strncmp(key, "L4_REMOTE_PORT", 14)) {
1485
0
    flow->dst_port = htons((u_int32_t)value->int_num);
1486
0
    ret = true;
1487
0
  } else if (!strncmp(key, "INTERFACE_NAME", 7) && strlen(key) == 14) {
1488
0
    flow->ifname = (char *)json_object_get_string(jvalue);
1489
0
    ret = true;
1490
0
  } else if (strlen(key) >= 14 &&
1491
0
             !strncmp(&key[strlen(key) - 14], "FATHER_PROCESS", 14)) {
1492
0
    if (json_object_object_get_ex(jvalue, "PID", &obj))
1493
0
      flow->src_process_info.father_pid = (u_int32_t)json_object_get_int64(obj);
1494
0
    if (json_object_object_get_ex(jvalue, "UID", &obj))
1495
0
      flow->src_process_info.father_uid = (u_int32_t)json_object_get_int64(obj);
1496
0
    if (json_object_object_get_ex(jvalue, "UID_NAME", &obj))
1497
0
      flow->src_process_info.father_uid_name =
1498
0
          strdup((char *)json_object_get_string(obj));
1499
0
    if (json_object_object_get_ex(jvalue, "GID", &obj))
1500
0
      flow->src_process_info.father_gid = (u_int32_t)json_object_get_int64(obj);
1501
0
    if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj))
1502
0
      flow->src_process_info.actual_memory =
1503
0
          (u_int32_t)json_object_get_int64(obj);
1504
0
    if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj))
1505
0
      flow->src_process_info.peak_memory =
1506
0
          (u_int32_t)json_object_get_int64(obj);
1507
0
    if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj))
1508
0
      flow->src_process_info.father_process_name =
1509
0
          strdup((char *)json_object_get_string(obj));
1510
0
    if (!flow->process_info_set) flow->process_info_set = true;
1511
0
    ret = true;
1512
1513
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Father Process [pid: %u][uid:
1514
    // %u][gid: %u][path: %s]",
1515
    //           flow->src_process_info.father_pid,
1516
    // flow->src_process_info.father_uid,
1517
    // flow->src_process_info.father_gid,
1518
    //         flow->src_process_info.father_process_name);
1519
0
  } else if (strlen(key) >= 7 &&
1520
0
             !strncmp(&key[strlen(key) - 7], "PROCESS", 7)) {
1521
0
    if (json_object_object_get_ex(jvalue, "PID", &obj))
1522
0
      flow->src_process_info.pid = (u_int32_t)json_object_get_int64(obj);
1523
0
    if (json_object_object_get_ex(jvalue, "UID", &obj))
1524
0
      flow->src_process_info.uid = (u_int32_t)json_object_get_int64(obj);
1525
0
    if (json_object_object_get_ex(jvalue, "UID_NAME", &obj))
1526
0
      flow->src_process_info.uid_name =
1527
0
          strdup((char *)json_object_get_string(obj));
1528
0
    if (json_object_object_get_ex(jvalue, "GID", &obj))
1529
0
      flow->src_process_info.gid = (u_int32_t)json_object_get_int64(obj);
1530
0
    if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj))
1531
0
      flow->src_process_info.actual_memory =
1532
0
          (u_int32_t)json_object_get_int64(obj);
1533
0
    if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj))
1534
0
      flow->src_process_info.peak_memory =
1535
0
          (u_int32_t)json_object_get_int64(obj);
1536
0
    if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj))
1537
0
      flow->src_process_info.process_name =
1538
0
          strdup((char *)json_object_get_string(obj));
1539
0
    if (!flow->process_info_set) flow->process_info_set = true;
1540
0
    ret = true;
1541
1542
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Process [pid: %u][uid:
1543
    // %u][gid: %u][size/peak vm: %u/%u][path: %s]",
1544
    //         flow->src_process_info.pid,
1545
    // flow->src_process_info.uid, flow->src_process_info.gid,
1546
    // flow->src_process_info.actual_memory, flow->src_process_info.peak_memory,
1547
    // flow->src_process_info.process_name);
1548
0
  } else if (strlen(key) >= 9 &&
1549
0
             !strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) {
1550
0
    if ((ret = parseContainerInfo(jvalue, &flow->src_container_info)))
1551
0
      flow->container_info_set = true;
1552
0
  } else if (!strncmp(key, "TCP", 3) && strlen(key) == 3) {
1553
0
    if (json_object_object_get_ex(jvalue, "CONN_STATE", &obj))
1554
0
      flow->src_tcp_info.conn_state =
1555
0
          Utils::tcpStateStr2State(json_object_get_string(obj));
1556
1557
0
    if (json_object_object_get_ex(jvalue, "SEGS_IN", &obj))
1558
0
      flow->src_tcp_info.in_segs = (u_int32_t)json_object_get_int64(obj);
1559
0
    if (json_object_object_get_ex(jvalue, "SEGS_OUT", &obj))
1560
0
      flow->src_tcp_info.out_segs = (u_int32_t)json_object_get_int64(obj);
1561
0
    if (json_object_object_get_ex(jvalue, "UNACK_SEGMENTS", &obj))
1562
0
      flow->src_tcp_info.unacked_segs = (u_int32_t)json_object_get_int64(obj);
1563
0
    if (json_object_object_get_ex(jvalue, "RETRAN_PKTS", &obj))
1564
0
      flow->src_tcp_info.retx_pkts = (u_int32_t)json_object_get_int64(obj);
1565
0
    if (json_object_object_get_ex(jvalue, "LOST_PKTS", &obj))
1566
0
      flow->src_tcp_info.lost_pkts = (u_int32_t)json_object_get_int64(obj);
1567
1568
0
    if (json_object_object_get_ex(jvalue, "RTT", &obj))
1569
0
      flow->src_tcp_info.rtt = json_object_get_double(obj);
1570
0
    if (json_object_object_get_ex(jvalue, "RTT_VARIANCE", &obj))
1571
0
      flow->src_tcp_info.rtt_var = json_object_get_double(obj);
1572
1573
0
    if (json_object_object_get_ex(jvalue, "BYTES_RCVD", &obj))
1574
0
      flow->out_bytes = flow->src_tcp_info.rcvd_bytes =
1575
0
          (u_int64_t)json_object_get_int64(obj);
1576
1577
0
    if (json_object_object_get_ex(jvalue, "BYTES_ACKED", &obj))
1578
0
      flow->in_bytes = flow->src_tcp_info.sent_bytes =
1579
0
          (u_int64_t)json_object_get_int64(obj);
1580
1581
0
    if (!flow->tcp_info_set) flow->tcp_info_set = true;
1582
0
    flow->absolute_packet_octet_counters = true;
1583
0
    ret = true;
1584
1585
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "TCP INFO [conn state:
1586
    // %s][rcvd_bytes: %u][retx_pkts: %u][lost_pkts: %u]"
1587
    //         "[in_segs: %u][out_segs: %u][unacked_segs: %u]"
1588
    //         "[rtt: %f][rtt_var: %f]",
1589
    //         Utils::tcpState2StateStr(flow->src_tcp_info.conn_state),
1590
    //         flow->src_tcp_info.rcvd_bytes,
1591
    //         flow->src_tcp_info.retx_pkts,
1592
    //         flow->src_tcp_info.lost_pkts,
1593
    //         flow->src_tcp_info.in_segs,
1594
    //         flow->src_tcp_info.out_segs,
1595
    //         flow->src_tcp_info.unacked_segs,
1596
    //         flow->src_tcp_info.rtt,
1597
    //         flow->src_tcp_info.rtt_var);
1598
0
  } else if ((!strncmp(key, "TCP_EVENT_TYPE", 14) && strlen(key) == 14) ||
1599
0
             (!strncmp(key, "UDP_EVENT_TYPE", 14) && strlen(key) == 14)) {
1600
0
    flow->event_type = Utils::eBPFEventStr2Event(value->string);
1601
1602
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Event Type [type: %s]",
1603
    // Utils::eBPFEvent2EventStr(flow->event_type));
1604
0
  }
1605
1606
0
  return ret;
1607
0
}
1608
1609
/* **************************************************** */
1610
1611
0
bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) {
1612
0
  bool invalid_flow = false;
1613
0
  bool rc = false;
1614
1615
0
  if (flow->vlan_id && ntop->getPrefs()->do_ignore_vlans()) flow->vlan_id = 0;
1616
1617
  /*
1618
    Some flow exporters write in host ports the ICMP type/code.
1619
    hence we need to make sure such fields are zeroed in order
1620
    to avoid odd values in the web GUI
1621
   */
1622
0
  if ((flow->l4_proto == IPPROTO_ICMP) || (flow->l4_proto == IPPROTO_ICMPV6))
1623
0
    flow->src_port = flow->dst_port = 0;
1624
1625
  /* Handle zero IPv4/IPv6 discrepancies */
1626
0
  if (!flow->hasParsedeBPF()) {
1627
0
    if (flow->src_ip.getVersion() != flow->dst_ip.getVersion()) {
1628
0
      if (flow->dst_ip.isIPv4() && flow->src_ip.isIPv6() &&
1629
0
          flow->src_ip.isEmpty())
1630
0
        flow->src_ip.setVersion(4);
1631
0
      else if (flow->src_ip.isIPv4() && flow->dst_ip.isIPv6() &&
1632
0
               flow->dst_ip.isEmpty())
1633
0
        flow->dst_ip.setVersion(4);
1634
0
      else if (flow->dst_ip.isIPv6() && flow->src_ip.isIPv4() &&
1635
0
               flow->src_ip.isEmpty())
1636
0
        flow->src_ip.setVersion(6);
1637
0
      else if (flow->src_ip.isIPv6() && flow->dst_ip.isIPv4() &&
1638
0
               flow->dst_ip.isEmpty())
1639
0
        flow->dst_ip.setVersion(6);
1640
0
      else {
1641
0
        invalid_flow = true;
1642
  
1643
0
        ntop->getTrace()->traceEvent(
1644
0
            TRACE_WARNING,
1645
0
            "IP version mismatch: client:%d server:%d - flow will be ignored",
1646
0
            flow->src_ip.getVersion(), flow->dst_ip.getVersion());
1647
0
      }
1648
0
    }
1649
0
  }
1650
1651
0
  if (!invalid_flow) {
1652
0
    if (flow->hasParsedeBPF()) {
1653
      /* Direction already reliable when the event is an accept or a connect.
1654
         Heuristic is only used in the other cases. */
1655
#if 0
1656
      /* Disabled as Flow::setParsedeBPFInfo() now supports directions */
1657
      if(flow->event_type != ebpf_event_type_tcp_accept
1658
   && flow->event_type != ebpf_event_type_tcp_connect
1659
   && ntohs(flow->src_port) < ntohs(flow->dst_port))
1660
  flow->swap();
1661
#endif
1662
0
    } else if (/* ntohs(flow->src_port) < 1024 && Relaxes this condition to also
1663
                  include non-well-known ports (e.g., MySQL 3306) */
1664
0
               ntohs(flow->src_port) < ntohs(flow->dst_port)
1665
               // && flow->in_pkts && flow->out_pkts /* Flows can be
1666
               // mono-directional, so can't use this condition */
1667
0
               &&
1668
               (flow->l4_proto != IPPROTO_TCP /* Not TCP or TCP but without SYN (See
1669
             https://github.com/ntop/ntopng/issues/5058)
1670
                */
1671
                /*
1672
                  No SYN (cumulative flow->tcp.tcp_flags are NOT checked as
1673
                  they can contain a SYN but the direction is unknown), do
1674
                  the swap as it is assumed the beginning of the TCP flow has
1675
                  not been seen.
1676
1677
                  Swap check is performed for TCP flows for all cases other
1678
                  than when a SYN is seen from client to server and no other
1679
                  flag is seen from server to client. Indded, in this case,
1680
                  the swap should not be performed as the direction is
1681
                  accurate according to the seen flags.
1682
                */
1683
0
                || !(flow->tcp.client_tcp_flags == TH_SYN &&
1684
0
                     flow->tcp.server_tcp_flags == 0)))
1685
      /* Attempt to determine flow client and server using port numbers
1686
         useful when exported flows are mono-directional
1687
         https://github.com/ntop/ntopng/issues/1978 */
1688
0
      flow->swap();
1689
1690
0
    if (flow->pkt_sampling_rate == 0) flow->pkt_sampling_rate = 1;
1691
1692
    /* Process Flow */
1693
0
    INTERFACE_PROFILING_SECTION_ENTER("processFlow", 30);
1694
1695
0
    rc = processFlow(flow);
1696
1697
0
    INTERFACE_PROFILING_SECTION_EXIT(30);
1698
0
  }
1699
1700
0
  if (!rc) recvStats.num_dropped_flows++;
1701
1702
0
  return rc;
1703
0
}
1704
1705
/* **************************************************** */
1706
1707
int ZMQParserInterface::parseSingleJSONFlow(json_object *o,
1708
0
                                            u_int32_t source_id) {
1709
0
  ParsedFlow flow;
1710
0
  struct json_object_iterator it = json_object_iter_begin(o);
1711
0
  struct json_object_iterator itEnd = json_object_iter_end(o);
1712
0
  int ret = 0;
1713
1714
  /* Reset data */
1715
0
  flow.source_id = source_id;
1716
0
  flow.direction = UNKNOWN_FLOW_DIRECTION;
1717
1718
0
  while (!json_object_iter_equal(&it, &itEnd)) {
1719
0
    const char *key = json_object_iter_peek_name(&it);
1720
0
    json_object *jvalue = json_object_iter_peek_value(&it);
1721
0
    json_object *additional_o = NULL;
1722
0
    enum json_type type = json_object_get_type(jvalue);
1723
0
    ParsedValue value = {0};
1724
0
    bool add_to_additional_fields = false;
1725
1726
0
    switch (type) {
1727
0
      case json_type_int:
1728
0
        value.int_num = json_object_get_int64(jvalue);
1729
0
        value.double_num = value.int_num;
1730
0
        break;
1731
0
      case json_type_double:
1732
0
        value.double_num = json_object_get_double(jvalue);
1733
0
        break;
1734
0
      case json_type_boolean:
1735
0
        value.boolean = json_object_get_boolean(jvalue);
1736
0
        break;
1737
0
      case json_type_string:
1738
0
        value.string = json_object_get_string(jvalue);
1739
0
        if (strcmp(key, "json") == 0)
1740
0
          additional_o = json_tokener_parse(value.string);
1741
0
        break;
1742
0
      case json_type_object:
1743
        /* This is handled by parseNProbeAgentField or addAdditionalField */
1744
0
        break;
1745
0
      case json_type_array:
1746
        /* This is handled by parseNProbeAgentField or addAdditionalField */
1747
0
        break;
1748
0
      default:
1749
0
        ntop->getTrace()->traceEvent(
1750
0
            TRACE_WARNING, "JSON type %u not supported [key: %s]\n", type, key);
1751
0
        break;
1752
0
    }
1753
1754
0
    if ((key != NULL) && (jvalue != NULL)) {
1755
0
      u_int32_t pen, key_id;
1756
0
      bool res;
1757
1758
0
      getKeyId((char *)key, strlen(key), &pen, &key_id);
1759
1760
0
      switch (pen) {
1761
0
        case 0: /* No PEN */
1762
0
          res = parsePENZeroField(&flow, key_id, &value);
1763
0
          if (res) break;
1764
          /* Dont'break when res == false for backward compatibility: attempt to
1765
           * parse Zero-PEN as Ntop-PEN */
1766
0
        case NTOP_PEN:
1767
0
          res = parsePENNtopField(&flow, key_id, &value);
1768
0
          break;
1769
0
        case UNKNOWN_PEN:
1770
0
        default:
1771
0
          res = false;
1772
0
          break;
1773
0
      }
1774
1775
0
      if (!res) {
1776
0
        switch (key_id) {
1777
0
          case 0:  // json additional object added by Flow::serialize()
1778
0
            if (additional_o != NULL) {
1779
0
              struct json_object_iterator additional_it =
1780
0
                  json_object_iter_begin(additional_o);
1781
0
              struct json_object_iterator additional_itEnd =
1782
0
                  json_object_iter_end(additional_o);
1783
1784
0
              while (
1785
0
                  !json_object_iter_equal(&additional_it, &additional_itEnd)) {
1786
0
                const char *additional_key =
1787
0
                    json_object_iter_peek_name(&additional_it);
1788
0
                json_object *additional_v =
1789
0
                    json_object_iter_peek_value(&additional_it);
1790
0
                const char *additional_value =
1791
0
                    json_object_get_string(additional_v);
1792
1793
0
                if ((additional_key != NULL) && (additional_value != NULL)) {
1794
                  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
1795
                  // field: %s", additional_key);
1796
0
                  flow.addAdditionalField(
1797
0
                      additional_key, json_object_new_string(additional_value));
1798
0
                }
1799
0
                json_object_iter_next(&additional_it);
1800
0
              }
1801
0
            }
1802
0
            break;
1803
0
          case UNKNOWN_FLOW_ELEMENT:
1804
            /* Attempt to parse it as an nProbe mini field */
1805
0
            if (parseNProbeAgentField(&flow, key, &value, jvalue)) {
1806
0
              if (!flow.hasParsedeBPF()) {
1807
0
                flow.setParsedeBPF();
1808
0
                flow.absolute_packet_octet_counters = true;
1809
0
              }
1810
0
              break;
1811
0
            }
1812
0
          default:
1813
#ifdef NTOPNG_PRO
1814
            if (custom_app_maps ||
1815
                (custom_app_maps = new (std::nothrow) CustomAppMaps()))
1816
              custom_app_maps->checkCustomApp(key, &value, &flow);
1817
#endif
1818
0
            ntop->getTrace()->traceEvent(
1819
0
                TRACE_DEBUG, "Not handled ZMQ field %u/%s", key_id, key);
1820
0
            add_to_additional_fields = true;
1821
0
            break;
1822
0
        } /* switch */
1823
0
      }
1824
1825
0
      if (add_to_additional_fields) {
1826
        // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s",
1827
        // key);
1828
0
        flow.addAdditionalField(key, json_object_get(jvalue));
1829
0
      }
1830
1831
0
      if (additional_o) json_object_put(additional_o);
1832
0
    } /* if */
1833
1834
    /* Move to the next element */
1835
0
    json_object_iter_next(&it);
1836
0
  }  // while json_object_iter_equal
1837
1838
0
  if (preprocessFlow(&flow)) ret = 1;
1839
1840
0
  return ret;
1841
0
}
1842
1843
/* **************************************************** */
1844
1845
int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer,
1846
0
                                           u_int32_t source_id) {
1847
0
  ndpi_serialization_type kt, et;
1848
0
  ParsedFlow flow;
1849
0
  int ret = 0, rc;
1850
0
  bool recordFound = false;
1851
1852
  /* Reset data */
1853
0
  flow.source_id = source_id;
1854
0
  flow.direction = UNKNOWN_FLOW_DIRECTION;
1855
1856
0
  INTERFACE_PROFILING_SECTION_ENTER("Decode TLV", 9);
1857
1858
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Processing TLV record");
1859
0
  while ((et = ndpi_deserialize_get_item_type(deserializer, &kt)) !=
1860
0
         ndpi_serialization_unknown) {
1861
0
    ParsedValue value = {0};
1862
0
    u_int32_t pen = 0, key_id = 0;
1863
0
    u_int32_t v32 = 0;
1864
0
    int32_t i32 = 0;
1865
0
    float f = 0;
1866
0
    u_int64_t v64 = 0;
1867
0
    int64_t i64 = 0;
1868
0
    ndpi_string key, vs;
1869
0
    char key_str[64];
1870
0
    u_int8_t vbkp = 0;
1871
0
    bool add_to_additional_fields = false;
1872
0
    bool key_is_string = false, value_is_string = false;
1873
1874
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "TLV key type = %u value type
1875
    // = %u", kt, et);
1876
1877
0
    if (et == ndpi_serialization_end_of_record) {
1878
0
      ndpi_deserialize_next(deserializer);
1879
0
      goto end_of_record;
1880
0
    }
1881
1882
0
    recordFound = true;
1883
1884
0
    switch (kt) {
1885
0
      case ndpi_serialization_uint32:
1886
0
        ndpi_deserialize_key_uint32(deserializer, &key_id);
1887
0
        break;
1888
0
      case ndpi_serialization_string:
1889
0
        ndpi_deserialize_key_string(deserializer, &key);
1890
0
        key_is_string = true;
1891
0
        break;
1892
0
      default:
1893
0
        ntop->getTrace()->traceEvent(
1894
0
            TRACE_WARNING,
1895
0
            "Unsupported TLV key type %u: please update both ntopng and the "
1896
0
            "probe to the same version",
1897
0
            kt);
1898
0
        ret = -1;
1899
0
        goto error;
1900
0
    }
1901
1902
0
    switch (et) {
1903
0
      case ndpi_serialization_uint32:
1904
0
        ndpi_deserialize_value_uint32(deserializer, &v32);
1905
0
        value.double_num = value.int_num = v32;
1906
0
        break;
1907
1908
0
      case ndpi_serialization_uint64:
1909
0
        ndpi_deserialize_value_uint64(deserializer, &v64);
1910
0
        value.double_num = value.int_num = v64;
1911
0
        break;
1912
1913
0
      case ndpi_serialization_int32:
1914
0
        ndpi_deserialize_value_int32(deserializer, &i32);
1915
0
        value.double_num = value.int_num = i32;
1916
0
        break;
1917
1918
0
      case ndpi_serialization_int64:
1919
0
        ndpi_deserialize_value_int64(deserializer, &i64);
1920
0
        value.double_num = value.int_num = i64;
1921
0
        break;
1922
1923
0
      case ndpi_serialization_float:
1924
0
        ndpi_deserialize_value_float(deserializer, &f);
1925
0
        value.double_num = f;
1926
0
        break;
1927
1928
0
      case ndpi_serialization_string:
1929
0
        ndpi_deserialize_value_string(deserializer, &vs);
1930
0
        value.string = vs.str;
1931
0
        value_is_string = true;
1932
0
        break;
1933
1934
0
      default:
1935
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n",
1936
0
                                     et);
1937
0
        ret = -1;
1938
0
        goto error;
1939
0
    }
1940
1941
0
    if (key_is_string) {
1942
0
      u_int8_t kbkp = key.str[key.str_len];
1943
0
      key.str[key.str_len] = '\0';
1944
0
      snprintf(key_str, sizeof(key_str), "%s", key.str);
1945
0
      getKeyId(key.str, key.str_len, &pen, &key_id);
1946
0
      key.str[key.str_len] = kbkp;
1947
0
    }
1948
1949
0
    if (value_is_string) {
1950
      /* Adding '\0' to the end of the string, backing up the character */
1951
0
      vbkp = vs.str[vs.str_len];
1952
0
      vs.str[vs.str_len] = '\0';
1953
0
    }
1954
1955
0
    switch (pen) {
1956
0
      case 0: /* No PEN */
1957
0
        rc = parsePENZeroField(&flow, key_id, &value);
1958
0
        if (rc) break;
1959
        /* Dont'break when rc == false for backward compatibility: attempt to
1960
         * parse Zero-PEN as Ntop-PEN */
1961
0
      case NTOP_PEN:
1962
0
        rc = parsePENNtopField(&flow, key_id, &value);
1963
0
        break;
1964
0
      case UNKNOWN_PEN:
1965
0
      default:
1966
0
        rc = false;
1967
0
        break;
1968
0
    }
1969
1970
0
    if (!key_is_string) {
1971
0
      if (pen)
1972
0
        snprintf(key_str, sizeof(key_str), "%u.%u", pen, key_id);
1973
0
      else
1974
0
        snprintf(key_str, sizeof(key_str), "%u", key_id);
1975
0
    }
1976
1977
#if 0
1978
    if(ntop->getTrace()->get_trace_level() >= TRACE_LEVEL_DEBUG) {
1979
      switch(et) {
1980
      case ndpi_serialization_uint32:
1981
      case ndpi_serialization_uint64:
1982
      case ndpi_serialization_int32:
1983
      case ndpi_serialization_int64:
1984
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %lld", key_str, key_id, pen, value.int_num);
1985
        break;
1986
      case ndpi_serialization_float:
1987
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %.3f", key_str, key_id, pen, value.double_num);
1988
        break;
1989
      case ndpi_serialization_string:
1990
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %s", key_str, key_id, pen, value.string);
1991
        break;
1992
      default:
1993
        ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: -", key_str, key_id, pen);
1994
        break;
1995
      }
1996
    }
1997
#endif
1998
1999
0
    if (!rc) { /* Not handled */
2000
0
      switch (key_id) {
2001
0
        case 0:  // json additional object added by Flow::serialize()
2002
0
          if (strcmp(key_str, "json") == 0 && value_is_string) {
2003
0
            json_object *additional_o = json_tokener_parse(vs.str);
2004
2005
0
            if (additional_o) {
2006
0
              struct json_object_iterator additional_it =
2007
0
                  json_object_iter_begin(additional_o);
2008
0
              struct json_object_iterator additional_itEnd =
2009
0
                  json_object_iter_end(additional_o);
2010
2011
0
              while (
2012
0
                  !json_object_iter_equal(&additional_it, &additional_itEnd)) {
2013
0
                const char *additional_key =
2014
0
                    json_object_iter_peek_name(&additional_it);
2015
0
                json_object *additional_v =
2016
0
                    json_object_iter_peek_value(&additional_it);
2017
0
                const char *additional_value =
2018
0
                    json_object_get_string(additional_v);
2019
2020
0
                if ((additional_key != NULL) && (additional_value != NULL)) {
2021
                  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
2022
                  // field: %s", additional_key);
2023
0
                  flow.addAdditionalField(
2024
0
                      additional_key, json_object_new_string(additional_value));
2025
0
                }
2026
0
                json_object_iter_next(&additional_it);
2027
0
              }
2028
2029
0
              json_object_put(additional_o);
2030
0
            }
2031
0
          }
2032
0
          break;
2033
0
        case UNKNOWN_FLOW_ELEMENT:
2034
#if 0  // TODO
2035
  /* Attempt to parse it as an nProbe mini field */
2036
  if(parseNProbeAgentField(&flow, key_str, &value)) {
2037
    if(!flow.hasParsedeBPF()) {
2038
      flow->setParsedeBPF();
2039
      flow.absolute_packet_octet_counters = true;
2040
    }
2041
    break;
2042
  }
2043
#endif
2044
0
        default:
2045
#ifdef NTOPNG_PRO
2046
          if (custom_app_maps ||
2047
              (custom_app_maps = new (std::nothrow) CustomAppMaps()))
2048
            custom_app_maps->checkCustomApp(key_str, &value, &flow);
2049
#endif
2050
0
          ntop->getTrace()->traceEvent(
2051
0
              TRACE_DEBUG, "Not handled ZMQ field %u.%u", pen, key_id);
2052
0
          add_to_additional_fields = true;
2053
0
          break;
2054
0
      } /* switch */
2055
0
    }
2056
2057
0
    if (add_to_additional_fields) {
2058
      // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s
2059
      // (Key-ID: %u PEN: %u)", key_str, key_id, pen);
2060
0
#if 1
2061
0
      flow.addAdditionalField(deserializer);
2062
#else
2063
      flow.addAdditionalField(
2064
          key_str, value_is_string ? json_object_new_string(value.string)
2065
                                   : json_object_new_int64(value.int_num));
2066
#endif
2067
0
    }
2068
2069
    /* Restoring backed up character at the end of the string in place of '\0'
2070
     */
2071
0
    if (value_is_string) vs.str[vs.str_len] = vbkp;
2072
2073
    /* Move to the next element */
2074
0
    ndpi_deserialize_next(deserializer);
2075
2076
0
  } /* while */
2077
2078
0
end_of_record:
2079
0
  if (recordFound) {
2080
0
    INTERFACE_PROFILING_SECTION_EXIT(9); /* Closes Decode TLV */
2081
0
    INTERFACE_PROFILING_SECTION_ENTER("processFlow", 10);
2082
2083
0
    if (preprocessFlow(&flow)) ret = 1;
2084
2085
0
    INTERFACE_PROFILING_SECTION_EXIT(10);
2086
0
  }
2087
2088
0
error:
2089
0
  return ret;
2090
0
}
2091
2092
/* **************************************************** */
2093
2094
u_int8_t ZMQParserInterface::parseJSONFlow(const char *payload,
2095
                                           int payload_size, u_int32_t source_id,
2096
0
                                           u_int32_t msg_id) {
2097
0
  json_object *f = NULL;
2098
0
  enum json_tokener_error jerr = json_tokener_success;
2099
2100
0
#ifndef NTOPNG_PRO
2101
  /*
2102
    nProbe exports flows in TLV so this code will be removed in the future
2103
    Leaving here for old nProbes that will be discontinued soon
2104
  */
2105
0
  return(0);
2106
0
#endif
2107
2108
#if 0
2109
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "JSON: '%s' [len=%lu]", payload, strlen(payload));
2110
  printf("\n\n%s\n\n", payload);
2111
#endif
2112
2113
0
  if (payload) f = json_tokener_parse_verbose(payload, &jerr);
2114
2115
0
  if (f != NULL) {
2116
0
    int n = 0, rc;
2117
2118
0
    if (json_object_get_type(f) == json_type_array) {
2119
      /* Flow array */
2120
0
      int id, num_elements = json_object_array_length(f);
2121
2122
0
      for (id = 0; id < num_elements; id++) {
2123
0
        rc = parseSingleJSONFlow(json_object_array_get_idx(f, id), source_id);
2124
2125
0
        if (rc > 0) n++;
2126
0
      }
2127
2128
0
    } else {
2129
0
      rc = parseSingleJSONFlow(f, source_id);
2130
2131
0
      if (rc > 0) n++;
2132
0
    }
2133
2134
0
    json_object_put(f);
2135
0
    return n;
2136
0
  } else {
2137
    // if o != NULL
2138
0
    if (!once) {
2139
0
      ntop->getTrace()->traceEvent(TRACE_WARNING,
2140
0
           "Invalid message received: your nProbe sender is outdated, data "
2141
0
           "encrypted or invalid JSON?");
2142
0
      ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2143
0
           json_tokener_error_desc(jerr), payload_size, payload);
2144
0
    }
2145
2146
0
    once = true;
2147
0
    return 0;
2148
0
  }
2149
2150
0
  return 0;
2151
0
}
2152
2153
/* **************************************************** */
2154
2155
u_int8_t ZMQParserInterface::parseTLVFlow(const char *payload, int payload_size,
2156
                                          u_int32_t source_id, u_int32_t msg_id,
2157
0
                                          void *data) {
2158
0
  ndpi_deserializer deserializer;
2159
0
  ndpi_serialization_type kt;
2160
0
  int n = 0, rc;
2161
2162
0
  rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload,
2163
0
                                  payload_size);
2164
2165
0
  if (rc == -1) return 0;
2166
2167
0
  if (ndpi_deserialize_get_format(&deserializer) != ndpi_serialization_format_tlv) {
2168
0
    if (!once) {
2169
0
      ntop->getTrace()->traceEvent(
2170
0
          TRACE_WARNING,
2171
0
          "Invalid TLV message: the TLV generated by your probe does not match "
2172
0
          "the version supported "
2173
0
          "by ntopng, please update both the probe and ntopng to the latest "
2174
0
          "version available");
2175
0
      once = true;
2176
0
    }
2177
2178
0
    return 0;
2179
0
  }
2180
2181
0
  while (ndpi_deserialize_get_item_type(&deserializer, &kt) !=
2182
0
         ndpi_serialization_unknown) {
2183
0
    rc = parseSingleTLVFlow(&deserializer, source_id);
2184
2185
0
    if (rc < 0)
2186
0
      break;
2187
0
    else if (rc > 0)
2188
0
      n++;
2189
0
  }
2190
2191
0
  return n;
2192
0
}
2193
2194
/* **************************************************** */
2195
2196
bool ZMQParserInterface::parseContainerInfo(
2197
0
    json_object *jo, ContainerInfo *const container_info) {
2198
0
  json_object *obj, *obj2;
2199
2200
  /* Keep in sync with ZMQParserInterface::freeContainerInfo and
2201
   * ParsedeBPF::~ParsedeBPF */
2202
2203
0
  if (json_object_object_get_ex(jo, "ID", &obj))
2204
0
    container_info->id = strdup((char *)json_object_get_string(obj));
2205
2206
0
  if (json_object_object_get_ex(jo, "K8S", &obj)) {
2207
0
    if (json_object_object_get_ex(obj, "POD", &obj2))
2208
0
      container_info->data.k8s.pod =
2209
0
          strdup((char *)json_object_get_string(obj2));
2210
0
    if (json_object_object_get_ex(obj, "NS", &obj2))
2211
0
      container_info->data.k8s.ns =
2212
0
          strdup((char *)json_object_get_string(obj2));
2213
0
    container_info->data_type = container_info_data_type_k8s;
2214
0
  } else if (json_object_object_get_ex(jo, "DOCKER", &obj)) {
2215
0
    container_info->data_type = container_info_data_type_k8s;
2216
0
  } else
2217
0
    container_info->data_type = container_info_data_type_unknown;
2218
2219
0
  if (obj) {
2220
0
    if (json_object_object_get_ex(obj, "NAME", &obj2))
2221
0
      container_info->name = strdup((char *)json_object_get_string(obj2));
2222
0
  }
2223
2224
0
  return true;
2225
0
}
2226
2227
/* **************************************************** */
2228
2229
void ZMQParserInterface::freeContainerInfo(
2230
0
    ContainerInfo *const container_info) {
2231
0
  if (container_info->id) free(container_info->id);
2232
0
  if (container_info->name) free(container_info->name);
2233
0
  if (container_info->data.k8s.pod) free(container_info->data.k8s.pod);
2234
0
  if (container_info->data.k8s.ns) free(container_info->data.k8s.ns);
2235
0
}
2236
2237
/* **************************************************** */
2238
2239
u_int8_t ZMQParserInterface::parseJSONCounter(const char *payload,
2240
0
                                              int payload_size) {
2241
0
  json_object *o;
2242
0
  enum json_tokener_error jerr = json_tokener_success;
2243
0
  sFlowInterfaceStats stats;
2244
2245
0
  ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2246
2247
0
  memset(&stats, 0, sizeof(stats));
2248
0
  o = json_tokener_parse_verbose(payload, &jerr);
2249
2250
0
  if (o != NULL) {
2251
0
    struct json_object_iterator it = json_object_iter_begin(o);
2252
0
    struct json_object_iterator itEnd = json_object_iter_end(o);
2253
2254
    /* Reset data */
2255
0
    memset(&stats, 0, sizeof(stats));
2256
2257
0
    while (!json_object_iter_equal(&it, &itEnd)) {
2258
0
      const char *key = json_object_iter_peek_name(&it);
2259
0
      json_object *v = json_object_iter_peek_value(&it);
2260
0
      const char *value = json_object_get_string(v);
2261
2262
0
      if ((key != NULL) && (value != NULL)) {
2263
0
        if (!strcmp(key, "deviceIP"))
2264
0
          stats.deviceIP =
2265
0
              (u_int32_t)json_object_get_int64(v);  // ntohl(inet_addr(value));
2266
0
        else if (!strcmp(key, "samplesGenerated"))
2267
0
          stats.samplesGenerated = (u_int32_t)json_object_get_int64(v);
2268
0
        else if (!strcmp(key, "ifIndex"))
2269
0
          stats.ifIndex = (u_int32_t)json_object_get_int64(v);
2270
0
        else if (!strcmp(key, "ifName"))
2271
0
          stats.ifName = (char *)json_object_get_string(v);
2272
0
        else if (!strcmp(key, "ifType"))
2273
0
          stats.ifType = (u_int32_t)json_object_get_int64(v);
2274
0
        else if (!strcmp(key, "ifSpeed"))
2275
0
          stats.ifSpeed = (u_int32_t)json_object_get_int64(v);
2276
0
        else if (!strcmp(key, "ifDirection"))
2277
0
          stats.ifFullDuplex = (!strcmp(value, "Full")) ? true : false;
2278
0
        else if (!strcmp(key, "ifAdminStatus"))
2279
0
          stats.ifAdminStatus = (!strcmp(value, "Up")) ? true : false;
2280
0
        else if (!strcmp(key, "ifOperStatus"))
2281
0
          stats.ifOperStatus = (!strcmp(value, "Up")) ? true : false;
2282
0
        else if (!strcmp(key, "ifInOctets"))
2283
0
          stats.ifInOctets = json_object_get_int64(v);
2284
0
        else if (!strcmp(key, "ifInPackets"))
2285
0
          stats.ifInPackets = json_object_get_int64(v);
2286
0
        else if (!strcmp(key, "ifInErrors"))
2287
0
          stats.ifInErrors = json_object_get_int64(v);
2288
0
        else if (!strcmp(key, "ifOutOctets"))
2289
0
          stats.ifOutOctets = json_object_get_int64(v);
2290
0
        else if (!strcmp(key, "ifOutPackets"))
2291
0
          stats.ifOutPackets = json_object_get_int64(v);
2292
0
        else if (!strcmp(key, "ifOutErrors"))
2293
0
          stats.ifOutErrors = json_object_get_int64(v);
2294
0
        else if (!strcmp(key, "ifPromiscuousMode"))
2295
0
          stats.ifPromiscuousMode =
2296
0
              (((u_int32_t)json_object_get_int64(v)) == 1) ? true : false;
2297
0
        else if (strlen(key) >= 9 &&
2298
0
                 !strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) {
2299
0
          if (parseContainerInfo(v, &stats.container_info))
2300
0
            stats.container_info_set = true;
2301
0
        }
2302
0
      } /* if */
2303
2304
      /* Move to the next element */
2305
0
      json_object_iter_next(&it);
2306
0
    }  // while json_object_iter_equal
2307
2308
    /* Process Flow */
2309
0
    processInterfaceStats(&stats);
2310
2311
0
    freeContainerInfo(&stats.container_info);
2312
0
    json_object_put(o);
2313
0
  } else {
2314
    // if o != NULL
2315
0
    if (!once) {
2316
0
      ntop->getTrace()->traceEvent(
2317
0
          TRACE_WARNING,
2318
0
          "Invalid message received: your nProbe sender is outdated, data "
2319
0
          "encrypted or invalid JSON?");
2320
0
      ntop->getTrace()->traceEvent(
2321
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2322
0
          json_tokener_error_desc(jerr), payload_size, payload);
2323
0
    }
2324
0
    once = true;
2325
0
    return -1;
2326
0
  }
2327
2328
0
  return 0;
2329
0
}
2330
2331
/* **************************************************** */
2332
2333
u_int8_t ZMQParserInterface::parseTLVCounter(const char *payload,
2334
0
                                             int payload_size) {
2335
0
  sFlowInterfaceStats stats;
2336
0
  ndpi_deserializer deserializer;
2337
0
  ndpi_serialization_type kt, et;
2338
0
  int rc, ret = -1;
2339
2340
0
  memset(&stats, 0, sizeof(stats));
2341
2342
0
  rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload,
2343
0
                                  payload_size);
2344
2345
0
  if (rc == -1) {
2346
0
    goto error;
2347
0
  }
2348
2349
0
  if (ndpi_deserialize_get_format(&deserializer) !=
2350
0
      ndpi_serialization_format_tlv) {
2351
0
    if (!once) {
2352
0
      ntop->getTrace()->traceEvent(
2353
0
          TRACE_WARNING,
2354
0
          "Invalid TLV message: the TLV generated by your probe does not match "
2355
0
          "the version supported "
2356
0
          "by ntopng, please update both the probe and ntopng to the latest "
2357
0
          "version available");
2358
0
      once = true;
2359
0
    }
2360
0
    goto error;
2361
0
  }
2362
2363
0
  while ((et = ndpi_deserialize_get_item_type(&deserializer, &kt)) !=
2364
0
         ndpi_serialization_unknown) {
2365
    /* Key */
2366
2367
0
    bool key_is_string = false;
2368
0
    ndpi_string key;
2369
0
    u_int32_t key_id = 0;
2370
2371
0
    switch (kt) {
2372
0
      case ndpi_serialization_uint32:
2373
0
        ndpi_deserialize_key_uint32(&deserializer, &key_id);
2374
0
        break;
2375
0
      case ndpi_serialization_string:
2376
0
        ndpi_deserialize_key_string(&deserializer, &key);
2377
0
        key_is_string = true;
2378
0
        break;
2379
0
      default:
2380
0
        ntop->getTrace()->traceEvent(
2381
0
            TRACE_WARNING,
2382
0
            "Unsupported TLV key type %u: "
2383
0
            "please update both ntopng and the probe to the same version",
2384
0
            kt);
2385
0
        goto error;
2386
0
    }
2387
2388
0
    if (key_is_string) {
2389
0
      u_int8_t kbkp = key.str[key.str_len];
2390
0
      key.str[key.str_len] = '\0';
2391
0
      bool found = getCounterId(key.str, key.str_len, &key_id);
2392
0
      if (!found) {
2393
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %s\n",
2394
0
                                     key.str);
2395
0
        key.str[key.str_len] = kbkp;
2396
0
        goto error;
2397
0
      }
2398
0
      key.str[key.str_len] = kbkp;
2399
0
    }
2400
2401
    /* Value */
2402
2403
0
    ParsedValue value = {0};
2404
0
    bool value_is_string = false;
2405
0
    ndpi_string vs;
2406
0
    u_int32_t v32 = 0;
2407
0
    u_int64_t v64 = 0;
2408
2409
0
    switch (et) {
2410
0
      case ndpi_serialization_uint32:
2411
0
        ndpi_deserialize_value_uint32(&deserializer, &v32);
2412
0
        value.int_num = v32;
2413
0
        break;
2414
2415
0
      case ndpi_serialization_uint64:
2416
0
        ndpi_deserialize_value_uint64(&deserializer, &v64);
2417
0
        value.int_num = v64;
2418
0
        break;
2419
2420
0
      case ndpi_serialization_string:
2421
0
        ndpi_deserialize_value_string(&deserializer, &vs);
2422
0
        value.string = vs.str;
2423
0
        value_is_string = true;
2424
0
        break;
2425
2426
0
      default:
2427
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n",
2428
0
                                     et);
2429
0
        goto error;
2430
0
    }
2431
2432
0
    u_int8_t vbkp;
2433
0
    if (value_is_string) {
2434
      /* Adding '\0' to the end of the string, backing up the character */
2435
0
      vbkp = vs.str[vs.str_len];
2436
0
      vs.str[vs.str_len] = '\0';
2437
0
    }
2438
2439
0
    switch (key_id) {
2440
0
      case SFLOW_DEVICE_IP:
2441
0
        if (value_is_string)
2442
0
          stats.deviceIP = ntohl(inet_addr((char *)value.string));
2443
0
        else
2444
0
          stats.deviceIP = (u_int32_t)value.int_num;
2445
0
        break;
2446
0
      case SFLOW_SAMPLES_GENERATED:
2447
0
        stats.samplesGenerated = (u_int32_t)value.int_num;
2448
0
        break;
2449
0
      case SFLOW_IF_INDEX:
2450
0
        stats.ifIndex = (u_int32_t)value.int_num;
2451
0
        break;
2452
0
      case SFLOW_IF_NAME:
2453
0
        stats.ifName = strdup((char *)value.string);
2454
0
        break;
2455
0
      case SFLOW_IF_TYPE:
2456
0
        stats.ifType = (u_int32_t)value.int_num;
2457
0
        break;
2458
0
      case SFLOW_IF_SPEED:
2459
0
        stats.ifSpeed = (u_int32_t)value.int_num;
2460
0
        break;
2461
0
      case SFLOW_IF_DIRECTION:
2462
0
        stats.ifFullDuplex = (!strcmp(value.string, "Full")) ? true : false;
2463
0
        break;
2464
0
      case SFLOW_IF_ADMIN_STATUS:
2465
0
        stats.ifAdminStatus = (!strcmp(value.string, "Up")) ? true : false;
2466
0
        break;
2467
0
      case SFLOW_IF_OPER_STATUS:
2468
0
        stats.ifOperStatus = (!strcmp(value.string, "Up")) ? true : false;
2469
0
        break;
2470
0
      case SFLOW_IF_IN_OCTETS:
2471
0
        stats.ifInOctets = value.int_num;
2472
0
        break;
2473
0
      case SFLOW_IF_IN_PACKETS:
2474
0
        stats.ifInPackets = value.int_num;
2475
0
        break;
2476
0
      case SFLOW_IF_IN_ERRORS:
2477
0
        stats.ifInErrors = value.int_num;
2478
0
        break;
2479
0
      case SFLOW_IF_OUT_OCTETS:
2480
0
        stats.ifOutOctets = value.int_num;
2481
0
        break;
2482
0
      case SFLOW_IF_OUT_PACKETS:
2483
0
        stats.ifOutPackets = value.int_num;
2484
0
        break;
2485
0
      case SFLOW_IF_OUT_ERRORS:
2486
0
        stats.ifOutErrors = value.int_num;
2487
0
        break;
2488
0
      case SFLOW_IF_PROMISCUOUS_MODE:
2489
0
        stats.ifPromiscuousMode = (value.int_num == 1) ? true : false;
2490
0
        break;
2491
0
      default:
2492
0
        ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %u\n",
2493
0
                                     key_id);
2494
0
        break;
2495
0
    }
2496
2497
    /* Restoring backed up character at the end of the string in place of '\0'
2498
     */
2499
0
    if (value_is_string) vs.str[vs.str_len] = vbkp;
2500
2501
0
    ndpi_deserialize_next(&deserializer);
2502
0
  }
2503
2504
  /* Process Flow */
2505
0
  processInterfaceStats(&stats);
2506
0
  ret = 0;
2507
2508
0
  if (stats.ifName) free(stats.ifName);
2509
2510
0
error:
2511
0
  return ret;
2512
0
}
2513
2514
/* **************************************************** */
2515
2516
/*
2517
 * Minimum set of fields expected by ntopng
2518
 * NOTE:
2519
 * - the following fields may or may not appear depending on the traffic:
2520
 *   "IPV4_SRC_ADDR", "IPV4_DST_ADDR", "IPV6_SRC_ADDR", "IPV6_DST_ADDR"
2521
 * - some fields may not appear when nprobe runs with --collector-passthrough
2522
 *   "L7_PROTO"
2523
 */
2524
static std::string mandatory_template_fields[] = {
2525
    "FIRST_SWITCHED",      "LAST_SWITCHED", "L4_SRC_PORT", "L4_DST_PORT",
2526
    "IP_PROTOCOL_VERSION", "PROTOCOL",      "IN_BYTES",    "IN_PKTS",
2527
    "OUT_BYTES",           "OUT_PKTS"};
2528
2529
u_int8_t ZMQParserInterface::parseTemplate(const char *payload,
2530
                                           int payload_size, u_int32_t source_id,
2531
0
                                           u_int32_t msg_id, void *data) {
2532
  /* The format that is currently defined for templates is a JSON as follows:
2533
2534
     [{"12/Apr/2023 23:53:04
2535
     [util.cPEN":0,"field":1,"len":4,"format":"formatted_uint","name":"IN_BYTES","descr":"Incoming
2536
     flow bytes
2537
     (src->dst)"},{"PEN":0,"field":2,"len":4,"format":"formatted_uint","name":"IN_PKTS","descr":"Incoming
2538
     flow packets (src->dst)"},]
2539
  */
2540
2541
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2542
2543
0
  ZMQ_Template zmq_template;
2544
0
  json_object *obj, *w, *z;
2545
0
  enum json_tokener_error jerr = json_tokener_success;
2546
2547
0
  memset(&zmq_template, 0, sizeof(zmq_template));
2548
0
  obj = json_tokener_parse_verbose(payload, &jerr);
2549
2550
0
  if (obj) {
2551
0
    if (json_object_get_type(obj) == json_type_array) {
2552
0
      int i, num_elements = json_object_array_length(obj);
2553
0
      std::set<std::string> mandatory_fields(
2554
0
          mandatory_template_fields,
2555
0
          mandatory_template_fields + sizeof(mandatory_template_fields) /
2556
0
                                          sizeof(mandatory_template_fields[0]));
2557
2558
0
      for (i = 0; i < num_elements; i++) {
2559
0
        memset(&zmq_template, 0, sizeof(zmq_template));
2560
2561
0
        w = json_object_array_get_idx(obj, i);
2562
2563
0
        if (json_object_object_get_ex(w, "PEN", &z))
2564
0
          zmq_template.pen = (u_int32_t)json_object_get_int(z);
2565
2566
0
        if (json_object_object_get_ex(w, "field", &z))
2567
0
          zmq_template.field = (u_int32_t)json_object_get_int(z);
2568
2569
0
        if (json_object_object_get_ex(w, "format", &z))
2570
0
          zmq_template.format = json_object_get_string(z);
2571
2572
0
        if (json_object_object_get_ex(w, "name", &z))
2573
0
          zmq_template.name = json_object_get_string(z);
2574
2575
0
        if (json_object_object_get_ex(w, "descr", &z))
2576
0
          zmq_template.descr = json_object_get_string(z);
2577
2578
0
        if (zmq_template.name) {
2579
0
          addMapping(zmq_template.name, zmq_template.field, zmq_template.pen,
2580
0
                     zmq_template.descr);
2581
2582
0
          mandatory_fields.erase(zmq_template.name);
2583
0
        }
2584
2585
        // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Template [PEN: %u][field:
2586
        // %u][format: %s][name: %s][descr: %s]",
2587
        //           zmq_template.pen, zmq_template.field,
2588
        // zmq_template.format, zmq_template.name, zmq_template.descr)
2589
0
        ;
2590
0
      }
2591
2592
0
      if (mandatory_fields.size() > 0) {
2593
0
        static bool template_warning_sent = 0;
2594
2595
0
        if (!template_warning_sent) {
2596
0
          std::set<std::string>::iterator it;
2597
2598
0
          ntop->getTrace()->traceEvent(
2599
0
              TRACE_WARNING,
2600
0
              "Some mandatory fields are missing in the ZMQ template:");
2601
0
          template_warning_sent = true;
2602
2603
0
          for (it = mandatory_fields.begin(); it != mandatory_fields.end();
2604
0
               ++it) {
2605
0
            ntop->getTrace()->traceEvent(TRACE_WARNING, "\t%s", (*it).c_str());
2606
0
          }
2607
0
        }
2608
0
      }
2609
0
    }
2610
0
    json_object_put(obj);
2611
0
  } else {
2612
    // if o != NULL
2613
0
    if (!once) {
2614
0
      ntop->getTrace()->traceEvent(
2615
0
          TRACE_WARNING,
2616
0
          "Invalid message received: your nProbe sender is outdated, data "
2617
0
          "encrypted or invalid JSON?");
2618
0
      ntop->getTrace()->traceEvent(
2619
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2620
0
          json_tokener_error_desc(jerr), payload_size, payload);
2621
0
    }
2622
0
    once = true;
2623
0
    return -1;
2624
0
  }
2625
2626
0
  return 0;
2627
0
}
2628
2629
/* **************************************************** */
2630
2631
void ZMQParserInterface::setFieldMap(
2632
0
    const ZMQ_FieldMap *const field_map) const {
2633
0
  char hname[CONST_MAX_LEN_REDIS_KEY], key[32];
2634
0
  snprintf(hname, sizeof(hname), CONST_FIELD_MAP_CACHE_KEY, get_id(),
2635
0
           field_map->pen);
2636
0
  snprintf(key, sizeof(key), "%u", field_map->field);
2637
2638
0
  ntop->getRedis()->hashSet(hname, key, field_map->map);
2639
0
}
2640
2641
/* **************************************************** */
2642
2643
void ZMQParserInterface::setFieldValueMap(
2644
0
    const ZMQ_FieldValueMap *const field_value_map) const {
2645
0
  char hname[CONST_MAX_LEN_REDIS_KEY], key[32];
2646
0
  snprintf(hname, sizeof(hname), CONST_FIELD_VALUE_MAP_CACHE_KEY, get_id(),
2647
0
           field_value_map->pen, field_value_map->field);
2648
0
  snprintf(key, sizeof(key), "%u", field_value_map->value);
2649
2650
0
  ntop->getRedis()->hashSet(hname, key, field_value_map->map);
2651
0
}
2652
2653
/* **************************************************** */
2654
2655
0
u_int8_t ZMQParserInterface::parseOptionFieldMap(json_object *const jo) {
2656
0
  int arraylen = json_object_array_length(jo);
2657
0
  json_object *w, *z;
2658
0
  ZMQ_FieldMap field_map;
2659
2660
0
  memset(&field_map, 0, sizeof(field_map));
2661
2662
0
  for (int i = 0; i < arraylen; i++) {
2663
0
    w = json_object_array_get_idx(jo, i);
2664
2665
0
    if (json_object_object_get_ex(w, "PEN", &z))
2666
0
      field_map.pen = (u_int32_t)json_object_get_int(z);
2667
2668
0
    if (json_object_object_get_ex(w, "field", &z)) {
2669
0
      field_map.field = (u_int32_t)json_object_get_int(z);
2670
2671
0
      if (json_object_object_get_ex(w, "map", &z)) {
2672
0
        field_map.map = json_object_to_json_string(z);
2673
2674
0
        setFieldMap(&field_map);
2675
2676
#ifdef CUSTOM_APP_DEBUG
2677
        ntop->getTrace()->traceEvent(
2678
            TRACE_NORMAL, "Option FieldMap [PEN: %u][field: %u][map: %s]",
2679
            field_map.pen, field_map.field, field_map.map);
2680
#endif
2681
0
      }
2682
0
    }
2683
0
  }
2684
2685
0
  return 0;
2686
0
}
2687
2688
/* **************************************************** */
2689
2690
u_int8_t ZMQParserInterface::parseOptionFieldValueMap(
2691
0
    json_object *const w) {
2692
0
  json_object *z;
2693
0
  ZMQ_FieldValueMap field_value_map;
2694
2695
0
  memset(&field_value_map, 0, sizeof(field_value_map));
2696
2697
0
  if (json_object_object_get_ex(w, "PEN", &z))
2698
0
    field_value_map.pen = (u_int32_t)json_object_get_int(z);
2699
2700
0
  if (json_object_object_get_ex(w, "field", &z)) {
2701
0
    field_value_map.field = (u_int32_t)json_object_get_int(z);
2702
2703
0
    if (json_object_object_get_ex(w, "value", &z)) {
2704
0
      field_value_map.value = (u_int32_t)json_object_get_int(z);
2705
2706
0
      if (json_object_object_get_ex(w, "map", &z)) {
2707
0
        field_value_map.map = json_object_to_json_string(z);
2708
2709
0
        setFieldValueMap(&field_value_map);
2710
2711
#ifdef CUSTOM_APP_DEBUG
2712
        ntop->getTrace()->traceEvent(
2713
            TRACE_NORMAL,
2714
            "Option FieldValueMap [PEN: %u][field: %u][value: %u][map: %s]",
2715
            field_value_map.pen, field_value_map.field, field_value_map.value,
2716
            field_value_map.map);
2717
#endif
2718
0
      }
2719
0
    }
2720
0
  }
2721
2722
0
  return 0;
2723
0
}
2724
2725
/* **************************************************** */
2726
2727
u_int8_t ZMQParserInterface::parseListeningPorts(const char *payload,
2728
                                                 int payload_size,
2729
                                                 u_int32_t source_id,
2730
0
                                                 u_int32_t msg_id, void *data) {
2731
0
  enum json_tokener_error jerr = json_tokener_success;
2732
0
  json_object *o = json_tokener_parse_verbose(payload, &jerr);
2733
2734
0
  if (o != NULL) {
2735
0
    json_object *z;
2736
0
    ListeningPorts pinfo;
2737
0
    u_int16_t vlan_id = 0;
2738
    
2739
0
    if(ntop->getPrefs()->is_cloud_edition()
2740
0
       && ntop->getPrefs()->addVLANCloudToExporters()) {
2741
0
      if(json_object_object_get_ex(o, "instance-name", &z))
2742
0
  vlan_id = findVLANMapping(json_object_get_string(z));
2743
0
    }
2744
    
2745
    /* Parse port information */
2746
0
    if (json_object_object_get_ex(o, "listening-ports", &z)) {
2747
0
      enum json_type o_type = json_object_get_type(z);
2748
0
      if (o_type == json_type_object) {
2749
0
        pinfo.parsePorts(z);
2750
2751
        /* Parse list of IP addresses */
2752
0
        if (json_object_object_get_ex(o, "ip-addresses", &z)) {
2753
0
          enum json_type o_type = json_object_get_type(z);
2754
   
2755
0
          if (o_type == json_type_array) {
2756
0
            for (u_int i = 0; i < (u_int)json_object_array_length(z); i++) {
2757
0
              Host *h = NULL;
2758
0
              json_object *host = json_object_array_get_idx(z, i);
2759
0
              const char *ip_addr = json_object_get_string(host);
2760
2761
              // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Received listening
2762
              // ports for %s", ip_addr);
2763
              // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2764
2765
              /* Assign list of ports to the host */
2766
2767
0
              h = getHost((char *)ip_addr,
2768
0
        vlan_id,
2769
0
                          0 /* observationPointId */,
2770
0
        true /* inline */);
2771
0
              if (h) {
2772
0
                h->setListeningPorts(pinfo);
2773
0
              } else {
2774
    // ntop->getTrace()->traceEvent(TRACE_INFO, "Unable to find host  %s", (char *)ip_addr);
2775
0
        }
2776
0
            }
2777
0
          }
2778
0
        }
2779
0
      }
2780
0
    }
2781
2782
0
    json_object_put(o); /* Free memory */
2783
0
  }
2784
2785
0
  return (0);
2786
0
}
2787
2788
/* **************************************************** */
2789
2790
u_int8_t ZMQParserInterface::parseSNMPIntefaces(const char *payload,
2791
                                                int payload_size,
2792
                                                u_int32_t source_id,
2793
0
                                                u_int32_t msg_id, void *data) {
2794
0
  enum json_tokener_error jerr = json_tokener_success;
2795
0
  json_object *f = json_tokener_parse_verbose(payload, &jerr);
2796
2797
0
  if (f != NULL) {
2798
0
    struct json_object_iterator it = json_object_iter_begin(f);
2799
0
    struct json_object_iterator itEnd = json_object_iter_end(f);
2800
2801
0
    while (!json_object_iter_equal(&it, &itEnd)) {
2802
0
      const char *key = json_object_iter_peek_name(&it);
2803
0
      json_object *value = json_object_iter_peek_value(&it);
2804
0
      const char *rsp = json_object_to_json_string(value);
2805
0
      char redis_key[64];
2806
2807
      /*
2808
         Saving 'short' interface names
2809
2810
         Use lua/modules/snmp_mappings.lua to access them from Lua //
2811
       */
2812
0
      snprintf(redis_key, sizeof(redis_key), "cachedexporters.%s.ifnames", key);
2813
0
      ntop->getRedis()->set(redis_key, rsp);
2814
2815
0
      ntop->getTrace()->traceEvent(TRACE_INFO, "[JSON] %s = %s", redis_key,
2816
0
                                   rsp);
2817
2818
      /* Move to the next element */
2819
0
      json_object_iter_next(&it);
2820
0
    }  // while json_object_iter_equal
2821
2822
0
    json_object_put(f); /* Free memory */
2823
2824
0
    return (0);
2825
0
  } else
2826
0
    return (-1);
2827
0
}
2828
2829
/* **************************************************** */
2830
2831
u_int8_t ZMQParserInterface::parseOption(const char *payload, int payload_size,
2832
                                         u_int32_t source_id, u_int32_t msg_id,
2833
0
                                         void *data) {
2834
  /* The format that is currently defined for options is a JSON as follows:
2835
2836
     char opt[] = "
2837
     "{\"PEN\":8741, \"field\": 22, \"value\":1, \"map\":{\"name\":\"Skype\"}},"
2838
     "{\"PEN\":8741, \"field\": 22, \"value\":3, \"map\":{\"name\":\"Winni\"}}";
2839
2840
     parseOption(opt, strlen(opt), source_id, this);
2841
  */
2842
2843
  // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
2844
2845
0
  json_object *o;
2846
0
  enum json_tokener_error jerr = json_tokener_success;
2847
2848
0
  o = json_tokener_parse_verbose(payload, &jerr);
2849
2850
0
  if (o != NULL) {
2851
0
    parseOptionFieldValueMap(o);
2852
0
    json_object_put(o);
2853
0
  } else {
2854
    // if o != NULL
2855
0
    if (!once) {
2856
0
      ntop->getTrace()->traceEvent(
2857
0
          TRACE_WARNING,
2858
0
          "Invalid message received: your nProbe sender is outdated, "
2859
0
          "data encrypted or invalid JSON?");
2860
0
      ntop->getTrace()->traceEvent(
2861
0
          TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
2862
0
          json_tokener_error_desc(jerr), payload_size, payload);
2863
0
    }
2864
2865
0
    once = true;
2866
0
    return -1;
2867
0
  }
2868
2869
0
  return 0;
2870
0
}
2871
2872
/* **************************************** */
2873
2874
0
u_int32_t ZMQParserInterface::periodicStatsUpdateFrequency() const {
2875
0
  ZMQ_RemoteStats *zrs = zmq_remote_stats;
2876
0
  u_int32_t update_freq;
2877
0
  u_int32_t update_freq_min = ntop->getPrefs()->get_housekeeping_frequency();
2878
2879
0
  if (zrs)
2880
0
    update_freq =
2881
0
        min_val(max_val(zrs->remote_lifetime_timeout, zrs->remote_idle_timeout),
2882
0
                zrs->remote_collected_lifetime_timeout);
2883
0
  else
2884
0
    update_freq = update_freq_min;
2885
2886
0
  return max_val(update_freq, update_freq_min);
2887
0
}
2888
2889
/* **************************************** */
2890
2891
0
void ZMQParserInterface::setRemoteStats(ZMQ_RemoteStats *zrs) {
2892
0
  ZMQ_RemoteStats *last_zrs, *cumulative_zrs;
2893
0
  map<u_int32_t, ZMQ_RemoteStats *>::iterator it;
2894
0
  u_int32_t last_time = getTimeLastPktRcvdRemote();
2895
0
  struct timeval now;
2896
2897
0
  gettimeofday(&now, NULL);
2898
2899
  /* Store stats for the current exporter */
2900
2901
0
  lock.wrlock(__FILE__, __LINE__);
2902
2903
0
  if (source_id_last_zmq_remote_stats.find(zrs->source_id) ==
2904
0
      source_id_last_zmq_remote_stats.end()) {
2905
0
    last_zrs = (ZMQ_RemoteStats *)malloc(sizeof(ZMQ_RemoteStats));
2906
2907
0
    if (!last_zrs) {
2908
0
      lock.unlock(__FILE__, __LINE__);
2909
0
      return;
2910
0
    }
2911
2912
0
    source_id_last_zmq_remote_stats[zrs->source_id] = last_zrs;
2913
0
  } else
2914
0
    last_zrs = source_id_last_zmq_remote_stats[zrs->source_id];
2915
2916
0
  memcpy(last_zrs, zrs, sizeof(ZMQ_RemoteStats));
2917
2918
0
  lock.unlock(__FILE__, __LINE__);
2919
2920
0
  if (Utils::msTimevalDiff(&now, &last_zmq_remote_stats_update) < 1000) {
2921
    /* Do not update cumulative stats more frequently than once per second.
2922
     * Note: this also avoids concurrent access (use after free) of shadow */
2923
0
    return;
2924
0
  }
2925
2926
  /* Sum stats from all exporters */
2927
2928
0
  cumulative_zrs = (ZMQ_RemoteStats *)calloc(1, sizeof(ZMQ_RemoteStats));
2929
0
  if (!cumulative_zrs) return;
2930
2931
0
  lock.wrlock(__FILE__, __LINE__); /* Need write lock due to (*) */
2932
2933
0
  for (it = source_id_last_zmq_remote_stats.begin();
2934
0
       it != source_id_last_zmq_remote_stats.end();) {
2935
0
    ZMQ_RemoteStats *zrs_i = it->second;
2936
2937
0
    if (last_time > MAX_HASH_ENTRY_IDLE &&
2938
0
        zrs_i->remote_time < last_time - MAX_HASH_ENTRY_IDLE /* sec */) {
2939
      /* Do not account inactive exporters, release them */
2940
      // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Erased %s [local_time:
2941
      // %u][last_time: %u]", zrs_i->remote_ifname, zrs_i->local_time,
2942
      // last_time);
2943
0
      free(zrs_i);
2944
0
      source_id_last_zmq_remote_stats.erase(it++); /* (*) */
2945
0
    } else {
2946
0
      cumulative_zrs->num_exporters += zrs_i->num_exporters;
2947
0
      cumulative_zrs->remote_bytes += zrs_i->remote_bytes;
2948
0
      cumulative_zrs->remote_pkts += zrs_i->remote_pkts;
2949
0
      cumulative_zrs->num_flow_exports += zrs_i->num_flow_exports;
2950
0
      cumulative_zrs->remote_ifspeed =
2951
0
          max_val(cumulative_zrs->remote_ifspeed, zrs_i->remote_ifspeed);
2952
0
      cumulative_zrs->remote_time =
2953
0
          max_val(cumulative_zrs->remote_time, zrs_i->remote_time);
2954
0
      cumulative_zrs->local_time =
2955
0
          max_val(cumulative_zrs->local_time, zrs_i->local_time);
2956
0
      cumulative_zrs->avg_bps += zrs_i->avg_bps;
2957
0
      cumulative_zrs->avg_pps += zrs_i->avg_pps;
2958
0
      cumulative_zrs->remote_lifetime_timeout =
2959
0
          max_val(cumulative_zrs->remote_lifetime_timeout,
2960
0
                  zrs_i->remote_lifetime_timeout);
2961
0
      cumulative_zrs->remote_collected_lifetime_timeout =
2962
0
          max_val(cumulative_zrs->remote_collected_lifetime_timeout,
2963
0
                  zrs_i->remote_collected_lifetime_timeout);
2964
0
      cumulative_zrs->remote_idle_timeout = max_val(
2965
0
          cumulative_zrs->remote_idle_timeout, zrs_i->remote_idle_timeout);
2966
0
      cumulative_zrs->export_queue_full += zrs_i->export_queue_full;
2967
0
      cumulative_zrs->too_many_flows += zrs_i->too_many_flows;
2968
0
      cumulative_zrs->elk_flow_drops += zrs_i->elk_flow_drops;
2969
0
      cumulative_zrs->sflow_pkt_sample_drops += zrs_i->sflow_pkt_sample_drops;
2970
0
      cumulative_zrs->flow_collection_drops += zrs_i->flow_collection_drops;
2971
0
      cumulative_zrs->flow_collection_udp_socket_drops +=
2972
0
          zrs_i->flow_collection_udp_socket_drops;
2973
0
      cumulative_zrs->flow_collection.nf_ipfix_flows +=
2974
0
          zrs_i->flow_collection.nf_ipfix_flows;
2975
0
      cumulative_zrs->flow_collection.sflow_samples +=
2976
0
          zrs_i->flow_collection.sflow_samples;
2977
2978
0
      ++it;
2979
0
    }
2980
0
  }
2981
2982
0
  lock.unlock(__FILE__, __LINE__);
2983
2984
0
  ifSpeed = cumulative_zrs->remote_ifspeed;
2985
0
  last_pkt_rcvd = 0;
2986
0
  last_pkt_rcvd_remote = cumulative_zrs->remote_time;
2987
0
  last_remote_pps = cumulative_zrs->avg_pps;
2988
0
  last_remote_bps = cumulative_zrs->avg_bps;
2989
2990
0
  if (cumulative_zrs->flow_collection.sflow_samples > 0)
2991
0
    is_sampled_traffic = true;
2992
2993
  /* Recalculate the flow max idle according to the timeouts received */
2994
0
  flow_max_idle = min_val(cumulative_zrs->remote_lifetime_timeout,
2995
0
                          cumulative_zrs->remote_collected_lifetime_timeout) +
2996
0
                  10 /* Safe margin */;
2997
0
  updateFlowMaxIdle();
2998
2999
0
  if ((zmq_initial_pkts == 0) /* ntopng has been restarted */
3000
0
      || (cumulative_zrs->remote_bytes <
3001
0
          zmq_initial_bytes) /* nProbe has been restarted */
3002
0
  ) {
3003
    /* Start over */
3004
0
    zmq_initial_bytes = cumulative_zrs->remote_bytes,
3005
0
    zmq_initial_pkts = cumulative_zrs->remote_pkts;
3006
0
  }
3007
3008
0
  if (zmq_remote_initial_exported_flows == 0 /* ntopng has been restarted */
3009
0
      || cumulative_zrs->num_flow_exports <
3010
0
             zmq_remote_initial_exported_flows) /* nProbe has been restarted */
3011
0
    zmq_remote_initial_exported_flows = cumulative_zrs->num_flow_exports;
3012
3013
  /* Swap values */
3014
0
  if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow);
3015
0
  zmq_remote_stats_shadow = zmq_remote_stats;
3016
0
  zmq_remote_stats = cumulative_zrs;
3017
3018
0
  memcpy(&last_zmq_remote_stats_update, &now, sizeof(now));
3019
3020
  /*
3021
   * Don't override ethStats here, these stats are properly updated
3022
   * inside NetworkInterface::processFlow for ZMQ interfaces.
3023
   * Overriding values here may cause glitches and non-strictly-increasing
3024
   counters
3025
   * yielding negative rates.
3026
   ethStats.setNumBytes(cumulative_zrs->remote_bytes),
3027
   ethStats.setNumPackets(cumulative_zrs->remote_pkts);
3028
   *
3029
   */
3030
0
}
3031
3032
/* **************************************************** */
3033
3034
#ifdef NTOPNG_PRO
3035
bool ZMQParserInterface::getCustomAppDetails(u_int32_t remapped_app_id,
3036
                                             u_int32_t *const pen,
3037
                                             u_int32_t *const app_field,
3038
                                             u_int32_t *const app_id) {
3039
  return custom_app_maps && custom_app_maps->getCustomAppDetails(
3040
                                remapped_app_id, pen, app_field, app_id);
3041
}
3042
#endif
3043
3044
/* **************************************************** */
3045
3046
0
void ZMQParserInterface::lua(lua_State *vm) {
3047
0
  ZMQ_RemoteStats *zrs = zmq_remote_stats;
3048
0
  std::map<u_int32_t, ZMQ_RemoteStats *>::iterator it;
3049
3050
0
  NetworkInterface::lua(vm);
3051
3052
  /* ************************************* */
3053
3054
0
  lua_newtable(vm);
3055
3056
0
  lock.rdlock(__FILE__, __LINE__);
3057
3058
0
  for (it = source_id_last_zmq_remote_stats.begin();
3059
0
       it != source_id_last_zmq_remote_stats.end(); ++it) {
3060
0
    ZMQ_RemoteStats *zrs = it->second;
3061
3062
0
    lua_newtable(vm);
3063
3064
    // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s (%u)", zrs->remote_ifname,
3065
    // it->first);
3066
3067
0
    lua_push_str_table_entry(vm, "remote.name", zrs->remote_ifname);
3068
0
    lua_push_str_table_entry(vm, "remote.if_addr", zrs->remote_ifaddress);
3069
0
    lua_push_uint64_table_entry(vm, "remote.ifspeed", zrs->remote_ifspeed);
3070
0
    lua_push_str_table_entry(vm, "probe.ip", zrs->remote_probe_address);
3071
0
    lua_push_str_table_entry(vm, "probe.public_ip",
3072
0
                             zrs->remote_probe_public_address);
3073
0
    lua_push_str_table_entry(vm, "probe.probe_version",
3074
0
                             zrs->remote_probe_version);
3075
0
    lua_push_str_table_entry(vm, "probe.probe_os", zrs->remote_probe_os);
3076
0
    lua_push_str_table_entry(vm, "probe.probe_license",
3077
0
                             zrs->remote_probe_license);
3078
0
    lua_push_str_table_entry(vm, "probe.probe_edition",
3079
0
                             zrs->remote_probe_edition);
3080
0
    lua_push_str_table_entry(vm, "probe.probe_maintenance",
3081
0
                             zrs->remote_probe_maintenance);
3082
3083
0
    lua_pushstring(vm, std::to_string(it->first).c_str() /* The source_id as string (can't use integers or Lua will think it's an array ) */);
3084
0
    lua_insert(vm, -2);
3085
0
    lua_settable(vm, -3);
3086
0
  }
3087
3088
0
  lock.unlock(__FILE__, __LINE__);
3089
3090
0
  lua_pushstring(vm, "probes");
3091
0
  lua_insert(vm, -2);
3092
0
  lua_settable(vm, -3);
3093
3094
  /* ************************************* */
3095
3096
0
  if (zrs) {
3097
0
    lua_push_uint64_table_entry(
3098
0
        vm, "probe.remote_time",
3099
0
        zrs->remote_time); /* remote time when last event has been sent */
3100
0
    lua_push_uint64_table_entry(
3101
0
        vm, "probe.local_time",
3102
0
        zrs->local_time); /* local time when last event has been received */
3103
3104
0
    lua_push_uint64_table_entry(
3105
0
        vm, "zmq.num_flow_exports",
3106
0
        zrs->num_flow_exports - zmq_remote_initial_exported_flows);
3107
0
    lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
3108
3109
0
    if (zrs->export_queue_full > 0)
3110
0
      lua_push_uint64_table_entry(vm, "zmq.drops.export_queue_full",
3111
0
                                  zrs->export_queue_full);
3112
0
    if (zrs->flow_collection_drops)
3113
0
      lua_push_uint64_table_entry(vm, "zmq.drops.flow_collection_drops",
3114
0
                                  zrs->flow_collection_drops);
3115
0
    if (zrs->flow_collection_udp_socket_drops)
3116
0
      lua_push_uint64_table_entry(vm,
3117
0
                                  "zmq.drops.flow_collection_udp_socket_drops",
3118
0
                                  zrs->flow_collection_udp_socket_drops);
3119
3120
0
    lua_push_uint64_table_entry(vm, "timeout.lifetime",
3121
0
                                zrs->remote_lifetime_timeout);
3122
0
    lua_push_uint64_table_entry(vm, "timeout.collected_lifetime",
3123
0
                                zrs->remote_collected_lifetime_timeout);
3124
0
    lua_push_uint64_table_entry(vm, "timeout.idle", zrs->remote_idle_timeout);
3125
0
  }
3126
0
}
3127
3128
/* **************************************************** */
3129
3130
0
void ZMQParserInterface::loadVLANMappings() {
3131
0
  char **keys, **values, buf[64];
3132
0
  int rc;
3133
0
  Redis *redis = ntop->getRedis();
3134
3135
0
  top_vlan_id = 0;
3136
3137
0
  snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id());
3138
3139
0
  rc = redis->hashGetAll(buf, &keys, &values);
3140
3141
0
  if(rc > 0) {
3142
0
    for (int i = 0; i < rc; i++) {
3143
0
      if(values[i] && keys[i]) {
3144
0
  u_int16_t v = atoi(values[i]);
3145
3146
0
  if(v > top_vlan_id) top_vlan_id = v;
3147
0
  name_to_vlan[keys[i]] = v;
3148
0
      }
3149
3150
0
      if (values[i]) free(values[i]);
3151
0
      if (keys[i])   free(keys[i]);
3152
0
    }
3153
3154
0
    free(keys);
3155
0
    free(values);
3156
0
  }
3157
0
}
3158
3159
/* **************************************************** */
3160
3161
0
u_int16_t ZMQParserInterface::findVLANMapping(std::string name) {
3162
0
  std::unordered_map<std::string, u_int16_t>::iterator it = name_to_vlan.find(name);
3163
3164
0
  if(it != name_to_vlan.end())
3165
0
    return(it->second);
3166
0
  else if(top_vlan_id < 4095) {
3167
0
    char value[16], buf[64];
3168
0
    u_int16_t id = ++top_vlan_id;
3169
0
    Redis *redis = ntop->getRedis();
3170
3171
0
    if(id >= 4096) return(0 /* too many vlans */);
3172
3173
0
    snprintf(value, sizeof(value), "%u", id);
3174
3175
0
    snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id());
3176
0
    redis->hashSet(buf, name.c_str(), value);
3177
3178
    /* Add VLAN mapping */
3179
0
    redis->hashSet(NTOPNG_VLAN_ALIASES, value, name.c_str());
3180
3181
0
    name_to_vlan[name] = id;
3182
0
    ntop->getTrace()->traceEvent(TRACE_NORMAL, "Added %s = %d", name.c_str(), id);
3183
0
    return(id);
3184
0
  } else
3185
0
    return(0);
3186
0
}
3187
3188
/* **************************************************** */
3189
3190
#endif