Coverage Report

Created: 2026-01-09 06:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/neqo/neqo-transport/src/path.rs
Line
Count
Source
1
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4
// option. This file may not be copied, modified, or distributed
5
// except according to those terms.
6
7
use std::{
8
    cell::RefCell,
9
    fmt::{self, Display},
10
    net::SocketAddr,
11
    num::NonZeroUsize,
12
    rc::Rc,
13
    time::{Duration, Instant},
14
};
15
16
use neqo_common::{
17
    hex, qdebug, qinfo, qlog::Qlog, qtrace, qwarn, Buffer, DatagramBatch, Encoder, Tos,
18
};
19
use neqo_crypto::random;
20
21
use crate::{
22
    ackrate::{AckRate, PeerAckDelay},
23
    cid::{ConnectionId, ConnectionIdRef, ConnectionIdStore, RemoteConnectionIdEntry},
24
    ecn,
25
    frame::{FrameEncoder as _, FrameType},
26
    packet,
27
    pmtud::Pmtud,
28
    recovery::{self, sent},
29
    rtt::{RttEstimate, RttSource},
30
    sender::PacketSender,
31
    stateless_reset::Token as Srt,
32
    stats::FrameStats,
33
    ConnectionParameters, Stats,
34
};
35
36
/// The maximum number of paths that `Paths` will track.
37
const MAX_PATHS: usize = 15;
38
39
pub type PathRef = Rc<RefCell<Path>>;
40
41
/// A collection for network paths.
42
/// This holds a collection of paths that have been used for sending or
43
/// receiving, plus an additional "temporary" path that is held only while
44
/// processing a packet.
45
/// This structure limits its storage and will forget about paths if it
46
/// is exposed to too many paths.
47
#[derive(Debug)]
48
pub struct Paths {
49
    /// All of the paths.  All of these paths will be permanent.
50
    #[expect(clippy::struct_field_names, reason = "This is the best name.")]
51
    paths: Vec<PathRef>,
52
    /// This is the primary path.  This will only be `None` initially, so
53
    /// care needs to be taken regarding that only during the handshake.
54
    /// This path will also be in `paths`.
55
    primary: Option<PathRef>,
56
57
    /// The path that we would prefer to migrate to.
58
    migration_target: Option<PathRef>,
59
60
    /// Connection IDs that need to be retired.
61
    to_retire: Vec<u64>,
62
63
    /// `QLog` handler.
64
    qlog: Qlog,
65
66
    /// Whether PMTUD is enabled for this connection.
67
    pmtud: bool,
68
}
69
70
impl Paths {
71
    #[must_use]
72
2.44k
    pub fn new(pmtud: bool) -> Self {
73
2.44k
        Self {
74
2.44k
            paths: Vec::new(),
75
2.44k
            primary: None,
76
2.44k
            migration_target: None,
77
2.44k
            to_retire: Vec::new(),
78
2.44k
            qlog: Qlog::disabled(),
79
2.44k
            pmtud,
80
2.44k
        }
81
2.44k
    }
82
83
    /// Find the path for the given addresses.
84
    /// This might be a temporary path.
85
1.22k
    pub fn find_path(
86
1.22k
        &self,
87
1.22k
        local: SocketAddr,
88
1.22k
        remote: SocketAddr,
89
1.22k
        conn_params: &ConnectionParameters,
90
1.22k
        now: Instant,
91
1.22k
        stats: &mut Stats,
92
1.22k
    ) -> PathRef {
93
1.22k
        self.paths
94
1.22k
            .iter()
95
1.22k
            .find_map(|p| p.borrow().received_on(local, remote).then(|| Rc::clone(p)))
96
1.22k
            .unwrap_or_else(|| {
97
1.22k
                let mut p =
98
1.22k
                    Path::temporary(local, remote, conn_params, self.qlog.clone(), now, stats);
99
1.22k
                if let Some(primary) = self.primary.as_ref() {
100
0
                    p.prime_rtt(primary.borrow().rtt());
101
1.22k
                }
102
1.22k
                Rc::new(RefCell::new(p))
103
1.22k
            })
104
1.22k
    }
105
106
    /// Get a reference to the primary path, if one exists.
107
3.86k
    pub fn primary(&self) -> Option<PathRef> {
108
3.86k
        self.primary.clone()
109
3.86k
    }
110
111
    /// Returns true if the path is not permanent.
112
1.66k
    pub fn is_temporary(&self, path: &PathRef) -> bool {
113
        // Ask the path first, which is simpler.
114
1.66k
        path.borrow().is_temporary() || !self.paths.iter().any(|p| Rc::ptr_eq(p, path))
115
1.66k
    }
116
117
0
    fn retire(to_retire: &mut Vec<u64>, retired: &PathRef) {
118
0
        if let Some(cid) = &retired.borrow().remote_cid {
119
0
            let seqno = cid.sequence_number();
120
0
            if cid.connection_id().is_empty() {
121
0
                qdebug!("Connection ID {seqno} is zero-length, not retiring");
122
0
            } else {
123
0
                to_retire.push(seqno);
124
0
            }
125
0
        }
126
0
    }
127
128
    /// Adopt a temporary path as permanent.
129
    /// The first path that is made permanent is made primary.
130
1.66k
    pub fn make_permanent(
131
1.66k
        &mut self,
132
1.66k
        path: &PathRef,
133
1.66k
        local_cid: Option<ConnectionId>,
134
1.66k
        remote_cid: RemoteConnectionIdEntry,
135
1.66k
        now: Instant,
136
1.66k
    ) {
137
1.66k
        debug_assert!(self.is_temporary(path));
138
139
        // Make sure not to track too many paths.
140
        // This protects index 0, which contains the primary path.
141
1.66k
        if self.paths.len() >= MAX_PATHS {
142
0
            debug_assert_eq!(self.paths.len(), MAX_PATHS);
143
0
            let removed = self.paths.remove(1);
144
0
            Self::retire(&mut self.to_retire, &removed);
145
0
            if self
146
0
                .migration_target
147
0
                .as_ref()
148
0
                .is_some_and(|target| Rc::ptr_eq(target, &removed))
149
            {
150
0
                qinfo!(
151
                    "[{}] The migration target path had to be removed",
152
0
                    path.borrow()
153
                );
154
0
                self.migration_target = None;
155
0
            }
156
0
            debug_assert_eq!(Rc::strong_count(&removed), 1);
157
1.66k
        }
158
159
1.66k
        qdebug!("[{}] Make permanent", path.borrow());
160
1.66k
        path.borrow_mut().make_permanent(local_cid, remote_cid);
161
1.66k
        self.paths.push(Rc::clone(path));
162
1.66k
        if self.primary.is_none() {
163
1.66k
            assert!(self.select_primary(path, now).is_none());
164
0
        }
165
1.66k
    }
166
167
    /// Select a path as the primary.  Returns the old primary path.
168
    /// Using the old path is only necessary if this change in path is a reaction
169
    /// to a migration from a peer, in which case the old path needs to be probed.
170
    #[must_use]
171
1.66k
    fn select_primary(&mut self, path: &PathRef, now: Instant) -> Option<PathRef> {
172
1.66k
        qdebug!("[{}] set as primary path", path.borrow());
173
1.66k
        let old_path = self.primary.replace(Rc::clone(path)).inspect(|old| {
174
0
            old.borrow_mut().set_primary(false, now);
175
0
        });
176
177
        // Swap the primary path into slot 0, so that it is protected from eviction.
178
1.66k
        let idx = self
179
1.66k
            .paths
180
1.66k
            .iter()
181
1.66k
            .enumerate()
182
1.66k
            .find_map(|(i, p)| Rc::ptr_eq(p, path).then_some(i))?;
183
1.66k
        self.paths.swap(0, idx);
184
185
1.66k
        path.borrow_mut().set_primary(true, now);
186
1.66k
        old_path
187
1.66k
    }
188
189
    /// Migrate to the identified path.  If `force` is true, the path
190
    /// is forcibly marked as valid and the path is used immediately.
191
    /// Otherwise, migration will occur after probing succeeds.
192
    /// The path is always probed and will be abandoned if probing fails.
193
    /// Returns `true` if the path was migrated.
194
0
    pub fn migrate(
195
0
        &mut self,
196
0
        path: &PathRef,
197
0
        force: bool,
198
0
        now: Instant,
199
0
        stats: &mut Stats,
200
0
    ) -> bool {
201
0
        debug_assert!(!self.is_temporary(path));
202
0
        let baseline = self.primary().map_or_else(
203
0
            || ecn::Info::default().baseline(),
204
0
            |p| p.borrow().ecn_info.baseline(),
205
        );
206
0
        path.borrow_mut().set_ecn_baseline(baseline);
207
0
        path.borrow_mut().start_ecn(stats);
208
0
        if force || path.borrow().is_valid() {
209
0
            path.borrow_mut().set_valid(now);
210
0
            drop(self.select_primary(path, now));
211
0
            self.migration_target = None;
212
0
        } else {
213
0
            self.migration_target = Some(Rc::clone(path));
214
0
        }
215
0
        path.borrow_mut().probe(stats);
216
0
        self.migration_target.is_none()
217
0
    }
218
219
    /// Process elapsed time for active paths.
220
    /// Returns an true if there are viable paths remaining after tidying up.
221
    ///
222
    /// TODO(mt) - the paths should own the RTT estimator, so they can find the PTO
223
    /// for themselves.
224
441
    pub fn process_timeout(&mut self, now: Instant, pto: Duration, stats: &mut Stats) -> bool {
225
441
        let to_retire = &mut self.to_retire;
226
441
        let mut primary_failed = false;
227
441
        self.paths.retain(|p| {
228
441
            if p.borrow_mut().process_timeout(now, pto, stats) {
229
441
                true
230
            } else {
231
0
                qdebug!("[{}] Retiring path", p.borrow());
232
0
                if p.borrow().is_primary() {
233
0
                    primary_failed = true;
234
0
                }
235
0
                Self::retire(to_retire, p);
236
0
                false
237
            }
238
441
        });
239
240
441
        if primary_failed {
241
0
            self.primary = None;
242
            // Find a valid path to fall back to.
243
            #[expect(
244
                clippy::option_if_let_else,
245
                reason = "The alternative is less readable."
246
            )]
247
0
            if let Some(fallback) = self
248
0
                .paths
249
0
                .iter()
250
0
                .rev() // More recent paths are toward the end.
251
0
                .find(|p| p.borrow().is_valid())
252
            {
253
                // Need a clone as `fallback` is borrowed from `self`.
254
0
                let path = Rc::clone(fallback);
255
0
                qinfo!("[{}] Failing over after primary path failed", path.borrow());
256
0
                drop(self.select_primary(&path, now));
257
0
                true
258
            } else {
259
0
                false
260
            }
261
        } else {
262
            // See if the PMTUD raise timer wants to fire.
263
441
            if let Some(path) = self.primary() {
264
441
                path.borrow_mut()
265
441
                    .pmtud_mut()
266
441
                    .maybe_fire_raise_timer(now, stats);
267
441
            }
268
441
            true
269
        }
270
441
    }
271
272
    /// Get when the next call to `process_timeout()` should be scheduled.
273
0
    pub fn next_timeout(&self, pto: Duration) -> Option<Instant> {
274
0
        self.paths
275
0
            .iter()
276
0
            .filter_map(|p| p.borrow().next_timeout(pto))
277
0
            .min()
278
0
    }
279
280
    /// Set the identified path to be primary.
281
    /// This panics if `make_permanent` hasn't been called.
282
    /// If PMTUD is enabled, it will be started on the new primary path.
283
0
    pub fn handle_migration(
284
0
        &mut self,
285
0
        path: &PathRef,
286
0
        remote: SocketAddr,
287
0
        now: Instant,
288
0
        stats: &mut Stats,
289
0
    ) {
290
        // The update here needs to match the checks in `Path::received_on`.
291
        // Here, we update the remote port number to match the source port on the
292
        // datagram that was received.  This ensures that we send subsequent
293
        // packets back to the right place.
294
0
        path.borrow_mut().update_port(remote.port());
295
296
0
        if path.borrow().is_primary() {
297
            // Update when the path was last regarded as valid.
298
0
            path.borrow_mut().update(now);
299
0
            return;
300
0
        }
301
302
0
        if let Some(old_path) = self.select_primary(path, now) {
303
0
            // Need to probe the old path if the peer migrates.
304
0
            old_path.borrow_mut().probe(stats);
305
0
            // TODO(mt) - suppress probing if the path was valid within 3PTO.
306
0
        }
307
308
0
        if self.pmtud {
309
0
            path.borrow_mut().pmtud_mut().start(now, stats);
310
0
        }
311
0
    }
312
313
    /// Select a path to send on.  This will select the first path that has
314
    /// probes to send, then fall back to the primary path.
315
1.66k
    pub fn select_path(&self) -> Option<PathRef> {
316
1.66k
        self.paths
317
1.66k
            .iter()
318
1.66k
            .find_map(|p| p.borrow().has_probe().then(|| Rc::clone(p)))
319
1.66k
            .or_else(|| self.primary.clone())
320
1.66k
    }
321
322
    /// A `PATH_RESPONSE` was received.
323
    /// Returns `true` if migration occurred.
324
    /// If PMTUD is enabled and migration occurs, it will be started on the new primary path.
325
    #[must_use]
326
0
    pub fn path_response(&mut self, response: [u8; 8], now: Instant, stats: &mut Stats) -> bool {
327
        // TODO(mt) consider recording an RTT measurement here as we don't train
328
        // RTT for non-primary paths.
329
0
        for p in &self.paths {
330
0
            if p.borrow_mut().path_response(response, now, stats) {
331
                // The response was accepted.  If this path is one we intend
332
                // to migrate to, then migrate.
333
0
                if let Some(primary) = self
334
0
                    .migration_target
335
0
                    .take_if(|target| Rc::ptr_eq(target, p))
336
                {
337
0
                    drop(self.select_primary(&primary, now));
338
0
                    if self.pmtud {
339
0
                        primary.borrow_mut().pmtud_mut().start(now, stats);
340
0
                    }
341
0
                    return true;
342
0
                }
343
0
                break;
344
0
            }
345
        }
346
0
        false
347
0
    }
348
349
    /// Retire all of the connection IDs prior to the indicated sequence number.
350
    /// Keep active paths if possible by pulling new connection IDs from the provided store.
351
    /// One slightly non-obvious consequence of this is that if migration is being attempted
352
    /// and the new path cannot obtain a new connection ID, the migration attempt will fail.
353
0
    pub fn retire_cids(&mut self, retire_prior: u64, store: &mut ConnectionIdStore<Srt>) {
354
0
        let to_retire = &mut self.to_retire;
355
0
        let migration_target = &mut self.migration_target;
356
357
        // First, tell the store to release any connection IDs that are too old.
358
0
        let mut retired = store.retire_prior_to(retire_prior);
359
0
        to_retire.append(&mut retired);
360
361
0
        self.paths.retain(|p| {
362
0
            let mut path = p.borrow_mut();
363
0
            let Some(current) = path.remote_cid.as_ref() else {
364
0
                return true;
365
            };
366
0
            if current.sequence_number() < retire_prior && !current.connection_id().is_empty() {
367
0
                to_retire.push(current.sequence_number());
368
0
                let new_cid = store.next();
369
0
                let has_replacement = new_cid.is_some();
370
                // There must be a connection ID available for the primary path as we
371
                // keep that path at the first index.
372
0
                debug_assert!(!path.is_primary() || has_replacement);
373
0
                path.remote_cid = new_cid;
374
0
                if !has_replacement
375
0
                    && migration_target
376
0
                        .as_ref()
377
0
                        .is_some_and(|target| Rc::ptr_eq(target, p))
378
                {
379
0
                    qinfo!(
380
                        "[{path}] NEW_CONNECTION_ID with Retire Prior To forced migration to fail"
381
                    );
382
0
                    *migration_target = None;
383
0
                }
384
0
                has_replacement
385
            } else {
386
0
                true
387
            }
388
0
        });
389
0
    }
390
391
    /// Write out any `RETIRE_CONNECTION_ID` frames that are outstanding.
392
441
    pub fn write_frames<B: Buffer>(
393
441
        &mut self,
394
441
        builder: &mut packet::Builder<B>,
395
441
        tokens: &mut recovery::Tokens,
396
441
        stats: &mut FrameStats,
397
441
    ) {
398
441
        while let Some(seqno) = self.to_retire.pop() {
399
0
            if builder.remaining() < 1 + Encoder::varint_len(seqno) {
400
0
                self.to_retire.push(seqno);
401
0
                break;
402
0
            }
403
0
            builder.encode_frame(FrameType::RetireConnectionId, |b| {
404
0
                b.encode_varint(seqno);
405
0
            });
406
0
            tokens.push(recovery::Token::RetireConnectionId(seqno));
407
0
            stats.retire_connection_id += 1;
408
        }
409
410
441
        if let Some(path) = self.primary() {
411
441
            // Write out any ACK_FREQUENCY frames.
412
441
            path.borrow_mut().write_cc_frames(builder, tokens, stats);
413
441
        }
414
441
    }
415
416
0
    pub fn lost_retire_cid(&mut self, lost: u64) {
417
0
        self.to_retire.push(lost);
418
0
    }
419
420
0
    pub fn acked_retire_cid(&mut self, acked: u64) {
421
0
        self.to_retire.retain(|&seqno| seqno != acked);
422
0
    }
423
424
0
    pub fn lost_ack_frequency(&self, lost: &AckRate) {
425
0
        if let Some(path) = self.primary() {
426
0
            path.borrow_mut().lost_ack_frequency(lost);
427
0
        }
428
0
    }
429
430
0
    pub fn acked_ack_frequency(&self, acked: &AckRate) {
431
0
        if let Some(path) = self.primary() {
432
0
            path.borrow_mut().acked_ack_frequency(acked);
433
0
        }
434
0
    }
435
436
0
    pub fn acked_ecn(&self) {
437
0
        if let Some(path) = self.primary() {
438
0
            path.borrow_mut().acked_ecn();
439
0
        }
440
0
    }
441
442
0
    pub fn lost_ecn(&self, stats: &mut Stats) {
443
0
        if let Some(path) = self.primary() {
444
0
            path.borrow_mut().lost_ecn(stats);
445
0
        }
446
0
    }
447
448
0
    pub fn start_ecn(&self, stats: &mut Stats) {
449
0
        if let Some(path) = self.primary() {
450
0
            path.borrow_mut().start_ecn(stats);
451
0
        }
452
0
    }
453
454
    /// Get an estimate of the RTT on the primary path.
455
    #[cfg(test)]
456
    pub fn rtt(&self) -> Duration {
457
        // Rather than have this fail when there is no active path,
458
        // make a new RTT estimate and interrogate that.
459
        // That is more expensive, but it should be rare and breaking encapsulation
460
        // is worse, especially as this is only used in tests.
461
        self.primary().map_or_else(
462
            || RttEstimate::new(crate::DEFAULT_INITIAL_RTT).estimate(),
463
            |p| p.borrow().rtt().estimate(),
464
        )
465
    }
466
467
2.44k
    pub fn set_qlog(&mut self, qlog: Qlog) {
468
2.44k
        for p in &mut self.paths {
469
1.22k
            p.borrow_mut().set_qlog(qlog.clone());
470
1.22k
        }
471
2.44k
        self.qlog = qlog;
472
2.44k
    }
473
}
474
475
/// The state of a path with respect to address validation.
476
#[derive(Debug)]
477
enum ProbeState {
478
    /// The path was last valid at the indicated time.
479
    Valid,
480
    /// The path was previously valid, but a new probe is needed.
481
    ProbeNeeded { probe_count: usize },
482
    /// The path hasn't been validated, but a probe has been sent.
483
    Probing {
484
        /// The number of probes that have been sent.
485
        probe_count: usize,
486
        /// The probe that was last sent.
487
        data: [u8; 8],
488
        /// Whether the probe was sent in a datagram padded to the path MTU.
489
        mtu: bool,
490
        /// When the probe was sent.
491
        sent: Instant,
492
    },
493
    /// Validation failed the last time it was attempted.
494
    Failed,
495
}
496
497
impl ProbeState {
498
    /// Determine whether the current state requires probing.
499
1.66k
    const fn probe_needed(&self) -> bool {
500
1.66k
        matches!(self, Self::ProbeNeeded { .. })
501
1.66k
    }
502
}
503
504
/// A network path.
505
///
506
/// Paths are used a little bit strangely by connections:
507
/// they need to encapsulate all the state for a path (which
508
/// is normal), but that information is not propagated to the
509
/// `Paths` instance that holds them.  This is because the packet
510
/// processing where changes occur can't hold a reference to the
511
/// `Paths` instance that owns the `Path`.  Any changes to the
512
/// path are communicated to `Paths` afterwards.
513
#[derive(Debug)]
514
pub struct Path {
515
    /// A local socket address.
516
    local: SocketAddr,
517
    /// A remote socket address.
518
    remote: SocketAddr,
519
    /// The connection IDs that we use when sending on this path.
520
    /// This is only needed during the handshake.
521
    local_cid: Option<ConnectionId>,
522
    /// The current connection ID that we are using and its details.
523
    remote_cid: Option<RemoteConnectionIdEntry>,
524
525
    /// Whether this is the primary path.
526
    primary: bool,
527
    /// Whether the current path is considered valid.
528
    state: ProbeState,
529
    /// For a path that is not validated, this is `None`.  For a validated
530
    /// path, the time that the path was last valid.
531
    validated: Option<Instant>,
532
    /// A path challenge was received and `PATH_RESPONSE` has not been sent.
533
    challenge: Option<[u8; 8]>,
534
535
    /// The round trip time estimate for this path.
536
    rtt: RttEstimate,
537
    /// A packet sender for the path, which includes congestion control and a pacer.
538
    sender: PacketSender,
539
540
    /// The number of bytes received on this path.
541
    /// Note that this value might saturate on a long-lived connection,
542
    /// but we only use it before the path is validated.
543
    received_bytes: usize,
544
    /// The number of bytes sent on this path.
545
    sent_bytes: usize,
546
    /// The ECN-related state for this path (see RFC9000, Section 13.4 and Appendix A.4)
547
    ecn_info: ecn::Info,
548
    /// For logging of events.
549
    qlog: Qlog,
550
}
551
552
impl Path {
553
    /// The number of times that a path will be probed before it is considered failed.
554
    ///
555
    /// Note that with [`crate::ecn`], a path is probed [`Self::MAX_PROBES`] with ECN
556
    /// marks and [`Self::MAX_PROBES`] without.
557
    pub const MAX_PROBES: usize = 3;
558
559
    /// Create a path from addresses and a remote connection ID.
560
    /// This is used for migration and for new datagrams.
561
2.44k
    pub fn temporary(
562
2.44k
        local: SocketAddr,
563
2.44k
        remote: SocketAddr,
564
2.44k
        conn_params: &ConnectionParameters,
565
2.44k
        qlog: Qlog,
566
2.44k
        now: Instant,
567
2.44k
        stats: &mut Stats,
568
2.44k
    ) -> Self {
569
2.44k
        let iface_mtu = if conn_params.pmtud_iface_mtu_enabled() {
570
2.44k
            match mtu::interface_and_mtu(remote.ip()) {
571
0
                Ok((name, mtu)) => {
572
0
                    qdebug!(
573
                        "Outbound interface {name} for destination {ip} has MTU {mtu}",
574
0
                        ip = remote.ip()
575
                    );
576
0
                    stats.pmtud_iface_mtu = mtu;
577
0
                    Some(mtu)
578
                }
579
2.44k
                Err(e) => {
580
2.44k
                    qwarn!(
581
                        "Failed to determine outbound interface for destination {ip}: {e}",
582
0
                        ip = remote.ip()
583
                    );
584
2.44k
                    None
585
                }
586
            }
587
        } else {
588
0
            None
589
        };
590
2.44k
        let mut sender = PacketSender::new(conn_params, Pmtud::new(remote.ip(), iface_mtu), now);
591
2.44k
        sender.set_qlog(qlog.clone());
592
2.44k
        Self {
593
2.44k
            local,
594
2.44k
            remote,
595
2.44k
            local_cid: None,
596
2.44k
            remote_cid: None,
597
2.44k
            primary: false,
598
2.44k
            state: ProbeState::ProbeNeeded { probe_count: 0 },
599
2.44k
            validated: None,
600
2.44k
            challenge: None,
601
2.44k
            rtt: RttEstimate::new(conn_params.get_initial_rtt()),
602
2.44k
            sender,
603
2.44k
            received_bytes: 0,
604
2.44k
            sent_bytes: 0,
605
2.44k
            ecn_info: ecn::Info::default(),
606
2.44k
            qlog,
607
2.44k
        }
608
2.44k
    }
609
610
0
    pub fn set_ecn_baseline(&mut self, baseline: ecn::Count) {
611
0
        self.ecn_info.set_baseline(baseline);
612
0
    }
613
614
    /// Return the DSCP/ECN marking to use for outgoing packets on this path.
615
1.66k
    pub fn tos(&self) -> Tos {
616
1.66k
        self.ecn_info.ecn_mark().into()
617
1.66k
    }
618
619
    /// Whether this path is the primary or current path for the connection.
620
5.97k
    pub const fn is_primary(&self) -> bool {
621
5.97k
        self.primary
622
5.97k
    }
623
624
    /// Whether this path is a temporary one.
625
1.66k
    pub const fn is_temporary(&self) -> bool {
626
1.66k
        self.remote_cid.is_none()
627
1.66k
    }
628
629
    /// By adding a remote connection ID, we make the path permanent
630
    /// and one that we will later send packets on.
631
    /// If `local_cid` is `None`, the existing value will be kept.
632
1.66k
    pub(crate) fn make_permanent(
633
1.66k
        &mut self,
634
1.66k
        local_cid: Option<ConnectionId>,
635
1.66k
        remote_cid: RemoteConnectionIdEntry,
636
1.66k
    ) {
637
1.66k
        if self.local_cid.is_none() {
638
1.66k
            self.local_cid = local_cid;
639
1.66k
        }
640
1.66k
        self.remote_cid.replace(remote_cid);
641
1.66k
    }
642
643
    /// Determine if this path was the one that the provided datagram was received on.
644
0
    fn received_on(&self, local: SocketAddr, remote: SocketAddr) -> bool {
645
0
        self.local == local && self.remote == remote
646
0
    }
647
648
    /// Update the remote port number.  Any flexibility we allow in `received_on`
649
    /// need to be adjusted at this point.
650
0
    fn update_port(&mut self, port: u16) {
651
0
        self.remote.set_port(port);
652
0
    }
653
654
    /// Set whether this path is primary.
655
1.66k
    pub(crate) fn set_primary(&mut self, primary: bool, now: Instant) {
656
1.66k
        qtrace!("[{self}] Make primary {primary}");
657
1.66k
        debug_assert!(self.remote_cid.is_some());
658
1.66k
        self.primary = primary;
659
1.66k
        if !primary {
660
0
            self.sender.discard_in_flight(now);
661
1.66k
        }
662
1.66k
    }
663
664
    /// Set the current path as valid.  This updates the time that the path was
665
    /// last validated and cancels any path validation.
666
1.22k
    pub fn set_valid(&mut self, now: Instant) {
667
1.22k
        qdebug!("[{self}] Path validated {now:?}");
668
1.22k
        self.state = ProbeState::Valid;
669
1.22k
        self.validated = Some(now);
670
1.22k
    }
671
672
    /// Update the last use of this path, if it is valid.
673
    /// This will keep the path active slightly longer.
674
0
    pub fn update(&mut self, now: Instant) {
675
0
        if self.validated.is_some() {
676
0
            self.validated = Some(now);
677
0
        }
678
0
    }
679
680
    /// Get the PL MTU.
681
7.53k
    pub fn plpmtu(&self) -> usize {
682
7.53k
        self.pmtud().plpmtu()
683
7.53k
    }
684
685
    /// Get a reference to the PMTUD state.
686
12.5k
    pub fn pmtud(&self) -> &Pmtud {
687
12.5k
        self.sender.pmtud()
688
12.5k
    }
689
690
    /// Get the first local connection ID.
691
    /// Only do this for the primary path during the handshake.
692
3.32k
    pub const fn local_cid(&self) -> Option<&ConnectionId> {
693
3.32k
        self.local_cid.as_ref()
694
3.32k
    }
695
696
    /// Set the remote connection ID based on the peer's choice.
697
    /// This is only valid during the handshake.
698
0
    pub fn set_remote_cid(&mut self, cid: ConnectionIdRef) {
699
0
        if let Some(remote_cid) = self.remote_cid.as_mut() {
700
0
            remote_cid.update_cid(ConnectionId::from(cid));
701
0
        }
702
0
    }
703
704
    /// Access the remote connection ID.
705
3.76k
    pub fn remote_cid(&self) -> Option<&ConnectionId> {
706
3.76k
        self.remote_cid
707
3.76k
            .as_ref()
708
3.76k
            .map(super::cid::ConnectionIdEntry::connection_id)
709
3.76k
    }
710
711
    /// Set the stateless reset token for the connection ID that is currently in use.
712
0
    pub fn set_reset_token(&mut self, token: Srt) {
713
0
        if let Some(remote_cid) = self.remote_cid.as_mut() {
714
0
            remote_cid.set_stateless_reset_token(token);
715
0
        }
716
0
    }
717
718
    /// Determine if the provided token is a stateless reset token.
719
0
    pub fn is_stateless_reset(&self, token: &Srt) -> bool {
720
0
        self.remote_cid
721
0
            .as_ref()
722
0
            .is_some_and(|rcid| rcid.is_stateless_reset(token))
723
0
    }
724
725
    /// Make a datagram.
726
1.66k
    pub fn datagram_batch(
727
1.66k
        &mut self,
728
1.66k
        payload: Vec<u8>,
729
1.66k
        tos: Tos,
730
1.66k
        num_datagrams: usize,
731
1.66k
        datagram_size: usize,
732
1.66k
        stats: &mut Stats,
733
1.66k
    ) -> DatagramBatch {
734
        // Make sure to use the TOS value from before calling ecn::Info::on_packet_sent, which may
735
        // update the ECN state and can hence change it - this packet should still be sent
736
        // with the current value.
737
1.66k
        self.ecn_info.on_packet_sent(num_datagrams, stats);
738
1.66k
        DatagramBatch::new(
739
1.66k
            self.local,
740
1.66k
            self.remote,
741
1.66k
            tos,
742
1.66k
            NonZeroUsize::new(datagram_size).expect("datagram size cannot be zero"),
743
1.66k
            payload,
744
        )
745
1.66k
    }
746
747
    /// Get local address as `SocketAddr`
748
3.66k
    pub const fn local_address(&self) -> SocketAddr {
749
3.66k
        self.local
750
3.66k
    }
751
752
    /// Get remote address as `SocketAddr`
753
2.44k
    pub const fn remote_address(&self) -> SocketAddr {
754
2.44k
        self.remote
755
2.44k
    }
756
757
    /// Whether the path has been validated.
758
1.66k
    pub const fn is_valid(&self) -> bool {
759
1.66k
        self.validated.is_some()
760
1.66k
    }
761
762
    /// Handle a `PATH_RESPONSE` frame. Returns true if the response was accepted.
763
0
    pub fn path_response(&mut self, response: [u8; 8], now: Instant, stats: &mut Stats) -> bool {
764
0
        if let ProbeState::Probing { data, mtu, .. } = &mut self.state {
765
0
            if response == *data {
766
0
                let need_full_probe = !*mtu;
767
0
                self.set_valid(now);
768
0
                if need_full_probe {
769
0
                    qdebug!("[{self}] Sub-MTU probe successful, reset probe count");
770
0
                    self.probe(stats);
771
0
                }
772
0
                true
773
            } else {
774
0
                false
775
            }
776
        } else {
777
0
            false
778
        }
779
0
    }
780
781
    /// The path has been challenged.  This generates a response.
782
    /// This only generates a single response at a time.
783
0
    pub fn challenged(&mut self, challenge: [u8; 8]) {
784
0
        self.challenge = Some(challenge.to_owned());
785
0
    }
786
787
    /// At the next opportunity, send a probe.
788
    /// If the probe count has been exhausted already, marks the path as failed.
789
0
    fn probe(&mut self, stats: &mut Stats) {
790
0
        let probe_count = match &self.state {
791
0
            ProbeState::Probing { probe_count, .. } => *probe_count + 1,
792
0
            ProbeState::ProbeNeeded { probe_count, .. } => *probe_count,
793
0
            _ => 0,
794
        };
795
0
        self.state = if probe_count >= Self::MAX_PROBES {
796
0
            if self.ecn_info.is_marking() {
797
                // The path validation failure may be due to ECN blackholing, try again without ECN.
798
0
                qinfo!("[{self}] Possible ECN blackhole, disabling ECN and re-probing path");
799
0
                self.ecn_info
800
0
                    .disable_ecn(stats, ecn::ValidationError::BlackHole);
801
0
                ProbeState::ProbeNeeded { probe_count: 0 }
802
            } else {
803
0
                qinfo!("[{self}] Probing failed");
804
0
                ProbeState::Failed
805
            }
806
        } else {
807
0
            qdebug!("[{self}] Initiating probe");
808
0
            ProbeState::ProbeNeeded { probe_count }
809
        };
810
0
    }
811
812
    /// Returns true if this path have any probing frames to send.
813
1.66k
    pub const fn has_probe(&self) -> bool {
814
1.66k
        self.challenge.is_some() || self.state.probe_needed()
815
1.66k
    }
816
817
0
    pub fn write_frames<B: Buffer>(
818
0
        &mut self,
819
0
        builder: &mut packet::Builder<B>,
820
0
        stats: &mut FrameStats,
821
0
        mtu: bool, // Whether the packet we're writing into will be a full MTU.
822
0
        now: Instant,
823
0
    ) -> bool {
824
0
        if builder.remaining() < 9 {
825
0
            return false;
826
0
        }
827
        // Send PATH_RESPONSE.
828
0
        let resp_sent = if let Some(challenge) = self.challenge.take() {
829
0
            qtrace!("[{self}] Responding to path challenge {}", hex(challenge));
830
0
            builder.encode_frame(FrameType::PathResponse, |b| {
831
0
                b.encode(&challenge[..]);
832
0
            });
833
834
            // These frames are not retransmitted in the usual fashion.
835
0
            stats.path_response += 1;
836
837
0
            if builder.remaining() < 9 {
838
0
                return true;
839
0
            }
840
0
            true
841
        } else {
842
0
            false
843
        };
844
845
        // Send PATH_CHALLENGE.
846
0
        if let ProbeState::ProbeNeeded { probe_count } = self.state {
847
0
            qtrace!("[{self}] Initiating path challenge {probe_count}");
848
0
            let data = random::<8>();
849
0
            builder.encode_frame(FrameType::PathChallenge, |b| {
850
0
                b.encode(data);
851
0
            });
852
853
            // As above, no recovery token.
854
0
            stats.path_challenge += 1;
855
856
0
            self.state = ProbeState::Probing {
857
0
                probe_count,
858
0
                data,
859
0
                mtu,
860
0
                sent: now,
861
0
            };
862
0
            true
863
        } else {
864
0
            resp_sent
865
        }
866
0
    }
867
868
    /// Write `ACK_FREQUENCY` frames.
869
441
    pub fn write_cc_frames<B: Buffer>(
870
441
        &mut self,
871
441
        builder: &mut packet::Builder<B>,
872
441
        tokens: &mut recovery::Tokens,
873
441
        stats: &mut FrameStats,
874
441
    ) {
875
441
        self.rtt.write_frames(builder, tokens, stats);
876
441
    }
877
878
0
    pub fn lost_ack_frequency(&mut self, lost: &AckRate) {
879
0
        self.rtt.frame_lost(lost);
880
0
    }
881
882
0
    pub fn acked_ecn(&mut self) {
883
0
        self.ecn_info.acked_ecn();
884
0
    }
885
886
0
    pub fn lost_ecn(&mut self, stats: &mut Stats) {
887
0
        self.ecn_info.lost_ecn(stats);
888
0
    }
889
890
0
    pub fn start_ecn(&mut self, stats: &mut Stats) {
891
0
        self.ecn_info.start(stats);
892
0
    }
893
894
0
    pub fn acked_ack_frequency(&mut self, acked: &AckRate) {
895
0
        self.rtt.frame_acked(acked);
896
0
    }
897
898
    /// Process a timer for this path.
899
    /// This returns true if the path is viable and can be kept alive.
900
441
    pub fn process_timeout(&mut self, now: Instant, pto: Duration, stats: &mut Stats) -> bool {
901
441
        if let ProbeState::Probing { sent, .. } = &self.state {
902
0
            if now >= *sent + pto {
903
0
                self.probe(stats);
904
0
            }
905
441
        }
906
441
        if matches!(self.state, ProbeState::Failed) {
907
            // Retire failed paths immediately.
908
0
            false
909
441
        } else if self.primary {
910
            // Keep valid primary paths otherwise.
911
441
            true
912
0
        } else if matches!(self.state, ProbeState::Valid) {
913
            // Retire validated, non-primary paths.
914
            // Allow more than `2 * Self::MAX_PROBES` times the PTO so that an old
915
            // path remains around until after a previous path fails.
916
0
            let count = u32::try_from(2 * Self::MAX_PROBES + 1).expect("result fits in u32");
917
0
            self.validated
918
0
                .is_some_and(|validated| validated + (pto * count) > now)
919
        } else {
920
            // Keep paths that are being actively probed.
921
0
            true
922
        }
923
441
    }
924
925
    /// Return the next time that this path needs servicing.
926
    /// This only considers retransmissions of probes, not cleanup of the path.
927
    /// If there is no other activity, then there is no real need to schedule a
928
    /// timer to cleanup old paths.
929
0
    pub fn next_timeout(&self, pto: Duration) -> Option<Instant> {
930
0
        if let ProbeState::Probing { sent, .. } = &self.state {
931
0
            Some(*sent + pto)
932
        } else {
933
0
            None
934
        }
935
0
    }
936
937
    /// Get the RTT estimator for this path.
938
9.84k
    pub const fn rtt(&self) -> &RttEstimate {
939
9.84k
        &self.rtt
940
9.84k
    }
941
942
    /// Mutably borrow the RTT estimator for this path.
943
0
    pub fn rtt_mut(&mut self) -> &mut RttEstimate {
944
0
        &mut self.rtt
945
0
    }
946
947
    /// Mutably borrow the PMTUD discoverer for this path.
948
441
    pub fn pmtud_mut(&mut self) -> &mut Pmtud {
949
441
        self.sender.pmtud_mut()
950
441
    }
951
952
    /// Read-only access to the owned sender.
953
1.66k
    pub const fn sender(&self) -> &PacketSender {
954
1.66k
        &self.sender
955
1.66k
    }
956
957
    /// Pass on RTT configuration: the maximum acknowledgment delay of the peer,
958
    /// and maybe the minimum delay.
959
0
    pub fn set_ack_delay(
960
0
        &mut self,
961
0
        max_ack_delay: Duration,
962
0
        min_ack_delay: Option<Duration>,
963
0
        ack_ratio: u8,
964
0
    ) {
965
0
        let ack_delay = min_ack_delay.map_or_else(
966
0
            || PeerAckDelay::fixed(max_ack_delay),
967
0
            |m| {
968
0
                PeerAckDelay::flexible(
969
0
                    max_ack_delay,
970
0
                    m,
971
0
                    ack_ratio,
972
0
                    self.sender.cwnd(),
973
0
                    self.plpmtu(),
974
0
                    self.rtt.estimate(),
975
                )
976
0
            },
977
        );
978
0
        self.rtt.set_ack_delay(ack_delay);
979
0
    }
980
981
    /// Initialize the RTT for the path based on an existing estimate.
982
0
    pub fn prime_rtt(&mut self, rtt: &RttEstimate) {
983
0
        self.rtt.prime_rtt(rtt);
984
0
    }
985
986
    /// Record received bytes for the path.
987
1.22k
    pub fn add_received(&mut self, count: usize) {
988
1.22k
        self.received_bytes = self.received_bytes.saturating_add(count);
989
1.22k
    }
990
991
    /// Record sent bytes for the path.
992
1.66k
    pub fn add_sent(&mut self, count: usize) {
993
1.66k
        self.sent_bytes = self.sent_bytes.saturating_add(count);
994
1.66k
    }
995
996
    /// Record a packet as having been sent on this path.
997
2.10k
    pub fn packet_sent(&mut self, sent: &mut sent::Packet, now: Instant) {
998
2.10k
        if !self.is_primary() {
999
0
            sent.clear_primary_path();
1000
2.10k
        }
1001
2.10k
        self.sender.on_packet_sent(sent, self.rtt.estimate(), now);
1002
2.10k
    }
1003
1004
    /// Discard a packet that previously might have been in-flight.
1005
0
    pub fn discard_packet(&mut self, sent: &sent::Packet, now: Instant, stats: &mut Stats) {
1006
0
        if self.rtt.first_sample_time().is_none() {
1007
            // When discarding a packet there might not be a good RTT estimate.
1008
            // But discards only occur after receiving something, so that means
1009
            // that there is some RTT information, which is better than nothing.
1010
            // Two cases: 1. at the client when handling a Retry and
1011
            // 2. at the server when disposing the Initial packet number space.
1012
0
            qinfo!(
1013
                "[{self}] discarding a packet without an RTT estimate; guessing RTT={:?}",
1014
0
                now - sent.time_sent()
1015
            );
1016
0
            stats.rtt_init_guess = true;
1017
0
            self.rtt.update(
1018
0
                &mut self.qlog,
1019
0
                now - sent.time_sent(),
1020
0
                Duration::new(0, 0),
1021
0
                RttSource::Guesstimate,
1022
0
                now,
1023
            );
1024
0
        }
1025
1026
0
        self.sender.discard(sent, now);
1027
0
    }
1028
1029
    /// Record packets as acknowledged with the sender.
1030
0
    pub fn on_packets_acked(
1031
0
        &mut self,
1032
0
        acked_pkts: &[sent::Packet],
1033
0
        ack_ecn: Option<&ecn::Count>,
1034
0
        now: Instant,
1035
0
        stats: &mut Stats,
1036
0
    ) {
1037
0
        debug_assert!(self.is_primary());
1038
1039
0
        let ecn_ce_received = self.ecn_info.on_packets_acked(acked_pkts, ack_ecn, stats);
1040
0
        if ecn_ce_received {
1041
0
            let cwnd_reduced = self.sender.on_ecn_ce_received(
1042
0
                acked_pkts.first().expect("must be there"),
1043
0
                now,
1044
0
                &mut stats.cc,
1045
            );
1046
0
            if cwnd_reduced {
1047
0
                self.rtt.update_ack_delay(self.sender.cwnd(), self.plpmtu());
1048
0
            }
1049
0
        }
1050
1051
0
        self.sender
1052
0
            .on_packets_acked(acked_pkts, &self.rtt, now, stats);
1053
0
    }
1054
1055
    /// Record packets as lost with the sender.
1056
1.32k
    pub fn on_packets_lost(
1057
1.32k
        &mut self,
1058
1.32k
        prev_largest_acked_sent: Option<Instant>,
1059
1.32k
        confirmed: bool,
1060
1.32k
        lost_packets: &[sent::Packet],
1061
1.32k
        stats: &mut Stats,
1062
1.32k
        now: Instant,
1063
1.32k
    ) {
1064
1.32k
        debug_assert!(self.is_primary());
1065
1.32k
        let cwnd_reduced = self.sender.on_packets_lost(
1066
1.32k
            self.rtt.first_sample_time(),
1067
1.32k
            prev_largest_acked_sent,
1068
1.32k
            self.rtt.pto(confirmed), // Important: the base PTO, not adjusted.
1069
1.32k
            lost_packets,
1070
1.32k
            stats,
1071
1.32k
            now,
1072
        );
1073
1.32k
        if cwnd_reduced {
1074
0
            self.rtt.update_ack_delay(self.sender.cwnd(), self.plpmtu());
1075
1.32k
        }
1076
1.32k
    }
1077
1078
    /// Determine whether we should be setting a PTO for this path. This is true when either the
1079
    /// path is valid or when there is enough remaining in the amplification limit to fit a
1080
    /// full-sized path (i.e., the path MTU).
1081
0
    pub fn pto_possible(&self) -> bool {
1082
        // See the implementation of `amplification_limit` for details.
1083
0
        self.amplification_limit() >= self.plpmtu()
1084
0
    }
1085
1086
    /// Get the number of bytes that can be written to this path.
1087
1.66k
    pub fn amplification_limit(&self) -> usize {
1088
1.66k
        if matches!(self.state, ProbeState::Failed) {
1089
0
            0
1090
1.66k
        } else if self.is_valid() {
1091
1.22k
            usize::MAX
1092
        } else {
1093
441
            self.received_bytes
1094
441
                .checked_mul(3)
1095
441
                .map_or(usize::MAX, |limit| {
1096
441
                    let budget = if limit == 0 {
1097
                        // If we have received absolutely nothing thus far, then this endpoint
1098
                        // is the one initiating communication on this path.  Allow enough space for
1099
                        // probing.
1100
0
                        self.plpmtu() * 5
1101
                    } else {
1102
441
                        limit
1103
                    };
1104
441
                    budget.saturating_sub(self.sent_bytes)
1105
441
                })
1106
        }
1107
1.66k
    }
1108
1109
    /// Update the `QLog` instance.
1110
1.22k
    pub fn set_qlog(&mut self, qlog: Qlog) {
1111
1.22k
        self.sender.set_qlog(qlog);
1112
1.22k
    }
1113
}
1114
1115
impl Display for Path {
1116
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1117
0
        if self.is_primary() {
1118
0
            write!(f, "pri-")?; // primary
1119
0
        }
1120
0
        if !self.is_valid() {
1121
0
            write!(f, "unv-")?; // unvalidated
1122
0
        }
1123
0
        write!(f, "path")?;
1124
0
        if let Some(entry) = self.remote_cid.as_ref() {
1125
0
            write!(f, ":{}", entry.connection_id())?;
1126
0
        }
1127
0
        write!(f, " {}->{}", self.local, self.remote)?;
1128
0
        Ok(())
1129
0
    }
1130
}