Coverage Report

Created: 2026-03-31 07:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/gitoxide/gix-features/src/parallel/mod.rs
Line
Count
Source
1
//! Run computations in parallel, or not based the `parallel` feature toggle.
2
//!
3
//! ### `in_parallel`(…)
4
//!
5
//! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage
6
//! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling
7
//! thread to aggregate the results into a single output, which is returned by [`in_parallel()`].
8
//!
9
//! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail.
10
//!
11
//! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself
12
//! or the data to work on.
13
//!
14
//! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running
15
//! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output
16
//! aggregation.
17
//!
18
//! ### `reduce::Stepwise`
19
//!
20
//! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by
21
//! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not
22
//! buffered, the owner of the iterator controls the progress made.
23
//!
24
//! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which
25
//! is functionally equivalent to calling [`in_parallel()`].
26
//!
27
//! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping
28
//! the iterator will wind down the computation without any result.
29
//!
30
//! #### Maintaining Safety
31
//!
32
//! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce
33
//! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce
34
//! suitable input iterators as long as they can hold something on the heap.
35
#[cfg(feature = "parallel")]
36
mod in_parallel;
37
#[cfg(feature = "parallel")]
38
pub use in_parallel::{
39
    build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope,
40
};
41
42
mod serial;
43
#[cfg(not(feature = "parallel"))]
44
pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope};
45
46
mod in_order;
47
pub use in_order::{InOrderIter, SequenceId};
48
49
mod eager_iter;
50
pub use eager_iter::{EagerIter, EagerIterIf};
51
52
/// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used
53
/// when the `parallel` feature toggle is not set.
54
#[cfg(not(feature = "parallel"))]
55
0
pub fn optimize_chunk_size_and_thread_limit(
56
0
    desired_chunk_size: usize,
57
0
    _num_items: Option<usize>,
58
0
    thread_limit: Option<usize>,
59
0
    _available_threads: Option<usize>,
60
0
) -> (usize, Option<usize>, usize) {
61
0
    (desired_chunk_size, thread_limit, num_threads(thread_limit))
62
0
}
Unexecuted instantiation: gix_features::parallel::optimize_chunk_size_and_thread_limit
Unexecuted instantiation: gix_features::parallel::optimize_chunk_size_and_thread_limit
63
64
/// Return the 'optimal' _(`size of chunks`,  `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given
65
/// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`.
66
///
67
/// * `desired_chunk_size` is the amount of items per chunk you think should be used.
68
/// * `num_items` is the total amount of items in the iteration, if `Some`.
69
///   Otherwise this knowledge will not affect the output of this function.
70
/// * `thread_limit` is the amount of threads to use at most, if `Some`.
71
///   Otherwise this knowledge will not affect the output of this function.
72
/// * `available_threads` is the total amount of threads available, if `Some`.
73
///   Otherwise the actual amount of available threads is determined by querying the system.
74
///
75
/// `Note` that this implementation is available only if the `parallel` feature toggle is set.
76
#[cfg(feature = "parallel")]
77
pub fn optimize_chunk_size_and_thread_limit(
78
    desired_chunk_size: usize,
79
    num_items: Option<usize>,
80
    thread_limit: Option<usize>,
81
    available_threads: Option<usize>,
82
) -> (usize, Option<usize>, usize) {
83
    let available_threads =
84
        available_threads.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, Into::into));
85
    let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l });
86
87
    let (lower, upper) = (50, 1000);
88
    let (chunk_size, thread_limit) = num_items.map_or(
89
        {
90
            let chunk_size = if available_threads == 1 {
91
                desired_chunk_size
92
            } else if desired_chunk_size < lower {
93
                lower
94
            } else {
95
                desired_chunk_size.min(upper)
96
            };
97
            (chunk_size, available_threads)
98
        },
99
        |num_items| {
100
            let desired_chunks_per_thread_at_least = 2;
101
            let items = num_items;
102
            let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper);
103
            let num_chunks = items / chunk_size;
104
            let thread_limit = if num_chunks <= available_threads {
105
                (num_chunks / desired_chunks_per_thread_at_least).max(1)
106
            } else {
107
                available_threads
108
            };
109
            (chunk_size, thread_limit)
110
        },
111
    );
112
    (chunk_size, Some(thread_limit), thread_limit)
113
}
114
115
/// Always returns 1, available when the `parallel` feature toggle is unset.
116
#[cfg(not(feature = "parallel"))]
117
0
pub fn num_threads(_thread_limit: Option<usize>) -> usize {
118
0
    1
119
0
}
Unexecuted instantiation: gix_features::parallel::num_threads
Unexecuted instantiation: gix_features::parallel::num_threads
Unexecuted instantiation: gix_features::parallel::num_threads
120
121
/// Returns the amount of threads the system can effectively use as the amount of its logical cores.
122
///
123
/// Only available with the `parallel` feature toggle set.
124
#[cfg(feature = "parallel")]
125
pub fn num_threads(thread_limit: Option<usize>) -> usize {
126
    let logical_cores = std::thread::available_parallelism().map_or(1, Into::into);
127
    thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l })
128
}
129
130
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
131
///
132
/// For parameters, see the documentation of [`in_parallel()`]
133
#[cfg(feature = "parallel")]
134
pub fn in_parallel_if<I, S, O, R>(
135
    condition: impl FnOnce() -> bool,
136
    input: impl Iterator<Item = I> + Send,
137
    thread_limit: Option<usize>,
138
    new_thread_state: impl FnOnce(usize) -> S + Send + Clone,
139
    consume: impl FnMut(I, &mut S) -> O + Send + Clone,
140
    reducer: R,
141
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
142
where
143
    R: Reduce<Input = O>,
144
    I: Send,
145
    O: Send,
146
{
147
    if num_threads(thread_limit) > 1 && condition() {
148
        in_parallel(input, thread_limit, new_thread_state, consume, reducer)
149
    } else {
150
        serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
151
    }
152
}
153
154
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
155
///
156
/// For parameters, see the documentation of [`in_parallel()`]
157
///
158
/// Note that the non-parallel version is equivalent to [`in_parallel()`].
159
#[cfg(not(feature = "parallel"))]
160
0
pub fn in_parallel_if<I, S, O, R>(
161
0
    _condition: impl FnOnce() -> bool,
162
0
    input: impl Iterator<Item = I>,
163
0
    thread_limit: Option<usize>,
164
0
    new_thread_state: impl FnOnce(usize) -> S,
165
0
    consume: impl FnMut(I, &mut S) -> O,
166
0
    reducer: R,
167
0
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
168
0
where
169
0
    R: Reduce<Input = O>,
170
0
    I: Send,
171
0
    O: Send,
172
{
173
0
    serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer)
174
0
}
Unexecuted instantiation: gix_features::parallel::in_parallel_if::<_, _, _, _, _, _, _, _>
Unexecuted instantiation: gix_features::parallel::in_parallel_if::<_, _, _, _, _, _, _, _>
175
176
///
177
pub mod reduce;
178
pub use reduce::Reduce;