/src/gitoxide/gix-features/src/parallel/mod.rs
Line | Count | Source |
1 | | //! Run computations in parallel, or not based the `parallel` feature toggle. |
2 | | //! |
3 | | //! ### `in_parallel`(…) |
4 | | //! |
5 | | //! The [`in_parallel(…)`][in_parallel()] is the typical fan-out-fan-in mode of parallelism, with thread local storage |
6 | | //! made available to a `consume(…)` function to process input. The result is sent to the [`Reduce`] running in the calling |
7 | | //! thread to aggregate the results into a single output, which is returned by [`in_parallel()`]. |
8 | | //! |
9 | | //! Interruptions can be achieved by letting the reducers [`feed(…)`][Reduce::feed()] method fail. |
10 | | //! |
11 | | //! It gets a boost in usability as it allows threads to borrow variables from the stack, most commonly the repository itself |
12 | | //! or the data to work on. |
13 | | //! |
14 | | //! This mode of operation doesn't lend itself perfectly to being wrapped for `async` as it appears like a single long-running |
15 | | //! operation which runs as fast as possible, which is cancellable only by merit of stopping the input or stopping the output |
16 | | //! aggregation. |
17 | | //! |
18 | | //! ### `reduce::Stepwise` |
19 | | //! |
20 | | //! The [`Stepwise`][reduce::Stepwise] iterator works exactly as [`in_parallel()`] except that the processing of the output produced by |
21 | | //! `consume(I, &mut State) -> O` is made accessible by the `Iterator` trait's `next()` method. As produced work is not |
22 | | //! buffered, the owner of the iterator controls the progress made. |
23 | | //! |
24 | | //! Getting the final output of the [`Reduce`] is achieved through the consuming [`Stepwise::finalize()`][reduce::Stepwise::finalize()] method, which |
25 | | //! is functionally equivalent to calling [`in_parallel()`]. |
26 | | //! |
27 | | //! In an `async` context this means that progress is only made each time `next()` is called on the iterator, while merely dropping |
28 | | //! the iterator will wind down the computation without any result. |
29 | | //! |
30 | | //! #### Maintaining Safety |
31 | | //! |
32 | | //! In order to assure that threads don't outlive the data they borrow because their handles are leaked, we enforce |
33 | | //! the `'static` lifetime for its inputs, making it less intuitive to use. It is, however, possible to produce |
34 | | //! suitable input iterators as long as they can hold something on the heap. |
35 | | #[cfg(feature = "parallel")] |
36 | | mod in_parallel; |
37 | | #[cfg(feature = "parallel")] |
38 | | pub use in_parallel::{ |
39 | | build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope, |
40 | | }; |
41 | | |
42 | | mod serial; |
43 | | #[cfg(not(feature = "parallel"))] |
44 | | pub use serial::{build_thread, in_parallel, in_parallel_with_finalize, in_parallel_with_slice, join, threads, Scope}; |
45 | | |
46 | | mod in_order; |
47 | | pub use in_order::{InOrderIter, SequenceId}; |
48 | | |
49 | | mod eager_iter; |
50 | | pub use eager_iter::{EagerIter, EagerIterIf}; |
51 | | |
52 | | /// A no-op returning the input _(`desired_chunk_size`, `Some(thread_limit)`, `thread_limit)_ used |
53 | | /// when the `parallel` feature toggle is not set. |
54 | | #[cfg(not(feature = "parallel"))] |
55 | 0 | pub fn optimize_chunk_size_and_thread_limit( |
56 | 0 | desired_chunk_size: usize, |
57 | 0 | _num_items: Option<usize>, |
58 | 0 | thread_limit: Option<usize>, |
59 | 0 | _available_threads: Option<usize>, |
60 | 0 | ) -> (usize, Option<usize>, usize) { |
61 | 0 | (desired_chunk_size, thread_limit, num_threads(thread_limit)) |
62 | 0 | } Unexecuted instantiation: gix_features::parallel::optimize_chunk_size_and_thread_limit Unexecuted instantiation: gix_features::parallel::optimize_chunk_size_and_thread_limit |
63 | | |
64 | | /// Return the 'optimal' _(`size of chunks`, `amount of threads as Option`, `amount of threads`)_ to use in [`in_parallel()`] for the given |
65 | | /// `desired_chunk_size`, `num_items`, `thread_limit` and `available_threads`. |
66 | | /// |
67 | | /// * `desired_chunk_size` is the amount of items per chunk you think should be used. |
68 | | /// * `num_items` is the total amount of items in the iteration, if `Some`. |
69 | | /// Otherwise this knowledge will not affect the output of this function. |
70 | | /// * `thread_limit` is the amount of threads to use at most, if `Some`. |
71 | | /// Otherwise this knowledge will not affect the output of this function. |
72 | | /// * `available_threads` is the total amount of threads available, if `Some`. |
73 | | /// Otherwise the actual amount of available threads is determined by querying the system. |
74 | | /// |
75 | | /// `Note` that this implementation is available only if the `parallel` feature toggle is set. |
76 | | #[cfg(feature = "parallel")] |
77 | | pub fn optimize_chunk_size_and_thread_limit( |
78 | | desired_chunk_size: usize, |
79 | | num_items: Option<usize>, |
80 | | thread_limit: Option<usize>, |
81 | | available_threads: Option<usize>, |
82 | | ) -> (usize, Option<usize>, usize) { |
83 | | let available_threads = |
84 | | available_threads.unwrap_or_else(|| std::thread::available_parallelism().map_or(1, Into::into)); |
85 | | let available_threads = thread_limit.map_or(available_threads, |l| if l == 0 { available_threads } else { l }); |
86 | | |
87 | | let (lower, upper) = (50, 1000); |
88 | | let (chunk_size, thread_limit) = num_items.map_or( |
89 | | { |
90 | | let chunk_size = if available_threads == 1 { |
91 | | desired_chunk_size |
92 | | } else if desired_chunk_size < lower { |
93 | | lower |
94 | | } else { |
95 | | desired_chunk_size.min(upper) |
96 | | }; |
97 | | (chunk_size, available_threads) |
98 | | }, |
99 | | |num_items| { |
100 | | let desired_chunks_per_thread_at_least = 2; |
101 | | let items = num_items; |
102 | | let chunk_size = (items / (available_threads * desired_chunks_per_thread_at_least)).clamp(1, upper); |
103 | | let num_chunks = items / chunk_size; |
104 | | let thread_limit = if num_chunks <= available_threads { |
105 | | (num_chunks / desired_chunks_per_thread_at_least).max(1) |
106 | | } else { |
107 | | available_threads |
108 | | }; |
109 | | (chunk_size, thread_limit) |
110 | | }, |
111 | | ); |
112 | | (chunk_size, Some(thread_limit), thread_limit) |
113 | | } |
114 | | |
115 | | /// Always returns 1, available when the `parallel` feature toggle is unset. |
116 | | #[cfg(not(feature = "parallel"))] |
117 | 0 | pub fn num_threads(_thread_limit: Option<usize>) -> usize { |
118 | 0 | 1 |
119 | 0 | } Unexecuted instantiation: gix_features::parallel::num_threads Unexecuted instantiation: gix_features::parallel::num_threads Unexecuted instantiation: gix_features::parallel::num_threads |
120 | | |
121 | | /// Returns the amount of threads the system can effectively use as the amount of its logical cores. |
122 | | /// |
123 | | /// Only available with the `parallel` feature toggle set. |
124 | | #[cfg(feature = "parallel")] |
125 | | pub fn num_threads(thread_limit: Option<usize>) -> usize { |
126 | | let logical_cores = std::thread::available_parallelism().map_or(1, Into::into); |
127 | | thread_limit.map_or(logical_cores, |l| if l == 0 { logical_cores } else { l }) |
128 | | } |
129 | | |
130 | | /// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. |
131 | | /// |
132 | | /// For parameters, see the documentation of [`in_parallel()`] |
133 | | #[cfg(feature = "parallel")] |
134 | | pub fn in_parallel_if<I, S, O, R>( |
135 | | condition: impl FnOnce() -> bool, |
136 | | input: impl Iterator<Item = I> + Send, |
137 | | thread_limit: Option<usize>, |
138 | | new_thread_state: impl FnOnce(usize) -> S + Send + Clone, |
139 | | consume: impl FnMut(I, &mut S) -> O + Send + Clone, |
140 | | reducer: R, |
141 | | ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> |
142 | | where |
143 | | R: Reduce<Input = O>, |
144 | | I: Send, |
145 | | O: Send, |
146 | | { |
147 | | if num_threads(thread_limit) > 1 && condition() { |
148 | | in_parallel(input, thread_limit, new_thread_state, consume, reducer) |
149 | | } else { |
150 | | serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) |
151 | | } |
152 | | } |
153 | | |
154 | | /// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated. |
155 | | /// |
156 | | /// For parameters, see the documentation of [`in_parallel()`] |
157 | | /// |
158 | | /// Note that the non-parallel version is equivalent to [`in_parallel()`]. |
159 | | #[cfg(not(feature = "parallel"))] |
160 | 0 | pub fn in_parallel_if<I, S, O, R>( |
161 | 0 | _condition: impl FnOnce() -> bool, |
162 | 0 | input: impl Iterator<Item = I>, |
163 | 0 | thread_limit: Option<usize>, |
164 | 0 | new_thread_state: impl FnOnce(usize) -> S, |
165 | 0 | consume: impl FnMut(I, &mut S) -> O, |
166 | 0 | reducer: R, |
167 | 0 | ) -> Result<<R as Reduce>::Output, <R as Reduce>::Error> |
168 | 0 | where |
169 | 0 | R: Reduce<Input = O>, |
170 | 0 | I: Send, |
171 | 0 | O: Send, |
172 | | { |
173 | 0 | serial::in_parallel(input, thread_limit, new_thread_state, consume, reducer) |
174 | 0 | } Unexecuted instantiation: gix_features::parallel::in_parallel_if::<_, _, _, _, _, _, _, _> Unexecuted instantiation: gix_features::parallel::in_parallel_if::<_, _, _, _, _, _, _, _> |
175 | | |
176 | | /// |
177 | | pub mod reduce; |
178 | | pub use reduce::Reduce; |