Coverage Report

Created: 2026-02-26 07:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rust-brotli/src/enc/threading.rs
Line
Count
Source
1
use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2
use core::marker::PhantomData;
3
use core::ops::Range;
4
use core::{any, mem};
5
#[cfg(feature = "std")]
6
use std;
7
8
use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
9
use super::encode::{
10
    hasher_setup, BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize,
11
    BrotliEncoderOperation, SanitizeParams,
12
};
13
use super::BrotliAlloc;
14
use crate::concat::{BroCatli, BroCatliResult};
15
use crate::enc::combined_alloc::{alloc_default, allocate};
16
use crate::enc::encode::BrotliEncoderStateStruct;
17
18
pub type PoisonedThreadError = ();
19
20
#[cfg(feature = "std")]
21
pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
22
#[cfg(not(feature = "std"))]
23
pub type LowLevelThreadError = ();
24
25
pub trait AnyBoxConstructor {
26
    fn new(data: LowLevelThreadError) -> Self;
27
}
28
29
pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
30
    fn join(self) -> Result<T, U>;
31
}
32
#[derive(Debug)]
33
pub enum BrotliEncoderThreadError {
34
    InsufficientOutputSpace,
35
    ConcatenationDidNotProcessFullFile,
36
    ConcatenationError(BroCatliResult),
37
    ConcatenationFinalizationError(BroCatliResult),
38
    OtherThreadPanic,
39
    ThreadExecError(LowLevelThreadError),
40
}
41
42
impl AnyBoxConstructor for BrotliEncoderThreadError {
43
    fn new(data: LowLevelThreadError) -> Self {
44
        BrotliEncoderThreadError::ThreadExecError(data)
45
    }
46
}
47
48
pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
49
where
50
    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
51
{
52
    data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
53
    data_size: usize,
54
}
55
pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
56
where
57
    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
58
{
59
    compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
60
    alloc: Alloc,
61
}
62
pub enum InternalSendAlloc<
63
    ReturnVal: Send + 'static,
64
    ExtraInput: Send + 'static,
65
    Alloc: BrotliAlloc + Send + 'static,
66
    Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
67
> where
68
    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
69
{
70
    A(Alloc, ExtraInput),
71
    Join(Join),
72
    SpawningOrJoining(PhantomData<ReturnVal>),
73
}
74
impl<
75
        ReturnVal: Send + 'static,
76
        ExtraInput: Send + 'static,
77
        Alloc: BrotliAlloc + Send + 'static,
78
        Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
79
    > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
80
where
81
    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
82
{
83
0
    fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
84
0
        match *self {
85
0
            InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
86
0
            _ => panic!("Bad state for allocator"),
87
        }
88
0
    }
89
}
90
91
pub struct SendAlloc<
92
    ReturnValue: Send + 'static,
93
    ExtraInput: Send + 'static,
94
    Alloc: BrotliAlloc + Send + 'static,
95
    Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
96
>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
97
//FIXME pub
98
where
99
    <Alloc as Allocator<u8>>::AllocatedMemory: Send;
100
101
impl<
102
        ReturnValue: Send + 'static,
103
        ExtraInput: Send + 'static,
104
        Alloc: BrotliAlloc + Send + 'static,
105
        Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
106
    > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
107
where
108
    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
109
{
110
0
    pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
111
0
        SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
112
0
    }
113
0
    pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
114
0
        match self.0 {
115
0
            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
116
            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
117
0
                (other, other_extra)
118
            }
119
        }
120
0
    }
121
0
    fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
122
0
        match self.0 {
123
0
            InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
124
            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
125
0
                panic!("Item permanently borrowed/leaked")
126
            }
127
        }
128
0
    }
129
0
    pub fn unwrap(self) -> (Alloc, ExtraInput) {
130
0
        match self.0 {
131
0
            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
132
            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
133
0
                panic!("Item permanently borrowed/leaked")
134
            }
135
        }
136
0
    }
137
0
    pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
138
0
        match mem::replace(
139
0
            &mut self.0,
140
0
            InternalSendAlloc::SpawningOrJoining(PhantomData),
141
0
        ) {
142
0
            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
143
            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
144
0
                panic!("Item permanently borrowed/leaked")
145
            }
146
        }
147
0
    }
148
}
149
150
pub enum InternalOwned<T> {
151
    // FIXME pub
152
    Item(T),
153
    Borrowed,
154
}
155
156
pub struct Owned<T>(pub InternalOwned<T>); // FIXME pub
157
impl<T> Owned<T> {
158
0
    pub fn new(data: T) -> Self {
159
0
        Owned::<T>(InternalOwned::Item(data))
160
0
    }
161
0
    pub fn unwrap_or(self, other: T) -> T {
162
0
        if let InternalOwned::Item(x) = self.0 {
163
0
            x
164
        } else {
165
0
            other
166
        }
167
0
    }
168
0
    pub fn unwrap(self) -> T {
169
0
        if let InternalOwned::Item(x) = self.0 {
170
0
            x
171
        } else {
172
0
            panic!("Item permanently borrowed")
173
        }
174
0
    }
175
0
    pub fn view(&self) -> &T {
176
0
        if let InternalOwned::Item(ref x) = self.0 {
177
0
            x
178
        } else {
179
0
            panic!("Item permanently borrowed")
180
        }
181
0
    }
182
}
183
184
pub trait OwnedRetriever<U: Send + 'static> {
185
    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
186
    fn unwrap(self) -> Result<U, PoisonedThreadError>;
187
}
188
189
#[cfg(feature = "std")]
190
impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
191
0
    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
192
0
        match self.read() {
193
0
            Ok(ref u) => Ok(f(u)),
194
0
            Err(_) => Err(PoisonedThreadError::default()),
195
        }
196
0
    }
197
0
    fn unwrap(self) -> Result<U, PoisonedThreadError> {
198
0
        match std::sync::Arc::try_unwrap(self) {
199
0
            Ok(rwlock) => match rwlock.into_inner() {
200
0
                Ok(u) => Ok(u),
201
0
                Err(_) => Err(PoisonedThreadError::default()),
202
            },
203
0
            Err(_) => Err(PoisonedThreadError::default()),
204
        }
205
0
    }
206
}
207
208
pub trait BatchSpawnable<
209
    ReturnValue: Send + 'static,
210
    ExtraInput: Send + 'static,
211
    Alloc: BrotliAlloc + Send + 'static,
212
    U: Send + 'static + Sync,
213
> where
214
    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
215
{
216
    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
217
    type FinalJoinHandle: OwnedRetriever<U>;
218
    // this function takes in an input slice
219
    // a SendAlloc per thread and converts them all into JoinHandle
220
    // the input is borrowed until the joins complete
221
    // owned is set to borrowed
222
    // the final join handle is a r/w lock which will return the SliceW to the owner
223
    // the FinalJoinHandle is only to be called when each individual JoinHandle has been examined
224
    // the function is called with the thread_index, the num_threads, a reference to the slice under a read lock,
225
    // and an allocator from the alloc_per_thread
226
    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
227
    fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
228
        &mut self,
229
        handle: &mut Self::FinalJoinHandle,
230
        alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
231
        index: usize,
232
        num_threads: usize,
233
        f: F,
234
    );
235
}
236
237
pub trait BatchSpawnableLite<
238
    ReturnValue: Send + 'static,
239
    ExtraInput: Send + 'static,
240
    Alloc: BrotliAlloc + Send + 'static,
241
    U: Send + 'static + Sync,
242
> where
243
    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
244
{
245
    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
246
    type FinalJoinHandle: OwnedRetriever<U>;
247
    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
248
    fn spawn(
249
        &mut self,
250
        handle: &mut Self::FinalJoinHandle,
251
        alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
252
        index: usize,
253
        num_threads: usize,
254
        f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
255
    );
256
}
257
/*
258
impl<ReturnValue:Send+'static,
259
     ExtraInput:Send+'static,
260
     Alloc:BrotliAlloc+Send+'static,
261
     U:Send+'static+Sync>
262
     BatchSpawnableLite<T, Alloc, U> for BatchSpawnable<T, Alloc, U> {
263
  type JoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::JoinHandle;
264
  type FinalJoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::FinalJoinHandle;
265
  fn batch_spawn(
266
    &mut self,
267
    input: &mut Owned<U>,
268
    alloc_per_thread:&mut [SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>],
269
    f: fn(usize, usize, &U, Alloc) -> T,
270
  ) -> Self::FinalJoinHandle {
271
   <Self as BatchSpawnable<ReturnValue, ExtraInput,  Alloc, U>>::batch_spawn(self, input, alloc_per_thread, f)
272
  }
273
}*/
274
275
0
pub fn CompressMultiSlice<
276
0
    Alloc: BrotliAlloc + Send + 'static,
277
0
    Spawner: BatchSpawnableLite<
278
0
        CompressionThreadResult<Alloc>,
279
0
        UnionHasher<Alloc>,
280
0
        Alloc,
281
0
        (
282
0
            <Alloc as Allocator<u8>>::AllocatedMemory,
283
0
            BrotliEncoderParams,
284
0
        ),
285
0
    >,
286
0
>(
287
0
    params: &BrotliEncoderParams,
288
0
    input_slice: &[u8],
289
0
    output: &mut [u8],
290
0
    alloc_per_thread: &mut [SendAlloc<
291
0
        CompressionThreadResult<Alloc>,
292
0
        UnionHasher<Alloc>,
293
0
        Alloc,
294
0
        Spawner::JoinHandle,
295
0
    >],
296
0
    thread_spawner: &mut Spawner,
297
0
) -> Result<usize, BrotliEncoderThreadError>
298
0
where
299
0
    <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
300
0
    <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
301
0
    <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
302
{
303
0
    let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
304
0
        let mut input = allocate::<u8, _>(alloc, input_slice.len());
305
0
        input.slice_mut().clone_from_slice(input_slice);
306
0
        input
307
    } else {
308
0
        alloc_default::<u8, Alloc>()
309
    };
310
0
    let mut owned_input = Owned::new(input);
311
0
    let ret = CompressMulti(
312
0
        params,
313
0
        &mut owned_input,
314
0
        output,
315
0
        alloc_per_thread,
316
0
        thread_spawner,
317
    );
318
0
    if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
319
0
        <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
320
0
    }
321
0
    ret
322
0
}
323
324
fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
325
    ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
326
}
327
328
0
fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
329
0
    hasher: UnionHasher<Alloc>,
330
0
    thread_index: usize,
331
0
    num_threads: usize,
332
0
    input_and_params: &(SliceW, BrotliEncoderParams),
333
0
    mut alloc: Alloc,
334
0
) -> CompressionThreadResult<Alloc>
335
0
where
336
0
    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
337
{
338
0
    let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
339
0
    let mut mem = allocate::<u8, _>(
340
0
        &mut alloc,
341
0
        BrotliEncoderMaxCompressedSize(range.end - range.start),
342
    );
343
0
    let mut state = BrotliEncoderStateStruct::new(alloc);
344
0
    state.params = input_and_params.1.clone();
345
0
    if thread_index != 0 {
346
0
        state.params.catable = true; // make sure we can concatenate this to the other work results
347
0
        state.params.magic_number = false; // no reason to pepper this around
348
0
    }
349
0
    state.params.appendable = true; // make sure we are at least appendable, so that future items can be catted in
350
0
    if thread_index != 0 {
351
0
        state.set_custom_dictionary_with_optional_precomputed_hasher(
352
0
            range.start,
353
0
            &input_and_params.0.slice()[..range.start],
354
0
            hasher,
355
0
            true,
356
0
        );
357
0
    }
358
0
    let mut out_offset = 0usize;
359
    let compression_result;
360
0
    let mut available_out = mem.len();
361
    loop {
362
0
        let mut next_in_offset = 0usize;
363
0
        let mut available_in = range.end - range.start;
364
0
        let result = state.compress_stream(
365
0
            BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
366
0
            &mut available_in,
367
0
            &input_and_params.0.slice()[range.clone()],
368
0
            &mut next_in_offset,
369
0
            &mut available_out,
370
0
            mem.slice_mut(),
371
0
            &mut out_offset,
372
0
            &mut None,
373
            &mut |_a, _b, _c, _d| (),
374
        );
375
0
        let new_range = range.start + next_in_offset..range.end;
376
0
        range = new_range;
377
0
        if result {
378
0
            compression_result = Ok(out_offset);
379
0
            break;
380
0
        } else if available_out == 0 {
381
0
            compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); // mark no space??
382
0
            break;
383
0
        }
384
    }
385
0
    BrotliEncoderDestroyInstance(&mut state);
386
0
    match compression_result {
387
0
        Ok(size) => CompressionThreadResult::<Alloc> {
388
0
            compressed: Ok(CompressedFileChunk {
389
0
                data_backing: mem,
390
0
                data_size: size,
391
0
            }),
392
0
            alloc: state.m8,
393
0
        },
394
0
        Err(e) => {
395
0
            <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
396
0
            CompressionThreadResult::<Alloc> {
397
0
                compressed: Err(e),
398
0
                alloc: state.m8,
399
0
            }
400
        }
401
    }
402
0
}
403
404
0
pub fn CompressMulti<
405
0
    Alloc: BrotliAlloc + Send + 'static,
406
0
    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
407
0
    Spawner: BatchSpawnableLite<
408
0
        CompressionThreadResult<Alloc>,
409
0
        UnionHasher<Alloc>,
410
0
        Alloc,
411
0
        (SliceW, BrotliEncoderParams),
412
0
    >,
413
0
>(
414
0
    params: &BrotliEncoderParams,
415
0
    owned_input: &mut Owned<SliceW>,
416
0
    output: &mut [u8],
417
0
    alloc_per_thread: &mut [SendAlloc<
418
0
        CompressionThreadResult<Alloc>,
419
0
        UnionHasher<Alloc>,
420
0
        Alloc,
421
0
        Spawner::JoinHandle,
422
0
    >],
423
0
    thread_spawner: &mut Spawner,
424
0
) -> Result<usize, BrotliEncoderThreadError>
425
0
where
426
0
    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
427
0
    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
428
0
    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
429
{
430
0
    let num_threads = alloc_per_thread.len();
431
0
    let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
432
0
    let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
433
    // start thread spawner
434
0
    let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
435
0
    if num_threads > 1 {
436
0
        // spawn first thread without "custom dictionary" while we compute the custom dictionary for other work items
437
0
        thread_spawner.spawn(
438
0
            &mut spawner_and_input,
439
0
            &mut alloc_per_thread[0],
440
0
            0,
441
0
            num_threads,
442
0
            compress_part,
443
0
        );
444
0
    }
445
    // populate all hashers at once, cloning them one by one
446
    let mut compression_last_thread_result;
447
0
    if num_threads > 1 && params.favor_cpu_efficiency {
448
0
        let mut local_params = params.clone();
449
0
        SanitizeParams(&mut local_params);
450
0
        let mut hasher = UnionHasher::Uninit;
451
0
        hasher_setup(
452
0
            alloc_per_thread[num_threads - 1].0.unwrap_input().0,
453
0
            &mut hasher,
454
0
            &mut local_params,
455
0
            None, // No unwrappable custom dict used here.
456
0
            &[],
457
            0,
458
            0,
459
            false,
460
        );
461
0
        for thread_index in 1..num_threads {
462
0
            let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
463
0
                let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
464
0
                let overlap = hasher.StoreLookahead().wrapping_sub(1);
465
0
                if range.end - range.start > overlap {
466
0
                    hasher.BulkStoreRange(
467
0
                        input_and_params.0.slice(),
468
                        usize::MAX,
469
0
                        if range.start > overlap {
470
0
                            range.start - overlap
471
                        } else {
472
0
                            0
473
                        },
474
0
                        range.end - overlap,
475
                    );
476
0
                }
477
0
            });
478
0
            if let Err(_e) = res {
479
0
                return Err(BrotliEncoderThreadError::OtherThreadPanic);
480
0
            }
481
0
            if thread_index + 1 != num_threads {
482
0
                {
483
0
                    let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
484
0
                    *out_hasher = hasher.clone_with_alloc(alloc);
485
0
                }
486
0
                thread_spawner.spawn(
487
0
                    &mut spawner_and_input,
488
0
                    &mut alloc_per_thread[thread_index],
489
0
                    thread_index,
490
0
                    num_threads,
491
0
                    compress_part,
492
0
                );
493
0
            }
494
        }
495
0
        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
496
0
        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
497
0
        compress_part(hasher,
498
0
                      num_threads - 1,
499
0
                      num_threads,
500
0
                      input_and_params,
501
0
                      alloc,
502
        )
503
0
      });
504
    } else {
505
0
        if num_threads > 1 {
506
0
            for thread_index in 1..num_threads - 1 {
507
0
                thread_spawner.spawn(
508
0
                    &mut spawner_and_input,
509
0
                    &mut alloc_per_thread[thread_index],
510
0
                    thread_index,
511
0
                    num_threads,
512
0
                    compress_part,
513
0
                );
514
0
            }
515
0
        }
516
0
        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
517
0
        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
518
0
        compress_part(UnionHasher::Uninit,
519
0
                      num_threads - 1,
520
0
                      num_threads,
521
0
                      input_and_params,
522
0
                      alloc,
523
        )
524
0
      });
525
    }
526
0
    let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
527
0
    let mut out_file_size = 0usize;
528
0
    let mut bro_cat_li = BroCatli::new();
529
0
    for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
530
0
        let mut cur_result = if index + 1 == num_threads {
531
0
            match mem::replace(&mut compression_last_thread_result, Err(())) {
532
0
                Ok(result) => result,
533
0
                Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
534
            }
535
        } else {
536
0
            match mem::replace(
537
0
                &mut thread.0,
538
0
                InternalSendAlloc::SpawningOrJoining(PhantomData),
539
0
            ) {
540
                InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
541
0
                    panic!("Thread not properly spawned")
542
                }
543
0
                InternalSendAlloc::Join(join) => match join.join() {
544
0
                    Ok(result) => result,
545
0
                    Err(err) => {
546
0
                        return Err(err);
547
                    }
548
                },
549
            }
550
        };
551
0
        match cur_result.compressed {
552
0
            Ok(compressed_out) => {
553
0
                bro_cat_li.new_brotli_file();
554
0
                let mut in_offset = 0usize;
555
0
                let cat_result = bro_cat_li.stream(
556
0
                    &compressed_out.data_backing.slice()[..compressed_out.data_size],
557
0
                    &mut in_offset,
558
0
                    output,
559
0
                    &mut out_file_size,
560
                );
561
0
                match cat_result {
562
0
                    BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
563
0
                        compression_result = Ok(out_file_size);
564
0
                    }
565
0
                    BroCatliResult::NeedsMoreOutput => {
566
0
                        compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
567
0
                        // not enough space
568
0
                    }
569
0
                    err => {
570
0
                        compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
571
0
                        // misc error
572
0
                    }
573
                }
574
0
                <Alloc as Allocator<u8>>::free_cell(
575
0
                    &mut cur_result.alloc,
576
0
                    compressed_out.data_backing,
577
                );
578
            }
579
0
            Err(e) => {
580
0
                compression_result = Err(e);
581
0
            }
582
        }
583
0
        thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
584
    }
585
0
    compression_result?;
586
0
    match bro_cat_li.finish(output, &mut out_file_size) {
587
0
        BroCatliResult::Success => compression_result = Ok(out_file_size),
588
0
        err => {
589
0
            compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
590
0
                err,
591
0
            ))
592
        }
593
    }
594
0
    if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
595
0
        *owned_input = Owned::new(retrieved_owned_input.0); // return the input to its rightful owner before returning
596
0
    } else if compression_result.is_ok() {
597
0
        compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
598
0
    }
599
0
    compression_result
600
0
}