Coverage Report

Created: 2026-01-16 06:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/samba/source3/smbd/notifyd/notifyd.c
Line
Count
Source
1
/*
2
 * Unix SMB/CIFS implementation.
3
 *
4
 * Copyright (C) Volker Lendecke 2014
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
 */
19
20
#include "replace.h"
21
#include <tevent.h>
22
#include "notifyd_private.h"
23
#include "lib/util/server_id.h"
24
#include "lib/util/data_blob.h"
25
#include "librpc/gen_ndr/notify.h"
26
#include "librpc/gen_ndr/messaging.h"
27
#include "librpc/gen_ndr/server_id.h"
28
#include "lib/dbwrap/dbwrap.h"
29
#include "lib/dbwrap/dbwrap_rbt.h"
30
#include "messages.h"
31
#include "tdb.h"
32
#include "util_tdb.h"
33
#include "notifyd.h"
34
#include "lib/util/server_id_db.h"
35
#include "lib/util/tevent_unix.h"
36
#include "lib/util/tevent_ntstatus.h"
37
#include "ctdbd_conn.h"
38
#include "ctdb_srvids.h"
39
#include "server_id_db_util.h"
40
#include "lib/util/iov_buf.h"
41
#include "messages_util.h"
42
43
#ifdef CLUSTER_SUPPORT
44
#include "ctdb_protocol.h"
45
#endif
46
47
struct notifyd_peer;
48
49
/*
50
 * All of notifyd's state
51
 */
52
53
struct notifyd_state {
54
  struct tevent_context *ev;
55
  struct messaging_context *msg_ctx;
56
  struct ctdbd_connection *ctdbd_conn;
57
58
  /*
59
   * Database of everything clients show interest in. Indexed by
60
   * absolute path. The database keys are not 0-terminated
61
   * to allow the critical operation, notifyd_trigger, to walk
62
   * the structure from the top without adding intermediate 0s.
63
   * The database records contain an array of
64
   *
65
   * struct notifyd_instance
66
   *
67
   * to be maintained and parsed by notifyd_parse_entry()
68
   */
69
  struct db_context *entries;
70
71
  /*
72
   * In the cluster case, this is the place where we store a log
73
   * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
74
   * forward them to our peer notifyd's in the cluster once a
75
   * second or when the log grows too large.
76
   */
77
78
  struct messaging_reclog *log;
79
80
  /*
81
   * Array of companion notifyd's in a cluster. Every notifyd
82
   * broadcasts its messaging_reclog to every other notifyd in
83
   * the cluster. This is done by making ctdb send a message to
84
   * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
85
   * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
86
   * had called register_with_ctdbd this srvid will receive the
87
   * broadcasts.
88
   *
89
   * Database replication happens via these broadcasts. Also,
90
   * they serve as liveness indication. If a notifyd receives a
91
   * broadcast from an unknown peer, it will create one for this
92
   * srvid. Also when we don't hear anything from a peer for a
93
   * while, we will discard it.
94
   */
95
96
  struct notifyd_peer **peers;
97
  size_t num_peers;
98
99
  sys_notify_watch_fn sys_notify_watch;
100
  struct sys_notify_context *sys_notify_ctx;
101
};
102
103
struct notifyd_peer {
104
  struct notifyd_state *state;
105
  struct server_id pid;
106
  uint64_t rec_index;
107
  struct db_context *db;
108
  time_t last_broadcast;
109
};
110
111
static void notifyd_rec_change(struct messaging_context *msg_ctx,
112
             void *private_data, uint32_t msg_type,
113
             struct server_id src, DATA_BLOB *data);
114
static void notifyd_trigger(struct messaging_context *msg_ctx,
115
          void *private_data, uint32_t msg_type,
116
          struct server_id src, DATA_BLOB *data);
117
static void notifyd_get_db(struct messaging_context *msg_ctx,
118
         void *private_data, uint32_t msg_type,
119
         struct server_id src, DATA_BLOB *data);
120
121
#ifdef CLUSTER_SUPPORT
122
static void notifyd_got_db(struct messaging_context *msg_ctx,
123
         void *private_data, uint32_t msg_type,
124
         struct server_id src, DATA_BLOB *data);
125
static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
126
             struct server_id src,
127
             struct messaging_reclog *log);
128
#endif
129
static void notifyd_sys_callback(struct sys_notify_context *ctx,
130
         void *private_data, struct notify_event *ev,
131
         uint32_t filter);
132
133
#ifdef CLUSTER_SUPPORT
134
static struct tevent_req *notifyd_broadcast_reclog_send(
135
  TALLOC_CTX *mem_ctx, struct tevent_context *ev,
136
  struct ctdbd_connection *ctdbd_conn, struct server_id src,
137
  struct messaging_reclog *log);
138
static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
139
140
static struct tevent_req *notifyd_clean_peers_send(
141
  TALLOC_CTX *mem_ctx, struct tevent_context *ev,
142
  struct notifyd_state *notifyd);
143
static int notifyd_clean_peers_recv(struct tevent_req *req);
144
#endif
145
146
static int sys_notify_watch_dummy(
147
  TALLOC_CTX *mem_ctx,
148
  struct sys_notify_context *ctx,
149
  const char *path,
150
  uint32_t *filter,
151
  uint32_t *subdir_filter,
152
  void (*callback)(struct sys_notify_context *ctx,
153
       void *private_data,
154
       struct notify_event *ev,
155
       uint32_t filter),
156
  void *private_data,
157
  void *handle_p)
158
0
{
159
0
  void **handle = handle_p;
160
0
  *handle = NULL;
161
0
  return 0;
162
0
}
163
164
#ifdef CLUSTER_SUPPORT
165
static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
166
static void notifyd_clean_peers_finished(struct tevent_req *subreq);
167
static int notifyd_snoop_broadcast(struct tevent_context *ev,
168
           uint32_t src_vnn, uint32_t dst_vnn,
169
           uint64_t dst_srvid,
170
           const uint8_t *msg, size_t msglen,
171
           void *private_data);
172
#endif
173
174
struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
175
        struct messaging_context *msg_ctx,
176
        struct ctdbd_connection *ctdbd_conn,
177
        sys_notify_watch_fn sys_notify_watch,
178
        struct sys_notify_context *sys_notify_ctx)
179
0
{
180
0
  struct tevent_req *req;
181
#ifdef CLUSTER_SUPPORT
182
  struct tevent_req *subreq;
183
#endif
184
0
  struct notifyd_state *state;
185
0
  struct server_id_db *names_db;
186
0
  NTSTATUS status;
187
0
  int ret;
188
189
0
  req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
190
0
  if (req == NULL) {
191
0
    return NULL;
192
0
  }
193
0
  state->ev = ev;
194
0
  state->msg_ctx = msg_ctx;
195
0
  state->ctdbd_conn = ctdbd_conn;
196
197
0
  if (sys_notify_watch == NULL) {
198
0
    sys_notify_watch = sys_notify_watch_dummy;
199
0
  }
200
201
0
  state->sys_notify_watch = sys_notify_watch;
202
0
  state->sys_notify_ctx = sys_notify_ctx;
203
204
0
  state->entries = db_open_rbt(state);
205
0
  if (tevent_req_nomem(state->entries, req)) {
206
0
    return tevent_req_post(req, ev);
207
0
  }
208
209
0
  status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
210
0
            notifyd_rec_change);
211
0
  if (tevent_req_nterror(req, status)) {
212
0
    return tevent_req_post(req, ev);
213
0
  }
214
215
0
  status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
216
0
            notifyd_trigger);
217
0
  if (tevent_req_nterror(req, status)) {
218
0
    goto deregister_rec_change;
219
0
  }
220
221
0
  status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
222
0
            notifyd_get_db);
223
0
  if (tevent_req_nterror(req, status)) {
224
0
    goto deregister_trigger;
225
0
  }
226
227
0
  names_db = messaging_names_db(msg_ctx);
228
229
0
  ret = server_id_db_set_exclusive(names_db, "notify-daemon");
230
0
  if (ret != 0) {
231
0
    DBG_DEBUG("server_id_db_set_exclusive() failed: %s\n",
232
0
        strerror(ret));
233
0
    tevent_req_error(req, ret);
234
0
    goto deregister_get_db;
235
0
  }
236
237
0
  if (ctdbd_conn == NULL) {
238
    /*
239
     * No cluster around, skip the database replication
240
     * engine
241
     */
242
0
    return req;
243
0
  }
244
245
#ifdef CLUSTER_SUPPORT
246
  status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
247
            notifyd_got_db);
248
  if (tevent_req_nterror(req, status)) {
249
    goto deregister_get_db;
250
  }
251
252
  state->log = talloc_zero(state, struct messaging_reclog);
253
  if (tevent_req_nomem(state->log, req)) {
254
    goto deregister_db;
255
  }
256
257
  subreq = notifyd_broadcast_reclog_send(
258
    state->log, ev, ctdbd_conn,
259
    messaging_server_id(msg_ctx),
260
    state->log);
261
  if (tevent_req_nomem(subreq, req)) {
262
    goto deregister_db;
263
  }
264
  tevent_req_set_callback(subreq,
265
        notifyd_broadcast_reclog_finished,
266
        req);
267
268
  subreq = notifyd_clean_peers_send(state, ev, state);
269
  if (tevent_req_nomem(subreq, req)) {
270
    goto deregister_db;
271
  }
272
  tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
273
        req);
274
275
  ret = register_with_ctdbd(ctdbd_conn,
276
          CTDB_SRVID_SAMBA_NOTIFY_PROXY,
277
          notifyd_snoop_broadcast, state);
278
  if (ret != 0) {
279
    tevent_req_error(req, ret);
280
    goto deregister_db;
281
  }
282
#endif
283
284
0
  return req;
285
286
#ifdef CLUSTER_SUPPORT
287
deregister_db:
288
  messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
289
#endif
290
0
deregister_get_db:
291
0
  messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
292
0
deregister_trigger:
293
0
  messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
294
0
deregister_rec_change:
295
0
  messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
296
0
  return tevent_req_post(req, ev);
297
0
}
298
299
#ifdef CLUSTER_SUPPORT
300
301
static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
302
{
303
  struct tevent_req *req = tevent_req_callback_data(
304
    subreq, struct tevent_req);
305
  int ret;
306
307
  ret = notifyd_broadcast_reclog_recv(subreq);
308
  TALLOC_FREE(subreq);
309
  tevent_req_error(req, ret);
310
}
311
312
static void notifyd_clean_peers_finished(struct tevent_req *subreq)
313
{
314
  struct tevent_req *req = tevent_req_callback_data(
315
    subreq, struct tevent_req);
316
  int ret;
317
318
  ret = notifyd_clean_peers_recv(subreq);
319
  TALLOC_FREE(subreq);
320
  tevent_req_error(req, ret);
321
}
322
323
#endif
324
325
int notifyd_recv(struct tevent_req *req)
326
0
{
327
0
  return tevent_req_simple_recv_unix(req);
328
0
}
329
330
static bool notifyd_apply_rec_change(
331
  const struct server_id *client,
332
  const char *path, size_t pathlen,
333
  const struct notify_instance *chg,
334
  struct db_context *entries,
335
  sys_notify_watch_fn sys_notify_watch,
336
  struct sys_notify_context *sys_notify_ctx,
337
  struct messaging_context *msg_ctx)
338
0
{
339
0
  struct db_record *rec = NULL;
340
0
  struct notifyd_watcher watcher = {};
341
0
  struct notifyd_instance *instances = NULL;
342
0
  size_t num_instances;
343
0
  size_t i;
344
0
  struct notifyd_instance *instance = NULL;
345
0
  TDB_DATA value;
346
0
  NTSTATUS status;
347
0
  bool ok = false;
348
0
  bool new_watcher = false;
349
350
0
  if (pathlen == 0) {
351
0
    DBG_WARNING("pathlen==0\n");
352
0
    return false;
353
0
  }
354
0
  if (path[pathlen-1] != '\0') {
355
0
    DBG_WARNING("path not 0-terminated\n");
356
0
    return false;
357
0
  }
358
359
0
  DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", "
360
0
      "private_data=%p\n",
361
0
      path,
362
0
      chg->filter,
363
0
      chg->subdir_filter,
364
0
      chg->private_data);
365
366
0
  rec = dbwrap_fetch_locked(
367
0
    entries, entries,
368
0
    make_tdb_data((const uint8_t *)path, pathlen-1));
369
370
0
  if (rec == NULL) {
371
0
    DBG_WARNING("dbwrap_fetch_locked failed\n");
372
0
    goto fail;
373
0
  }
374
375
0
  num_instances = 0;
376
0
  value = dbwrap_record_get_value(rec);
377
378
0
  if (value.dsize != 0) {
379
0
    ok = notifyd_parse_entry(value.dptr,
380
0
           value.dsize,
381
0
           &watcher,
382
0
           NULL,
383
0
           &num_instances);
384
0
    if (!ok) {
385
0
      goto fail;
386
0
    }
387
0
  }
388
389
  /*
390
   * Overallocate by one instance to avoid a realloc when adding
391
   */
392
0
  instances = talloc_array(rec, struct notifyd_instance,
393
0
         num_instances + 1);
394
0
  if (instances == NULL) {
395
0
    DBG_WARNING("talloc failed\n");
396
0
    goto fail;
397
0
  }
398
399
0
  if (num_instances > 0) {
400
0
    struct notifyd_instance *tmp = NULL;
401
0
    size_t num_tmp = 0;
402
403
0
    ok = notifyd_parse_entry(value.dptr,
404
0
           value.dsize,
405
0
           NULL,
406
0
           &tmp,
407
0
           &num_tmp);
408
0
    if (!ok) {
409
0
      goto fail;
410
0
    }
411
412
0
    memcpy(instances,
413
0
           tmp,
414
0
           sizeof(struct notifyd_instance) * num_tmp);
415
0
  }
416
417
0
  for (i=0; i<num_instances; i++) {
418
0
    instance = &instances[i];
419
420
0
    if (server_id_equal(&instance->client, client) &&
421
0
        (instance->instance.private_data == chg->private_data)) {
422
0
      break;
423
0
    }
424
0
  }
425
426
0
  if (i < num_instances) {
427
0
    instance->instance = *chg;
428
0
  } else {
429
    /*
430
     * We've overallocated for one instance
431
     */
432
0
    instance = &instances[num_instances];
433
434
0
    *instance = (struct notifyd_instance) {
435
0
      .client = *client,
436
0
      .instance = *chg,
437
0
    };
438
439
0
    num_instances += 1;
440
0
  }
441
442
  /*
443
   * Calculate an intersection of the instances filters for the watcher.
444
   */
445
0
  if (instance->instance.filter > 0) {
446
0
    uint32_t filter = instance->instance.filter;
447
448
0
    if ((watcher.filter & filter) != filter) {
449
0
      watcher.filter |= filter;
450
451
0
      new_watcher = true;
452
0
    }
453
0
  }
454
455
  /*
456
   * Calculate an intersection of the instances subdir_filters for the
457
   * watcher.
458
   */
459
0
  if (instance->instance.subdir_filter > 0) {
460
0
    uint32_t subdir_filter = instance->instance.subdir_filter;
461
462
0
    if ((watcher.subdir_filter & subdir_filter) != subdir_filter) {
463
0
      watcher.subdir_filter |= subdir_filter;
464
465
0
      new_watcher = true;
466
0
    }
467
0
  }
468
469
0
  if ((instance->instance.filter == 0) &&
470
0
      (instance->instance.subdir_filter == 0)) {
471
0
    uint32_t tmp_filter = 0;
472
0
    uint32_t tmp_subdir_filter = 0;
473
474
    /* This is a delete request */
475
0
    *instance = instances[num_instances-1];
476
0
    num_instances -= 1;
477
478
0
    for (i = 0; i < num_instances; i++) {
479
0
      struct notifyd_instance *tmp = &instances[i];
480
481
0
      tmp_filter |= tmp->instance.filter;
482
0
      tmp_subdir_filter |= tmp->instance.subdir_filter;
483
0
    }
484
485
    /*
486
     * If the filter has changed, register a new watcher with the
487
     * changed filter.
488
     */
489
0
    if (watcher.filter != tmp_filter ||
490
0
        watcher.subdir_filter != tmp_subdir_filter)
491
0
    {
492
0
      watcher.filter = tmp_filter;
493
0
      watcher.subdir_filter = tmp_subdir_filter;
494
495
0
      new_watcher = true;
496
0
    }
497
0
  }
498
499
0
  if (new_watcher) {
500
    /*
501
     * In case we removed all notify instances, we want to remove
502
     * the watcher. We won't register a new one, if no filters are
503
     * set anymore.
504
     */
505
506
0
    TALLOC_FREE(watcher.sys_watch);
507
508
0
    watcher.sys_filter = watcher.filter;
509
0
    watcher.sys_subdir_filter = watcher.subdir_filter;
510
511
    /*
512
     * Only register a watcher if we have filter.
513
     */
514
0
    if (watcher.filter != 0 || watcher.subdir_filter != 0) {
515
0
      int ret = sys_notify_watch(entries,
516
0
               sys_notify_ctx,
517
0
               path,
518
0
               &watcher.sys_filter,
519
0
               &watcher.sys_subdir_filter,
520
0
               notifyd_sys_callback,
521
0
               msg_ctx,
522
0
               &watcher.sys_watch);
523
0
      if (ret != 0) {
524
0
        DBG_WARNING("sys_notify_watch for [%s] "
525
0
              "returned %s\n",
526
0
              path,
527
0
              strerror(errno));
528
0
      }
529
0
    }
530
0
  }
531
532
0
  DBG_DEBUG("%s has %zu instances\n", path, num_instances);
533
534
0
  if (num_instances == 0) {
535
0
    TALLOC_FREE(watcher.sys_watch);
536
537
0
    status = dbwrap_record_delete(rec);
538
0
    if (!NT_STATUS_IS_OK(status)) {
539
0
      DBG_WARNING("dbwrap_record_delete returned %s\n",
540
0
            nt_errstr(status));
541
0
      goto fail;
542
0
    }
543
0
  } else {
544
0
    struct TDB_DATA iov[2] = {
545
0
      {
546
0
        .dptr = (uint8_t *)&watcher,
547
0
        .dsize = sizeof(struct notifyd_watcher),
548
0
      },
549
0
      {
550
0
        .dptr = (uint8_t *)instances,
551
0
        .dsize = sizeof(struct notifyd_instance) *
552
0
           num_instances,
553
0
      },
554
0
    };
555
556
0
    status = dbwrap_record_storev(rec, iov, ARRAY_SIZE(iov), 0);
557
0
    if (!NT_STATUS_IS_OK(status)) {
558
0
      DBG_WARNING("dbwrap_record_storev returned %s\n",
559
0
            nt_errstr(status));
560
0
      goto fail;
561
0
    }
562
0
  }
563
564
0
  ok = true;
565
0
fail:
566
0
  TALLOC_FREE(rec);
567
0
  return ok;
568
0
}
569
570
static void notifyd_sys_callback(struct sys_notify_context *ctx,
571
         void *private_data, struct notify_event *ev,
572
         uint32_t filter)
573
0
{
574
0
  struct messaging_context *msg_ctx = talloc_get_type_abort(
575
0
    private_data, struct messaging_context);
576
0
  struct notify_trigger_msg msg;
577
0
  struct iovec iov[4];
578
0
  char slash = '/';
579
580
0
  msg = (struct notify_trigger_msg) {
581
0
    .when = timespec_current(),
582
0
    .action = ev->action,
583
0
    .filter = filter,
584
0
  };
585
586
0
  iov[0].iov_base = &msg;
587
0
  iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
588
0
  iov[1].iov_base = discard_const_p(char, ev->dir);
589
0
  iov[1].iov_len = strlen(ev->dir);
590
0
  iov[2].iov_base = &slash;
591
0
  iov[2].iov_len = 1;
592
0
  iov[3].iov_base = discard_const_p(char, ev->path);
593
0
  iov[3].iov_len = strlen(ev->path)+1;
594
595
0
  messaging_send_iov(
596
0
    msg_ctx, messaging_server_id(msg_ctx),
597
0
    MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
598
0
}
599
600
static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
601
             struct notify_rec_change_msg **pmsg,
602
             size_t *pathlen)
603
0
{
604
0
  struct notify_rec_change_msg *msg;
605
606
0
  if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
607
0
    DBG_WARNING("message too short, ignoring: %zu\n", bufsize);
608
0
    return false;
609
0
  }
610
611
0
  *pmsg = msg = (struct notify_rec_change_msg *)buf;
612
0
  *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
613
614
0
  DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", "
615
0
      "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n",
616
0
      msg->instance.filter,
617
0
      msg->instance.subdir_filter,
618
0
      msg->instance.private_data,
619
0
      (int)(*pathlen),
620
0
      msg->path);
621
622
0
  return true;
623
0
}
624
625
static void notifyd_rec_change(struct messaging_context *msg_ctx,
626
             void *private_data, uint32_t msg_type,
627
             struct server_id src, DATA_BLOB *data)
628
0
{
629
0
  struct notifyd_state *state = talloc_get_type_abort(
630
0
    private_data, struct notifyd_state);
631
0
  struct server_id_buf idbuf;
632
0
  struct notify_rec_change_msg *msg;
633
0
  size_t pathlen;
634
0
  bool ok;
635
0
  struct notify_instance instance;
636
637
0
  DBG_DEBUG("Got %zu bytes from %s\n", data->length,
638
0
      server_id_str_buf(src, &idbuf));
639
640
0
  ok = notifyd_parse_rec_change(data->data, data->length,
641
0
              &msg, &pathlen);
642
0
  if (!ok) {
643
0
    return;
644
0
  }
645
646
0
  memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */
647
648
0
  ok = notifyd_apply_rec_change(
649
0
    &src, msg->path, pathlen, &instance,
650
0
    state->entries, state->sys_notify_watch, state->sys_notify_ctx,
651
0
    state->msg_ctx);
652
0
  if (!ok) {
653
0
    DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n");
654
0
    return;
655
0
  }
656
657
0
  if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
658
0
    return;
659
0
  }
660
661
#ifdef CLUSTER_SUPPORT
662
  {
663
664
  struct messaging_rec **tmp;
665
  struct messaging_reclog *log;
666
  struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
667
668
  log = state->log;
669
670
  tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
671
           log->num_recs+1);
672
  if (tmp == NULL) {
673
    DBG_WARNING("talloc_realloc failed, ignoring\n");
674
    return;
675
  }
676
  log->recs = tmp;
677
678
  log->recs[log->num_recs] = messaging_rec_create(
679
    log->recs, src, messaging_server_id(msg_ctx),
680
    msg_type, &iov, 1, NULL, 0);
681
682
  if (log->recs[log->num_recs] == NULL) {
683
    DBG_WARNING("messaging_rec_create failed, ignoring\n");
684
    return;
685
  }
686
687
  log->num_recs += 1;
688
689
  if (log->num_recs >= 100) {
690
    /*
691
     * Don't let the log grow too large
692
     */
693
    notifyd_broadcast_reclog(state->ctdbd_conn,
694
           messaging_server_id(msg_ctx), log);
695
  }
696
697
  }
698
#endif
699
0
}
700
701
struct notifyd_trigger_state {
702
  struct messaging_context *msg_ctx;
703
  struct notify_trigger_msg *msg;
704
  bool recursive;
705
  bool covered_by_sys_notify;
706
};
707
708
static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
709
           void *private_data);
710
711
static void notifyd_trigger(struct messaging_context *msg_ctx,
712
          void *private_data, uint32_t msg_type,
713
          struct server_id src, DATA_BLOB *data)
714
0
{
715
0
  struct notifyd_state *state = talloc_get_type_abort(
716
0
    private_data, struct notifyd_state);
717
0
  struct server_id my_id = messaging_server_id(msg_ctx);
718
0
  struct notifyd_trigger_state tstate;
719
0
  const char *path;
720
0
  const char *p, *next_p;
721
722
0
  if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
723
0
    DBG_WARNING("message too short, ignoring: %zu\n",
724
0
          data->length);
725
0
    return;
726
0
  }
727
0
  if (data->data[data->length-1] != 0) {
728
0
    DBG_WARNING("path not 0-terminated, ignoring\n");;
729
0
    return;
730
0
  }
731
732
0
  tstate.msg_ctx = msg_ctx;
733
734
0
  tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
735
0
  tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
736
737
0
  tstate.msg = (struct notify_trigger_msg *)data->data;
738
0
  path = tstate.msg->path;
739
740
0
  DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", "
741
0
      "path=%s\n",
742
0
      tstate.msg->action,
743
0
      tstate.msg->filter,
744
0
      path);
745
746
0
  if (path[0] != '/') {
747
0
    DBG_WARNING("path %s does not start with /, ignoring\n",
748
0
          path);
749
0
    return;
750
0
  }
751
752
0
  for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
753
0
    ptrdiff_t path_len = p - path;
754
0
    TDB_DATA key;
755
0
    uint32_t i;
756
757
0
    next_p = strchr(p+1, '/');
758
0
    tstate.recursive = (next_p != NULL);
759
760
0
    DBG_DEBUG("Trying path %.*s\n", (int)path_len, path);
761
762
0
    key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
763
0
           .dsize = path_len };
764
765
0
    dbwrap_parse_record(state->entries, key,
766
0
            notifyd_trigger_parser, &tstate);
767
768
0
    if (state->peers == NULL) {
769
0
      continue;
770
0
    }
771
772
0
    if (src.vnn != my_id.vnn) {
773
0
      continue;
774
0
    }
775
776
0
    for (i=0; i<state->num_peers; i++) {
777
0
      if (state->peers[i]->db == NULL) {
778
        /*
779
         * Inactive peer, did not get a db yet
780
         */
781
0
        continue;
782
0
      }
783
0
      dbwrap_parse_record(state->peers[i]->db, key,
784
0
              notifyd_trigger_parser, &tstate);
785
0
    }
786
0
  }
787
0
}
788
789
static void notifyd_send_delete(struct messaging_context *msg_ctx,
790
        TDB_DATA key,
791
        struct notifyd_instance *instance);
792
793
static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
794
           void *private_data)
795
796
0
{
797
0
  struct notifyd_trigger_state *tstate = private_data;
798
0
  struct notify_event_msg msg = { .action = tstate->msg->action,
799
0
          .when = tstate->msg->when };
800
0
  struct iovec iov[2];
801
0
  size_t path_len = key.dsize;
802
0
  struct notifyd_watcher watcher = {};
803
0
  struct notifyd_instance *instances = NULL;
804
0
  size_t num_instances = 0;
805
0
  size_t i;
806
0
  bool ok;
807
808
0
  ok = notifyd_parse_entry(data.dptr,
809
0
         data.dsize,
810
0
         &watcher,
811
0
         &instances,
812
0
         &num_instances);
813
0
  if (!ok) {
814
0
    DBG_DEBUG("Could not parse notifyd_entry\n");
815
0
    return;
816
0
  }
817
818
0
  DBG_DEBUG("Found %zu instances for %.*s\n",
819
0
      num_instances,
820
0
      (int)key.dsize,
821
0
      (char *)key.dptr);
822
823
0
  iov[0].iov_base = &msg;
824
0
  iov[0].iov_len = offsetof(struct notify_event_msg, path);
825
0
  iov[1].iov_base = tstate->msg->path + path_len + 1;
826
0
  iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
827
828
0
  for (i=0; i<num_instances; i++) {
829
0
    struct notifyd_instance *instance = &instances[i];
830
0
    struct server_id_buf idbuf;
831
0
    uint32_t i_filter;
832
0
    NTSTATUS status;
833
834
0
    if (tstate->covered_by_sys_notify) {
835
0
      if (tstate->recursive) {
836
0
        i_filter = watcher.sys_subdir_filter &
837
0
             instance->instance.subdir_filter;
838
0
      } else {
839
0
        i_filter = watcher.sys_filter &
840
0
             instance->instance.filter;
841
0
      }
842
0
    } else {
843
0
      if (tstate->recursive) {
844
0
        i_filter = instance->instance.subdir_filter;
845
0
      } else {
846
0
        i_filter = instance->instance.filter;
847
0
      }
848
0
    }
849
850
0
    if ((i_filter & tstate->msg->filter) == 0) {
851
0
      continue;
852
0
    }
853
854
0
    msg.private_data = instance->instance.private_data;
855
856
0
    status = messaging_send_iov(
857
0
      tstate->msg_ctx, instance->client,
858
0
      MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
859
860
0
    DBG_DEBUG("messaging_send_iov to %s returned %s\n",
861
0
        server_id_str_buf(instance->client, &idbuf),
862
0
        nt_errstr(status));
863
864
0
    if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
865
0
        procid_is_local(&instance->client)) {
866
      /*
867
       * That process has died
868
       */
869
0
      notifyd_send_delete(tstate->msg_ctx, key, instance);
870
0
      continue;
871
0
    }
872
873
0
    if (!NT_STATUS_IS_OK(status)) {
874
0
      DBG_WARNING("messaging_send_iov returned %s\n",
875
0
            nt_errstr(status));
876
0
    }
877
0
  }
878
0
}
879
880
/*
881
 * Send a delete request to ourselves to properly discard a notify
882
 * record for an smbd that has died.
883
 */
884
885
static void notifyd_send_delete(struct messaging_context *msg_ctx,
886
        TDB_DATA key,
887
        struct notifyd_instance *instance)
888
0
{
889
0
  struct notify_rec_change_msg msg = {
890
0
    .instance.private_data = instance->instance.private_data
891
0
  };
892
0
  uint8_t nul = 0;
893
0
  struct iovec iov[3];
894
0
  NTSTATUS status;
895
896
  /*
897
   * Send a rec_change to ourselves to delete a dead entry
898
   */
899
900
0
  iov[0] = (struct iovec) {
901
0
    .iov_base = &msg,
902
0
    .iov_len = offsetof(struct notify_rec_change_msg, path) };
903
0
  iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
904
0
  iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
905
906
0
  status = messaging_send_iov(msg_ctx,
907
0
            instance->client,
908
0
            MSG_SMB_NOTIFY_REC_CHANGE,
909
0
            iov,
910
0
            ARRAY_SIZE(iov),
911
0
            NULL,
912
0
            0);
913
914
0
  if (!NT_STATUS_IS_OK(status)) {
915
0
    DBG_WARNING("messaging_send_iov failed: %s\n",
916
0
          nt_errstr(status));
917
0
  }
918
0
}
919
920
static void notifyd_get_db(struct messaging_context *msg_ctx,
921
         void *private_data, uint32_t msg_type,
922
         struct server_id src, DATA_BLOB *data)
923
0
{
924
0
  struct notifyd_state *state = talloc_get_type_abort(
925
0
    private_data, struct notifyd_state);
926
0
  struct server_id_buf id1, id2;
927
0
  NTSTATUS status;
928
0
  uint64_t rec_index = UINT64_MAX;
929
0
  uint8_t index_buf[sizeof(uint64_t)];
930
0
  size_t dbsize;
931
0
  uint8_t *buf;
932
0
  struct iovec iov[2];
933
934
0
  dbsize = dbwrap_marshall(state->entries, NULL, 0);
935
936
0
  buf = talloc_array(talloc_tos(), uint8_t, dbsize);
937
0
  if (buf == NULL) {
938
0
    DBG_WARNING("talloc_array(%zu) failed\n", dbsize);
939
0
    return;
940
0
  }
941
942
0
  dbsize = dbwrap_marshall(state->entries, buf, dbsize);
943
944
0
  if (dbsize != talloc_get_size(buf)) {
945
0
    DBG_DEBUG("dbsize changed: %zu->%zu\n",
946
0
        talloc_get_size(buf),
947
0
        dbsize);
948
0
    TALLOC_FREE(buf);
949
0
    return;
950
0
  }
951
952
0
  if (state->log != NULL) {
953
0
    rec_index = state->log->rec_index;
954
0
  }
955
0
  SBVAL(index_buf, 0, rec_index);
956
957
0
  iov[0] = (struct iovec) { .iov_base = index_buf,
958
0
          .iov_len = sizeof(index_buf) };
959
0
  iov[1] = (struct iovec) { .iov_base = buf,
960
0
          .iov_len = dbsize };
961
962
0
  DBG_DEBUG("Sending %zu bytes to %s->%s\n",
963
0
      iov_buflen(iov, ARRAY_SIZE(iov)),
964
0
      server_id_str_buf(messaging_server_id(msg_ctx), &id1),
965
0
      server_id_str_buf(src, &id2));
966
967
0
  status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
968
0
            iov, ARRAY_SIZE(iov), NULL, 0);
969
0
  TALLOC_FREE(buf);
970
0
  if (!NT_STATUS_IS_OK(status)) {
971
    DBG_WARNING("messaging_send_iov failed: %s\n",
972
0
          nt_errstr(status));
973
0
  }
974
0
}
975
976
#ifdef CLUSTER_SUPPORT
977
978
static int notifyd_add_proxy_syswatches(struct db_record *rec,
979
          void *private_data);
980
981
static void notifyd_got_db(struct messaging_context *msg_ctx,
982
         void *private_data, uint32_t msg_type,
983
         struct server_id src, DATA_BLOB *data)
984
{
985
  struct notifyd_state *state = talloc_get_type_abort(
986
    private_data, struct notifyd_state);
987
  struct notifyd_peer *p = NULL;
988
  struct server_id_buf idbuf;
989
  NTSTATUS status;
990
  int count;
991
  size_t i;
992
993
  for (i=0; i<state->num_peers; i++) {
994
    if (server_id_equal(&src, &state->peers[i]->pid)) {
995
      p = state->peers[i];
996
      break;
997
    }
998
  }
999
1000
  if (p == NULL) {
1001
    DBG_DEBUG("Did not find peer for db from %s\n",
1002
        server_id_str_buf(src, &idbuf));
1003
    return;
1004
  }
1005
1006
  if (data->length < 8) {
1007
    DBG_DEBUG("Got short db length %zu from %s\n", data->length,
1008
         server_id_str_buf(src, &idbuf));
1009
    TALLOC_FREE(p);
1010
    return;
1011
  }
1012
1013
  p->rec_index = BVAL(data->data, 0);
1014
1015
  p->db = db_open_rbt(p);
1016
  if (p->db == NULL) {
1017
    DBG_DEBUG("db_open_rbt failed\n");
1018
    TALLOC_FREE(p);
1019
    return;
1020
  }
1021
1022
  status = dbwrap_unmarshall(p->db, data->data + 8,
1023
           data->length - 8);
1024
  if (!NT_STATUS_IS_OK(status)) {
1025
    DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n",
1026
        nt_errstr(status),
1027
        server_id_str_buf(src, &idbuf));
1028
    TALLOC_FREE(p);
1029
    return;
1030
  }
1031
1032
  dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
1033
           &count);
1034
1035
  DBG_DEBUG("Database from %s contained %d records\n",
1036
      server_id_str_buf(src, &idbuf),
1037
      count);
1038
}
1039
1040
static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
1041
             struct server_id src,
1042
             struct messaging_reclog *log)
1043
{
1044
  enum ndr_err_code ndr_err;
1045
  uint8_t msghdr[MESSAGE_HDR_LENGTH];
1046
  DATA_BLOB blob;
1047
  struct iovec iov[2];
1048
  int ret;
1049
1050
  if (log == NULL) {
1051
    return;
1052
  }
1053
1054
  DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n",
1055
      log->rec_index,
1056
      log->num_recs);
1057
1058
  message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
1059
      (struct server_id) {0 });
1060
  iov[0] = (struct iovec) { .iov_base = msghdr,
1061
          .iov_len = sizeof(msghdr) };
1062
1063
  ndr_err = ndr_push_struct_blob(
1064
    &blob, log, log,
1065
    (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1066
  if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1067
    DBG_WARNING("ndr_push_messaging_recs failed: %s\n",
1068
          ndr_errstr(ndr_err));
1069
    goto done;
1070
  }
1071
  iov[1] = (struct iovec) { .iov_base = blob.data,
1072
          .iov_len = blob.length };
1073
1074
  ret = ctdbd_messaging_send_iov(
1075
    ctdbd_conn, CTDB_BROADCAST_CONNECTED,
1076
    CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1077
  TALLOC_FREE(blob.data);
1078
  if (ret != 0) {
1079
    DBG_WARNING("ctdbd_messaging_send failed: %s\n",
1080
          strerror(ret));
1081
    goto done;
1082
  }
1083
1084
  log->rec_index += 1;
1085
1086
done:
1087
  log->num_recs = 0;
1088
  TALLOC_FREE(log->recs);
1089
}
1090
1091
struct notifyd_broadcast_reclog_state {
1092
  struct tevent_context *ev;
1093
  struct ctdbd_connection *ctdbd_conn;
1094
  struct server_id src;
1095
  struct messaging_reclog *log;
1096
};
1097
1098
static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1099
1100
static struct tevent_req *notifyd_broadcast_reclog_send(
1101
  TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1102
  struct ctdbd_connection *ctdbd_conn, struct server_id src,
1103
  struct messaging_reclog *log)
1104
{
1105
  struct tevent_req *req, *subreq;
1106
  struct notifyd_broadcast_reclog_state *state;
1107
1108
  req = tevent_req_create(mem_ctx, &state,
1109
        struct notifyd_broadcast_reclog_state);
1110
  if (req == NULL) {
1111
    return NULL;
1112
  }
1113
  state->ev = ev;
1114
  state->ctdbd_conn = ctdbd_conn;
1115
  state->src = src;
1116
  state->log = log;
1117
1118
  subreq = tevent_wakeup_send(state, state->ev,
1119
            timeval_current_ofs_msec(1000));
1120
  if (tevent_req_nomem(subreq, req)) {
1121
    return tevent_req_post(req, ev);
1122
  }
1123
  tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1124
  return req;
1125
}
1126
1127
static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1128
{
1129
  struct tevent_req *req = tevent_req_callback_data(
1130
    subreq, struct tevent_req);
1131
  struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1132
    req, struct notifyd_broadcast_reclog_state);
1133
  bool ok;
1134
1135
  ok = tevent_wakeup_recv(subreq);
1136
  TALLOC_FREE(subreq);
1137
  if (!ok) {
1138
    tevent_req_oom(req);
1139
    return;
1140
  }
1141
1142
  notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1143
1144
  subreq = tevent_wakeup_send(state, state->ev,
1145
            timeval_current_ofs_msec(1000));
1146
  if (tevent_req_nomem(subreq, req)) {
1147
    return;
1148
  }
1149
  tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1150
}
1151
1152
static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1153
{
1154
  return tevent_req_simple_recv_unix(req);
1155
}
1156
1157
struct notifyd_clean_peers_state {
1158
  struct tevent_context *ev;
1159
  struct notifyd_state *notifyd;
1160
};
1161
1162
static void notifyd_clean_peers_next(struct tevent_req *subreq);
1163
1164
static struct tevent_req *notifyd_clean_peers_send(
1165
  TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1166
  struct notifyd_state *notifyd)
1167
{
1168
  struct tevent_req *req, *subreq;
1169
  struct notifyd_clean_peers_state *state;
1170
1171
  req = tevent_req_create(mem_ctx, &state,
1172
        struct notifyd_clean_peers_state);
1173
  if (req == NULL) {
1174
    return NULL;
1175
  }
1176
  state->ev = ev;
1177
  state->notifyd = notifyd;
1178
1179
  subreq = tevent_wakeup_send(state, state->ev,
1180
            timeval_current_ofs_msec(30000));
1181
  if (tevent_req_nomem(subreq, req)) {
1182
    return tevent_req_post(req, ev);
1183
  }
1184
  tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1185
  return req;
1186
}
1187
1188
static void notifyd_clean_peers_next(struct tevent_req *subreq)
1189
{
1190
  struct tevent_req *req = tevent_req_callback_data(
1191
    subreq, struct tevent_req);
1192
  struct notifyd_clean_peers_state *state = tevent_req_data(
1193
    req, struct notifyd_clean_peers_state);
1194
  struct notifyd_state *notifyd = state->notifyd;
1195
  size_t i;
1196
  bool ok;
1197
  time_t now = time(NULL);
1198
1199
  ok = tevent_wakeup_recv(subreq);
1200
  TALLOC_FREE(subreq);
1201
  if (!ok) {
1202
    tevent_req_oom(req);
1203
    return;
1204
  }
1205
1206
  i = 0;
1207
  while (i < notifyd->num_peers) {
1208
    struct notifyd_peer *p = notifyd->peers[i];
1209
1210
    if ((now - p->last_broadcast) > 60) {
1211
      struct server_id_buf idbuf;
1212
1213
      /*
1214
       * Haven't heard for more than 60 seconds. Call this
1215
       * peer dead
1216
       */
1217
1218
      DBG_DEBUG("peer %s died\n",
1219
          server_id_str_buf(p->pid, &idbuf));
1220
      /*
1221
       * This implicitly decrements notifyd->num_peers
1222
       */
1223
      TALLOC_FREE(p);
1224
    } else {
1225
      i += 1;
1226
    }
1227
  }
1228
1229
  subreq = tevent_wakeup_send(state, state->ev,
1230
            timeval_current_ofs_msec(30000));
1231
  if (tevent_req_nomem(subreq, req)) {
1232
    return;
1233
  }
1234
  tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1235
}
1236
1237
static int notifyd_clean_peers_recv(struct tevent_req *req)
1238
{
1239
  return tevent_req_simple_recv_unix(req);
1240
}
1241
1242
static int notifyd_add_proxy_syswatches(struct db_record *rec,
1243
          void *private_data)
1244
{
1245
  struct notifyd_state *state = talloc_get_type_abort(
1246
    private_data, struct notifyd_state);
1247
  struct db_context *db = dbwrap_record_get_db(rec);
1248
  TDB_DATA key = dbwrap_record_get_key(rec);
1249
  TDB_DATA value = dbwrap_record_get_value(rec);
1250
  struct notifyd_watcher watcher = {};
1251
  char path[key.dsize+1];
1252
  bool ok;
1253
  int ret;
1254
1255
  memcpy(path, key.dptr, key.dsize);
1256
  path[key.dsize] = '\0';
1257
1258
  /* This is a remote database, we just need the watcher. */
1259
  ok = notifyd_parse_entry(value.dptr, value.dsize, &watcher, NULL, NULL);
1260
  if (!ok) {
1261
    DBG_WARNING("Could not parse notifyd entry for %s\n", path);
1262
    return 0;
1263
  }
1264
1265
  watcher.sys_watch = NULL;
1266
  watcher.sys_filter = watcher.filter;
1267
  watcher.sys_subdir_filter = watcher.subdir_filter;
1268
1269
  ret = state->sys_notify_watch(db,
1270
              state->sys_notify_ctx,
1271
              path,
1272
              &watcher.filter,
1273
              &watcher.subdir_filter,
1274
              notifyd_sys_callback,
1275
              state->msg_ctx,
1276
              &watcher.sys_watch);
1277
  if (ret != 0) {
1278
    DBG_WARNING("inotify_watch returned %s\n", strerror(errno));
1279
  }
1280
1281
  memcpy(value.dptr, &watcher, sizeof(struct notifyd_watcher));
1282
1283
  return 0;
1284
}
1285
1286
static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1287
{
1288
  TDB_DATA key = dbwrap_record_get_key(rec);
1289
  TDB_DATA value = dbwrap_record_get_value(rec);
1290
  struct notifyd_watcher watcher = {};
1291
  bool ok;
1292
1293
  ok = notifyd_parse_entry(value.dptr, value.dsize, &watcher, NULL, NULL);
1294
  if (!ok) {
1295
    DBG_WARNING("Could not parse notifyd entry for %.*s\n",
1296
          (int)key.dsize, (char *)key.dptr);
1297
    return 0;
1298
  }
1299
  TALLOC_FREE(watcher.sys_watch);
1300
1301
  return 0;
1302
}
1303
1304
static int notifyd_peer_destructor(struct notifyd_peer *p)
1305
{
1306
  struct notifyd_state *state = p->state;
1307
  size_t i;
1308
1309
  if (p->db != NULL) {
1310
    dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1311
             NULL, NULL);
1312
  }
1313
1314
  for (i = 0; i<state->num_peers; i++) {
1315
    if (p == state->peers[i]) {
1316
      state->peers[i] = state->peers[state->num_peers-1];
1317
      state->num_peers -= 1;
1318
      break;
1319
    }
1320
  }
1321
  return 0;
1322
}
1323
1324
static struct notifyd_peer *notifyd_peer_new(
1325
  struct notifyd_state *state, struct server_id pid)
1326
{
1327
  struct notifyd_peer *p, **tmp;
1328
1329
  tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1330
           state->num_peers+1);
1331
  if (tmp == NULL) {
1332
    return NULL;
1333
  }
1334
  state->peers = tmp;
1335
1336
  p = talloc_zero(state->peers, struct notifyd_peer);
1337
  if (p == NULL) {
1338
    return NULL;
1339
  }
1340
  p->state = state;
1341
  p->pid = pid;
1342
1343
  state->peers[state->num_peers] = p;
1344
  state->num_peers += 1;
1345
1346
  talloc_set_destructor(p, notifyd_peer_destructor);
1347
1348
  return p;
1349
}
1350
1351
static void notifyd_apply_reclog(struct notifyd_peer *peer,
1352
         const uint8_t *msg, size_t msglen)
1353
{
1354
  struct notifyd_state *state = peer->state;
1355
  DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1356
         .length = msglen };
1357
  struct server_id_buf idbuf;
1358
  struct messaging_reclog *log;
1359
  enum ndr_err_code ndr_err;
1360
  uint32_t i;
1361
1362
  if (peer->db == NULL) {
1363
    /*
1364
     * No db yet
1365
     */
1366
    return;
1367
  }
1368
1369
  log = talloc(peer, struct messaging_reclog);
1370
  if (log == NULL) {
1371
    DBG_DEBUG("talloc failed\n");
1372
    return;
1373
  }
1374
1375
  ndr_err = ndr_pull_struct_blob_all(
1376
    &blob, log, log,
1377
    (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1378
  if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1379
    DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n",
1380
        ndr_errstr(ndr_err));
1381
    goto fail;
1382
  }
1383
1384
  DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n",
1385
      log->num_recs,
1386
      log->rec_index,
1387
      server_id_str_buf(peer->pid, &idbuf));
1388
1389
  if (log->rec_index != peer->rec_index) {
1390
    DBG_INFO("Got rec index %"PRIu64" from %s, "
1391
       "expected %"PRIu64"\n",
1392
       log->rec_index,
1393
       server_id_str_buf(peer->pid, &idbuf),
1394
       peer->rec_index);
1395
    goto fail;
1396
  }
1397
1398
  for (i=0; i<log->num_recs; i++) {
1399
    struct messaging_rec *r = log->recs[i];
1400
    struct notify_rec_change_msg *chg;
1401
    size_t pathlen;
1402
    bool ok;
1403
    struct notify_instance instance;
1404
1405
    ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1406
                &chg, &pathlen);
1407
    if (!ok) {
1408
      DBG_INFO("notifyd_parse_rec_change failed\n");
1409
      goto fail;
1410
    }
1411
1412
    /* avoid SIGBUS */
1413
    memcpy(&instance, &chg->instance, sizeof(instance));
1414
1415
    ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1416
                &instance, peer->db,
1417
                state->sys_notify_watch,
1418
                state->sys_notify_ctx,
1419
                state->msg_ctx);
1420
    if (!ok) {
1421
      DBG_INFO("notifyd_apply_rec_change failed\n");
1422
      goto fail;
1423
    }
1424
  }
1425
1426
  peer->rec_index += 1;
1427
  peer->last_broadcast = time(NULL);
1428
1429
  TALLOC_FREE(log);
1430
  return;
1431
1432
fail:
1433
  DBG_DEBUG("Dropping peer %s\n",
1434
      server_id_str_buf(peer->pid, &idbuf));
1435
  TALLOC_FREE(peer);
1436
}
1437
1438
/*
1439
 * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1440
 * messages) broadcasts by other notifyds. Several cases:
1441
 *
1442
 * We don't know the source. This creates a new peer. Creating a peer
1443
 * involves asking the peer for its full database. We assume ordered
1444
 * messages, so the new database will arrive before the next broadcast
1445
 * will.
1446
 *
1447
 * We know the source and the log index matches. We will apply the log
1448
 * locally to our peer's db as if we had received it from a local
1449
 * client.
1450
 *
1451
 * We know the source but the log index does not match. This means we
1452
 * lost a message. We just drop the whole peer and wait for the next
1453
 * broadcast, which will then trigger a fresh database pull.
1454
 */
1455
1456
static int notifyd_snoop_broadcast(struct tevent_context *ev,
1457
           uint32_t src_vnn, uint32_t dst_vnn,
1458
           uint64_t dst_srvid,
1459
           const uint8_t *msg, size_t msglen,
1460
           void *private_data)
1461
{
1462
  struct notifyd_state *state = talloc_get_type_abort(
1463
    private_data, struct notifyd_state);
1464
  struct server_id my_id = messaging_server_id(state->msg_ctx);
1465
  struct notifyd_peer *p;
1466
  uint32_t i;
1467
  uint32_t msg_type;
1468
  struct server_id src, dst;
1469
  struct server_id_buf idbuf;
1470
  NTSTATUS status;
1471
1472
  if (msglen < MESSAGE_HDR_LENGTH) {
1473
    DBG_DEBUG("Got short broadcast\n");
1474
    return 0;
1475
  }
1476
  message_hdr_get(&msg_type, &src, &dst, msg);
1477
1478
  if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1479
    DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type);
1480
    return 0;
1481
  }
1482
  if (server_id_equal(&src, &my_id)) {
1483
    DBG_DEBUG("Ignoring my own broadcast\n");
1484
    return 0;
1485
  }
1486
1487
  DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1488
       server_id_str_buf(src, &idbuf));
1489
1490
  for (i=0; i<state->num_peers; i++) {
1491
    if (server_id_equal(&state->peers[i]->pid, &src)) {
1492
1493
      DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i);
1494
1495
      notifyd_apply_reclog(state->peers[i],
1496
               msg + MESSAGE_HDR_LENGTH,
1497
               msglen - MESSAGE_HDR_LENGTH);
1498
      return 0;
1499
    }
1500
  }
1501
1502
  DBG_DEBUG("Creating new peer for %s\n",
1503
       server_id_str_buf(src, &idbuf));
1504
1505
  p = notifyd_peer_new(state, src);
1506
  if (p == NULL) {
1507
    DBG_DEBUG("notifyd_peer_new failed\n");
1508
    return 0;
1509
  }
1510
1511
  status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1512
            NULL, 0);
1513
  if (!NT_STATUS_IS_OK(status)) {
1514
    DBG_DEBUG("messaging_send_buf failed: %s\n",
1515
        nt_errstr(status));
1516
    TALLOC_FREE(p);
1517
    return 0;
1518
  }
1519
1520
  return 0;
1521
}
1522
#endif