/src/rust-brotli/src/enc/threading.rs
Line | Count | Source |
1 | | use alloc::{Allocator, SliceWrapper, SliceWrapperMut}; |
2 | | use core::marker::PhantomData; |
3 | | use core::ops::Range; |
4 | | use core::{any, mem}; |
5 | | #[cfg(feature = "std")] |
6 | | use std; |
7 | | |
8 | | use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher}; |
9 | | use super::encode::{ |
10 | | hasher_setup, BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize, |
11 | | BrotliEncoderOperation, SanitizeParams, |
12 | | }; |
13 | | use super::BrotliAlloc; |
14 | | use crate::concat::{BroCatli, BroCatliResult}; |
15 | | use crate::enc::combined_alloc::{alloc_default, allocate}; |
16 | | use crate::enc::encode::BrotliEncoderStateStruct; |
17 | | |
18 | | pub type PoisonedThreadError = (); |
19 | | |
20 | | #[cfg(feature = "std")] |
21 | | pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>; |
22 | | #[cfg(not(feature = "std"))] |
23 | | pub type LowLevelThreadError = (); |
24 | | |
25 | | pub trait AnyBoxConstructor { |
26 | | fn new(data: LowLevelThreadError) -> Self; |
27 | | } |
28 | | |
29 | | pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized { |
30 | | fn join(self) -> Result<T, U>; |
31 | | } |
32 | | #[derive(Debug)] |
33 | | pub enum BrotliEncoderThreadError { |
34 | | InsufficientOutputSpace, |
35 | | ConcatenationDidNotProcessFullFile, |
36 | | ConcatenationError(BroCatliResult), |
37 | | ConcatenationFinalizationError(BroCatliResult), |
38 | | OtherThreadPanic, |
39 | | ThreadExecError(LowLevelThreadError), |
40 | | } |
41 | | |
42 | | impl AnyBoxConstructor for BrotliEncoderThreadError { |
43 | | fn new(data: LowLevelThreadError) -> Self { |
44 | | BrotliEncoderThreadError::ThreadExecError(data) |
45 | | } |
46 | | } |
47 | | |
48 | | pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static> |
49 | | where |
50 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send, |
51 | | { |
52 | | data_backing: <Alloc as Allocator<u8>>::AllocatedMemory, |
53 | | data_size: usize, |
54 | | } |
55 | | pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static> |
56 | | where |
57 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send, |
58 | | { |
59 | | compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>, |
60 | | alloc: Alloc, |
61 | | } |
62 | | pub enum InternalSendAlloc< |
63 | | ReturnVal: Send + 'static, |
64 | | ExtraInput: Send + 'static, |
65 | | Alloc: BrotliAlloc + Send + 'static, |
66 | | Join: Joinable<ReturnVal, BrotliEncoderThreadError>, |
67 | | > where |
68 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send, |
69 | | { |
70 | | A(Alloc, ExtraInput), |
71 | | Join(Join), |
72 | | SpawningOrJoining(PhantomData<ReturnVal>), |
73 | | } |
74 | | impl< |
75 | | ReturnVal: Send + 'static, |
76 | | ExtraInput: Send + 'static, |
77 | | Alloc: BrotliAlloc + Send + 'static, |
78 | | Join: Joinable<ReturnVal, BrotliEncoderThreadError>, |
79 | | > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join> |
80 | | where |
81 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send, |
82 | | { |
83 | 0 | fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) { |
84 | 0 | match *self { |
85 | 0 | InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra), |
86 | 0 | _ => panic!("Bad state for allocator"), |
87 | | } |
88 | 0 | } |
89 | | } |
90 | | |
91 | | pub struct SendAlloc< |
92 | | ReturnValue: Send + 'static, |
93 | | ExtraInput: Send + 'static, |
94 | | Alloc: BrotliAlloc + Send + 'static, |
95 | | Join: Joinable<ReturnValue, BrotliEncoderThreadError>, |
96 | | >(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>) |
97 | | //FIXME pub |
98 | | where |
99 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send; |
100 | | |
101 | | impl< |
102 | | ReturnValue: Send + 'static, |
103 | | ExtraInput: Send + 'static, |
104 | | Alloc: BrotliAlloc + Send + 'static, |
105 | | Join: Joinable<ReturnValue, BrotliEncoderThreadError>, |
106 | | > SendAlloc<ReturnValue, ExtraInput, Alloc, Join> |
107 | | where |
108 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send, |
109 | | { |
110 | 0 | pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self { |
111 | 0 | SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input)) |
112 | 0 | } |
113 | 0 | pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) { |
114 | 0 | match self.0 { |
115 | 0 | InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input), |
116 | | InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => { |
117 | 0 | (other, other_extra) |
118 | | } |
119 | | } |
120 | 0 | } |
121 | 0 | fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) { |
122 | 0 | match self.0 { |
123 | 0 | InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input), |
124 | | InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => { |
125 | 0 | panic!("Item permanently borrowed/leaked") |
126 | | } |
127 | | } |
128 | 0 | } |
129 | 0 | pub fn unwrap(self) -> (Alloc, ExtraInput) { |
130 | 0 | match self.0 { |
131 | 0 | InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input), |
132 | | InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => { |
133 | 0 | panic!("Item permanently borrowed/leaked") |
134 | | } |
135 | | } |
136 | 0 | } |
137 | 0 | pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) { |
138 | 0 | match mem::replace( |
139 | 0 | &mut self.0, |
140 | 0 | InternalSendAlloc::SpawningOrJoining(PhantomData), |
141 | 0 | ) { |
142 | 0 | InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input), |
143 | | InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => { |
144 | 0 | panic!("Item permanently borrowed/leaked") |
145 | | } |
146 | | } |
147 | 0 | } |
148 | | } |
149 | | |
150 | | pub enum InternalOwned<T> { |
151 | | // FIXME pub |
152 | | Item(T), |
153 | | Borrowed, |
154 | | } |
155 | | |
156 | | pub struct Owned<T>(pub InternalOwned<T>); // FIXME pub |
157 | | impl<T> Owned<T> { |
158 | 0 | pub fn new(data: T) -> Self { |
159 | 0 | Owned::<T>(InternalOwned::Item(data)) |
160 | 0 | } |
161 | 0 | pub fn unwrap_or(self, other: T) -> T { |
162 | 0 | if let InternalOwned::Item(x) = self.0 { |
163 | 0 | x |
164 | | } else { |
165 | 0 | other |
166 | | } |
167 | 0 | } |
168 | 0 | pub fn unwrap(self) -> T { |
169 | 0 | if let InternalOwned::Item(x) = self.0 { |
170 | 0 | x |
171 | | } else { |
172 | 0 | panic!("Item permanently borrowed") |
173 | | } |
174 | 0 | } |
175 | 0 | pub fn view(&self) -> &T { |
176 | 0 | if let InternalOwned::Item(ref x) = self.0 { |
177 | 0 | x |
178 | | } else { |
179 | 0 | panic!("Item permanently borrowed") |
180 | | } |
181 | 0 | } |
182 | | } |
183 | | |
184 | | pub trait OwnedRetriever<U: Send + 'static> { |
185 | | fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>; |
186 | | fn unwrap(self) -> Result<U, PoisonedThreadError>; |
187 | | } |
188 | | |
189 | | #[cfg(feature = "std")] |
190 | | impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> { |
191 | 0 | fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> { |
192 | 0 | match self.read() { |
193 | 0 | Ok(ref u) => Ok(f(u)), |
194 | 0 | Err(_) => Err(PoisonedThreadError::default()), |
195 | | } |
196 | 0 | } |
197 | 0 | fn unwrap(self) -> Result<U, PoisonedThreadError> { |
198 | 0 | match std::sync::Arc::try_unwrap(self) { |
199 | 0 | Ok(rwlock) => match rwlock.into_inner() { |
200 | 0 | Ok(u) => Ok(u), |
201 | 0 | Err(_) => Err(PoisonedThreadError::default()), |
202 | | }, |
203 | 0 | Err(_) => Err(PoisonedThreadError::default()), |
204 | | } |
205 | 0 | } |
206 | | } |
207 | | |
208 | | pub trait BatchSpawnable< |
209 | | ReturnValue: Send + 'static, |
210 | | ExtraInput: Send + 'static, |
211 | | Alloc: BrotliAlloc + Send + 'static, |
212 | | U: Send + 'static + Sync, |
213 | | > where |
214 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static, |
215 | | { |
216 | | type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>; |
217 | | type FinalJoinHandle: OwnedRetriever<U>; |
218 | | // this function takes in an input slice |
219 | | // a SendAlloc per thread and converts them all into JoinHandle |
220 | | // the input is borrowed until the joins complete |
221 | | // owned is set to borrowed |
222 | | // the final join handle is a r/w lock which will return the SliceW to the owner |
223 | | // the FinalJoinHandle is only to be called when each individual JoinHandle has been examined |
224 | | // the function is called with the thread_index, the num_threads, a reference to the slice under a read lock, |
225 | | // and an allocator from the alloc_per_thread |
226 | | fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle; |
227 | | fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>( |
228 | | &mut self, |
229 | | handle: &mut Self::FinalJoinHandle, |
230 | | alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>, |
231 | | index: usize, |
232 | | num_threads: usize, |
233 | | f: F, |
234 | | ); |
235 | | } |
236 | | |
237 | | pub trait BatchSpawnableLite< |
238 | | ReturnValue: Send + 'static, |
239 | | ExtraInput: Send + 'static, |
240 | | Alloc: BrotliAlloc + Send + 'static, |
241 | | U: Send + 'static + Sync, |
242 | | > where |
243 | | <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static, |
244 | | { |
245 | | type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>; |
246 | | type FinalJoinHandle: OwnedRetriever<U>; |
247 | | fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle; |
248 | | fn spawn( |
249 | | &mut self, |
250 | | handle: &mut Self::FinalJoinHandle, |
251 | | alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>, |
252 | | index: usize, |
253 | | num_threads: usize, |
254 | | f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue, |
255 | | ); |
256 | | } |
257 | | /* |
258 | | impl<ReturnValue:Send+'static, |
259 | | ExtraInput:Send+'static, |
260 | | Alloc:BrotliAlloc+Send+'static, |
261 | | U:Send+'static+Sync> |
262 | | BatchSpawnableLite<T, Alloc, U> for BatchSpawnable<T, Alloc, U> { |
263 | | type JoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::JoinHandle; |
264 | | type FinalJoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::FinalJoinHandle; |
265 | | fn batch_spawn( |
266 | | &mut self, |
267 | | input: &mut Owned<U>, |
268 | | alloc_per_thread:&mut [SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>], |
269 | | f: fn(usize, usize, &U, Alloc) -> T, |
270 | | ) -> Self::FinalJoinHandle { |
271 | | <Self as BatchSpawnable<ReturnValue, ExtraInput, Alloc, U>>::batch_spawn(self, input, alloc_per_thread, f) |
272 | | } |
273 | | }*/ |
274 | | |
275 | 0 | pub fn CompressMultiSlice< |
276 | 0 | Alloc: BrotliAlloc + Send + 'static, |
277 | 0 | Spawner: BatchSpawnableLite< |
278 | 0 | CompressionThreadResult<Alloc>, |
279 | 0 | UnionHasher<Alloc>, |
280 | 0 | Alloc, |
281 | 0 | ( |
282 | 0 | <Alloc as Allocator<u8>>::AllocatedMemory, |
283 | 0 | BrotliEncoderParams, |
284 | 0 | ), |
285 | 0 | >, |
286 | 0 | >( |
287 | 0 | params: &BrotliEncoderParams, |
288 | 0 | input_slice: &[u8], |
289 | 0 | output: &mut [u8], |
290 | 0 | alloc_per_thread: &mut [SendAlloc< |
291 | 0 | CompressionThreadResult<Alloc>, |
292 | 0 | UnionHasher<Alloc>, |
293 | 0 | Alloc, |
294 | 0 | Spawner::JoinHandle, |
295 | 0 | >], |
296 | 0 | thread_spawner: &mut Spawner, |
297 | 0 | ) -> Result<usize, BrotliEncoderThreadError> |
298 | 0 | where |
299 | 0 | <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync, |
300 | 0 | <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync, |
301 | 0 | <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync, |
302 | | { |
303 | 0 | let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 { |
304 | 0 | let mut input = allocate::<u8, _>(alloc, input_slice.len()); |
305 | 0 | input.slice_mut().clone_from_slice(input_slice); |
306 | 0 | input |
307 | | } else { |
308 | 0 | alloc_default::<u8, Alloc>() |
309 | | }; |
310 | 0 | let mut owned_input = Owned::new(input); |
311 | 0 | let ret = CompressMulti( |
312 | 0 | params, |
313 | 0 | &mut owned_input, |
314 | 0 | output, |
315 | 0 | alloc_per_thread, |
316 | 0 | thread_spawner, |
317 | | ); |
318 | 0 | if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 { |
319 | 0 | <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap()); |
320 | 0 | } |
321 | 0 | ret |
322 | 0 | } |
323 | | |
324 | | fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> { |
325 | | ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads) |
326 | | } |
327 | | |
328 | 0 | fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>( |
329 | 0 | hasher: UnionHasher<Alloc>, |
330 | 0 | thread_index: usize, |
331 | 0 | num_threads: usize, |
332 | 0 | input_and_params: &(SliceW, BrotliEncoderParams), |
333 | 0 | mut alloc: Alloc, |
334 | 0 | ) -> CompressionThreadResult<Alloc> |
335 | 0 | where |
336 | 0 | <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static, |
337 | | { |
338 | 0 | let mut range = get_range(thread_index, num_threads, input_and_params.0.len()); |
339 | 0 | let mut mem = allocate::<u8, _>( |
340 | 0 | &mut alloc, |
341 | 0 | BrotliEncoderMaxCompressedSize(range.end - range.start), |
342 | | ); |
343 | 0 | let mut state = BrotliEncoderStateStruct::new(alloc); |
344 | 0 | state.params = input_and_params.1.clone(); |
345 | 0 | if thread_index != 0 { |
346 | 0 | state.params.catable = true; // make sure we can concatenate this to the other work results |
347 | 0 | state.params.magic_number = false; // no reason to pepper this around |
348 | 0 | } |
349 | 0 | state.params.appendable = true; // make sure we are at least appendable, so that future items can be catted in |
350 | 0 | if thread_index != 0 { |
351 | 0 | state.set_custom_dictionary_with_optional_precomputed_hasher( |
352 | 0 | range.start, |
353 | 0 | &input_and_params.0.slice()[..range.start], |
354 | 0 | hasher, |
355 | 0 | true, |
356 | 0 | ); |
357 | 0 | } |
358 | 0 | let mut out_offset = 0usize; |
359 | | let compression_result; |
360 | 0 | let mut available_out = mem.len(); |
361 | | loop { |
362 | 0 | let mut next_in_offset = 0usize; |
363 | 0 | let mut available_in = range.end - range.start; |
364 | 0 | let result = state.compress_stream( |
365 | 0 | BrotliEncoderOperation::BROTLI_OPERATION_FINISH, |
366 | 0 | &mut available_in, |
367 | 0 | &input_and_params.0.slice()[range.clone()], |
368 | 0 | &mut next_in_offset, |
369 | 0 | &mut available_out, |
370 | 0 | mem.slice_mut(), |
371 | 0 | &mut out_offset, |
372 | 0 | &mut None, |
373 | | &mut |_a, _b, _c, _d| (), |
374 | | ); |
375 | 0 | let new_range = range.start + next_in_offset..range.end; |
376 | 0 | range = new_range; |
377 | 0 | if result { |
378 | 0 | compression_result = Ok(out_offset); |
379 | 0 | break; |
380 | 0 | } else if available_out == 0 { |
381 | 0 | compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); // mark no space?? |
382 | 0 | break; |
383 | 0 | } |
384 | | } |
385 | 0 | BrotliEncoderDestroyInstance(&mut state); |
386 | 0 | match compression_result { |
387 | 0 | Ok(size) => CompressionThreadResult::<Alloc> { |
388 | 0 | compressed: Ok(CompressedFileChunk { |
389 | 0 | data_backing: mem, |
390 | 0 | data_size: size, |
391 | 0 | }), |
392 | 0 | alloc: state.m8, |
393 | 0 | }, |
394 | 0 | Err(e) => { |
395 | 0 | <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem); |
396 | 0 | CompressionThreadResult::<Alloc> { |
397 | 0 | compressed: Err(e), |
398 | 0 | alloc: state.m8, |
399 | 0 | } |
400 | | } |
401 | | } |
402 | 0 | } |
403 | | |
404 | 0 | pub fn CompressMulti< |
405 | 0 | Alloc: BrotliAlloc + Send + 'static, |
406 | 0 | SliceW: SliceWrapper<u8> + Send + 'static + Sync, |
407 | 0 | Spawner: BatchSpawnableLite< |
408 | 0 | CompressionThreadResult<Alloc>, |
409 | 0 | UnionHasher<Alloc>, |
410 | 0 | Alloc, |
411 | 0 | (SliceW, BrotliEncoderParams), |
412 | 0 | >, |
413 | 0 | >( |
414 | 0 | params: &BrotliEncoderParams, |
415 | 0 | owned_input: &mut Owned<SliceW>, |
416 | 0 | output: &mut [u8], |
417 | 0 | alloc_per_thread: &mut [SendAlloc< |
418 | 0 | CompressionThreadResult<Alloc>, |
419 | 0 | UnionHasher<Alloc>, |
420 | 0 | Alloc, |
421 | 0 | Spawner::JoinHandle, |
422 | 0 | >], |
423 | 0 | thread_spawner: &mut Spawner, |
424 | 0 | ) -> Result<usize, BrotliEncoderThreadError> |
425 | 0 | where |
426 | 0 | <Alloc as Allocator<u8>>::AllocatedMemory: Send, |
427 | 0 | <Alloc as Allocator<u16>>::AllocatedMemory: Send, |
428 | 0 | <Alloc as Allocator<u32>>::AllocatedMemory: Send, |
429 | | { |
430 | 0 | let num_threads = alloc_per_thread.len(); |
431 | 0 | let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed)); |
432 | 0 | let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone())); |
433 | | // start thread spawner |
434 | 0 | let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair); |
435 | 0 | if num_threads > 1 { |
436 | 0 | // spawn first thread without "custom dictionary" while we compute the custom dictionary for other work items |
437 | 0 | thread_spawner.spawn( |
438 | 0 | &mut spawner_and_input, |
439 | 0 | &mut alloc_per_thread[0], |
440 | 0 | 0, |
441 | 0 | num_threads, |
442 | 0 | compress_part, |
443 | 0 | ); |
444 | 0 | } |
445 | | // populate all hashers at once, cloning them one by one |
446 | | let mut compression_last_thread_result; |
447 | 0 | if num_threads > 1 && params.favor_cpu_efficiency { |
448 | 0 | let mut local_params = params.clone(); |
449 | 0 | SanitizeParams(&mut local_params); |
450 | 0 | let mut hasher = UnionHasher::Uninit; |
451 | 0 | hasher_setup( |
452 | 0 | alloc_per_thread[num_threads - 1].0.unwrap_input().0, |
453 | 0 | &mut hasher, |
454 | 0 | &mut local_params, |
455 | 0 | None, // No unwrappable custom dict used here. |
456 | 0 | &[], |
457 | | 0, |
458 | | 0, |
459 | | false, |
460 | | ); |
461 | 0 | for thread_index in 1..num_threads { |
462 | 0 | let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| { |
463 | 0 | let range = get_range(thread_index - 1, num_threads, input_and_params.0.len()); |
464 | 0 | let overlap = hasher.StoreLookahead().wrapping_sub(1); |
465 | 0 | if range.end - range.start > overlap { |
466 | 0 | hasher.BulkStoreRange( |
467 | 0 | input_and_params.0.slice(), |
468 | | usize::MAX, |
469 | 0 | if range.start > overlap { |
470 | 0 | range.start - overlap |
471 | | } else { |
472 | 0 | 0 |
473 | | }, |
474 | 0 | range.end - overlap, |
475 | | ); |
476 | 0 | } |
477 | 0 | }); |
478 | 0 | if let Err(_e) = res { |
479 | 0 | return Err(BrotliEncoderThreadError::OtherThreadPanic); |
480 | 0 | } |
481 | 0 | if thread_index + 1 != num_threads { |
482 | 0 | { |
483 | 0 | let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut(); |
484 | 0 | *out_hasher = hasher.clone_with_alloc(alloc); |
485 | 0 | } |
486 | 0 | thread_spawner.spawn( |
487 | 0 | &mut spawner_and_input, |
488 | 0 | &mut alloc_per_thread[thread_index], |
489 | 0 | thread_index, |
490 | 0 | num_threads, |
491 | 0 | compress_part, |
492 | 0 | ); |
493 | 0 | } |
494 | | } |
495 | 0 | let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default(); |
496 | 0 | compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> { |
497 | 0 | compress_part(hasher, |
498 | 0 | num_threads - 1, |
499 | 0 | num_threads, |
500 | 0 | input_and_params, |
501 | 0 | alloc, |
502 | | ) |
503 | 0 | }); |
504 | | } else { |
505 | 0 | if num_threads > 1 { |
506 | 0 | for thread_index in 1..num_threads - 1 { |
507 | 0 | thread_spawner.spawn( |
508 | 0 | &mut spawner_and_input, |
509 | 0 | &mut alloc_per_thread[thread_index], |
510 | 0 | thread_index, |
511 | 0 | num_threads, |
512 | 0 | compress_part, |
513 | 0 | ); |
514 | 0 | } |
515 | 0 | } |
516 | 0 | let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default(); |
517 | 0 | compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> { |
518 | 0 | compress_part(UnionHasher::Uninit, |
519 | 0 | num_threads - 1, |
520 | 0 | num_threads, |
521 | 0 | input_and_params, |
522 | 0 | alloc, |
523 | | ) |
524 | 0 | }); |
525 | | } |
526 | 0 | let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); |
527 | 0 | let mut out_file_size = 0usize; |
528 | 0 | let mut bro_cat_li = BroCatli::new(); |
529 | 0 | for (index, thread) in alloc_per_thread.iter_mut().enumerate() { |
530 | 0 | let mut cur_result = if index + 1 == num_threads { |
531 | 0 | match mem::replace(&mut compression_last_thread_result, Err(())) { |
532 | 0 | Ok(result) => result, |
533 | 0 | Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic), |
534 | | } |
535 | | } else { |
536 | 0 | match mem::replace( |
537 | 0 | &mut thread.0, |
538 | 0 | InternalSendAlloc::SpawningOrJoining(PhantomData), |
539 | 0 | ) { |
540 | | InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => { |
541 | 0 | panic!("Thread not properly spawned") |
542 | | } |
543 | 0 | InternalSendAlloc::Join(join) => match join.join() { |
544 | 0 | Ok(result) => result, |
545 | 0 | Err(err) => { |
546 | 0 | return Err(err); |
547 | | } |
548 | | }, |
549 | | } |
550 | | }; |
551 | 0 | match cur_result.compressed { |
552 | 0 | Ok(compressed_out) => { |
553 | 0 | bro_cat_li.new_brotli_file(); |
554 | 0 | let mut in_offset = 0usize; |
555 | 0 | let cat_result = bro_cat_li.stream( |
556 | 0 | &compressed_out.data_backing.slice()[..compressed_out.data_size], |
557 | 0 | &mut in_offset, |
558 | 0 | output, |
559 | 0 | &mut out_file_size, |
560 | | ); |
561 | 0 | match cat_result { |
562 | 0 | BroCatliResult::Success | BroCatliResult::NeedsMoreInput => { |
563 | 0 | compression_result = Ok(out_file_size); |
564 | 0 | } |
565 | 0 | BroCatliResult::NeedsMoreOutput => { |
566 | 0 | compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); |
567 | 0 | // not enough space |
568 | 0 | } |
569 | 0 | err => { |
570 | 0 | compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err)); |
571 | 0 | // misc error |
572 | 0 | } |
573 | | } |
574 | 0 | <Alloc as Allocator<u8>>::free_cell( |
575 | 0 | &mut cur_result.alloc, |
576 | 0 | compressed_out.data_backing, |
577 | | ); |
578 | | } |
579 | 0 | Err(e) => { |
580 | 0 | compression_result = Err(e); |
581 | 0 | } |
582 | | } |
583 | 0 | thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit); |
584 | | } |
585 | 0 | compression_result?; |
586 | 0 | match bro_cat_li.finish(output, &mut out_file_size) { |
587 | 0 | BroCatliResult::Success => compression_result = Ok(out_file_size), |
588 | 0 | err => { |
589 | 0 | compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError( |
590 | 0 | err, |
591 | 0 | )) |
592 | | } |
593 | | } |
594 | 0 | if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() { |
595 | 0 | *owned_input = Owned::new(retrieved_owned_input.0); // return the input to its rightful owner before returning |
596 | 0 | } else if compression_result.is_ok() { |
597 | 0 | compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic); |
598 | 0 | } |
599 | 0 | compression_result |
600 | 0 | } |