Coverage Report

Created: 2018-09-25 14:53

/src/mozilla-central/netwerk/sctp/datachannel/DataChannel.cpp
Line
Count
Source (jump to first uncovered line)
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5
 * You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include <algorithm>
8
#include <stdio.h>
9
#include <stdlib.h>
10
#if !defined(__Userspace_os_Windows)
11
#include <arpa/inet.h>
12
#endif
13
// usrsctp.h expects to have errno definitions prior to its inclusion.
14
#include <errno.h>
15
16
#define SCTP_DEBUG 1
17
#define SCTP_STDINT_INCLUDE <stdint.h>
18
19
#ifdef _MSC_VER
20
// Disable "warning C4200: nonstandard extension used : zero-sized array in
21
//          struct/union"
22
// ...which the third-party file usrsctp.h runs afoul of.
23
#pragma warning(push)
24
#pragma warning(disable:4200)
25
#endif
26
27
#include "usrsctp.h"
28
29
#ifdef _MSC_VER
30
#pragma warning(pop)
31
#endif
32
33
#include "DataChannelLog.h"
34
35
#include "nsServiceManagerUtils.h"
36
#include "nsIObserverService.h"
37
#include "nsIObserver.h"
38
#include "nsIPrefBranch.h"
39
#include "nsIPrefService.h"
40
#include "mozilla/Services.h"
41
#include "mozilla/Sprintf.h"
42
#include "nsProxyRelease.h"
43
#include "nsThread.h"
44
#include "nsThreadUtils.h"
45
#include "nsAutoPtr.h"
46
#include "nsNetUtil.h"
47
#include "nsNetCID.h"
48
#include "mozilla/StaticPtr.h"
49
#include "mozilla/StaticMutex.h"
50
#include "mozilla/Unused.h"
51
#ifdef MOZ_PEERCONNECTION
52
#include "mtransport/runnable_utils.h"
53
#endif
54
55
0
#define DATACHANNEL_LOG(args) LOG(args)
56
#include "DataChannel.h"
57
#include "DataChannelProtocol.h"
58
59
// Let us turn on and off important assertions in non-debug builds
60
#ifdef DEBUG
61
#define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
62
#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
63
0
#define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
64
#endif
65
66
static bool sctp_initialized;
67
68
namespace mozilla {
69
70
LazyLogModule gDataChannelLog("DataChannel");
71
static LazyLogModule gSCTPLog("SCTP");
72
73
0
#define SCTP_LOG(args) MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
74
75
class DataChannelConnectionShutdown : public nsITimerCallback
76
{
77
public:
78
  explicit DataChannelConnectionShutdown(DataChannelConnection* aConnection)
79
    : mConnection(aConnection)
80
0
  {
81
0
    mTimer = NS_NewTimer(); // we'll crash if this fails
82
0
    mTimer->InitWithCallback(this, 30*1000, nsITimer::TYPE_ONE_SHOT);
83
0
  }
84
85
  NS_IMETHODIMP Notify(nsITimer* aTimer) override;
86
87
  NS_DECL_THREADSAFE_ISUPPORTS
88
89
private:
90
  virtual ~DataChannelConnectionShutdown()
91
0
  {
92
0
    mTimer->Cancel();
93
0
  }
94
95
  RefPtr<DataChannelConnection> mConnection;
96
  nsCOMPtr<nsITimer> mTimer;
97
};
98
99
class DataChannelShutdown;
100
101
StaticRefPtr<DataChannelShutdown> sDataChannelShutdown;
102
103
class DataChannelShutdown : public nsIObserver
104
{
105
public:
106
  // This needs to be tied to some object that is guaranteed to be
107
  // around (singleton likely) unless we want to shutdown sctp whenever
108
  // we're not using it (and in which case we'd keep a refcnt'd object
109
  // ref'd by each DataChannelConnection to release the SCTP usrlib via
110
  // sctp_finish). Right now, the single instance of this class is
111
  // owned by the observer service and a StaticRefPtr.
112
113
  NS_DECL_ISUPPORTS
114
115
0
  DataChannelShutdown() = default;
116
117
  void Init()
118
0
    {
119
0
      nsCOMPtr<nsIObserverService> observerService =
120
0
        mozilla::services::GetObserverService();
121
0
      if (!observerService)
122
0
        return;
123
0
124
0
      nsresult rv = observerService->AddObserver(this,
125
0
                                                 "xpcom-will-shutdown",
126
0
                                                 false);
127
0
      MOZ_ASSERT(rv == NS_OK);
128
0
      (void) rv;
129
0
    }
130
131
  NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
132
                     const char16_t* aData) override
133
0
  {
134
0
    // Note: MainThread
135
0
    if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
136
0
      LOG(("Shutting down SCTP"));
137
0
      if (sctp_initialized) {
138
0
        usrsctp_finish();
139
0
        sctp_initialized = false;
140
0
      }
141
0
      nsCOMPtr<nsIObserverService> observerService =
142
0
        mozilla::services::GetObserverService();
143
0
      if (!observerService)
144
0
        return NS_ERROR_FAILURE;
145
0
146
0
      nsresult rv = observerService->RemoveObserver(this,
147
0
                                                    "xpcom-will-shutdown");
148
0
      MOZ_ASSERT(rv == NS_OK);
149
0
      (void) rv;
150
0
151
0
      {
152
0
        StaticMutexAutoLock lock(sLock);
153
0
        sConnections = nullptr; // clears as well
154
0
      }
155
0
      sDataChannelShutdown = nullptr;
156
0
    }
157
0
    return NS_OK;
158
0
  }
159
160
  void CreateConnectionShutdown(DataChannelConnection* aConnection)
161
0
  {
162
0
    StaticMutexAutoLock lock(sLock);
163
0
    if (!sConnections) {
164
0
      sConnections = new nsTArray<RefPtr<DataChannelConnectionShutdown>>();
165
0
    }
166
0
    sConnections->AppendElement(new DataChannelConnectionShutdown(aConnection));
167
0
  }
168
169
  void RemoveConnectionShutdown(DataChannelConnectionShutdown* aConnectionShutdown)
170
0
  {
171
0
    StaticMutexAutoLock lock(sLock);
172
0
    if (sConnections) {
173
0
      sConnections->RemoveElement(aConnectionShutdown);
174
0
    }
175
0
  }
176
177
private:
178
  // The only instance of DataChannelShutdown is owned by the observer
179
  // service, so there is no need to call RemoveObserver here.
180
0
  virtual ~DataChannelShutdown() = default;
181
182
  // protects sConnections
183
  static StaticMutex sLock;
184
  static StaticAutoPtr<nsTArray<RefPtr<DataChannelConnectionShutdown>>> sConnections;
185
};
186
187
StaticMutex DataChannelShutdown::sLock;
188
StaticAutoPtr<nsTArray<RefPtr<DataChannelConnectionShutdown>>> DataChannelShutdown::sConnections;
189
190
NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
191
192
NS_IMPL_ISUPPORTS(DataChannelConnectionShutdown, nsITimerCallback)
193
194
NS_IMETHODIMP
195
DataChannelConnectionShutdown::Notify(nsITimer* aTimer)
196
0
{
197
0
  // safely release reference to ourself
198
0
  RefPtr<DataChannelConnectionShutdown> grip(this);
199
0
  // Might not be set. We don't actually use the |this| pointer in
200
0
  // RemoveConnectionShutdown right now, which makes this a bit gratuitous
201
0
  // anyway...
202
0
  if (sDataChannelShutdown) {
203
0
    sDataChannelShutdown->RemoveConnectionShutdown(this);
204
0
  }
205
0
  return NS_OK;
206
0
}
207
208
OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
209
                         size_t length)
210
  : mLength(length)
211
  , mData(data)
212
0
{
213
0
  mInfo = &info;
214
0
  mPos = 0;
215
0
}
216
217
void OutgoingMsg::Advance(size_t offset)
218
0
{
219
0
  mPos += offset;
220
0
  if (mPos > mLength) {
221
0
    mPos = mLength;
222
0
  }
223
0
}
224
225
BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg &msg)
226
0
{
227
0
  size_t length = msg.GetLeft();
228
0
  auto *tmp = new uint8_t[length]; // infallible malloc!
229
0
  memcpy(tmp, msg.GetData(), length);
230
0
  mLength = length;
231
0
  mData = tmp;
232
0
  mInfo = new sctp_sendv_spa;
233
0
  *mInfo = msg.GetInfo();
234
0
  mPos = 0;
235
0
}
236
237
BufferedOutgoingMsg::~BufferedOutgoingMsg()
238
0
{
239
0
  delete mInfo;
240
0
  delete mData;
241
0
}
242
243
static int
244
receive_cb(struct socket* sock, union sctp_sockstore addr,
245
           void *data, size_t datalen,
246
           struct sctp_rcvinfo rcv, int flags, void *ulp_info)
247
0
{
248
0
  DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
249
0
  return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
250
0
}
251
252
static
253
DataChannelConnection *
254
GetConnectionFromSocket(struct socket* sock)
255
0
{
256
0
  struct sockaddr *addrs = nullptr;
257
0
  int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
258
0
  if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
259
0
    return nullptr;
260
0
  }
261
0
  // usrsctp_getladdrs() returns the addresses bound to this socket, which
262
0
  // contains the SctpDataMediaChannel* as sconn_addr.  Read the pointer,
263
0
  // then free the list of addresses once we have the pointer.  We only open
264
0
  // AF_CONN sockets, and they should all have the sconn_addr set to the
265
0
  // pointer that created them, so [0] is as good as any other.
266
0
  struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
267
0
  DataChannelConnection *connection =
268
0
    reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
269
0
  usrsctp_freeladdrs(addrs);
270
0
271
0
  return connection;
272
0
}
273
274
// called when the buffer empties to the threshold value
275
static int
276
threshold_event(struct socket* sock, uint32_t sb_free)
277
0
{
278
0
  DataChannelConnection *connection = GetConnectionFromSocket(sock);
279
0
  if (connection) {
280
0
    connection->SendDeferredMessages();
281
0
  } else {
282
0
    LOG(("Can't find connection for socket %p", sock));
283
0
  }
284
0
  return 0;
285
0
}
286
287
static void
288
debug_printf(const char *format, ...)
289
0
{
290
0
  va_list ap;
291
0
  char buffer[1024];
292
0
293
0
  if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
294
0
    va_start(ap, format);
295
#ifdef _WIN32
296
    if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
297
#else
298
0
    if (VsprintfLiteral(buffer, format, ap) > 0) {
299
0
#endif
300
0
      SCTP_LOG(("%s", buffer));
301
0
    }
302
0
    va_end(ap);
303
0
  }
304
0
}
305
306
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
307
                                             nsIEventTarget *aTarget)
308
  : NeckoTargetHolder(aTarget)
309
  , mLock("netwerk::sctp::DataChannelConnection")
310
  , mSendInterleaved(false)
311
  , mPpidFragmentation(false)
312
  , mMaxMessageSizeSet(false)
313
  , mMaxMessageSize(0)
314
  , mAllocateEven(false)
315
0
{
316
0
  mCurrentStream = 0;
317
0
  mState = CLOSED;
318
0
  mSocket = nullptr;
319
0
  mMasterSocket = nullptr;
320
0
  mListener = listener;
321
0
  mDtls = nullptr;
322
0
  mLocalPort = 0;
323
0
  mRemotePort = 0;
324
0
  mPendingType = PENDING_NONE;
325
0
  LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
326
0
  mInternalIOThread = nullptr;
327
0
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
328
0
  mShutdown = false;
329
0
#endif
330
0
}
331
332
DataChannelConnection::~DataChannelConnection()
333
0
{
334
0
  LOG(("Deleting DataChannelConnection %p", (void *) this));
335
0
  // This may die on the MainThread, or on the STS thread
336
0
  ASSERT_WEBRTC(mState == CLOSED);
337
0
  MOZ_ASSERT(!mMasterSocket);
338
0
  MOZ_ASSERT(mPending.GetSize() == 0);
339
0
  MOZ_ASSERT(!mDtls);
340
0
341
0
  // Already disconnected from sigslot/mTransportFlow
342
0
  // TransportFlows must be released from the STS thread
343
0
  if (!IsSTSThread()) {
344
0
    ASSERT_WEBRTC(NS_IsMainThread());
345
0
346
0
    if (mInternalIOThread) {
347
0
      // Avoid spinning the event thread from here (which if we're mainthread
348
0
      // is in the event loop already)
349
0
      nsCOMPtr<nsIRunnable> r = WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
350
0
                                             &nsIThread::Shutdown);
351
0
      Dispatch(r.forget());
352
0
    }
353
0
  } else {
354
0
    // on STS, safe to call shutdown
355
0
    if (mInternalIOThread) {
356
0
      mInternalIOThread->Shutdown();
357
0
    }
358
0
  }
359
0
}
360
361
void
362
DataChannelConnection::Destroy()
363
0
{
364
0
  // Though it's probably ok to do this and close the sockets;
365
0
  // if we really want it to do true clean shutdowns it can
366
0
  // create a dependant Internal object that would remain around
367
0
  // until the network shut down the association or timed out.
368
0
  LOG(("Destroying DataChannelConnection %p", (void *) this));
369
0
  ASSERT_WEBRTC(NS_IsMainThread());
370
0
  CloseAll();
371
0
372
0
  MutexAutoLock lock(mLock);
373
0
  // If we had a pending reset, we aren't waiting for it - clear the list so
374
0
  // we can deregister this DataChannelConnection without leaking.
375
0
  ClearResets();
376
0
377
0
  MOZ_ASSERT(mSTS);
378
0
  ASSERT_WEBRTC(NS_IsMainThread());
379
0
  // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
380
0
  // the usrsctp_close() calls can move back here (and just proxy the
381
0
  // disconnect_all())
382
0
  RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
383
0
                                   &DataChannelConnection::DestroyOnSTS,
384
0
                                   mSocket, mMasterSocket),
385
0
                NS_DISPATCH_NORMAL);
386
0
387
0
  // These will be released on STS
388
0
  mSocket = nullptr;
389
0
  mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
390
0
391
0
  // We can't get any more new callbacks from the SCTP library
392
0
  // All existing callbacks have refs to DataChannelConnection
393
0
394
0
  // nsDOMDataChannel objects have refs to DataChannels that have refs to us
395
0
}
396
397
void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
398
                                         struct socket *aSocket)
399
0
{
400
0
  if (aSocket && aSocket != aMasterSocket)
401
0
    usrsctp_close(aSocket);
402
0
  if (aMasterSocket)
403
0
    usrsctp_close(aMasterSocket);
404
0
405
0
  usrsctp_deregister_address(static_cast<void *>(this));
406
0
  LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
407
0
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
408
0
  mShutdown = true;
409
0
#endif
410
0
411
0
  disconnect_all();
412
0
413
0
  // we may have queued packet sends on STS after this; dispatch to ourselves before finishing here
414
0
  // so we can be sure there aren't anymore runnables active that can try to touch the flow.
415
0
  // DON'T use RUN_ON_THREAD, it queue-jumps!
416
0
  mSTS->Dispatch(WrapRunnable(RefPtr<DataChannelConnection>(this),
417
0
                              &DataChannelConnection::DestroyOnSTSFinal),
418
0
                 NS_DISPATCH_NORMAL);
419
0
}
420
421
void DataChannelConnection::DestroyOnSTSFinal()
422
0
{
423
0
  mTransportFlow = nullptr;
424
0
  mDtls = nullptr;
425
0
  sDataChannelShutdown->CreateConnectionShutdown(this);
426
0
}
427
428
bool
429
DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aMaxMessageSizeSet,
430
                            uint64_t aMaxMessageSize)
431
0
{
432
0
  struct sctp_initmsg initmsg;
433
0
  struct sctp_assoc_value av;
434
0
  struct sctp_event event;
435
0
  socklen_t len;
436
0
437
0
  uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
438
0
                            SCTP_PEER_ADDR_CHANGE,
439
0
                            SCTP_REMOTE_ERROR,
440
0
                            SCTP_SHUTDOWN_EVENT,
441
0
                            SCTP_ADAPTATION_INDICATION,
442
0
                            SCTP_PARTIAL_DELIVERY_EVENT,
443
0
                            SCTP_SEND_FAILED_EVENT,
444
0
                            SCTP_STREAM_RESET_EVENT,
445
0
                            SCTP_STREAM_CHANGE_EVENT};
446
0
  {
447
0
    ASSERT_WEBRTC(NS_IsMainThread());
448
0
    // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
449
0
450
0
    mSendInterleaved = false;
451
0
    mPpidFragmentation = false;
452
0
    mMaxMessageSizeSet = false;
453
0
    SetMaxMessageSize(aMaxMessageSizeSet, aMaxMessageSize);
454
0
455
0
    if (!sctp_initialized) {
456
0
      LOG(("sctp_init"));
457
0
#ifdef MOZ_PEERCONNECTION
458
0
      usrsctp_init(0,
459
0
                   DataChannelConnection::SctpDtlsOutput,
460
0
                   debug_printf
461
0
                  );
462
#else
463
      MOZ_CRASH("Trying to use SCTP/DTLS without mtransport");
464
#endif
465
466
0
      // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
467
0
      if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
468
0
        usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
469
0
      }
470
0
471
0
      // Do not send ABORTs in response to INITs (1).
472
0
      // Do not send ABORTs for received Out of the Blue packets (2).
473
0
      usrsctp_sysctl_set_sctp_blackhole(2);
474
0
475
0
      // Disable the Explicit Congestion Notification extension (currently not supported by the
476
0
      // Firefox code)
477
0
      usrsctp_sysctl_set_sctp_ecn_enable(0);
478
0
479
0
      // Enable interleaving messages for different streams (incoming)
480
0
      // See: https://tools.ietf.org/html/rfc6458#section-8.1.20
481
0
      usrsctp_sysctl_set_sctp_default_frag_interleave(2);
482
0
483
0
      sctp_initialized = true;
484
0
485
0
      sDataChannelShutdown = new DataChannelShutdown();
486
0
      sDataChannelShutdown->Init();
487
0
    }
488
0
  }
489
0
490
0
  // XXX FIX! make this a global we get once
491
0
  // Find the STS thread
492
0
  nsresult rv;
493
0
  mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
494
0
  MOZ_ASSERT(NS_SUCCEEDED(rv));
495
0
496
0
  // Open sctp with a callback
497
0
  if ((mMasterSocket = usrsctp_socket(
498
0
         AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
499
0
         usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
500
0
    return false;
501
0
  }
502
0
503
0
  // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
504
0
  // in associations for normal IO
505
0
  if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
506
0
    LOG(("Couldn't set non_blocking on SCTP socket"));
507
0
    // We can't handle connect() safely if it will block, not that this will
508
0
    // even happen.
509
0
    goto error_cleanup;
510
0
  }
511
0
512
0
  // Make sure when we close the socket, make sure it doesn't call us back again!
513
0
  // This would cause it try to use an invalid DataChannelConnection pointer
514
0
  struct linger l;
515
0
  l.l_onoff = 1;
516
0
  l.l_linger = 0;
517
0
  if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
518
0
                         (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
519
0
    LOG(("Couldn't set SO_LINGER on SCTP socket"));
520
0
    // unsafe to allow it to continue if this fails
521
0
    goto error_cleanup;
522
0
  }
523
0
524
0
  // XXX Consider disabling this when we add proper SDP negotiation.
525
0
  // We may want to leave enabled for supporting 'cloning' of SDP offers, which
526
0
  // implies re-use of the same pseudo-port number, or forcing a renegotiation.
527
0
  {
528
0
    const int option_value = 1;
529
0
    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
530
0
                           (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
531
0
      LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
532
0
    }
533
0
    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
534
0
                           (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
535
0
      LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
536
0
    }
537
0
  }
538
0
539
0
  // Set explicit EOR
540
0
  {
541
0
    const int option_value = 1;
542
0
    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
543
0
                           (const void *)&option_value, (socklen_t)sizeof(option_value)) < 0) {
544
0
      LOG(("*** failed enable explicit EOR mode %d", errno));
545
0
      goto error_cleanup;
546
0
    }
547
0
  }
548
0
549
0
  // Enable ndata
550
0
  // TODO: Bug 1381145, enable this once ndata has been deployed
551
#if 0
552
  av.assoc_id = SCTP_FUTURE_ASSOC;
553
  av.assoc_value = 1;
554
  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
555
                         (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
556
    LOG(("*** failed enable ndata errno %d", errno));
557
    goto error_cleanup;
558
  }
559
#endif
560
561
0
  av.assoc_id = SCTP_ALL_ASSOC;
562
0
  av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
563
0
  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
564
0
                         (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
565
0
    LOG(("*** failed enable stream reset errno %d", errno));
566
0
    goto error_cleanup;
567
0
  }
568
0
569
0
  /* Enable the events of interest. */
570
0
  memset(&event, 0, sizeof(event));
571
0
  event.se_assoc_id = SCTP_ALL_ASSOC;
572
0
  event.se_on = 1;
573
0
  for (unsigned short event_type : event_types) {
574
0
    event.se_type = event_type;
575
0
    if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
576
0
      LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
577
0
      goto error_cleanup;
578
0
    }
579
0
  }
580
0
581
0
  // Update number of streams
582
0
  mStreams.AppendElements(aNumStreams);
583
0
  for (uint32_t i = 0; i < aNumStreams; ++i) {
584
0
    mStreams[i] = nullptr;
585
0
  }
586
0
  memset(&initmsg, 0, sizeof(initmsg));
587
0
  len = sizeof(initmsg);
588
0
  if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
589
0
    LOG(("*** failed getsockopt SCTP_INITMSG"));
590
0
    goto error_cleanup;
591
0
  }
592
0
  LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
593
0
       initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
594
0
  initmsg.sinit_num_ostreams  = aNumStreams;
595
0
  initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
596
0
  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
597
0
                         (socklen_t)sizeof(initmsg)) < 0) {
598
0
    LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
599
0
    goto error_cleanup;
600
0
  }
601
0
602
0
  mSocket = nullptr;
603
0
  usrsctp_register_address(static_cast<void *>(this));
604
0
  LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
605
0
  return true;
606
0
607
0
error_cleanup:
608
0
  usrsctp_close(mMasterSocket);
609
0
  mMasterSocket = nullptr;
610
0
  return false;
611
0
}
612
613
void
614
DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet, uint64_t aMaxMessageSize)
615
0
{
616
0
  MutexAutoLock lock(mLock); // TODO: Needed?
617
0
618
0
  if (mMaxMessageSizeSet && !aMaxMessageSizeSet) {
619
0
    // Don't overwrite already set MMS with default values
620
0
    return;
621
0
  }
622
0
623
0
  mMaxMessageSizeSet = aMaxMessageSizeSet;
624
0
  mMaxMessageSize = aMaxMessageSize;
625
0
626
0
  bool ppidFragmentationEnforced = false;
627
0
  nsresult rv;
628
0
  nsCOMPtr<nsIPrefService> prefs = do_GetService("@mozilla.org/preferences-service;1", &rv);
629
0
  if (!NS_WARN_IF(NS_FAILED(rv))) {
630
0
    nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
631
0
632
0
    if (branch) {
633
0
      if (!NS_FAILED(branch->GetBoolPref(
634
0
          "media.peerconnection.sctp.force_ppid_fragmentation", &mPpidFragmentation))) {
635
0
        // Ensure that forced on/off PPID fragmentation does not get overridden when Firefox has
636
0
        // been detected.
637
0
        mMaxMessageSizeSet = true;
638
0
        ppidFragmentationEnforced = true;
639
0
      }
640
0
641
0
      int32_t temp;
642
0
      if (!NS_FAILED(branch->GetIntPref(
643
0
          "media.peerconnection.sctp.force_maximum_message_size", &temp))) {
644
0
        if (temp >= 0) {
645
0
          mMaxMessageSize = (uint64_t)temp;
646
0
        }
647
0
      }
648
0
    }
649
0
  }
650
0
651
0
  // Fix remote MMS. This code exists, so future implementations of RTCSctpTransport.maxMessageSize
652
0
  // can simply provide that value from GetMaxMessageSize.
653
0
654
0
  // TODO: Bug 1382779, once resolved, can be increased to min(Uint8ArrayMaxSize, UINT32_MAX)
655
0
  // TODO: Bug 1381146, once resolved, can be increased to whatever we support then (hopefully
656
0
  //       SIZE_MAX)
657
0
  if (mMaxMessageSize == 0 || mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
658
0
    mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
659
0
  }
660
0
661
0
  LOG(("Use PPID-based fragmentation/reassembly: %s (enforced=%s)",
662
0
       mPpidFragmentation ? "yes" : "no", ppidFragmentationEnforced ? "yes" : "no"));
663
0
  LOG(("Maximum message size (outgoing data): %" PRIu64 " (set=%s, enforced=%s)",
664
0
       mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
665
0
       aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
666
0
}
667
668
uint64_t
669
DataChannelConnection::GetMaxMessageSize()
670
0
{
671
0
  return mMaxMessageSize;
672
0
}
673
674
#ifdef MOZ_PEERCONNECTION
675
void
676
DataChannelConnection::SetEvenOdd()
677
0
{
678
0
  ASSERT_WEBRTC(IsSTSThread());
679
0
680
0
  MOZ_ASSERT(mDtls);  // DTLS is mandatory
681
0
  mAllocateEven = (mDtls->role() == TransportLayerDtls::CLIENT);
682
0
}
683
684
bool
685
DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
686
0
{
687
0
  LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
688
0
689
0
  MOZ_ASSERT(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
690
0
  if (NS_WARN_IF(!aFlow)) {
691
0
    return false;
692
0
  }
693
0
694
0
  mTransportFlow = aFlow;
695
0
  mLocalPort = localport;
696
0
  mRemotePort = remoteport;
697
0
  mState = CONNECTING;
698
0
699
0
  RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
700
0
                                   &DataChannelConnection::SetSignals),
701
0
                NS_DISPATCH_NORMAL);
702
0
  return true;
703
0
}
704
705
void
706
DataChannelConnection::SetSignals()
707
0
{
708
0
  ASSERT_WEBRTC(IsSTSThread());
709
0
  mDtls = static_cast<TransportLayerDtls*>(mTransportFlow->GetLayer("dtls"));
710
0
  ASSERT_WEBRTC(mDtls);
711
0
  LOG(("Setting transport signals, state: %d", mDtls->state()));
712
0
  mDtls->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
713
0
  // SignalStateChange() doesn't call you with the initial state
714
0
  mDtls->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
715
0
  CompleteConnect(mDtls, mDtls->state());
716
0
}
717
718
void
719
DataChannelConnection::CompleteConnect(TransportLayer *layer, TransportLayer::State state)
720
0
{
721
0
  LOG(("Data transport state: %d", state));
722
0
  MutexAutoLock lock(mLock);
723
0
  ASSERT_WEBRTC(IsSTSThread());
724
0
  // We should abort connection on TS_ERROR.
725
0
  // Note however that the association will also fail (perhaps with a delay) and
726
0
  // notify us in that way
727
0
  if (state != TransportLayer::TS_OPEN || !mMasterSocket)
728
0
    return;
729
0
730
0
  struct sockaddr_conn addr;
731
0
  memset(&addr, 0, sizeof(addr));
732
0
  addr.sconn_family = AF_CONN;
733
#if defined(__Userspace_os_Darwin)
734
  addr.sconn_len = sizeof(addr);
735
#endif
736
  addr.sconn_port = htons(mLocalPort);
737
0
  addr.sconn_addr = static_cast<void *>(this);
738
0
739
0
  LOG(("Calling usrsctp_bind"));
740
0
  int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
741
0
                       sizeof(addr));
742
0
  if (r < 0) {
743
0
    LOG(("usrsctp_bind failed: %d", r));
744
0
  } else {
745
0
    // This is the remote addr
746
0
    addr.sconn_port = htons(mRemotePort);
747
0
    LOG(("Calling usrsctp_connect"));
748
0
    r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
749
0
                        sizeof(addr));
750
0
    if (r >= 0 || errno == EINPROGRESS) {
751
0
      struct sctp_paddrparams paddrparams;
752
0
      socklen_t opt_len;
753
0
754
0
      memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
755
0
      memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
756
0
      opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
757
0
      r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
758
0
                             &paddrparams, &opt_len);
759
0
      if (r < 0) {
760
0
        LOG(("usrsctp_getsockopt failed: %d", r));
761
0
      } else {
762
0
        // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
763
0
        paddrparams.spp_pathmtu = 1200; // safe for either
764
0
        paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
765
0
        paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
766
0
        opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
767
0
        r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
768
0
                               &paddrparams, opt_len);
769
0
        if (r < 0) {
770
0
          LOG(("usrsctp_getsockopt failed: %d", r));
771
0
        } else {
772
0
          LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
773
0
        }
774
0
      }
775
0
    }
776
0
    if (r < 0) {
777
0
      if (errno == EINPROGRESS) {
778
0
        // non-blocking
779
0
        return;
780
0
      }
781
0
      LOG(("usrsctp_connect failed: %d", errno));
782
0
      mState = CLOSED;
783
0
    } else {
784
0
      // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
785
0
      // This also avoids issues with calling TransportFlow stuff on Mainthread
786
0
      return;
787
0
    }
788
0
  }
789
0
  // Note: currently this doesn't actually notify the application
790
0
  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
791
0
             DataChannelOnMessageAvailable::ON_CONNECTION,
792
0
             this)));
793
0
}
794
795
// Process any pending Opens
796
void
797
DataChannelConnection::ProcessQueuedOpens()
798
0
{
799
0
  // The nsDeque holds channels with an AddRef applied.  Another reference
800
0
  // (may) be held by the DOMDataChannel, unless it's been GC'd.  No other
801
0
  // references should exist.
802
0
803
0
  // Can't copy nsDeque's.  Move into temp array since any that fail will
804
0
  // go back to mPending
805
0
  nsDeque temp;
806
0
  DataChannel *temp_channel; // really already_AddRefed<>
807
0
  while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
808
0
    temp.Push(static_cast<void *>(temp_channel));
809
0
  }
810
0
811
0
  RefPtr<DataChannel> channel;
812
0
  // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
813
0
  while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
814
0
    if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
815
0
      LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
816
0
      channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
817
0
      // OpenFinish returns a reference itself, so we need to take it can Release it
818
0
      channel = OpenFinish(channel.forget()); // may reset the flag and re-push
819
0
    } else {
820
0
      NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
821
0
    }
822
0
  }
823
0
824
0
}
825
826
void
827
DataChannelConnection::SctpDtlsInput(TransportLayer *layer, MediaPacket& packet)
828
0
{
829
0
  if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
830
0
    char *buf;
831
0
832
0
    if ((buf = usrsctp_dumppacket((void *)packet.data(),
833
0
                                  packet.len(),
834
0
                                  SCTP_DUMP_INBOUND)) != nullptr) {
835
0
      SCTP_LOG(("%s", buf));
836
0
      usrsctp_freedumpbuffer(buf);
837
0
    }
838
0
  }
839
0
  // Pass the data to SCTP
840
0
  MutexAutoLock lock(mLock);
841
0
  usrsctp_conninput(static_cast<void *>(this), packet.data(), packet.len(), 0);
842
0
}
843
844
int
845
DataChannelConnection::SendPacket(nsAutoPtr<MediaPacket> packet)
846
0
{
847
0
  //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
848
0
  if (mDtls) {
849
0
    return mDtls->SendPacket(*packet) < 0 ? 1 : 0;
850
0
  }
851
0
  return 0;
852
0
}
853
854
/* static */
855
int
856
DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
857
                                      uint8_t tos, uint8_t set_df)
858
0
{
859
0
  DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
860
0
  MOZ_DIAGNOSTIC_ASSERT(!peer->mShutdown);
861
0
862
0
  if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
863
0
    char *buf;
864
0
865
0
    if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
866
0
      SCTP_LOG(("%s", buf));
867
0
      usrsctp_freedumpbuffer(buf);
868
0
    }
869
0
  }
870
0
871
0
  // We're async proxying even if on the STSThread because this is called
872
0
  // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
873
0
  // SCTP has an option for Apple, on IP connections only, to release at least
874
0
  // one of the locks before calling a packet output routine; with changes to
875
0
  // the underlying SCTP stack this might remove the need to use an async proxy.
876
0
  nsAutoPtr<MediaPacket> packet(new MediaPacket);
877
0
  packet->Copy(static_cast<const uint8_t*>(buffer), length);
878
0
879
0
  // XXX It might be worthwhile to add an assertion against the thread
880
0
  // somehow getting into the DataChannel/SCTP code again, as
881
0
  // DISPATCH_SYNC is not fully blocking.  This may be tricky, as it
882
0
  // needs to be a per-thread check, not a global.
883
0
  peer->mSTS->Dispatch(WrapRunnable(
884
0
                         RefPtr<DataChannelConnection>(peer),
885
0
                         &DataChannelConnection::SendPacket, packet),
886
0
                                 NS_DISPATCH_NORMAL);
887
0
  return 0; // cheat!  Packets can always be dropped later anyways
888
0
}
889
#endif
890
891
#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
892
// listen for incoming associations
893
// Blocks! - Don't call this from main thread!
894
895
#error This code will not work as-is since SetEvenOdd() runs on Mainthread
896
897
bool
898
DataChannelConnection::Listen(unsigned short port)
899
{
900
  struct sockaddr_in addr;
901
  socklen_t addr_len;
902
903
  NS_WARNING_ASSERTION(!NS_IsMainThread(),
904
                       "Blocks, do not call from main thread!!!");
905
906
  /* Acting as the 'server' */
907
  memset((void *)&addr, 0, sizeof(addr));
908
#ifdef HAVE_SIN_LEN
909
  addr.sin_len = sizeof(struct sockaddr_in);
910
#endif
911
  addr.sin_family = AF_INET;
912
  addr.sin_port = htons(port);
913
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
914
  LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
915
  mState = CONNECTING;
916
  if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
917
    LOG(("***Failed userspace_bind"));
918
    return false;
919
  }
920
  if (usrsctp_listen(mMasterSocket, 1) < 0) {
921
    LOG(("***Failed userspace_listen"));
922
    return false;
923
  }
924
925
  LOG(("Accepting connection"));
926
  addr_len = 0;
927
  if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
928
    LOG(("***Failed accept"));
929
    return false;
930
  }
931
  mState = OPEN;
932
933
  struct linger l;
934
  l.l_onoff = 1;
935
  l.l_linger = 0;
936
  if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
937
                         (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
938
    LOG(("Couldn't set SO_LINGER on SCTP socket"));
939
  }
940
941
  SetEvenOdd();
942
943
  // Notify Connection open
944
  // XXX We need to make sure connection sticks around until the message is delivered
945
  LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
946
  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
947
             DataChannelOnMessageAvailable::ON_CONNECTION,
948
             this, (DataChannel *) nullptr)));
949
  return true;
950
}
951
952
// Blocks! - Don't call this from main thread!
953
bool
954
DataChannelConnection::Connect(const char *addr, unsigned short port)
955
{
956
  struct sockaddr_in addr4;
957
  struct sockaddr_in6 addr6;
958
959
  NS_WARNING_ASSERTION(!NS_IsMainThread(),
960
                       "Blocks, do not call from main thread!!!");
961
962
  /* Acting as the connector */
963
  LOG(("Connecting to %s, port %u", addr, port));
964
  memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
965
  memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
966
#ifdef HAVE_SIN_LEN
967
  addr4.sin_len = sizeof(struct sockaddr_in);
968
#endif
969
#ifdef HAVE_SIN6_LEN
970
  addr6.sin6_len = sizeof(struct sockaddr_in6);
971
#endif
972
  addr4.sin_family = AF_INET;
973
  addr6.sin6_family = AF_INET6;
974
  addr4.sin_port = htons(port);
975
  addr6.sin6_port = htons(port);
976
  mState = CONNECTING;
977
978
#if !defined(__Userspace_os_Windows)
979
  if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
980
    if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
981
      LOG(("*** Failed userspace_connect"));
982
      return false;
983
    }
984
  } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
985
    if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
986
      LOG(("*** Failed userspace_connect"));
987
      return false;
988
    }
989
  } else {
990
    LOG(("*** Illegal destination address."));
991
  }
992
#else
993
  {
994
    struct sockaddr_storage ss;
995
    int sslen = sizeof(ss);
996
997
    if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
998
      addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
999
      if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
1000
        LOG(("*** Failed userspace_connect"));
1001
        return false;
1002
      }
1003
    } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
1004
      addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
1005
      if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
1006
        LOG(("*** Failed userspace_connect"));
1007
        return false;
1008
      }
1009
    } else {
1010
      LOG(("*** Illegal destination address."));
1011
    }
1012
  }
1013
#endif
1014
1015
  mSocket = mMasterSocket;
1016
1017
  LOG(("connect() succeeded!  Entering connected mode"));
1018
  mState = OPEN;
1019
1020
  SetEvenOdd();
1021
1022
  // Notify Connection open
1023
  // XXX We need to make sure connection sticks around until the message is delivered
1024
  LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
1025
  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1026
             DataChannelOnMessageAvailable::ON_CONNECTION,
1027
             this, (DataChannel *) nullptr)));
1028
  return true;
1029
}
1030
#endif
1031
1032
DataChannel *
1033
DataChannelConnection::FindChannelByStream(uint16_t stream)
1034
0
{
1035
0
  return mStreams.SafeElementAt(stream);
1036
0
}
1037
1038
uint16_t
1039
DataChannelConnection::FindFreeStream()
1040
0
{
1041
0
  uint32_t i, j, limit;
1042
0
1043
0
  limit = mStreams.Length();
1044
0
  if (limit > MAX_NUM_STREAMS)
1045
0
    limit = MAX_NUM_STREAMS;
1046
0
1047
0
  for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
1048
0
    if (!mStreams[i]) {
1049
0
      // Verify it's not still in the process of closing
1050
0
      for (j = 0; j < mStreamsResetting.Length(); ++j) {
1051
0
        if (mStreamsResetting[j] == i) {
1052
0
          break;
1053
0
        }
1054
0
      }
1055
0
      if (j == mStreamsResetting.Length())
1056
0
        break;
1057
0
    }
1058
0
  }
1059
0
  if (i >= limit) {
1060
0
    return INVALID_STREAM;
1061
0
  }
1062
0
  return i;
1063
0
}
1064
1065
uint32_t
1066
DataChannelConnection::UpdateCurrentStreamIndex()
1067
0
{
1068
0
  if (mCurrentStream == mStreams.Length() - 1) {
1069
0
      mCurrentStream = 0;
1070
0
  } else {
1071
0
    ++mCurrentStream;
1072
0
  }
1073
0
1074
0
  return mCurrentStream;
1075
0
}
1076
1077
uint32_t
1078
DataChannelConnection::GetCurrentStreamIndex()
1079
0
{
1080
0
  // Fix current stream index (in case #streams decreased)
1081
0
  if (mCurrentStream >= mStreams.Length()) {
1082
0
    mCurrentStream = 0;
1083
0
  }
1084
0
1085
0
  return mCurrentStream;
1086
0
}
1087
1088
bool
1089
DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
1090
0
{
1091
0
  struct sctp_status status;
1092
0
  struct sctp_add_streams sas;
1093
0
  uint32_t outStreamsNeeded;
1094
0
  socklen_t len;
1095
0
1096
0
  if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
1097
0
    aNeeded = MAX_NUM_STREAMS - mStreams.Length();
1098
0
  }
1099
0
  if (aNeeded <= 0) {
1100
0
    return false;
1101
0
  }
1102
0
1103
0
  len = (socklen_t)sizeof(struct sctp_status);
1104
0
  if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
1105
0
    LOG(("***failed: getsockopt SCTP_STATUS"));
1106
0
    return false;
1107
0
  }
1108
0
  outStreamsNeeded = aNeeded; // number to add
1109
0
1110
0
  // Note: if multiple channel opens happen when we don't have enough space,
1111
0
  // we'll call RequestMoreStreams() multiple times
1112
0
  memset(&sas, 0, sizeof(sas));
1113
0
  sas.sas_instrms = 0;
1114
0
  sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
1115
0
  // Doesn't block, we get an event when it succeeds or fails
1116
0
  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
1117
0
                         (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
1118
0
    if (errno == EALREADY) {
1119
0
      LOG(("Already have %u output streams", outStreamsNeeded));
1120
0
      return true;
1121
0
    }
1122
0
1123
0
    LOG(("***failed: setsockopt ADD errno=%d", errno));
1124
0
    return false;
1125
0
  }
1126
0
  LOG(("Requested %u more streams", outStreamsNeeded));
1127
0
  // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
1128
0
  // values are larger than mStreams.Length()
1129
0
  return true;
1130
0
}
1131
1132
// Returns a POSIX error code.
1133
int
1134
DataChannelConnection::SendControlMessage(const uint8_t *data, uint32_t len, uint16_t stream)
1135
0
{
1136
0
  struct sctp_sendv_spa info = {0};
1137
0
1138
0
  // General flags
1139
0
  info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
1140
0
1141
0
  // Set stream identifier, protocol identifier and flags
1142
0
  info.sendv_sndinfo.snd_sid = stream;
1143
0
  info.sendv_sndinfo.snd_flags = SCTP_EOR;
1144
0
  info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
1145
0
1146
0
  // Create message instance and send
1147
0
  // Note: Main-thread IO, but doesn't block
1148
#if (UINT32_MAX > SIZE_MAX)
1149
  if (len > SIZE_MAX) {
1150
    return EMSGSIZE;
1151
  }
1152
#endif
1153
  OutgoingMsg msg(info, data, (size_t)len);
1154
0
  bool buffered;
1155
0
  int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
1156
0
1157
0
  // Set pending type (if buffered)
1158
0
  if (!error && buffered && !mPendingType) {
1159
0
    mPendingType = PENDING_DCEP;
1160
0
  }
1161
0
  return error;
1162
0
}
1163
1164
// Returns a POSIX error code.
1165
int
1166
DataChannelConnection::SendOpenAckMessage(uint16_t stream)
1167
0
{
1168
0
  struct rtcweb_datachannel_ack ack;
1169
0
1170
0
  memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
1171
0
  ack.msg_type = DATA_CHANNEL_ACK;
1172
0
1173
0
  return SendControlMessage((const uint8_t *)&ack, sizeof(ack), stream);
1174
0
}
1175
1176
// Returns a POSIX error code.
1177
int
1178
DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
1179
                                              const nsACString& protocol,
1180
                                              uint16_t stream, bool unordered,
1181
                                              uint16_t prPolicy, uint32_t prValue)
1182
0
{
1183
0
  const int label_len = label.Length(); // not including nul
1184
0
  const int proto_len = protocol.Length(); // not including nul
1185
0
  // careful - request struct include one char for the label
1186
0
  const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
1187
0
                        label_len + proto_len;
1188
0
  struct rtcweb_datachannel_open_request *req =
1189
0
    (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
1190
0
1191
0
  memset(req, 0, req_size);
1192
0
  req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
1193
0
  switch (prPolicy) {
1194
0
  case SCTP_PR_SCTP_NONE:
1195
0
    req->channel_type = DATA_CHANNEL_RELIABLE;
1196
0
    break;
1197
0
  case SCTP_PR_SCTP_TTL:
1198
0
    req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
1199
0
    break;
1200
0
  case SCTP_PR_SCTP_RTX:
1201
0
    req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
1202
0
    break;
1203
0
  default:
1204
0
    free(req);
1205
0
    return EINVAL;
1206
0
  }
1207
0
  if (unordered) {
1208
0
    // Per the current types, all differ by 0x80 between ordered and unordered
1209
0
    req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
1210
0
  }
1211
0
1212
0
  req->reliability_param = htonl(prValue);
1213
0
  req->priority = htons(0); /* XXX: add support */
1214
0
  req->label_length = htons(label_len);
1215
0
  req->protocol_length = htons(proto_len);
1216
0
  memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
1217
0
  memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
1218
0
1219
0
  // TODO: req_size is an int... that looks hairy
1220
0
  int error = SendControlMessage((const uint8_t *)req, req_size, stream);
1221
0
1222
0
  free(req);
1223
0
  return error;
1224
0
}
1225
1226
// XXX This should use a separate thread (outbound queue) which should
1227
// select() to know when to *try* to send data to the socket again.
1228
// Alternatively, it can use a timeout, but that's guaranteed to be wrong
1229
// (just not sure in what direction).  We could re-implement NSPR's
1230
// PR_POLL_WRITE/etc handling... with a lot of work.
1231
1232
// Better yet, use the SCTP stack's notifications on buffer state to avoid
1233
// filling the SCTP's buffers.
1234
1235
// returns if we're still blocked (true)
1236
bool
1237
DataChannelConnection::SendDeferredMessages()
1238
0
{
1239
0
  RefPtr<DataChannel> channel; // we may null out the refs to this
1240
0
1241
0
  // This may block while something is modifying channels, but should not block for IO
1242
0
  mLock.AssertCurrentThreadOwns();
1243
0
1244
0
  LOG(("SendDeferredMessages called, pending type: %d", mPendingType));
1245
0
  if (!mPendingType) {
1246
0
    return false;
1247
0
  }
1248
0
1249
0
  // Send pending control messages
1250
0
  // Note: If ndata is not active, check if DCEP messages are currently outstanding. These need to
1251
0
  //       be sent first before other streams can be used for sending.
1252
0
  if (!mBufferedControl.IsEmpty() && (mSendInterleaved || mPendingType == PENDING_DCEP)) {
1253
0
    if (SendBufferedMessages(mBufferedControl)) {
1254
0
      return true;
1255
0
    }
1256
0
1257
0
    // Note: There may or may not be pending data messages
1258
0
    mPendingType = PENDING_DATA;
1259
0
  }
1260
0
1261
0
  bool blocked = false;
1262
0
  uint32_t i = GetCurrentStreamIndex();
1263
0
  uint32_t end = i;
1264
0
  do {
1265
0
    channel = mStreams[i];
1266
0
    if (!channel || channel->mBufferedData.IsEmpty()) {
1267
0
      i = UpdateCurrentStreamIndex();
1268
0
      continue;
1269
0
    }
1270
0
1271
0
    // Clear if closing/closed
1272
0
    if (channel->mState == CLOSED || channel->mState == CLOSING) {
1273
0
      channel->mBufferedData.Clear();
1274
0
      i = UpdateCurrentStreamIndex();
1275
0
      continue;
1276
0
    }
1277
0
1278
0
    size_t bufferedAmount = channel->GetBufferedAmountLocked();
1279
0
    size_t threshold = channel->mBufferedThreshold;
1280
0
    bool wasOverThreshold = bufferedAmount >= threshold;
1281
0
1282
0
    // Send buffered data messages
1283
0
    // Warning: This will fail in case ndata is inactive and a previously deallocated data channel
1284
0
    //          has not been closed properly. If you ever see that no messages can be sent on any
1285
0
    //          channel, this is likely the cause (an explicit EOR message partially sent whose
1286
0
    //          remaining chunks are still being waited for).
1287
0
    blocked = SendBufferedMessages(channel->mBufferedData);
1288
0
    bufferedAmount = channel->GetBufferedAmountLocked();
1289
0
1290
0
    // can never fire with default threshold of 0
1291
0
    if (wasOverThreshold && bufferedAmount < threshold) {
1292
0
      LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
1293
0
           channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1294
0
      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1295
0
                 DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
1296
0
                 this, channel)));
1297
0
    }
1298
0
1299
0
    if (bufferedAmount == 0) {
1300
0
      // buffered-to-not-buffered transition; tell the DOM code in case this makes it
1301
0
      // available for GC
1302
0
      LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
1303
0
           channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1304
0
      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1305
0
                 DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
1306
0
                 this, channel)));
1307
0
    }
1308
0
1309
0
    // Update current stream index
1310
0
    // Note: If ndata is not active, the outstanding data messages on this stream need to be sent
1311
0
    //       first before other streams can be used for sending.
1312
0
    if (mSendInterleaved || !blocked) {
1313
0
      i = UpdateCurrentStreamIndex();
1314
0
    }
1315
0
  } while (!blocked && i != end);
1316
0
1317
0
  if (!blocked) {
1318
0
    mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
1319
0
  }
1320
0
  return blocked;
1321
0
}
1322
1323
1324
// Called with mLock locked!
1325
// buffer MUST have at least one item!
1326
// returns if we're still blocked (true)
1327
bool
1328
DataChannelConnection::SendBufferedMessages(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer)
1329
0
{
1330
0
  do {
1331
0
    // Re-send message
1332
0
    int error = SendMsgInternal(*buffer[0]);
1333
0
    switch (error) {
1334
0
      case 0:
1335
0
        buffer.RemoveElementAt(0);
1336
0
        break;
1337
0
      case EAGAIN:
1338
#if (EAGAIN != EWOULDBLOCK)
1339
      case EWOULDBLOCK:
1340
#endif
1341
        return true;
1342
0
      default:
1343
0
        buffer.RemoveElementAt(0);
1344
0
        LOG(("error on sending: %d", error));
1345
0
        break;
1346
0
    }
1347
0
  } while (!buffer.IsEmpty());
1348
0
1349
0
  return false;
1350
0
}
1351
1352
// Caller must ensure that length <= SIZE_MAX
1353
void
1354
DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
1355
                                                uint32_t length, uint16_t stream)
1356
0
{
1357
0
  RefPtr<DataChannel> channel;
1358
0
  uint32_t prValue;
1359
0
  uint16_t prPolicy;
1360
0
  uint32_t flags;
1361
0
1362
0
  mLock.AssertCurrentThreadOwns();
1363
0
1364
0
  const size_t requiredLength =
1365
0
    (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length);
1366
0
  if (((size_t)length) != requiredLength) {
1367
0
    LOG(("%s: Inconsistent length: %u, should be %zu",
1368
0
         __FUNCTION__, length, requiredLength));
1369
0
    if (((size_t)length) < requiredLength)
1370
0
      return;
1371
0
  }
1372
0
1373
0
  LOG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length, sizeof(*req)));
1374
0
1375
0
  switch (req->channel_type) {
1376
0
    case DATA_CHANNEL_RELIABLE:
1377
0
    case DATA_CHANNEL_RELIABLE_UNORDERED:
1378
0
      prPolicy = SCTP_PR_SCTP_NONE;
1379
0
      break;
1380
0
    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1381
0
    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1382
0
      prPolicy = SCTP_PR_SCTP_RTX;
1383
0
      break;
1384
0
    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1385
0
    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1386
0
      prPolicy = SCTP_PR_SCTP_TTL;
1387
0
      break;
1388
0
    default:
1389
0
      LOG(("Unknown channel type %d", req->channel_type));
1390
0
      /* XXX error handling */
1391
0
      return;
1392
0
  }
1393
0
  prValue = ntohl(req->reliability_param);
1394
0
  flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1395
0
1396
0
  if ((channel = FindChannelByStream(stream))) {
1397
0
    if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1398
0
      LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
1399
0
           stream, channel->mState));
1400
0
     /* XXX: some error handling */
1401
0
    } else {
1402
0
      LOG(("Open for externally negotiated channel %u", stream));
1403
0
      // XXX should also check protocol, maybe label
1404
0
      if (prPolicy != channel->mPrPolicy ||
1405
0
          prValue != channel->mPrValue ||
1406
0
          flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
1407
0
      {
1408
0
        LOG(("WARNING: external negotiation mismatch with OpenRequest:"
1409
0
             "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1410
0
             stream, prPolicy, channel->mPrPolicy,
1411
0
             prValue, channel->mPrValue, flags, channel->mFlags));
1412
0
      }
1413
0
    }
1414
0
    return;
1415
0
  }
1416
0
  if (stream >= mStreams.Length()) {
1417
0
    LOG(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream, mStreams.Length()));
1418
0
    return;
1419
0
  }
1420
0
1421
0
  nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1422
0
  nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
1423
0
                                           ntohs(req->protocol_length)));
1424
0
1425
0
  channel = new DataChannel(this,
1426
0
                            stream,
1427
0
                            DataChannel::CONNECTING,
1428
0
                            label,
1429
0
                            protocol,
1430
0
                            prPolicy, prValue,
1431
0
                            flags,
1432
0
                            nullptr, nullptr);
1433
0
  mStreams[stream] = channel;
1434
0
1435
0
  channel->mState = DataChannel::WAITING_TO_OPEN;
1436
0
1437
0
  LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1438
0
       channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
1439
0
  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1440
0
             DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
1441
0
             this, channel)));
1442
0
1443
0
  LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1444
0
1445
0
  int error = SendOpenAckMessage(stream);
1446
0
  if (error) {
1447
0
    LOG(("SendOpenRequest failed, error = %d", error));
1448
0
    // Close the channel, inform the user
1449
0
    CloseInt(channel);
1450
0
    // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1451
0
    return;
1452
0
  }
1453
0
1454
0
  // Now process any queued data messages for the channel (which will
1455
0
  // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1456
0
  // more that come in before that happens)
1457
0
  DeliverQueuedData(stream);
1458
0
}
1459
1460
// NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1461
// That would make this code moot.  Keep it for now for backwards compatibility.
1462
void
1463
DataChannelConnection::DeliverQueuedData(uint16_t stream)
1464
0
{
1465
0
  mLock.AssertCurrentThreadOwns();
1466
0
1467
0
  uint32_t i = 0;
1468
0
  while (i < mQueuedData.Length()) {
1469
0
    // Careful! we may modify the array length from within the loop!
1470
0
    if (mQueuedData[i]->mStream == stream) {
1471
0
      LOG(("Delivering queued data for stream %u, length %u",
1472
0
           stream, mQueuedData[i]->mLength));
1473
0
      // Deliver the queued data
1474
0
      HandleDataMessage(mQueuedData[i]->mData, mQueuedData[i]->mLength,
1475
0
                        mQueuedData[i]->mPpid, mQueuedData[i]->mStream,
1476
0
                        mQueuedData[i]->mFlags);
1477
0
      mQueuedData.RemoveElementAt(i);
1478
0
      continue; // don't bump index since we removed the element
1479
0
    }
1480
0
    i++;
1481
0
  }
1482
0
}
1483
1484
// Caller must ensure that length <= SIZE_MAX
1485
void
1486
DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
1487
                                            uint32_t length, uint16_t stream)
1488
0
{
1489
0
  DataChannel *channel;
1490
0
1491
0
  mLock.AssertCurrentThreadOwns();
1492
0
1493
0
  channel = FindChannelByStream(stream);
1494
0
  if (NS_WARN_IF(!channel)) {
1495
0
    return;
1496
0
  }
1497
0
1498
0
  LOG(("OpenAck received for stream %u, waiting=%d", stream,
1499
0
       (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1500
0
1501
0
  channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1502
0
}
1503
1504
// Caller must ensure that length <= SIZE_MAX
1505
void
1506
DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
1507
0
{
1508
0
  /* XXX: Send an error message? */
1509
0
  LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid, length, stream));
1510
0
  // XXX Log to JS error console if possible
1511
0
}
1512
1513
uint8_t
1514
DataChannelConnection::BufferMessage(nsACString& recvBuffer, const void *data,
1515
                                     uint32_t length, uint32_t ppid, int flags)
1516
0
{
1517
0
  const char *buffer = (const char *) data;
1518
0
  uint8_t bufferFlags = 0;
1519
0
1520
0
  if ((flags & MSG_EOR) &&
1521
0
      ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
1522
0
      ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
1523
0
    bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
1524
0
1525
0
    // Return directly if nothing has been buffered
1526
0
    if (recvBuffer.IsEmpty()) {
1527
0
      return bufferFlags;
1528
0
    }
1529
0
  }
1530
0
1531
0
  // Ensure it doesn't blow up our buffer
1532
0
  // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the new buffer is capable
1533
0
  //       of holding.
1534
0
  if (((uint64_t) recvBuffer.Length()) + ((uint64_t) length) > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
1535
0
    bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
1536
0
    return bufferFlags;
1537
0
  }
1538
0
1539
0
  // Copy & add to receive buffer
1540
0
  recvBuffer.Append(buffer, length);
1541
0
  bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
1542
0
  return bufferFlags;
1543
0
}
1544
1545
void
1546
DataChannelConnection::HandleDataMessage(const void *data, size_t length, uint32_t ppid,
1547
                                         uint16_t stream, int flags)
1548
0
{
1549
0
  DataChannel *channel;
1550
0
  const char *buffer = (const char *) data;
1551
0
1552
0
  mLock.AssertCurrentThreadOwns();
1553
0
  channel = FindChannelByStream(stream);
1554
0
1555
0
  // Note: Until we support SIZE_MAX sized messages, we need this check
1556
0
#if (SIZE_MAX > UINT32_MAX)
1557
0
  if (length > UINT32_MAX) {
1558
0
    LOG(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 ")",
1559
0
         length, UINT32_MAX));
1560
0
    CloseInt(channel);
1561
0
    return;
1562
0
  }
1563
0
#endif
1564
0
  uint32_t data_length = (uint32_t)length;
1565
0
1566
0
  // XXX A closed channel may trip this... check
1567
0
  // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1568
0
  // That would make this code moot.  Keep it for now for backwards compatibility.
1569
0
  if (!channel) {
1570
0
    // In the updated 0-RTT open case, the sender can send data immediately
1571
0
    // after Open, and doesn't set the in-order bit (since we don't have a
1572
0
    // response or ack).  Also, with external negotiation, data can come in
1573
0
    // before we're told about the external negotiation.  We need to buffer
1574
0
    // data until either a) Open comes in, if the ordering get messed up,
1575
0
    // or b) the app tells us this channel was externally negotiated.  When
1576
0
    // these occur, we deliver the data.
1577
0
1578
0
    // Since this is rare and non-performance, keep a single list of queued
1579
0
    // data messages to deliver once the channel opens.
1580
0
    LOG(("Queuing data for stream %u, length %u", stream, data_length));
1581
0
    // Copies data
1582
0
    mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, flags, data, data_length));
1583
0
    return;
1584
0
  }
1585
0
1586
0
  // Ignore incoming data in case the channel is closed
1587
0
  if (channel->mState == CLOSED) {
1588
0
    return;
1589
0
  }
1590
0
1591
0
  bool is_binary = true;
1592
0
  uint8_t bufferFlags;
1593
0
  int32_t type;
1594
0
  const char* info = "";
1595
0
1596
0
  if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
1597
0
      ppid == DATA_CHANNEL_PPID_DOMSTRING) {
1598
0
    is_binary = false;
1599
0
  }
1600
0
  if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1601
0
    NS_WARNING("DataChannel message aborted by fragment type change!");
1602
0
    // TODO: Maybe closing would be better as this is a hard to detect protocol violation?
1603
0
    channel->mRecvBuffer.Truncate(0);
1604
0
  }
1605
0
  channel->mIsRecvBinary = is_binary;
1606
0
1607
0
  // Remaining chunks of previously truncated message (due to the buffer being full)?
1608
0
  if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
1609
0
    LOG(("DataChannel: Ignoring partial message of length %u, buffer full and closing",
1610
0
         data_length));
1611
0
    // Only unblock if unordered
1612
0
    if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) && (flags & MSG_EOR)) {
1613
0
      channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
1614
0
    }
1615
0
  }
1616
0
1617
0
  // Buffer message until complete
1618
0
  bufferFlags = BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
1619
0
  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
1620
0
    LOG(("DataChannel: Buffered message would become too large to handle, closing channel"));
1621
0
    channel->mRecvBuffer.Truncate(0);
1622
0
    channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
1623
0
    CloseInt(channel);
1624
0
    return;
1625
0
  }
1626
0
  if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
1627
0
    LOG(("DataChannel: Partial %s message of length %u (total %u) on channel id %u",
1628
0
         is_binary ? "binary" : "string", data_length, channel->mRecvBuffer.Length(),
1629
0
         channel->mStream));
1630
0
    return; // Not ready to notify application
1631
0
  }
1632
0
  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1633
0
    data_length = channel->mRecvBuffer.Length();
1634
0
  }
1635
0
1636
0
  // Complain about large messages (only complain - we can handle it)
1637
0
  if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
1638
0
    LOG(("DataChannel: Received message of length %u is > announced maximum message size (%u)",
1639
0
         data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
1640
0
  }
1641
0
1642
0
  switch (ppid) {
1643
0
    case DATA_CHANNEL_PPID_DOMSTRING:
1644
0
      LOG(("DataChannel: Received string message of length %u on channel %u",
1645
0
           data_length, channel->mStream));
1646
0
      type = DataChannelOnMessageAvailable::ON_DATA_STRING;
1647
0
      if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1648
0
        info = " (string fragmented)";
1649
0
      }
1650
0
      // else send using recvData normally
1651
0
1652
0
      // WebSockets checks IsUTF8() here; we can try to deliver it
1653
0
      break;
1654
0
1655
0
    case DATA_CHANNEL_PPID_BINARY:
1656
0
      LOG(("DataChannel: Received binary message of length %u on channel id %u",
1657
0
           data_length, channel->mStream));
1658
0
      type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
1659
0
      if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1660
0
        info = " (binary fragmented)";
1661
0
      }
1662
0
1663
0
      // else send using recvData normally
1664
0
      break;
1665
0
1666
0
    default:
1667
0
      NS_ERROR("Unknown data PPID");
1668
0
      return;
1669
0
  }
1670
0
1671
0
  // Notify onmessage
1672
0
  LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
1673
0
       (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING" : "BINARY",
1674
0
       info, channel));
1675
0
  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1676
0
    channel->SendOrQueue(new DataChannelOnMessageAvailable(
1677
0
                           type, this, channel, channel->mRecvBuffer));
1678
0
    channel->mRecvBuffer.Truncate(0);
1679
0
  } else {
1680
0
    nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
1681
0
    channel->SendOrQueue(new DataChannelOnMessageAvailable(
1682
0
                           type, this, channel, recvData));
1683
0
  }
1684
0
}
1685
1686
void
1687
DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length, uint32_t ppid,
1688
                                         uint16_t stream, int flags)
1689
0
{
1690
0
  const struct rtcweb_datachannel_open_request *req;
1691
0
  const struct rtcweb_datachannel_ack *ack;
1692
0
1693
0
  // Note: Until we support SIZE_MAX sized messages, we need this check
1694
0
#if (SIZE_MAX > UINT32_MAX)
1695
0
  if (length > UINT32_MAX) {
1696
0
    LOG(("DataChannel: Cannot handle message of size %zu (max=%u)", length, UINT32_MAX));
1697
0
    Stop();
1698
0
    return;
1699
0
  }
1700
0
#endif
1701
0
  uint32_t data_length = (uint32_t)length;
1702
0
1703
0
  mLock.AssertCurrentThreadOwns();
1704
0
1705
0
  // Buffer message until complete
1706
0
  const uint8_t bufferFlags = BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
1707
0
  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
1708
0
    LOG(("DataChannel: Buffered message would become too large to handle, closing connection"));
1709
0
    mRecvBuffer.Truncate(0);
1710
0
    Stop();
1711
0
    return;
1712
0
  }
1713
0
  if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
1714
0
    LOG(("Buffered partial DCEP message of length %u", data_length));
1715
0
    return;
1716
0
  }
1717
0
  if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1718
0
    buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
1719
0
    data_length = mRecvBuffer.Length();
1720
0
  }
1721
0
1722
0
  req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1723
0
  LOG(("Handling DCEP message of length %u", data_length));
1724
0
1725
0
  // Ensure minimum message size (ack is the smallest DCEP message)
1726
0
  if ((size_t)data_length < sizeof(*ack)) {
1727
0
    LOG(("Ignored invalid DCEP message (too short)"));
1728
0
    return;
1729
0
  }
1730
0
1731
0
  switch (req->msg_type) {
1732
0
    case DATA_CHANNEL_OPEN_REQUEST:
1733
0
      // structure includes a possibly-unused char label[1] (in a packed structure)
1734
0
      if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
1735
0
        return;
1736
0
      }
1737
0
1738
0
      HandleOpenRequestMessage(req, data_length, stream);
1739
0
      break;
1740
0
    case DATA_CHANNEL_ACK:
1741
0
      // >= sizeof(*ack) checked above
1742
0
1743
0
      ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1744
0
      HandleOpenAckMessage(ack, data_length, stream);
1745
0
      break;
1746
0
    default:
1747
0
      HandleUnknownMessage(ppid, data_length, stream);
1748
0
      break;
1749
0
  }
1750
0
1751
0
  // Reset buffer
1752
0
  mRecvBuffer.Truncate(0);
1753
0
}
1754
1755
// Called with mLock locked!
1756
void
1757
DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid,
1758
                                     uint16_t stream, int flags)
1759
0
{
1760
0
  mLock.AssertCurrentThreadOwns();
1761
0
1762
0
  switch (ppid) {
1763
0
    case DATA_CHANNEL_PPID_CONTROL:
1764
0
      HandleDCEPMessage(buffer, length, ppid, stream, flags);
1765
0
      break;
1766
0
    case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
1767
0
    case DATA_CHANNEL_PPID_DOMSTRING:
1768
0
    case DATA_CHANNEL_PPID_BINARY_PARTIAL:
1769
0
    case DATA_CHANNEL_PPID_BINARY:
1770
0
      HandleDataMessage(buffer, length, ppid, stream, flags);
1771
0
      break;
1772
0
    default:
1773
0
      LOG(("Message of length %zu PPID %u on stream %u received (%s).",
1774
0
           length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
1775
0
      break;
1776
0
  }
1777
0
}
1778
1779
void
1780
DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
1781
0
{
1782
0
  uint32_t i, n;
1783
0
1784
0
  switch (sac->sac_state) {
1785
0
  case SCTP_COMM_UP:
1786
0
    LOG(("Association change: SCTP_COMM_UP"));
1787
0
    if (mState == CONNECTING) {
1788
0
      mSocket = mMasterSocket;
1789
0
      mState = OPEN;
1790
0
1791
0
      // Check for older Firefox by looking at the amount of incoming streams
1792
0
      LOG(("Negotiated number of incoming streams: %" PRIu16, sac->sac_inbound_streams));
1793
0
      if (!mMaxMessageSizeSet
1794
0
          && sac->sac_inbound_streams == WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
1795
0
        LOG(("Older Firefox detected, using PPID-based fragmentation"));
1796
0
        mPpidFragmentation = true;
1797
0
      }
1798
0
1799
0
      SetEvenOdd();
1800
0
1801
0
      Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1802
0
                 DataChannelOnMessageAvailable::ON_CONNECTION,
1803
0
                 this)));
1804
0
      LOG(("DTLS connect() succeeded!  Entering connected mode"));
1805
0
1806
0
      // Open any streams pending...
1807
0
      ProcessQueuedOpens();
1808
0
1809
0
    } else if (mState == OPEN) {
1810
0
      LOG(("DataConnection Already OPEN"));
1811
0
    } else {
1812
0
      LOG(("Unexpected state: %d", mState));
1813
0
    }
1814
0
    break;
1815
0
  case SCTP_COMM_LOST:
1816
0
    LOG(("Association change: SCTP_COMM_LOST"));
1817
0
    // This association is toast, so also close all the channels -- from mainthread!
1818
0
    Stop();
1819
0
    break;
1820
0
  case SCTP_RESTART:
1821
0
    LOG(("Association change: SCTP_RESTART"));
1822
0
    break;
1823
0
  case SCTP_SHUTDOWN_COMP:
1824
0
    LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1825
0
    Stop();
1826
0
    break;
1827
0
  case SCTP_CANT_STR_ASSOC:
1828
0
    LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1829
0
    break;
1830
0
  default:
1831
0
    LOG(("Association change: UNKNOWN"));
1832
0
    break;
1833
0
  }
1834
0
  LOG(("Association change: streams (in/out) = (%u/%u)",
1835
0
       sac->sac_inbound_streams, sac->sac_outbound_streams));
1836
0
1837
0
  if (NS_WARN_IF(!sac)) {
1838
0
    return;
1839
0
  }
1840
0
1841
0
  n = sac->sac_length - sizeof(*sac);
1842
0
  if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
1843
0
    if (n > 0) {
1844
0
      for (i = 0; i < n; ++i) {
1845
0
        switch (sac->sac_info[i]) {
1846
0
        case SCTP_ASSOC_SUPPORTS_PR:
1847
0
          LOG(("Supports: PR"));
1848
0
          break;
1849
0
        case SCTP_ASSOC_SUPPORTS_AUTH:
1850
0
          LOG(("Supports: AUTH"));
1851
0
          break;
1852
0
        case SCTP_ASSOC_SUPPORTS_ASCONF:
1853
0
          LOG(("Supports: ASCONF"));
1854
0
          break;
1855
0
        case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1856
0
          LOG(("Supports: MULTIBUF"));
1857
0
          break;
1858
0
        case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1859
0
          LOG(("Supports: RE-CONFIG"));
1860
0
          break;
1861
#if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
1862
        case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
1863
          LOG(("Supports: NDATA"));
1864
          // TODO: This should probably be set earlier above in 'case SCTP_COMM_UP' but we also
1865
          //       need this for 'SCTP_RESTART'.
1866
          mSendInterleaved = true;
1867
          break;
1868
#endif
1869
0
        default:
1870
0
          LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1871
0
          break;
1872
0
        }
1873
0
      }
1874
0
    }
1875
0
  } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1876
0
              (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
1877
0
    LOG(("Association: ABORT ="));
1878
0
    for (i = 0; i < n; ++i) {
1879
0
      LOG((" 0x%02x", sac->sac_info[i]));
1880
0
    }
1881
0
  }
1882
0
  if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1883
0
      (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1884
0
      (sac->sac_state == SCTP_COMM_LOST)) {
1885
0
    return;
1886
0
  }
1887
0
}
1888
1889
void
1890
DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
1891
0
{
1892
0
  const char *addr = "";
1893
0
#if !defined(__Userspace_os_Windows)
1894
0
  char addr_buf[INET6_ADDRSTRLEN];
1895
0
  struct sockaddr_in *sin;
1896
0
  struct sockaddr_in6 *sin6;
1897
0
#endif
1898
0
1899
0
  switch (spc->spc_aaddr.ss_family) {
1900
0
  case AF_INET:
1901
0
#if !defined(__Userspace_os_Windows)
1902
0
    sin = (struct sockaddr_in *)&spc->spc_aaddr;
1903
0
    addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1904
0
#endif
1905
0
    break;
1906
0
  case AF_INET6:
1907
0
#if !defined(__Userspace_os_Windows)
1908
0
    sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1909
0
    addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1910
0
#endif
1911
0
    break;
1912
0
  case AF_CONN:
1913
0
    addr = "DTLS connection";
1914
0
    break;
1915
0
  default:
1916
0
    break;
1917
0
  }
1918
0
  LOG(("Peer address %s is now ", addr));
1919
0
  switch (spc->spc_state) {
1920
0
  case SCTP_ADDR_AVAILABLE:
1921
0
    LOG(("SCTP_ADDR_AVAILABLE"));
1922
0
    break;
1923
0
  case SCTP_ADDR_UNREACHABLE:
1924
0
    LOG(("SCTP_ADDR_UNREACHABLE"));
1925
0
    break;
1926
0
  case SCTP_ADDR_REMOVED:
1927
0
    LOG(("SCTP_ADDR_REMOVED"));
1928
0
    break;
1929
0
  case SCTP_ADDR_ADDED:
1930
0
    LOG(("SCTP_ADDR_ADDED"));
1931
0
    break;
1932
0
  case SCTP_ADDR_MADE_PRIM:
1933
0
    LOG(("SCTP_ADDR_MADE_PRIM"));
1934
0
    break;
1935
0
  case SCTP_ADDR_CONFIRMED:
1936
0
    LOG(("SCTP_ADDR_CONFIRMED"));
1937
0
    break;
1938
0
  default:
1939
0
    LOG(("UNKNOWN"));
1940
0
    break;
1941
0
  }
1942
0
  LOG((" (error = 0x%08x).\n", spc->spc_error));
1943
0
}
1944
1945
void
1946
DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
1947
0
{
1948
0
  size_t i, n;
1949
0
1950
0
  n = sre->sre_length - sizeof(struct sctp_remote_error);
1951
0
  LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1952
0
  for (i = 0; i < n; ++i) {
1953
0
    LOG((" 0x%02x", sre-> sre_data[i]));
1954
0
  }
1955
0
}
1956
1957
void
1958
DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
1959
0
{
1960
0
  LOG(("Shutdown event."));
1961
0
  /* XXX: notify all channels. */
1962
0
  // Attempts to actually send anything will fail
1963
0
}
1964
1965
void
1966
DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
1967
0
{
1968
0
  LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
1969
0
}
1970
1971
void
1972
DataChannelConnection::HandlePartialDeliveryEvent(const struct sctp_pdapi_event *spde)
1973
0
{
1974
0
  // Note: Be aware that stream and sequence number being u32 instead of u16 is
1975
0
  //       a bug in the SCTP API. This may change in the future.
1976
0
1977
0
  LOG(("Partial delivery event: "));
1978
0
  switch (spde->pdapi_indication) {
1979
0
    case SCTP_PARTIAL_DELIVERY_ABORTED:
1980
0
      LOG(("delivery aborted "));
1981
0
      break;
1982
0
    default:
1983
0
      LOG(("??? "));
1984
0
      break;
1985
0
  }
1986
0
  LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags, spde->pdapi_stream,
1987
0
       spde->pdapi_seq));
1988
0
1989
0
  // Validate stream ID
1990
0
  if (spde->pdapi_stream >= UINT16_MAX) {
1991
0
    LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n", spde->pdapi_stream));
1992
0
    return;
1993
0
  }
1994
0
1995
0
  // Find channel and reset buffer
1996
0
  DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
1997
0
  if (channel) {
1998
0
    LOG(("Abort partially delivered message of %u bytes\n", channel->mRecvBuffer.Length()));
1999
0
    channel->mRecvBuffer.Truncate(0);
2000
0
  }
2001
0
}
2002
2003
void
2004
DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
2005
0
{
2006
0
  size_t i, n;
2007
0
2008
0
  if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
2009
0
    LOG(("Unsent "));
2010
0
  }
2011
0
   if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
2012
0
    LOG(("Sent "));
2013
0
  }
2014
0
  if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
2015
0
    LOG(("(flags = %x) ", ssfe->ssfe_flags));
2016
0
  }
2017
0
  LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
2018
0
       ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
2019
0
       ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
2020
0
  n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
2021
0
  for (i = 0; i < n; ++i) {
2022
0
    LOG((" 0x%02x", ssfe->ssfe_data[i]));
2023
0
  }
2024
0
}
2025
2026
void
2027
DataChannelConnection::ClearResets()
2028
0
{
2029
0
  // Clear all pending resets
2030
0
  if (!mStreamsResetting.IsEmpty()) {
2031
0
    LOG(("Clearing resets for %zu streams", mStreamsResetting.Length()));
2032
0
  }
2033
0
2034
0
  for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
2035
0
    RefPtr<DataChannel> channel;
2036
0
    channel = FindChannelByStream(mStreamsResetting[i]);
2037
0
    if (channel) {
2038
0
      LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
2039
0
      mStreams[channel->mStream] = nullptr;
2040
0
    }
2041
0
  }
2042
0
  mStreamsResetting.Clear();
2043
0
}
2044
2045
void
2046
DataChannelConnection::ResetOutgoingStream(uint16_t stream)
2047
0
{
2048
0
  uint32_t i;
2049
0
2050
0
  mLock.AssertCurrentThreadOwns();
2051
0
  LOG(("Connection %p: Resetting outgoing stream %u",
2052
0
       (void *) this, stream));
2053
0
  // Rarely has more than a couple items and only for a short time
2054
0
  for (i = 0; i < mStreamsResetting.Length(); ++i) {
2055
0
    if (mStreamsResetting[i] == stream) {
2056
0
      return;
2057
0
    }
2058
0
  }
2059
0
  mStreamsResetting.AppendElement(stream);
2060
0
}
2061
2062
void
2063
DataChannelConnection::SendOutgoingStreamReset()
2064
0
{
2065
0
  struct sctp_reset_streams *srs;
2066
0
  uint32_t i;
2067
0
  size_t len;
2068
0
2069
0
  LOG(("Connection %p: Sending outgoing stream reset for %zu streams",
2070
0
       (void *) this, mStreamsResetting.Length()));
2071
0
  mLock.AssertCurrentThreadOwns();
2072
0
  if (mStreamsResetting.IsEmpty()) {
2073
0
    LOG(("No streams to reset"));
2074
0
    return;
2075
0
  }
2076
0
  len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
2077
0
  srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
2078
0
  memset(srs, 0, len);
2079
0
  srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
2080
0
  srs->srs_number_streams = mStreamsResetting.Length();
2081
0
  for (i = 0; i < mStreamsResetting.Length(); ++i) {
2082
0
    srs->srs_stream_list[i] = mStreamsResetting[i];
2083
0
  }
2084
0
  if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
2085
0
    LOG(("***failed: setsockopt RESET, errno %d", errno));
2086
0
    // if errno == EALREADY, this is normal - we can't send another reset
2087
0
    // with one pending.
2088
0
    // When we get an incoming reset (which may be a response to our
2089
0
    // outstanding one), see if we have any pending outgoing resets and
2090
0
    // send them
2091
0
  } else {
2092
0
    mStreamsResetting.Clear();
2093
0
  }
2094
0
  free(srs);
2095
0
}
2096
2097
void
2098
DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
2099
0
{
2100
0
  uint32_t n, i;
2101
0
  RefPtr<DataChannel> channel; // since we may null out the ref to the channel
2102
0
2103
0
  if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
2104
0
      !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
2105
0
    n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
2106
0
    for (i = 0; i < n; ++i) {
2107
0
      if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
2108
0
        channel = FindChannelByStream(strrst->strreset_stream_list[i]);
2109
0
        if (channel) {
2110
0
          // The other side closed the channel
2111
0
          // We could be in three states:
2112
0
          // 1. Normal state (input and output streams (OPEN)
2113
0
          //    Notify application, send a RESET in response on our
2114
0
          //    outbound channel.  Go to CLOSED
2115
0
          // 2. We sent our own reset (CLOSING); either they crossed on the
2116
0
          //    wire, or this is a response to our Reset.
2117
0
          //    Go to CLOSED
2118
0
          // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
2119
0
          //    I believe this is impossible, as we don't have an input stream yet.
2120
0
2121
0
          LOG(("Incoming: Channel %u  closed, state %d",
2122
0
               channel->mStream, channel->mState));
2123
0
          ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
2124
0
                        channel->mState == DataChannel::CLOSING ||
2125
0
                        channel->mState == DataChannel::CONNECTING ||
2126
0
                        channel->mState == DataChannel::WAITING_TO_OPEN);
2127
0
          if (channel->mState == DataChannel::OPEN ||
2128
0
              channel->mState == DataChannel::WAITING_TO_OPEN) {
2129
0
            // Mark the stream for reset (the reset is sent below)
2130
0
            ResetOutgoingStream(channel->mStream);
2131
0
          }
2132
0
          mStreams[channel->mStream] = nullptr;
2133
0
2134
0
          LOG(("Disconnected DataChannel %p from connection %p",
2135
0
               (void *) channel.get(), (void *) channel->mConnection.get()));
2136
0
          // This sends ON_CHANNEL_CLOSED to mainthread
2137
0
          channel->StreamClosedLocked();
2138
0
        } else {
2139
0
          LOG(("Can't find incoming channel %d",i));
2140
0
        }
2141
0
      }
2142
0
    }
2143
0
  }
2144
0
2145
0
  // Process any pending resets now:
2146
0
  if (!mStreamsResetting.IsEmpty()) {
2147
0
    LOG(("Sending %zu pending resets", mStreamsResetting.Length()));
2148
0
    SendOutgoingStreamReset();
2149
0
  }
2150
0
}
2151
2152
void
2153
DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
2154
0
{
2155
0
  uint16_t stream;
2156
0
  RefPtr<DataChannel> channel;
2157
0
2158
0
  if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
2159
0
    LOG(("*** Failed increasing number of streams from %zu (%u/%u)",
2160
0
         mStreams.Length(),
2161
0
         strchg->strchange_instrms,
2162
0
         strchg->strchange_outstrms));
2163
0
    // XXX FIX! notify pending opens of failure
2164
0
    return;
2165
0
  }
2166
0
  if (strchg->strchange_instrms > mStreams.Length()) {
2167
0
    LOG(("Other side increased streams from %zu to %u",
2168
0
         mStreams.Length(), strchg->strchange_instrms));
2169
0
  }
2170
0
  if (strchg->strchange_outstrms > mStreams.Length() ||
2171
0
      strchg->strchange_instrms > mStreams.Length()) {
2172
0
    uint16_t old_len = mStreams.Length();
2173
0
    uint16_t new_len = std::max(strchg->strchange_outstrms,
2174
0
                                strchg->strchange_instrms);
2175
0
    LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
2176
0
         old_len, new_len, new_len - old_len,
2177
0
         strchg->strchange_instrms));
2178
0
    // make sure both are the same length
2179
0
    mStreams.AppendElements(new_len - old_len);
2180
0
    LOG(("New length = %zu (was %d)", mStreams.Length(), old_len));
2181
0
    for (size_t i = old_len; i < mStreams.Length(); ++i) {
2182
0
      mStreams[i] = nullptr;
2183
0
    }
2184
0
    // Re-process any channels waiting for streams.
2185
0
    // Linear search, but we don't increase channels often and
2186
0
    // the array would only get long in case of an app error normally
2187
0
2188
0
    // Make sure we request enough streams if there's a big jump in streams
2189
0
    // Could make a more complex API for OpenXxxFinish() and avoid this loop
2190
0
    size_t num_needed = mPending.GetSize();
2191
0
    LOG(("%zu of %d new streams already needed", num_needed,
2192
0
         new_len - old_len));
2193
0
    num_needed -= (new_len - old_len); // number we added
2194
0
    if (num_needed > 0) {
2195
0
      if (num_needed < 16)
2196
0
        num_needed = 16;
2197
0
      LOG(("Not enough new streams, asking for %zu more", num_needed));
2198
0
      // TODO: parameter is an int32_t but we pass size_t
2199
0
      RequestMoreStreams(num_needed);
2200
0
    } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
2201
0
      LOG(("Requesting %d output streams to match partner",
2202
0
           strchg->strchange_instrms - strchg->strchange_outstrms));
2203
0
      RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
2204
0
    }
2205
0
2206
0
    ProcessQueuedOpens();
2207
0
  }
2208
0
  // else probably not a change in # of streams
2209
0
2210
0
  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2211
0
    channel = mStreams[i];
2212
0
    if (!channel)
2213
0
      continue;
2214
0
2215
0
    if ((channel->mState == CONNECTING) &&
2216
0
        (channel->mStream == INVALID_STREAM)) {
2217
0
      if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
2218
0
          (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
2219
0
        /* XXX: Signal to the other end. */
2220
0
        channel->mState = CLOSED;
2221
0
        Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2222
0
                   DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2223
0
                   channel)));
2224
0
        // maybe fire onError (bug 843625)
2225
0
      } else {
2226
0
        stream = FindFreeStream();
2227
0
        if (stream != INVALID_STREAM) {
2228
0
          channel->mStream = stream;
2229
0
          mStreams[stream] = channel;
2230
0
2231
0
          // Send open request
2232
0
          int error = SendOpenRequestMessage(
2233
0
              channel->mLabel, channel->mProtocol, channel->mStream,
2234
0
              !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
2235
0
              channel->mPrValue);
2236
0
          if (error) {
2237
0
            LOG(("SendOpenRequest failed, error = %d", error));
2238
0
            // Close the channel, inform the user
2239
0
            mStreams[channel->mStream] = nullptr;
2240
0
            channel->mState = CLOSED;
2241
0
            // Don't need to reset; we didn't open it
2242
0
            Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2243
0
                       DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2244
0
                       channel)));
2245
0
          } else {
2246
0
            channel->mState = OPEN;
2247
0
            channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2248
0
            LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2249
0
            Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2250
0
                       DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2251
0
                       channel)));
2252
0
          }
2253
0
        } else {
2254
0
          /* We will not find more ... */
2255
0
          break;
2256
0
        }
2257
0
      }
2258
0
    }
2259
0
  }
2260
0
}
2261
2262
// Called with mLock locked!
2263
void
2264
DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
2265
0
{
2266
0
  mLock.AssertCurrentThreadOwns();
2267
0
  if (notif->sn_header.sn_length != (uint32_t)n) {
2268
0
    return;
2269
0
  }
2270
0
  switch (notif->sn_header.sn_type) {
2271
0
  case SCTP_ASSOC_CHANGE:
2272
0
    HandleAssociationChangeEvent(&(notif->sn_assoc_change));
2273
0
    break;
2274
0
  case SCTP_PEER_ADDR_CHANGE:
2275
0
    HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
2276
0
    break;
2277
0
  case SCTP_REMOTE_ERROR:
2278
0
    HandleRemoteErrorEvent(&(notif->sn_remote_error));
2279
0
    break;
2280
0
  case SCTP_SHUTDOWN_EVENT:
2281
0
    HandleShutdownEvent(&(notif->sn_shutdown_event));
2282
0
    break;
2283
0
  case SCTP_ADAPTATION_INDICATION:
2284
0
    HandleAdaptationIndication(&(notif->sn_adaptation_event));
2285
0
    break;
2286
0
  case SCTP_AUTHENTICATION_EVENT:
2287
0
    LOG(("SCTP_AUTHENTICATION_EVENT"));
2288
0
    break;
2289
0
  case SCTP_SENDER_DRY_EVENT:
2290
0
    //LOG(("SCTP_SENDER_DRY_EVENT"));
2291
0
    break;
2292
0
  case SCTP_NOTIFICATIONS_STOPPED_EVENT:
2293
0
    LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
2294
0
    break;
2295
0
  case SCTP_PARTIAL_DELIVERY_EVENT:
2296
0
    HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
2297
0
    break;
2298
0
  case SCTP_SEND_FAILED_EVENT:
2299
0
    HandleSendFailedEvent(&(notif->sn_send_failed_event));
2300
0
    break;
2301
0
  case SCTP_STREAM_RESET_EVENT:
2302
0
    HandleStreamResetEvent(&(notif->sn_strreset_event));
2303
0
    break;
2304
0
  case SCTP_ASSOC_RESET_EVENT:
2305
0
    LOG(("SCTP_ASSOC_RESET_EVENT"));
2306
0
    break;
2307
0
  case SCTP_STREAM_CHANGE_EVENT:
2308
0
    HandleStreamChangeEvent(&(notif->sn_strchange_event));
2309
0
    break;
2310
0
  default:
2311
0
    LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
2312
0
    break;
2313
0
   }
2314
0
 }
2315
2316
int
2317
DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
2318
                                       struct sctp_rcvinfo rcv, int flags)
2319
0
{
2320
0
  ASSERT_WEBRTC(!NS_IsMainThread());
2321
0
2322
0
  if (!data) {
2323
0
    LOG(("ReceiveCallback: SCTP has finished shutting down"));
2324
0
  } else {
2325
0
    mLock.AssertCurrentThreadOwns();
2326
0
    if (flags & MSG_NOTIFICATION) {
2327
0
      HandleNotification(static_cast<union sctp_notification *>(data), datalen);
2328
0
    } else {
2329
0
      HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
2330
0
    }
2331
0
  }
2332
0
  // sctp allocates 'data' with malloc(), and expects the receiver to free
2333
0
  // it (presumably with free).
2334
0
  // XXX future optimization: try to deliver messages without an internal
2335
0
  // alloc/copy, and if so delay the free until later.
2336
0
  free(data);
2337
0
  // usrsctp defines the callback as returning an int, but doesn't use it
2338
0
  return 1;
2339
0
}
2340
2341
already_AddRefed<DataChannel>
2342
DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
2343
                            Type type, bool inOrder,
2344
                            uint32_t prValue, DataChannelListener *aListener,
2345
                            nsISupports *aContext, bool aExternalNegotiated,
2346
                            uint16_t aStream)
2347
0
{
2348
0
  // aStream == INVALID_STREAM to have the protocol allocate
2349
0
  uint16_t prPolicy = SCTP_PR_SCTP_NONE;
2350
0
  uint32_t flags;
2351
0
2352
0
  LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
2353
0
       PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
2354
0
       type, inOrder, prValue, aListener, aContext,
2355
0
       aExternalNegotiated ? "true" : "false", aStream));
2356
0
  switch (type) {
2357
0
    case DATA_CHANNEL_RELIABLE:
2358
0
      prPolicy = SCTP_PR_SCTP_NONE;
2359
0
      break;
2360
0
    case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
2361
0
      prPolicy = SCTP_PR_SCTP_RTX;
2362
0
      break;
2363
0
    case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
2364
0
      prPolicy = SCTP_PR_SCTP_TTL;
2365
0
      break;
2366
0
    default:
2367
0
      LOG(("ERROR: unsupported channel type: %u", type));
2368
0
      MOZ_ASSERT(false);
2369
0
      return nullptr;
2370
0
  }
2371
0
  if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
2372
0
    return nullptr;
2373
0
  }
2374
0
2375
0
  // Don't look past currently-negotiated streams
2376
0
  if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
2377
0
    LOG(("ERROR: external negotiation of already-open channel %u", aStream));
2378
0
    // XXX How do we indicate this up to the application?  Probably the
2379
0
    // caller's job, but we may need to return an error code.
2380
0
    return nullptr;
2381
0
  }
2382
0
2383
0
  flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
2384
0
2385
0
  RefPtr<DataChannel> channel(new DataChannel(this,
2386
0
                                                aStream,
2387
0
                                                DataChannel::CONNECTING,
2388
0
                                                label, protocol,
2389
0
                                                prPolicy, prValue,
2390
0
                                                flags,
2391
0
                                                aListener, aContext));
2392
0
  if (aExternalNegotiated) {
2393
0
    channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
2394
0
  }
2395
0
2396
0
  MutexAutoLock lock(mLock); // OpenFinish assumes this
2397
0
  return OpenFinish(channel.forget());
2398
0
}
2399
2400
// Separate routine so we can also call it to finish up from pending opens
2401
already_AddRefed<DataChannel>
2402
DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
2403
0
{
2404
0
  RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
2405
0
  // Normally 1 reference if called from ::Open(), or 2 if called from
2406
0
  // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2407
0
  uint16_t stream = channel->mStream;
2408
0
  bool queue = false;
2409
0
2410
0
  mLock.AssertCurrentThreadOwns();
2411
0
2412
0
  // Cases we care about:
2413
0
  // Pre-negotiated:
2414
0
  //    Not Open:
2415
0
  //      Doesn't fit:
2416
0
  //         -> change initial ask or renegotiate after open
2417
0
  //      -> queue open
2418
0
  //    Open:
2419
0
  //      Doesn't fit:
2420
0
  //         -> RequestMoreStreams && queue
2421
0
  //      Does fit:
2422
0
  //         -> open
2423
0
  // Not negotiated:
2424
0
  //    Not Open:
2425
0
  //      -> queue open
2426
0
  //    Open:
2427
0
  //      -> Try to get a stream
2428
0
  //      Doesn't fit:
2429
0
  //         -> RequestMoreStreams && queue
2430
0
  //      Does fit:
2431
0
  //         -> open
2432
0
  // So the Open cases are basically the same
2433
0
  // Not Open cases are simply queue for non-negotiated, and
2434
0
  // either change the initial ask or possibly renegotiate after open.
2435
0
2436
0
  if (mState == OPEN) {
2437
0
    if (stream == INVALID_STREAM) {
2438
0
      stream = FindFreeStream(); // may be INVALID_STREAM if we need more
2439
0
    }
2440
0
    if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2441
0
      // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
2442
0
      // to avoid going back immediately for more if the ask to N, N+1, etc
2443
0
      int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
2444
0
                            (stream-((int32_t)mStreams.Length())) + 16;
2445
0
      if (!RequestMoreStreams(more_needed)) {
2446
0
        // Something bad happened... we're done
2447
0
        goto request_error_cleanup;
2448
0
      }
2449
0
      queue = true;
2450
0
    }
2451
0
  } else {
2452
0
    // not OPEN
2453
0
    if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2454
0
        mState == CLOSED) {
2455
0
      // Update number of streams for init message
2456
0
      struct sctp_initmsg initmsg;
2457
0
      socklen_t len = sizeof(initmsg);
2458
0
      int32_t total_needed = stream+16;
2459
0
2460
0
      memset(&initmsg, 0, sizeof(initmsg));
2461
0
      if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
2462
0
        LOG(("*** failed getsockopt SCTP_INITMSG"));
2463
0
        goto request_error_cleanup;
2464
0
      }
2465
0
      LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2466
0
           initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2467
0
      initmsg.sinit_num_ostreams  = total_needed;
2468
0
      initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2469
0
      if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
2470
0
                             (socklen_t)sizeof(initmsg)) < 0) {
2471
0
        LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2472
0
        goto request_error_cleanup;
2473
0
      }
2474
0
2475
0
      int32_t old_len = mStreams.Length();
2476
0
      mStreams.AppendElements(total_needed - old_len);
2477
0
      for (int32_t i = old_len; i < total_needed; ++i) {
2478
0
        mStreams[i] = nullptr;
2479
0
      }
2480
0
    }
2481
0
    // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2482
0
    // is called, if needed
2483
0
    queue = true;
2484
0
  }
2485
0
  if (queue) {
2486
0
    LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2487
0
    // Also serves to mark we told the app
2488
0
    channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2489
0
    // we need a ref for the nsDeQue and one to return
2490
0
    DataChannel* rawChannel = channel;
2491
0
    rawChannel->AddRef();
2492
0
    mPending.Push(rawChannel);
2493
0
    return channel.forget();
2494
0
  }
2495
0
2496
0
  MOZ_ASSERT(stream != INVALID_STREAM);
2497
0
  // just allocated (& OPEN), or externally negotiated
2498
0
  mStreams[stream] = channel; // holds a reference
2499
0
  channel->mStream = stream;
2500
0
2501
#ifdef TEST_QUEUED_DATA
2502
  // It's painful to write a test for this...
2503
  channel->mState = OPEN;
2504
  channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2505
  SendDataMsgInternalOrBuffer(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING);
2506
#endif
2507
2508
0
  if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2509
0
    // Don't send unordered until this gets cleared
2510
0
    channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2511
0
  }
2512
0
2513
0
  if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2514
0
    int error = SendOpenRequestMessage(
2515
0
        channel->mLabel, channel->mProtocol, stream,
2516
0
        !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED), channel->mPrPolicy,
2517
0
        channel->mPrValue);
2518
0
    if (error) {
2519
0
      LOG(("SendOpenRequest failed, error = %d", error));
2520
0
      if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2521
0
        // We already returned the channel to the app.
2522
0
        NS_ERROR("Failed to send open request");
2523
0
        Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2524
0
                   DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2525
0
                   channel)));
2526
0
      }
2527
0
      // If we haven't returned the channel yet, it will get destroyed when we exit
2528
0
      // this function.
2529
0
      mStreams[stream] = nullptr;
2530
0
      channel->mStream = INVALID_STREAM;
2531
0
      // we'll be destroying the channel
2532
0
      channel->mState = CLOSED;
2533
0
      return nullptr;
2534
0
      /* NOTREACHED */
2535
0
    }
2536
0
  }
2537
0
  // Either externally negotiated or we sent Open
2538
0
  channel->mState = OPEN;
2539
0
  channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2540
0
  // FIX?  Move into DOMDataChannel?  I don't think we can send it yet here
2541
0
  LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2542
0
  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2543
0
             DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2544
0
             channel)));
2545
0
2546
0
  return channel.forget();
2547
0
2548
0
request_error_cleanup:
2549
0
  channel->mState = CLOSED;
2550
0
  if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2551
0
    // We already returned the channel to the app.
2552
0
    NS_ERROR("Failed to request more streams");
2553
0
    Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2554
0
               DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2555
0
               channel)));
2556
0
    return channel.forget();
2557
0
  }
2558
0
  // we'll be destroying the channel, but it never really got set up
2559
0
  // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2560
0
  // Dispatch it to ourselves
2561
0
  return nullptr;
2562
0
}
2563
2564
// Requires mLock to be locked!
2565
// Returns a POSIX error code directly instead of setting errno.
2566
int
2567
DataChannelConnection::SendMsgInternal(OutgoingMsg &msg)
2568
0
{
2569
0
  auto &info = msg.GetInfo().sendv_sndinfo;
2570
0
  int error;
2571
0
2572
0
  // EOR set?
2573
0
  bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
2574
0
2575
0
  // Send until buffer is empty
2576
0
  size_t left = msg.GetLeft();
2577
0
  do {
2578
0
    size_t length;
2579
0
2580
0
    // Carefully chunk the buffer
2581
0
    if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
2582
0
      length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
2583
0
2584
0
      // Unset EOR flag
2585
0
      info.snd_flags &= ~SCTP_EOR;
2586
0
    } else {
2587
0
      length = left;
2588
0
2589
0
      // Set EOR flag
2590
0
      if (eor_set) {
2591
0
        info.snd_flags |= SCTP_EOR;
2592
0
      }
2593
0
    }
2594
0
2595
0
    // Send (or try at least)
2596
0
    // SCTP will return EMSGSIZE if the message is bigger than the buffer
2597
0
    // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
2598
0
    // by carefully crafting small enough message chunks.
2599
0
    ssize_t written = usrsctp_sendv(
2600
0
        mSocket, msg.GetData(), length, nullptr, 0,
2601
0
        (void *)&msg.GetInfo(), (socklen_t)sizeof(struct sctp_sendv_spa),
2602
0
        SCTP_SENDV_SPA, 0);
2603
0
    if (written < 0) {
2604
0
      error = errno;
2605
0
      goto out;
2606
0
    }
2607
0
    LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)",
2608
0
         (size_t)written, length, left - (size_t)written));
2609
0
2610
0
    // TODO: Remove once resolved (https://github.com/sctplab/usrsctp/issues/132)
2611
0
    if (written == 0) {
2612
0
      LOG(("@tuexen: usrsctp_sendv returned 0"));
2613
0
      error = EAGAIN;
2614
0
      goto out;
2615
0
    }
2616
0
2617
0
    // If not all bytes have been written, this obviously means that usrsctp's buffer is full
2618
0
    // and we need to try again later.
2619
0
    if ((size_t)written < length) {
2620
0
      msg.Advance((size_t)written);
2621
0
      error = EAGAIN;
2622
0
      goto out;
2623
0
    }
2624
0
2625
0
    // Update buffer position
2626
0
    msg.Advance((size_t)written);
2627
0
2628
0
    // Get amount of bytes left in the buffer
2629
0
    left = msg.GetLeft();
2630
0
  } while (left > 0);
2631
0
2632
0
  // Done
2633
0
  error = 0;
2634
0
2635
0
out:
2636
0
  // Reset EOR flag
2637
0
  if (eor_set) {
2638
0
    info.snd_flags |= SCTP_EOR;
2639
0
  }
2640
0
2641
0
  return error;
2642
0
}
2643
2644
// Requires mLock to be locked!
2645
// Returns a POSIX error code directly instead of setting errno.
2646
// IMPORTANT: Ensure that the buffer passed is guarded by mLock!
2647
int
2648
DataChannelConnection::SendMsgInternalOrBuffer(nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer,
2649
                                               OutgoingMsg &msg, bool &buffered)
2650
0
{
2651
0
  NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
2652
0
2653
0
  int error = 0;
2654
0
  bool need_buffering = false;
2655
0
2656
0
  // Note: Main-thread IO, but doesn't block!
2657
0
  // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
2658
0
  // (more than the buffersize) queue data onto another thread to do the
2659
0
  // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
2660
0
2661
0
  // Avoid a race between buffer-full-failure (where we have to add the
2662
0
  // packet to the buffered-data queue) and the buffer-now-only-half-full
2663
0
  // callback, which happens on a different thread.  Otherwise we might
2664
0
  // fail here, then before we add it to the queue get the half-full
2665
0
  // callback, find nothing to do, then on this thread add it to the
2666
0
  // queue - which would sit there.  Also, if we later send more data, it
2667
0
  // would arrive ahead of the buffered message, but if the buffer ever
2668
0
  // got to 1/2 full, the message would get sent - but at a semi-random
2669
0
  // time, after other data it was supposed to be in front of.
2670
0
2671
0
  // Must lock before empty check for similar reasons!
2672
0
  mLock.AssertCurrentThreadOwns();
2673
0
  if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
2674
0
    error = SendMsgInternal(msg);
2675
0
    switch (error) {
2676
0
      case 0:
2677
0
        break;
2678
0
      case EAGAIN:
2679
#if (EAGAIN != EWOULDBLOCK)
2680
      case EWOULDBLOCK:
2681
#endif
2682
        need_buffering = true;
2683
0
        break;
2684
0
      default:
2685
0
        LOG(("error %d on sending", error));
2686
0
        break;
2687
0
    }
2688
0
  } else {
2689
0
    need_buffering = true;
2690
0
  }
2691
0
2692
0
  if (need_buffering) {
2693
0
    // queue data for resend!  And queue any further data for the stream until it is...
2694
0
    auto *bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
2695
0
    buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
2696
0
    LOG(("Queued %zu buffers (left=%zu, total=%zu)",
2697
0
         buffer.Length(), msg.GetLeft(), msg.GetLength()));
2698
0
    buffered = true;
2699
0
    return 0;
2700
0
  }
2701
0
2702
0
  buffered = false;
2703
0
  return error;
2704
0
}
2705
2706
// Caller must ensure that length <= UINT32_MAX
2707
// Returns a POSIX error code.
2708
int
2709
DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel, const uint8_t *data,
2710
                                                   size_t len, uint32_t ppid)
2711
0
{
2712
0
  if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
2713
0
    return EINVAL; // TODO: Find a better error code
2714
0
  }
2715
0
2716
0
  struct sctp_sendv_spa info = {0};
2717
0
2718
0
  // General flags
2719
0
  info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2720
0
2721
0
  // Set stream identifier, protocol identifier and flags
2722
0
  info.sendv_sndinfo.snd_sid = channel.mStream;
2723
0
  info.sendv_sndinfo.snd_flags = SCTP_EOR;
2724
0
  info.sendv_sndinfo.snd_ppid = htonl(ppid);
2725
0
2726
0
  // Unordered?
2727
0
  // To avoid problems where an in-order OPEN is lost and an
2728
0
  // out-of-order data message "beats" it, require data to be in-order
2729
0
  // until we get an ACK.
2730
0
  if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2731
0
      !(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2732
0
    info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
2733
0
  }
2734
0
2735
0
  // Partial reliability policy
2736
0
  if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
2737
0
    info.sendv_prinfo.pr_policy = channel.mPrPolicy;
2738
0
    info.sendv_prinfo.pr_value = channel.mPrValue;
2739
0
    info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2740
0
  }
2741
0
2742
0
  // Create message instance and send
2743
0
  OutgoingMsg msg(info, data, len);
2744
0
  MutexAutoLock lock(mLock);
2745
0
  bool buffered;
2746
0
  int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
2747
0
2748
0
  // Set pending type and stream index (if buffered)
2749
0
  if (!error && buffered && !mPendingType) {
2750
0
    mPendingType = PENDING_DATA;
2751
0
    mCurrentStream = channel.mStream;
2752
0
  }
2753
0
  return error;
2754
0
}
2755
2756
// Caller must ensure that length <= UINT32_MAX
2757
// Returns a POSIX error code.
2758
int
2759
DataChannelConnection::SendDataMsg(DataChannel &channel, const uint8_t *data, size_t len,
2760
                                   uint32_t ppidPartial, uint32_t ppidFinal)
2761
0
{
2762
0
  // We *really* don't want to do this from main thread! - and
2763
0
  // SendDataMsgInternalOrBuffer avoids blocking.
2764
0
2765
0
  if (mPpidFragmentation) {
2766
0
    // TODO: Bug 1381136, remove this block and all other code that uses PPIDs for fragmentation
2767
0
    //       and reassembly once older Firefoxes without EOR are no longer supported as target
2768
0
    //       clients.
2769
0
2770
0
    // Use the deprecated PPID-level fragmentation if enabled. Should be enabled
2771
0
    // in case we can be certain that the other peer is an older Firefox browser
2772
0
    // that does support PPID-level fragmentation/reassembly.
2773
0
2774
0
    // PPID-level fragmentation can only be applied on reliable data channels.
2775
0
    if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2776
0
        channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
2777
0
        !(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2778
0
      LOG(("Sending data message (total=%zu) using deprecated PPID-based chunks", len));
2779
0
2780
0
      size_t left = len;
2781
0
      while (left > 0) {
2782
0
        // Note: For correctness, chunkLen should also consider mMaxMessageSize as minimum but as
2783
0
        //       this block is going to be removed soon, I see no need for it.
2784
0
        size_t chunkLen = std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2785
0
        left -= chunkLen;
2786
0
        uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
2787
0
2788
0
        // Send the chunk
2789
0
        // Note that these might end up being deferred and queued.
2790
0
        LOG(("Send chunk (len=%zu, left=%zu, total=%zu, ppid %u",
2791
0
             chunkLen, left, len, ppid));
2792
0
        int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
2793
0
        if (error) {
2794
0
          LOG(("*** send chunk fail %d", error));
2795
0
          return error;
2796
0
        }
2797
0
2798
0
        // Update data position
2799
0
        data += chunkLen;
2800
0
      }
2801
0
2802
0
      // Sending chunks complete
2803
0
      LOG(("Sent %zu chunks using deprecated PPID-based fragmentation",
2804
0
           (size_t)(len+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT));
2805
0
      return 0;
2806
0
    }
2807
0
2808
0
    // Cannot do PPID-based fragmentaton on unreliable channels
2809
0
    NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2810
0
                         "Sending too-large data on unreliable channel!");
2811
0
  } else {
2812
0
    if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
2813
0
      LOG(("Message rejected, too large (%zu > %" PRIu64 ")", len, mMaxMessageSize));
2814
0
      return EMSGSIZE;
2815
0
    }
2816
0
  }
2817
0
2818
0
  // This will use EOR-based fragmentation if the message is too large (> 64 KiB)
2819
0
  return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
2820
0
}
2821
2822
class ReadBlobRunnable : public Runnable {
2823
public:
2824
  ReadBlobRunnable(DataChannelConnection* aConnection,
2825
                   uint16_t aStream,
2826
                   nsIInputStream* aBlob)
2827
    : Runnable("ReadBlobRunnable")
2828
    , mConnection(aConnection)
2829
    , mStream(aStream)
2830
    , mBlob(aBlob)
2831
0
  {}
2832
2833
0
  NS_IMETHOD Run() override {
2834
0
    // ReadBlob() is responsible to releasing the reference
2835
0
    DataChannelConnection *self = mConnection;
2836
0
    self->ReadBlob(mConnection.forget(), mStream, mBlob);
2837
0
    return NS_OK;
2838
0
  }
2839
2840
private:
2841
  // Make sure the Connection doesn't die while there are jobs outstanding.
2842
  // Let it die (if released by PeerConnectionImpl while we're running)
2843
  // when we send our runnable back to MainThread.  Then ~DataChannelConnection
2844
  // can send the IOThread to MainThread to die in a runnable, avoiding
2845
  // unsafe event loop recursion.  Evil.
2846
  RefPtr<DataChannelConnection> mConnection;
2847
  uint16_t mStream;
2848
  // Use RefCount for preventing the object is deleted when SendBlob returns.
2849
  RefPtr<nsIInputStream> mBlob;
2850
};
2851
2852
// Returns a POSIX error code.
2853
int
2854
DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
2855
0
{
2856
0
  DataChannel *channel = mStreams[stream];
2857
0
  if (NS_WARN_IF(!channel)) {
2858
0
    return EINVAL; // TODO: Find a better error code
2859
0
  }
2860
0
2861
0
  // Spawn a thread to send the data
2862
0
  if (!mInternalIOThread) {
2863
0
    nsresult rv = NS_NewNamedThread("DataChannel IO",
2864
0
                                    getter_AddRefs(mInternalIOThread));
2865
0
    if (NS_FAILED(rv)) {
2866
0
      return EINVAL; // TODO: Find a better error code
2867
0
    }
2868
0
  }
2869
0
2870
0
  mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
2871
0
  return 0;
2872
0
}
2873
2874
class DataChannelBlobSendRunnable : public Runnable
2875
{
2876
public:
2877
  DataChannelBlobSendRunnable(
2878
    already_AddRefed<DataChannelConnection>& aConnection,
2879
    uint16_t aStream)
2880
    : Runnable("DataChannelBlobSendRunnable")
2881
    , mConnection(aConnection)
2882
    , mStream(aStream)
2883
0
  {}
2884
2885
  ~DataChannelBlobSendRunnable() override
2886
0
  {
2887
0
    if (!NS_IsMainThread() && mConnection) {
2888
0
      MOZ_ASSERT(false);
2889
0
      // explicitly leak the connection if destroyed off mainthread
2890
0
      Unused << mConnection.forget().take();
2891
0
    }
2892
0
  }
2893
2894
  NS_IMETHOD Run() override
2895
0
  {
2896
0
    ASSERT_WEBRTC(NS_IsMainThread());
2897
0
2898
0
    mConnection->SendBinaryMsg(mStream, mData);
2899
0
    mConnection = nullptr;
2900
0
    return NS_OK;
2901
0
  }
2902
2903
  // explicitly public so we can avoid allocating twice and copying
2904
  nsCString mData;
2905
2906
private:
2907
  // Note: we can be destroyed off the target thread, so be careful not to let this
2908
  // get Released()ed on the temp thread!
2909
  RefPtr<DataChannelConnection> mConnection;
2910
  uint16_t mStream;
2911
};
2912
2913
void
2914
DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
2915
                                uint16_t aStream, nsIInputStream* aBlob)
2916
0
{
2917
0
  // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2918
0
  // it off mainthread; if PeerConnectionImpl has released then we want
2919
0
  // ~DataChannelConnection() to run on MainThread
2920
0
2921
0
  // XXX to do this safely, we must enqueue these atomically onto the
2922
0
  // output socket.  We need a sender thread(s?) to enqueue data into the
2923
0
  // socket and to avoid main-thread IO that might block.  Even on a
2924
0
  // background thread, we may not want to block on one stream's data.
2925
0
  // I.e. run non-blocking and service multiple channels.
2926
0
2927
0
  // Must not let Dispatching it cause the DataChannelConnection to get
2928
0
  // released on the wrong thread.  Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
2929
0
  // will occasionally cause aThis to get released on this thread.  Also, an explicit Runnable
2930
0
  // lets us avoid copying the blob data an extra time.
2931
0
  RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
2932
0
                                                                                 aStream);
2933
0
  // avoid copying the blob data by passing the mData from the runnable
2934
0
  if (NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, -1))) {
2935
0
    // Bug 966602:  Doesn't return an error to the caller via onerror.
2936
0
    // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
2937
0
    // aThis is now owned by the runnable; release it there
2938
0
    NS_ReleaseOnMainThreadSystemGroup(
2939
0
      "DataChannelBlobSendRunnable", runnable.forget());
2940
0
    return;
2941
0
  }
2942
0
  aBlob->Close();
2943
0
  Dispatch(runnable.forget());
2944
0
}
2945
2946
void
2947
DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
2948
0
{
2949
0
  ASSERT_WEBRTC(NS_IsMainThread());
2950
0
  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2951
0
    if (mStreams[i]) {
2952
0
      aStreamList->push_back(mStreams[i]->mStream);
2953
0
    }
2954
0
  }
2955
0
}
2956
2957
// Returns a POSIX error code.
2958
int
2959
DataChannelConnection::SendDataMsgCommon(uint16_t stream, const nsACString &aMsg,
2960
                                         bool isBinary)
2961
0
{
2962
0
  ASSERT_WEBRTC(NS_IsMainThread());
2963
0
  // We really could allow this from other threads, so long as we deal with
2964
0
  // asynchronosity issues with channels closing, in particular access to
2965
0
  // mStreams, and issues with the association closing (access to mSocket).
2966
0
2967
0
  const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
2968
0
  uint32_t len     = aMsg.Length();
2969
#if (UINT32_MAX > SIZE_MAX)
2970
  if (len > SIZE_MAX) {
2971
    return EMSGSIZE;
2972
  }
2973
#endif
2974
  DataChannel *channelPtr;
2975
0
2976
0
  LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
2977
0
  // XXX if we want more efficiency, translate flags once at open time
2978
0
  channelPtr = mStreams[stream];
2979
0
  if (NS_WARN_IF(!channelPtr)) {
2980
0
    return EINVAL; // TODO: Find a better error code
2981
0
  }
2982
0
2983
0
  auto &channel = *channelPtr;
2984
0
2985
0
  if (isBinary) {
2986
0
    return SendDataMsg(channel, data, len,
2987
0
                       DATA_CHANNEL_PPID_BINARY_PARTIAL, DATA_CHANNEL_PPID_BINARY);
2988
0
  }
2989
0
  return SendDataMsg(channel, data, len,
2990
0
                     DATA_CHANNEL_PPID_DOMSTRING_PARTIAL, DATA_CHANNEL_PPID_DOMSTRING);
2991
0
}
2992
2993
void
2994
DataChannelConnection::Stop()
2995
0
{
2996
0
  // Note: This will call 'CloseAll' from the main thread
2997
0
  Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2998
0
             DataChannelOnMessageAvailable::ON_DISCONNECTED,
2999
0
             this)));
3000
0
}
3001
3002
void
3003
DataChannelConnection::Close(DataChannel *aChannel)
3004
0
{
3005
0
  MutexAutoLock lock(mLock);
3006
0
  CloseInt(aChannel);
3007
0
}
3008
3009
// So we can call Close() with the lock already held
3010
// Called from someone who holds a ref via ::Close(), or from ~DataChannel
3011
void
3012
DataChannelConnection::CloseInt(DataChannel *aChannel)
3013
0
{
3014
0
  MOZ_ASSERT(aChannel);
3015
0
  RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
3016
0
3017
0
  mLock.AssertCurrentThreadOwns();
3018
0
  LOG(("Connection %p/Channel %p: Closing stream %u",
3019
0
       channel->mConnection.get(), channel.get(), channel->mStream));
3020
0
  // re-test since it may have closed before the lock was grabbed
3021
0
  if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
3022
0
    LOG(("Channel already closing/closed (%u)", aChannel->mState));
3023
0
    if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
3024
0
      // called from CloseAll()
3025
0
      // we're not going to hang around waiting any more
3026
0
      mStreams[channel->mStream] = nullptr;
3027
0
    }
3028
0
    return;
3029
0
  }
3030
0
  aChannel->mBufferedData.Clear();
3031
0
  if (channel->mStream != INVALID_STREAM) {
3032
0
    ResetOutgoingStream(channel->mStream);
3033
0
    if (mState == CLOSED) { // called from CloseAll()
3034
0
      // Let resets accumulate then send all at once in CloseAll()
3035
0
      // we're not going to hang around waiting
3036
0
      mStreams[channel->mStream] = nullptr;
3037
0
    } else {
3038
0
      SendOutgoingStreamReset();
3039
0
    }
3040
0
  }
3041
0
  aChannel->mState = CLOSING;
3042
0
  if (mState == CLOSED) {
3043
0
    // we're not going to hang around waiting
3044
0
    channel->StreamClosedLocked();
3045
0
  }
3046
0
  // At this point when we leave here, the object is a zombie held alive only by the DOM object
3047
0
}
3048
3049
void DataChannelConnection::CloseAll()
3050
0
{
3051
0
  LOG(("Closing all channels (connection %p)", (void*) this));
3052
0
  // Don't need to lock here
3053
0
3054
0
  // Make sure no more channels will be opened
3055
0
  {
3056
0
    MutexAutoLock lock(mLock);
3057
0
    mState = CLOSED;
3058
0
  }
3059
0
3060
0
  // Close current channels
3061
0
  // If there are runnables, they hold a strong ref and keep the channel
3062
0
  // and/or connection alive (even if in a CLOSED state)
3063
0
  bool closed_some = false;
3064
0
  for (uint32_t i = 0; i < mStreams.Length(); ++i) {
3065
0
    if (mStreams[i]) {
3066
0
      mStreams[i]->Close();
3067
0
      closed_some = true;
3068
0
    }
3069
0
  }
3070
0
3071
0
  // Clean up any pending opens for channels
3072
0
  RefPtr<DataChannel> channel;
3073
0
  while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
3074
0
    LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
3075
0
    channel->Close(); // also releases the ref on each iteration
3076
0
    closed_some = true;
3077
0
  }
3078
0
  // It's more efficient to let the Resets queue in shutdown and then
3079
0
  // SendOutgoingStreamReset() here.
3080
0
  if (closed_some) {
3081
0
    MutexAutoLock lock(mLock);
3082
0
    SendOutgoingStreamReset();
3083
0
  }
3084
0
}
3085
3086
DataChannel::~DataChannel()
3087
0
{
3088
0
  // NS_ASSERTION since this is more "I think I caught all the cases that
3089
0
  // can cause this" than a true kill-the-program assertion.  If this is
3090
0
  // wrong, nothing bad happens.  A worst it's a leak.
3091
0
  NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
3092
0
}
3093
3094
void
3095
DataChannel::Close()
3096
0
{
3097
0
  if (mConnection) {
3098
0
    // ensure we don't get deleted
3099
0
    RefPtr<DataChannelConnection> connection(mConnection);
3100
0
    connection->Close(this);
3101
0
  }
3102
0
}
3103
3104
// Used when disconnecting from the DataChannelConnection
3105
void
3106
DataChannel::StreamClosedLocked()
3107
0
{
3108
0
  mConnection->mLock.AssertCurrentThreadOwns();
3109
0
  ENSURE_DATACONNECTION;
3110
0
3111
0
  LOG(("Destroying Data channel %u", mStream));
3112
0
  MOZ_ASSERT_IF(mStream != INVALID_STREAM,
3113
0
                !mConnection->FindChannelByStream(mStream));
3114
0
  mStream = INVALID_STREAM;
3115
0
  mState = CLOSED;
3116
0
  mMainThreadEventTarget->Dispatch(
3117
0
    do_AddRef(new DataChannelOnMessageAvailable(
3118
0
                DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
3119
0
                mConnection, this)));
3120
0
  // We leave mConnection live until the DOM releases us, to avoid races
3121
0
}
3122
3123
void
3124
DataChannel::ReleaseConnection()
3125
0
{
3126
0
  ASSERT_WEBRTC(NS_IsMainThread());
3127
0
  mConnection = nullptr;
3128
0
}
3129
3130
void
3131
DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
3132
0
{
3133
0
  MutexAutoLock mLock(mListenerLock);
3134
0
  mContext = aContext;
3135
0
  mListener = aListener;
3136
0
}
3137
3138
void
3139
DataChannel::SendErrnoToErrorResult(int error, ErrorResult& aRv)
3140
{
3141
  switch (error) {
3142
    case 0:
3143
      break;
3144
    case EMSGSIZE:
3145
      aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
3146
      break;
3147
    default:
3148
      aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
3149
      break;
3150
  }
3151
}
3152
3153
void
3154
DataChannel::SendMsg(const nsACString &aMsg, ErrorResult& aRv)
3155
0
{
3156
0
  if (!EnsureValidStream(aRv)) {
3157
0
    return;
3158
0
  }
3159
0
3160
0
  SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
3161
0
}
3162
3163
void
3164
DataChannel::SendBinaryMsg(const nsACString &aMsg, ErrorResult& aRv)
3165
0
{
3166
0
  if (!EnsureValidStream(aRv)) {
3167
0
    return;
3168
0
  }
3169
0
3170
0
  SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
3171
0
}
3172
3173
void
3174
DataChannel::SendBinaryStream(nsIInputStream *aBlob,ErrorResult& aRv)
3175
0
{
3176
0
  if (!EnsureValidStream(aRv)) {
3177
0
    return;
3178
0
  }
3179
0
3180
0
  SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
3181
0
}
3182
3183
dom::Nullable<uint16_t>
3184
DataChannel::GetMaxPacketLifeTime() const
3185
0
{
3186
0
  if (mPrPolicy == SCTP_PR_SCTP_TTL) {
3187
0
    return dom::Nullable<uint16_t>(mPrValue);
3188
0
  }
3189
0
  return dom::Nullable<uint16_t>();
3190
0
}
3191
3192
dom::Nullable<uint16_t>
3193
DataChannel::GetMaxRetransmits() const
3194
0
{
3195
0
  if (mPrPolicy == SCTP_PR_SCTP_RTX) {
3196
0
    return dom::Nullable<uint16_t>(mPrValue);
3197
0
  }
3198
0
  return dom::Nullable<uint16_t>();
3199
0
}
3200
3201
// May be called from another (i.e. Main) thread!
3202
void
3203
DataChannel::AppReady()
3204
0
{
3205
0
  ENSURE_DATACONNECTION;
3206
0
3207
0
  MutexAutoLock lock(mConnection->mLock);
3208
0
3209
0
  mFlags |= DATA_CHANNEL_FLAGS_READY;
3210
0
  if (mState == WAITING_TO_OPEN) {
3211
0
    mState = OPEN;
3212
0
    mMainThreadEventTarget->Dispatch(
3213
0
      do_AddRef(new DataChannelOnMessageAvailable(
3214
0
                  DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
3215
0
                  this)));
3216
0
    for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
3217
0
      nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
3218
0
      MOZ_ASSERT(runnable);
3219
0
      mMainThreadEventTarget->Dispatch(runnable.forget());
3220
0
    }
3221
0
  } else {
3222
0
    NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
3223
0
  }
3224
0
  mQueuedMessages.Clear();
3225
0
  mQueuedMessages.Compact();
3226
0
  // We never use it again...  We could even allocate the array in the odd
3227
0
  // cases we need it.
3228
0
}
3229
3230
size_t
3231
DataChannel::GetBufferedAmountLocked() const
3232
0
{
3233
0
  size_t buffered = 0;
3234
0
3235
0
  for (auto &msg : mBufferedData) {
3236
0
    buffered += msg->GetLeft();
3237
0
  }
3238
0
  // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
3239
0
  // amount from the SCTP stack for a single stream.  It is on their to-do
3240
0
  // list, and once we import a stack with support for that, we'll need to
3241
0
  // add it to what we buffer.  Also we'll need to ask for notification of a per-
3242
0
  // stream buffer-low event and merge that into the handling of buffer-low
3243
0
  // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
3244
0
3245
0
  return buffered;
3246
0
}
3247
3248
uint32_t
3249
DataChannel::GetBufferedAmountLowThreshold()
3250
0
{
3251
0
  return mBufferedThreshold;
3252
0
}
3253
3254
// Never fire immediately, as it's defined to fire on transitions, not state
3255
void
3256
DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
3257
0
{
3258
0
  mBufferedThreshold = aThreshold;
3259
0
}
3260
3261
// Called with mLock locked!
3262
void
3263
DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
3264
0
{
3265
0
  if (!(mFlags & DATA_CHANNEL_FLAGS_READY) &&
3266
0
      (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
3267
0
    mQueuedMessages.AppendElement(aMessage);
3268
0
  } else {
3269
0
    nsCOMPtr<nsIRunnable> runnable = aMessage;
3270
0
    mMainThreadEventTarget->Dispatch(runnable.forget());
3271
0
  }
3272
0
}
3273
3274
bool
3275
DataChannel::EnsureValidStream(ErrorResult& aRv)
3276
0
{
3277
0
  MOZ_ASSERT(mConnection);
3278
0
  if (mConnection && mStream != INVALID_STREAM) {
3279
0
    return true;
3280
0
  }
3281
0
  aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
3282
0
  return false;
3283
0
}
3284
3285
} // namespace mozilla