Coverage Report

Created: 2026-04-03 06:39

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/c-blosc/blosc/blosc.c
Line
Count
Source
1
/*********************************************************************
2
  Blosc - Blocked Shuffling and Compression Library
3
4
  Author: Francesc Alted <francesc@blosc.org>
5
  Creation date: 2009-05-20
6
7
  See LICENSE.txt for details about copyright and rights to use.
8
**********************************************************************/
9
10
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <errno.h>
14
#include <string.h>
15
#include <sys/types.h>
16
#include <assert.h>
17
18
#include "fastcopy.h"
19
20
#if defined(USING_CMAKE)
21
  #include "config.h"
22
#endif /*  USING_CMAKE */
23
#include "blosc.h"
24
#include "shuffle.h"
25
#include "blosclz.h"
26
#if defined(HAVE_LZ4)
27
  #include "lz4.h"
28
  #include "lz4hc.h"
29
#endif /*  HAVE_LZ4 */
30
#if defined(HAVE_SNAPPY)
31
  #include "snappy-c.h"
32
#endif /*  HAVE_SNAPPY */
33
#if defined(HAVE_ZLIB)
34
  #include "zlib.h"
35
#endif /*  HAVE_ZLIB */
36
#if defined(HAVE_ZSTD)
37
  #include "zstd.h"
38
#endif /*  HAVE_ZSTD */
39
40
#if defined(_WIN32) && !defined(__MINGW32__)
41
  #include <windows.h>
42
  #include <malloc.h>
43
44
  /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
45
  #if defined(_MSC_VER) && _MSC_VER < 1600
46
    #include "win32/stdint-windows.h"
47
  #else
48
    #include <stdint.h>
49
  #endif
50
51
  #include <process.h>
52
  #define getpid _getpid
53
#else
54
  #include <stdint.h>
55
  #include <unistd.h>
56
  #include <inttypes.h>
57
#endif  /* _WIN32 */
58
59
/* Include the win32/pthread.h library for all the Windows builds. See #224. */
60
#if defined(_WIN32)
61
  #include "win32/pthread.h"
62
  #include "win32/pthread.c"
63
#else
64
  #include <pthread.h>
65
#endif
66
67
68
/* Some useful units */
69
0
#define KB 1024
70
#define MB (1024 * (KB))
71
72
/* Minimum buffer size to be compressed */
73
35.0k
#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
74
75
/* The maximum number of splits in a block for compression */
76
11.6k
#define MAX_SPLITS 16            /* Cannot be larger than 128 */
77
78
/* The size of L1 cache.  32 KB is quite common nowadays. */
79
0
#define L1 (32 * (KB))
80
81
/* Have problems using posix barriers when symbol value is 200112L */
82
/* This requires more investigation, but will work for the moment */
83
#if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
84
#define _POSIX_BARRIERS_MINE
85
#endif
86
/* Synchronization variables */
87
88
89
struct blosc_context {
90
  int32_t compress;               /* 1 if we are doing compression 0 if decompress */
91
92
  const uint8_t* src;
93
  uint8_t* dest;                  /* The current pos in the destination buffer */
94
  uint8_t* header_flags;          /* Flags for header */
95
  int compversion;                /* Compressor version byte, only used during decompression */
96
  int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
97
  int32_t compressedsize;         /* Number of bytes of compressed data (only used when decompressing) */
98
  int32_t nblocks;                /* Number of total blocks in buffer */
99
  int32_t leftover;               /* Extra bytes at end of buffer */
100
  int32_t blocksize;              /* Length of the block in bytes */
101
  int32_t typesize;               /* Type size */
102
  int32_t num_output_bytes;       /* Counter for the number of output bytes */
103
  int32_t destsize;               /* Maximum size for destination buffer */
104
  uint8_t* bstarts;               /* Start of the buffer past header info */
105
  int32_t compcode;               /* Compressor code to use */
106
  int clevel;                     /* Compression level (1-9) */
107
  /* Function to use for decompression.  Only used when decompression */
108
  int (*decompress_func)(const void* input, int compressed_length, void* output,
109
                         int maxout);
110
111
  /* Threading */
112
  int32_t numthreads;
113
  int32_t threads_started;
114
  int32_t end_threads;
115
  pthread_t threads[BLOSC_MAX_THREADS];
116
  int32_t tids[BLOSC_MAX_THREADS];
117
  pthread_mutex_t count_mutex;
118
  #ifdef _POSIX_BARRIERS_MINE
119
  pthread_barrier_t barr_init;
120
  pthread_barrier_t barr_finish;
121
  #else
122
  int32_t count_threads;
123
  pthread_mutex_t count_threads_mutex;
124
  pthread_cond_t count_threads_cv;
125
  #endif
126
  #if !defined(_WIN32)
127
  pthread_attr_t ct_attr;            /* creation time attrs for threads */
128
  #endif
129
  int32_t thread_giveup_code;               /* error code when give up */
130
  int32_t thread_nblock;                    /* block counter */
131
};
132
133
struct thread_context {
134
  struct blosc_context* parent_context;
135
  int32_t tid;
136
  uint8_t* tmp;
137
  uint8_t* tmp2;
138
  uint8_t* tmp3;
139
  int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140
};
141
142
/* Global context for non-contextual API */
143
static struct blosc_context* g_global_context;
144
static pthread_mutex_t* global_comp_mutex;
145
static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */
146
static int32_t g_threads = 1;
147
static int32_t g_force_blocksize = 0;
148
static int32_t g_initlib = 0;
149
static int32_t g_atfork_registered = 0;
150
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
151
152
153
154
/* Wrapped function to adjust the number of threads used by blosc */
155
int blosc_set_nthreads_(struct blosc_context*);
156
157
/* Releases the global threadpool */
158
int blosc_release_threadpool(struct blosc_context* context);
159
160
/* Macros for synchronization */
161
162
/* Wait until all threads are initialized */
163
#ifdef _POSIX_BARRIERS_MINE
164
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
165
0
  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
166
0
  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
167
0
    printf("Could not wait on barrier (init): %d\n", rc); \
168
0
    return((RET_VAL));                            \
169
0
  }
170
#else
171
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
172
  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
173
  if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
174
    CONTEXT_PTR->count_threads++;  \
175
    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
176
  } \
177
  else { \
178
    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
179
  } \
180
  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
181
#endif
182
183
/* Wait for all threads to finish */
184
#ifdef _POSIX_BARRIERS_MINE
185
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
186
0
  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
187
0
  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
188
0
    printf("Could not wait on barrier (finish)\n"); \
189
0
    return((RET_VAL));                              \
190
0
  }
191
#else
192
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
193
  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
194
  if (CONTEXT_PTR->count_threads > 0) { \
195
    CONTEXT_PTR->count_threads--; \
196
    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
197
  } \
198
  else { \
199
    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
200
  } \
201
  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
202
#endif
203
204
205
/* A function for aligned malloc that is portable */
206
static uint8_t *my_malloc(size_t size)
207
23.4k
{
208
23.4k
  void *block = NULL;
209
23.4k
  int res = 0;
210
211
/* Do an alignment to 32 bytes because AVX2 is supported */
212
#if defined(_WIN32)
213
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
214
  block = (void *)_aligned_malloc(size, 32);
215
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
216
  /* Platform does have an implementation of posix_memalign */
217
23.4k
  res = posix_memalign(&block, 32, size);
218
#else
219
  block = malloc(size);
220
#endif  /* _WIN32 */
221
222
23.4k
  if (block == NULL || res != 0) {
223
0
    printf("Error allocating memory!");
224
0
    return NULL;
225
0
  }
226
227
23.4k
  return (uint8_t *)block;
228
23.4k
}
229
230
231
/* Release memory booked by my_malloc */
232
static void my_free(void *block)
233
23.4k
{
234
#if defined(_WIN32)
235
    _aligned_free(block);
236
#else
237
23.4k
    free(block);
238
23.4k
#endif  /* _WIN32 */
239
23.4k
}
240
241
242
/* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
243
static int32_t sw32_(const uint8_t *pa)
244
250k
{
245
250k
  int32_t idest;
246
250k
  uint8_t *dest = (uint8_t *)&idest;
247
250k
  int i = 1;                    /* for big/little endian detection */
248
250k
  char *p = (char *)&i;
249
250
250k
  if (p[0] != 1) {
251
    /* big endian */
252
0
    dest[0] = pa[3];
253
0
    dest[1] = pa[2];
254
0
    dest[2] = pa[1];
255
0
    dest[3] = pa[0];
256
0
  }
257
250k
  else {
258
    /* little endian */
259
250k
    dest[0] = pa[0];
260
250k
    dest[1] = pa[1];
261
250k
    dest[2] = pa[2];
262
250k
    dest[3] = pa[3];
263
250k
  }
264
250k
  return idest;
265
250k
}
266
267
268
/* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
269
static void _sw32(uint8_t* dest, int32_t a)
270
435k
{
271
435k
  uint8_t *pa = (uint8_t *)&a;
272
435k
  int i = 1;                    /* for big/little endian detection */
273
435k
  char *p = (char *)&i;
274
275
435k
  if (p[0] != 1) {
276
    /* big endian */
277
0
    dest[0] = pa[3];
278
0
    dest[1] = pa[2];
279
0
    dest[2] = pa[1];
280
0
    dest[3] = pa[0];
281
0
  }
282
435k
  else {
283
    /* little endian */
284
435k
    dest[0] = pa[0];
285
435k
    dest[1] = pa[1];
286
435k
    dest[2] = pa[2];
287
435k
    dest[3] = pa[3];
288
435k
  }
289
435k
}
290
291
/*
292
 * Conversion routines between compressor and compression libraries
293
 */
294
295
/* Return the library code associated with the compressor name */
296
static int compname_to_clibcode(const char *compname)
297
0
{
298
0
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
299
0
    return BLOSC_BLOSCLZ_LIB;
300
0
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
301
0
    return BLOSC_LZ4_LIB;
302
0
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
303
0
    return BLOSC_LZ4_LIB;
304
0
  if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
305
0
    return BLOSC_SNAPPY_LIB;
306
0
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
307
0
    return BLOSC_ZLIB_LIB;
308
0
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
309
0
    return BLOSC_ZSTD_LIB;
310
0
  return -1;
311
0
}
312
313
/* Return the library name associated with the compressor code */
314
static const char *clibcode_to_clibname(int clibcode)
315
0
{
316
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
317
0
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
318
0
  if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
319
0
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
320
0
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
321
0
  return NULL;                  /* should never happen */
322
0
}
323
324
325
/*
326
 * Conversion routines between compressor names and compressor codes
327
 */
328
329
/* Get the compressor name associated with the compressor code */
330
int blosc_compcode_to_compname(int compcode, const char **compname)
331
0
{
332
0
  int code = -1;    /* -1 means non-existent compressor code */
333
0
  const char *name = NULL;
334
335
  /* Map the compressor code */
336
0
  if (compcode == BLOSC_BLOSCLZ)
337
0
    name = BLOSC_BLOSCLZ_COMPNAME;
338
0
  else if (compcode == BLOSC_LZ4)
339
0
    name = BLOSC_LZ4_COMPNAME;
340
0
  else if (compcode == BLOSC_LZ4HC)
341
0
    name = BLOSC_LZ4HC_COMPNAME;
342
0
  else if (compcode == BLOSC_SNAPPY)
343
0
    name = BLOSC_SNAPPY_COMPNAME;
344
0
  else if (compcode == BLOSC_ZLIB)
345
0
    name = BLOSC_ZLIB_COMPNAME;
346
0
  else if (compcode == BLOSC_ZSTD)
347
0
    name = BLOSC_ZSTD_COMPNAME;
348
349
0
  *compname = name;
350
351
  /* Guess if there is support for this code */
352
0
  if (compcode == BLOSC_BLOSCLZ)
353
0
    code = BLOSC_BLOSCLZ;
354
0
#if defined(HAVE_LZ4)
355
0
  else if (compcode == BLOSC_LZ4)
356
0
    code = BLOSC_LZ4;
357
0
  else if (compcode == BLOSC_LZ4HC)
358
0
    code = BLOSC_LZ4HC;
359
0
#endif /*  HAVE_LZ4 */
360
#if defined(HAVE_SNAPPY)
361
  else if (compcode == BLOSC_SNAPPY)
362
    code = BLOSC_SNAPPY;
363
#endif /*  HAVE_SNAPPY */
364
0
#if defined(HAVE_ZLIB)
365
0
  else if (compcode == BLOSC_ZLIB)
366
0
    code = BLOSC_ZLIB;
367
0
#endif /*  HAVE_ZLIB */
368
0
#if defined(HAVE_ZSTD)
369
0
  else if (compcode == BLOSC_ZSTD)
370
0
    code = BLOSC_ZSTD;
371
0
#endif /*  HAVE_ZSTD */
372
373
0
  return code;
374
0
}
375
376
/* Get the compressor code for the compressor name. -1 if it is not available */
377
int blosc_compname_to_compcode(const char *compname)
378
15.5k
{
379
15.5k
  int code = -1;  /* -1 means non-existent compressor code */
380
381
15.5k
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
382
607
    code = BLOSC_BLOSCLZ;
383
607
  }
384
14.9k
#if defined(HAVE_LZ4)
385
14.9k
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
386
555
    code = BLOSC_LZ4;
387
555
  }
388
14.4k
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
389
2.43k
    code = BLOSC_LZ4HC;
390
2.43k
  }
391
11.9k
#endif /*  HAVE_LZ4 */
392
#if defined(HAVE_SNAPPY)
393
  else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
394
    code = BLOSC_SNAPPY;
395
  }
396
#endif /*  HAVE_SNAPPY */
397
11.9k
#if defined(HAVE_ZLIB)
398
11.9k
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
399
1.94k
    code = BLOSC_ZLIB;
400
1.94k
  }
401
10.0k
#endif /*  HAVE_ZLIB */
402
10.0k
#if defined(HAVE_ZSTD)
403
10.0k
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
404
8.93k
    code = BLOSC_ZSTD;
405
8.93k
  }
406
15.5k
#endif /*  HAVE_ZSTD */
407
408
15.5k
return code;
409
15.5k
}
410
411
412
#if defined(HAVE_LZ4)
413
static int lz4_wrap_compress(const char* input, size_t input_length,
414
                             char* output, size_t maxout, int accel)
415
11.8k
{
416
11.8k
  int cbytes;
417
11.8k
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
418
11.8k
                             accel);
419
11.8k
  return cbytes;
420
11.8k
}
421
422
static int lz4hc_wrap_compress(const char* input, size_t input_length,
423
                               char* output, size_t maxout, int clevel)
424
24.7k
{
425
24.7k
  int cbytes;
426
24.7k
  if (input_length > (size_t)(UINT32_C(2)<<30))
427
0
    return -1;   /* input larger than 2 GB is not supported */
428
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
429
   * but levels larger than 9 do not buy much compression. */
430
24.7k
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
431
24.7k
                           clevel);
432
24.7k
  return cbytes;
433
24.7k
}
434
435
static int lz4_wrap_decompress(const void* input, int compressed_length,
436
                               void* output, int maxout)
437
4.39k
{
438
4.39k
  return LZ4_decompress_safe(input, output, compressed_length, maxout);
439
4.39k
}
440
441
#endif /* HAVE_LZ4 */
442
443
#if defined(HAVE_SNAPPY)
444
static int snappy_wrap_compress(const char* input, size_t input_length,
445
                                char* output, size_t maxout)
446
{
447
  snappy_status status;
448
  size_t cl = maxout;
449
  status = snappy_compress(input, input_length, output, &cl);
450
  if (status != SNAPPY_OK){
451
    return 0;
452
  }
453
  return (int)cl;
454
}
455
456
static int snappy_wrap_decompress(const void* input, int compressed_length,
457
                                  void* output, int maxout)
458
{
459
  snappy_status status;
460
  size_t ul = maxout;
461
  status = snappy_uncompress(input, compressed_length, output, &ul);
462
  if (status != SNAPPY_OK){
463
    return 0;
464
  }
465
  return (int)ul;
466
}
467
#endif /* HAVE_SNAPPY */
468
469
#if defined(HAVE_ZLIB)
470
/* zlib is not very respectful with sharing name space with others.
471
 Fortunately, its names do not collide with those already in blosc. */
472
static int zlib_wrap_compress(const char* input, size_t input_length,
473
                              char* output, size_t maxout, int clevel)
474
25.0k
{
475
25.0k
  int status;
476
25.0k
  uLongf cl = maxout;
477
25.0k
  status = compress2(
478
25.0k
             (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
479
25.0k
  if (status != Z_OK){
480
5.34k
    return 0;
481
5.34k
  }
482
19.7k
  return (int)cl;
483
25.0k
}
484
485
static int zlib_wrap_decompress(const void* input, int compressed_length,
486
4.63k
                                void* output, int maxout) {
487
4.63k
  int status;
488
4.63k
  uLongf ul = maxout;
489
4.63k
  status = uncompress(
490
4.63k
             (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
491
4.63k
  if (status != Z_OK){
492
1.29k
    return 0;
493
1.29k
  }
494
3.34k
  return (int)ul;
495
4.63k
}
496
#endif /*  HAVE_ZLIB */
497
498
#if defined(HAVE_ZSTD)
499
static int zstd_wrap_compress(const char* input, size_t input_length,
500
127k
                              char* output, size_t maxout, int clevel) {
501
127k
  size_t code;
502
127k
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
503
  /* Make the level 8 close enough to maxCLevel */
504
127k
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
505
127k
  code = ZSTD_compress(
506
127k
      (void*)output, maxout, (void*)input, input_length, clevel);
507
127k
  if (ZSTD_isError(code)) {
508
22.5k
    return 0;
509
22.5k
  }
510
104k
  return (int)code;
511
127k
}
512
513
static int zstd_wrap_decompress(const void* input, int compressed_length,
514
10.0k
                                void* output, int maxout) {
515
10.0k
  size_t code;
516
10.0k
  code = ZSTD_decompress(
517
10.0k
      (void*)output, maxout, (void*)input, compressed_length);
518
10.0k
  if (ZSTD_isError(code)) {
519
4.31k
    return 0;
520
4.31k
  }
521
5.77k
  return (int)code;
522
10.0k
}
523
#endif /*  HAVE_ZSTD */
524
525
9.01k
static int initialize_decompress_func(struct blosc_context* context) {
526
9.01k
  int8_t header_flags = *(context->header_flags);
527
9.01k
  int32_t compformat = (header_flags & 0xe0) >> 5;
528
9.01k
  int compversion = context->compversion;
529
530
9.01k
  if (compformat == BLOSC_BLOSCLZ_FORMAT) {
531
677
    if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
532
7
      return -9;
533
7
    }
534
670
    context->decompress_func = &blosclz_decompress;
535
670
    return 0;
536
677
  }
537
8.34k
#if defined(HAVE_LZ4)
538
8.34k
  if (compformat == BLOSC_LZ4_FORMAT) {
539
1.23k
    if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
540
6
      return -9;
541
6
    }
542
1.22k
    context->decompress_func = &lz4_wrap_decompress;
543
1.22k
    return 0;
544
1.23k
  }
545
7.10k
#endif /*  HAVE_LZ4 */
546
#if defined(HAVE_SNAPPY)
547
  if (compformat == BLOSC_SNAPPY_FORMAT) {
548
    if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
549
      return -9;
550
    }
551
    context->decompress_func = &snappy_wrap_decompress;
552
    return 0;
553
  }
554
#endif /*  HAVE_SNAPPY */
555
7.10k
#if defined(HAVE_ZLIB)
556
7.10k
  if (compformat == BLOSC_ZLIB_FORMAT) {
557
2.01k
    if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
558
8
      return -9;
559
8
    }
560
2.00k
    context->decompress_func = &zlib_wrap_decompress;
561
2.00k
    return 0;
562
2.01k
  }
563
5.09k
#endif /*  HAVE_ZLIB */
564
5.09k
#if defined(HAVE_ZSTD)
565
5.09k
  if (compformat == BLOSC_ZSTD_FORMAT) {
566
5.08k
    if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
567
7
      return -9;
568
7
    }
569
5.08k
    context->decompress_func = &zstd_wrap_decompress;
570
5.08k
    return 0;
571
5.08k
  }
572
3
#endif /*  HAVE_ZSTD */
573
3
  return -5; /* signals no decompression support */
574
5.09k
}
575
576
/* Compute acceleration for blosclz */
577
196k
static int get_accel(const struct blosc_context* context) {
578
196k
  int32_t clevel = context->clevel;
579
580
196k
  if (context->compcode == BLOSC_LZ4) {
581
    /* This acceleration setting based on discussions held in:
582
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
583
     */
584
11.8k
    return (10 - clevel);
585
11.8k
  }
586
184k
  return 1;
587
196k
}
588
589
590
/* Shuffle & compress a single block */
591
static int blosc_c(const struct blosc_context* context, int32_t blocksize,
592
                   int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
593
                   const uint8_t *src, uint8_t *dest, uint8_t *tmp,
594
                   uint8_t *tmp2)
595
196k
{
596
196k
  int8_t header_flags = *(context->header_flags);
597
196k
  int dont_split = (header_flags & 0x10) >> 4;
598
196k
  int32_t j, neblock, nsplits;
599
196k
  int32_t cbytes;                   /* number of compressed bytes in split */
600
196k
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
601
196k
  int32_t maxout;
602
196k
  int32_t typesize = context->typesize;
603
196k
  const uint8_t *_tmp = src;
604
196k
  const char *compname;
605
196k
  int accel;
606
196k
  int bscount;
607
196k
  int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
608
196k
  int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
609
107k
                      (blocksize >= typesize));
610
611
196k
  if (doshuffle) {
612
    /* Byte shuffling only makes sense if typesize > 1 */
613
0
    blosc_internal_shuffle(typesize, blocksize, src, tmp);
614
0
    _tmp = tmp;
615
0
  }
616
  /* We don't allow more than 1 filter at the same time (yet) */
617
196k
  else if (dobitshuffle) {
618
107k
    bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
619
107k
    if (bscount < 0)
620
0
      return bscount;
621
107k
    _tmp = tmp;
622
107k
  }
623
624
  /* Calculate acceleration for different compressors */
625
196k
  accel = get_accel(context);
626
627
  /* The number of splits for this block */
628
196k
  if (!dont_split && !leftoverblock) {
629
12.5k
    nsplits = typesize;
630
12.5k
  }
631
184k
  else {
632
184k
    nsplits = 1;
633
184k
  }
634
196k
  neblock = blocksize / nsplits;
635
391k
  for (j = 0; j < nsplits; j++) {
636
196k
    dest += sizeof(int32_t);
637
196k
    ntbytes += (int32_t)sizeof(int32_t);
638
196k
    ctbytes += (int32_t)sizeof(int32_t);
639
196k
    maxout = neblock;
640
    #if defined(HAVE_SNAPPY)
641
    if (context->compcode == BLOSC_SNAPPY) {
642
      /* TODO perhaps refactor this to keep the value stashed somewhere */
643
      maxout = snappy_max_compressed_length(neblock);
644
    }
645
    #endif /*  HAVE_SNAPPY */
646
196k
    if (ntbytes+maxout > maxbytes) {
647
9.32k
      maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
648
9.32k
      if (maxout <= 0) {
649
7
        return 0;                  /* non-compressible block */
650
7
      }
651
9.32k
    }
652
196k
    if (context->compcode == BLOSC_BLOSCLZ) {
653
7.60k
      cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
654
7.60k
                                dest, maxout, !dont_split);
655
7.60k
    }
656
189k
    #if defined(HAVE_LZ4)
657
189k
    else if (context->compcode == BLOSC_LZ4) {
658
11.8k
      cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
659
11.8k
                                 (char *)dest, (size_t)maxout, accel);
660
11.8k
    }
661
177k
    else if (context->compcode == BLOSC_LZ4HC) {
662
24.7k
      cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
663
24.7k
                                   (char *)dest, (size_t)maxout,
664
24.7k
                                   context->clevel);
665
24.7k
    }
666
152k
    #endif /* HAVE_LZ4 */
667
    #if defined(HAVE_SNAPPY)
668
    else if (context->compcode == BLOSC_SNAPPY) {
669
      cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
670
                                    (char *)dest, (size_t)maxout);
671
    }
672
    #endif /* HAVE_SNAPPY */
673
152k
    #if defined(HAVE_ZLIB)
674
152k
    else if (context->compcode == BLOSC_ZLIB) {
675
25.0k
      cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
676
25.0k
                                  (char *)dest, (size_t)maxout,
677
25.0k
                                  context->clevel);
678
25.0k
    }
679
127k
    #endif /* HAVE_ZLIB */
680
127k
    #if defined(HAVE_ZSTD)
681
127k
    else if (context->compcode == BLOSC_ZSTD) {
682
127k
      cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
683
127k
                                  (char*)dest, (size_t)maxout, context->clevel);
684
127k
    }
685
0
    #endif /* HAVE_ZSTD */
686
687
0
    else {
688
0
      blosc_compcode_to_compname(context->compcode, &compname);
689
0
      if (compname == NULL) {
690
0
          compname = "(null)";
691
0
      }
692
0
      fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
693
0
      fprintf(stderr, "compression support.  Please use one having it.");
694
0
      return -5;    /* signals no compression support */
695
0
    }
696
697
196k
    if (cbytes > maxout) {
698
      /* Buffer overrun caused by compression (should never happen) */
699
0
      return -1;
700
0
    }
701
196k
    else if (cbytes < 0) {
702
      /* cbytes should never be negative */
703
0
      return -2;
704
0
    }
705
196k
    else if (cbytes == 0 || cbytes == neblock) {
706
      /* The compressor has been unable to compress data at all. */
707
      /* Before doing the copy, check that we are not running into a
708
         buffer overflow. */
709
42.8k
      if ((ntbytes+neblock) > maxbytes) {
710
1.44k
        return 0;    /* Non-compressible data */
711
1.44k
      }
712
41.3k
      fastcopy(dest, _tmp + j * neblock, neblock);
713
41.3k
      cbytes = neblock;
714
41.3k
    }
715
195k
    _sw32(dest - 4, cbytes);
716
195k
    dest += cbytes;
717
195k
    ntbytes += cbytes;
718
195k
    ctbytes += cbytes;
719
195k
  }  /* Closes j < nsplits */
720
721
195k
  return ctbytes;
722
196k
}
723
724
/* Decompress & unshuffle a single block */
725
static int blosc_d(struct blosc_context* context, int32_t blocksize,
726
                   int32_t leftoverblock, const uint8_t* base_src,
727
                   int32_t src_offset, uint8_t* dest, uint8_t* tmp,
728
51.4k
                   uint8_t* tmp2) {
729
51.4k
  int8_t header_flags = *(context->header_flags);
730
51.4k
  int dont_split = (header_flags & 0x10) >> 4;
731
51.4k
  int32_t j, neblock, nsplits;
732
51.4k
  int32_t nbytes;                /* number of decompressed bytes in split */
733
51.4k
  const int32_t compressedsize = context->compressedsize;
734
51.4k
  int32_t cbytes;                /* number of compressed bytes in split */
735
51.4k
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
736
51.4k
  uint8_t *_tmp = dest;
737
51.4k
  int32_t typesize = context->typesize;
738
51.4k
  int bscount;
739
51.4k
  int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
740
51.4k
  int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
741
37.1k
                      (blocksize >= typesize));
742
51.4k
  const uint8_t* src;
743
744
51.4k
  if (doshuffle || dobitshuffle) {
745
38.5k
    _tmp = tmp;
746
38.5k
  }
747
748
  /* The number of splits for this block */
749
51.4k
  if (!dont_split &&
750
      /* For compatibility with before the introduction of the split flag */
751
9.59k
      ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
752
1.52k
      !leftoverblock) {
753
961
    nsplits = typesize;
754
961
  }
755
50.5k
  else {
756
50.5k
    nsplits = 1;
757
50.5k
  }
758
759
51.4k
  neblock = blocksize / nsplits;
760
95.7k
  for (j = 0; j < nsplits; j++) {
761
    /* Validate src_offset */
762
51.5k
    if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
763
359
      return -1;
764
359
    }
765
51.1k
    cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
766
51.1k
    src_offset += sizeof(int32_t);
767
    /* Validate cbytes */
768
51.1k
    if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
769
165
      return -1;
770
165
    }
771
50.9k
    src = base_src + src_offset;
772
    /* Uncompress */
773
50.9k
    if (cbytes == neblock) {
774
30.3k
      fastcopy(_tmp, src, neblock);
775
30.3k
      nbytes = neblock;
776
30.3k
    }
777
20.6k
    else {
778
20.6k
      nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
779
      /* Check that decompressed bytes number is correct */
780
20.6k
      if (nbytes != neblock) {
781
6.68k
        return -2;
782
6.68k
      }
783
20.6k
    }
784
44.2k
    src_offset += cbytes;
785
44.2k
    _tmp += nbytes;
786
44.2k
    ntbytes += nbytes;
787
44.2k
  } /* Closes j < nsplits */
788
789
44.2k
  if (doshuffle) {
790
4.04k
    blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
791
4.04k
  }
792
40.2k
  else if (dobitshuffle) {
793
30.3k
    bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
794
30.3k
    if (bscount < 0)
795
0
      return bscount;
796
30.3k
  }
797
798
  /* Return the number of uncompressed bytes */
799
44.2k
  return ntbytes;
800
44.2k
}
801
802
/* Serial version for compression/decompression */
803
static int serial_blosc(struct blosc_context* context)
804
23.4k
{
805
23.4k
  int32_t j, bsize, leftoverblock;
806
23.4k
  int32_t cbytes;
807
808
23.4k
  int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
809
23.4k
  int32_t ntbytes = context->num_output_bytes;
810
811
23.4k
  uint8_t *tmp = my_malloc(context->blocksize + ebsize);
812
23.4k
  uint8_t *tmp2 = tmp + context->blocksize;
813
814
264k
  for (j = 0; j < context->nblocks; j++) {
815
249k
    if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
816
196k
      _sw32(context->bstarts + j * 4, ntbytes);
817
196k
    }
818
249k
    bsize = context->blocksize;
819
249k
    leftoverblock = 0;
820
249k
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
821
10.0k
      bsize = context->leftover;
822
10.0k
      leftoverblock = 1;
823
10.0k
    }
824
249k
    if (context->compress) {
825
196k
      if (*(context->header_flags) & BLOSC_MEMCPYED) {
826
        /* We want to memcpy only */
827
0
        fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
828
0
                 context->src + j * context->blocksize, bsize);
829
0
        cbytes = bsize;
830
0
      }
831
196k
      else {
832
        /* Regular compression */
833
196k
        cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
834
196k
                         context->destsize, context->src+j*context->blocksize,
835
196k
                         context->dest+ntbytes, tmp, tmp2);
836
196k
        if (cbytes == 0) {
837
1.45k
          ntbytes = 0;              /* incompressible data */
838
1.45k
          break;
839
1.45k
        }
840
196k
      }
841
196k
    }
842
52.8k
    else {
843
52.8k
      if (*(context->header_flags) & BLOSC_MEMCPYED) {
844
        /* We want to memcpy only */
845
1.40k
        fastcopy(context->dest + j * context->blocksize,
846
1.40k
                 context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
847
1.40k
        cbytes = bsize;
848
1.40k
      }
849
51.4k
      else {
850
        /* Regular decompression */
851
51.4k
        cbytes = blosc_d(context, bsize, leftoverblock, context->src,
852
51.4k
                         sw32_(context->bstarts + j * 4),
853
51.4k
                         context->dest + j * context->blocksize, tmp, tmp2);
854
51.4k
      }
855
52.8k
    }
856
248k
    if (cbytes < 0) {
857
7.20k
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
858
7.20k
      break;
859
7.20k
    }
860
240k
    ntbytes += cbytes;
861
240k
  }
862
863
  /* Free temporaries */
864
23.4k
  my_free(tmp);
865
866
23.4k
  return ntbytes;
867
23.4k
}
868
869
870
/* Threaded version for compression/decompression */
871
static int parallel_blosc(struct blosc_context* context)
872
0
{
873
0
  int rc;
874
0
  (void)rc;  // just to avoid 'unused-variable' warning
875
876
  /* Check whether we need to restart threads */
877
0
  if (blosc_set_nthreads_(context) < 0) {
878
0
    return -1;
879
0
  }
880
881
  /* Set sentinels */
882
0
  context->thread_giveup_code = 1;
883
0
  context->thread_nblock = -1;
884
885
  /* Synchronization point for all threads (wait for initialization) */
886
0
  WAIT_INIT(-1, context);
887
888
  /* Synchronization point for all threads (wait for finalization) */
889
0
  WAIT_FINISH(-1, context);
890
891
0
  if (context->thread_giveup_code > 0) {
892
    /* Return the total bytes (de-)compressed in threads */
893
0
    return context->num_output_bytes;
894
0
  }
895
0
  else {
896
    /* Compression/decompression gave up.  Return error code. */
897
0
    return context->thread_giveup_code;
898
0
  }
899
0
}
900
901
902
/* Do the compression or decompression of the buffer depending on the
903
   global params. */
904
static int do_job(struct blosc_context* context)
905
23.4k
{
906
23.4k
  int32_t ntbytes;
907
908
  /* Run the serial version when nthreads is 1 or when the buffers are
909
     not much larger than blocksize */
910
23.4k
  if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
911
23.4k
    ntbytes = serial_blosc(context);
912
23.4k
  }
913
0
  else {
914
0
    ntbytes = parallel_blosc(context);
915
0
  }
916
917
23.4k
  return ntbytes;
918
23.4k
}
919
920
921
/* Whether a codec is meant for High Compression Ratios */
922
0
#define HCR(codec) (  \
923
0
             ((codec) == BLOSC_LZ4HC) ||                  \
924
0
             ((codec) == BLOSC_ZLIB) ||                   \
925
0
             ((codec) == BLOSC_ZSTD) ? 1 : 0 )
926
927
928
/* Conditions for splitting a block before compressing with a codec. */
929
28.9k
static int split_block(int compcode, int typesize, int blocksize) {
930
28.9k
  int splitblock = -1;
931
932
28.9k
  switch (g_splitmode) {
933
11.0k
    case BLOSC_ALWAYS_SPLIT:
934
11.0k
      splitblock = 1;
935
11.0k
      break;
936
7.39k
    case BLOSC_NEVER_SPLIT:
937
7.39k
      splitblock = 0;
938
7.39k
      break;
939
4.07k
    case BLOSC_AUTO_SPLIT:
940
      /* Normally all the compressors designed for speed benefit from a
941
         split.  However, in conducted benchmarks LZ4 seems that it runs
942
         faster if we don't split, which is quite surprising. */
943
4.07k
      splitblock= (((compcode == BLOSC_BLOSCLZ) ||
944
4.00k
                    (compcode == BLOSC_SNAPPY)) &&
945
76
                   (typesize <= MAX_SPLITS) &&
946
76
                   (blocksize / typesize) >= MIN_BUFFERSIZE);
947
4.07k
      break;
948
6.42k
    case BLOSC_FORWARD_COMPAT_SPLIT:
949
      /* The zstd support was introduced at the same time than the split flag, so
950
       * there should be not a problem with not splitting bloscks with it */
951
6.42k
      splitblock = ((compcode != BLOSC_ZSTD) &&
952
1.97k
                    (typesize <= MAX_SPLITS) &&
953
1.97k
                    (blocksize / typesize) >= MIN_BUFFERSIZE);
954
6.42k
      break;
955
0
    default:
956
0
      fprintf(stderr, "Split mode %d not supported", g_splitmode);
957
28.9k
  }
958
28.9k
  return splitblock;
959
28.9k
}
960
961
962
static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
963
                                 int32_t typesize, int32_t nbytes,
964
                                 int32_t forced_blocksize)
965
14.4k
{
966
14.4k
  int32_t blocksize;
967
968
  /* Protection against very small buffers */
969
14.4k
  if (nbytes < (int32_t)typesize) {
970
0
    return 1;
971
0
  }
972
973
14.4k
  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
974
975
14.4k
  if (forced_blocksize) {
976
14.4k
    blocksize = forced_blocksize;
977
    /* Check that forced blocksize is not too small */
978
14.4k
    if (blocksize < MIN_BUFFERSIZE) {
979
0
      blocksize = MIN_BUFFERSIZE;
980
0
    }
981
    /* Check that forced blocksize is not too large */
982
14.4k
    if (blocksize > BLOSC_MAX_BLOCKSIZE) {
983
0
      blocksize = BLOSC_MAX_BLOCKSIZE;
984
0
    }
985
14.4k
  }
986
0
  else if (nbytes >= L1) {
987
0
    blocksize = L1;
988
989
    /* For HCR codecs, increase the block sizes by a factor of 2 because they
990
       are meant for compressing large blocks (i.e. they show a big overhead
991
       when compressing small ones). */
992
0
    if (HCR(context->compcode)) {
993
0
      blocksize *= 2;
994
0
    }
995
996
0
    switch (clevel) {
997
0
      case 0:
998
        /* Case of plain copy */
999
0
        blocksize /= 4;
1000
0
        break;
1001
0
      case 1:
1002
0
        blocksize /= 2;
1003
0
        break;
1004
0
      case 2:
1005
0
        blocksize *= 1;
1006
0
        break;
1007
0
      case 3:
1008
0
        blocksize *= 2;
1009
0
        break;
1010
0
      case 4:
1011
0
      case 5:
1012
0
        blocksize *= 4;
1013
0
        break;
1014
0
      case 6:
1015
0
      case 7:
1016
0
      case 8:
1017
0
        blocksize *= 8;
1018
0
        break;
1019
0
      case 9:
1020
0
        blocksize *= 8;
1021
0
        if (HCR(context->compcode)) {
1022
0
          blocksize *= 2;
1023
0
        }
1024
0
        break;
1025
0
      default:
1026
0
        assert(0);
1027
0
        break;
1028
0
    }
1029
0
  }
1030
1031
  /* Enlarge the blocksize for splittable codecs */
1032
14.4k
  if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
1033
6.53k
    if (blocksize > (1 << 18)) {
1034
      /* Do not use a too large split buffer (> 256 KB) for splitting codecs */
1035
0
      blocksize = (1 << 18);
1036
0
    }
1037
6.53k
    blocksize *= typesize;
1038
6.53k
    if (blocksize < (1 << 16)) {
1039
      /* Do not use a too small blocksize (< 64 KB) when typesize is small */
1040
6.53k
      blocksize = (1 << 16);
1041
6.53k
    }
1042
6.53k
    if (blocksize > 1024 * 1024) {
1043
      /* But do not exceed 1 MB per thread (having this capacity in L3 is normal in modern CPUs) */
1044
0
      blocksize = 1024 * 1024;
1045
0
    }
1046
1047
6.53k
  }
1048
1049
  /* Check that blocksize is not too large */
1050
14.4k
  if (blocksize > (int32_t)nbytes) {
1051
8.42k
    blocksize = nbytes;
1052
8.42k
  }
1053
1054
  /* blocksize *must absolutely* be a multiple of the typesize */
1055
14.4k
  if (blocksize > typesize) {
1056
14.4k
    blocksize = blocksize / typesize * typesize;
1057
14.4k
  }
1058
1059
14.4k
  return blocksize;
1060
14.4k
}
1061
1062
static int initialize_context_compression(struct blosc_context* context,
1063
                          int clevel,
1064
                          int doshuffle,
1065
                          size_t typesize,
1066
                          size_t sourcesize,
1067
                          const void* src,
1068
                          void* dest,
1069
                          size_t destsize,
1070
                          int32_t compressor,
1071
                          int32_t blocksize,
1072
                          int32_t numthreads,
1073
                          int warnlvl)
1074
14.4k
{
1075
  /* Check buffer size limits and clamp destsize */
1076
14.4k
  if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1077
0
    if (warnlvl > 0) {
1078
0
      fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1079
0
              BLOSC_MAX_BUFFERSIZE);
1080
0
    }
1081
0
    return 0;
1082
0
  }
1083
14.4k
  if (destsize < BLOSC_MAX_OVERHEAD) {
1084
19
    if (warnlvl > 0) {
1085
0
      fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
1086
0
              BLOSC_MAX_OVERHEAD);
1087
0
    }
1088
19
    return 0;
1089
19
  }
1090
14.4k
  if (destsize - BLOSC_MAX_OVERHEAD > sourcesize) {
1091
0
    destsize = sourcesize + BLOSC_MAX_OVERHEAD;
1092
0
  }
1093
1094
  /* Compression level */
1095
14.4k
  if (clevel < 0 || clevel > 9) {
1096
0
    if (warnlvl > 0) {
1097
0
      fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1098
0
    }
1099
0
    return -10;
1100
0
  }
1101
1102
  /* Shuffle */
1103
14.4k
  if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1104
0
    if (warnlvl > 0) {
1105
0
      fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1106
0
    }
1107
0
    return -10;
1108
0
  }
1109
1110
  /* Check typesize limits */
1111
14.4k
  if (typesize <= 0) {
1112
0
    if (warnlvl > 0) {
1113
0
      fprintf(stderr, "`typesize` parameter must be greater than 0!\n");
1114
0
    }
1115
0
    return -10;
1116
0
  }
1117
14.4k
  if (typesize > BLOSC_MAX_TYPESIZE) {
1118
    /* If typesize is too large, treat buffer as an 1-byte stream. */
1119
0
    typesize = 1;
1120
0
  }
1121
1122
  /* Set parameters */
1123
14.4k
  context->compress = 1;
1124
14.4k
  context->src = (const uint8_t*)src;
1125
14.4k
  context->dest = (uint8_t *)(dest);
1126
14.4k
  context->num_output_bytes = 0;
1127
  // previous checks ensure the following size_t to int32_t casts don't overflow
1128
14.4k
  context->destsize = (int32_t)destsize;
1129
14.4k
  context->sourcesize = (int32_t)sourcesize;
1130
14.4k
  context->typesize = (int32_t)typesize;
1131
14.4k
  context->compcode = compressor;
1132
14.4k
  context->numthreads = numthreads;
1133
14.4k
  context->end_threads = 0;
1134
14.4k
  context->clevel = clevel;
1135
1136
  /* Get the blocksize */
1137
14.4k
  context->blocksize = compute_blocksize(context, clevel, context->typesize, context->sourcesize, blocksize);
1138
1139
  /* Compute number of blocks in buffer */
1140
14.4k
  context->nblocks = context->sourcesize / context->blocksize;
1141
14.4k
  context->leftover = context->sourcesize % context->blocksize;
1142
14.4k
  context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1143
1144
14.4k
  return 1;
1145
14.4k
}
1146
1147
1148
static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1149
14.4k
{
1150
14.4k
  int32_t compformat;
1151
14.4k
  int dont_split;
1152
1153
  /* Write version header for this block */
1154
14.4k
  context->dest[0] = BLOSC_VERSION_FORMAT;           /* blosc format version */
1155
1156
  /* Write compressor format */
1157
14.4k
  compformat = -1;
1158
14.4k
  switch (context->compcode)
1159
14.4k
  {
1160
603
  case BLOSC_BLOSCLZ:
1161
603
    compformat = BLOSC_BLOSCLZ_FORMAT;
1162
603
    context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1163
603
    break;
1164
1165
0
#if defined(HAVE_LZ4)
1166
552
  case BLOSC_LZ4:
1167
552
    compformat = BLOSC_LZ4_FORMAT;
1168
552
    context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */
1169
552
    break;
1170
2.43k
  case BLOSC_LZ4HC:
1171
2.43k
    compformat = BLOSC_LZ4HC_FORMAT;
1172
2.43k
    context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1173
2.43k
    break;
1174
0
#endif /* HAVE_LZ4 */
1175
1176
#if defined(HAVE_SNAPPY)
1177
  case BLOSC_SNAPPY:
1178
    compformat = BLOSC_SNAPPY_FORMAT;
1179
    context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */
1180
    break;
1181
#endif /* HAVE_SNAPPY */
1182
1183
0
#if defined(HAVE_ZLIB)
1184
1.94k
  case BLOSC_ZLIB:
1185
1.94k
    compformat = BLOSC_ZLIB_FORMAT;
1186
1.94k
    context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */
1187
1.94k
    break;
1188
0
#endif /* HAVE_ZLIB */
1189
1190
0
#if defined(HAVE_ZSTD)
1191
8.93k
  case BLOSC_ZSTD:
1192
8.93k
    compformat = BLOSC_ZSTD_FORMAT;
1193
8.93k
    context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */
1194
8.93k
    break;
1195
0
#endif /* HAVE_ZSTD */
1196
1197
0
  default:
1198
0
  {
1199
0
    const char *compname;
1200
0
    compname = clibcode_to_clibname(compformat);
1201
0
    if (compname == NULL) {
1202
0
        compname = "(null)";
1203
0
    }
1204
0
    fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1205
0
    fprintf(stderr, "compression support.  Please use one having it.");
1206
0
    return -5;    /* signals no compression support */
1207
0
    break;
1208
0
  }
1209
14.4k
  }
1210
1211
14.4k
  context->header_flags = context->dest+2;  /* flags */
1212
14.4k
  context->dest[2] = 0;  /* zeroes flags */
1213
14.4k
  context->dest[3] = (uint8_t)context->typesize;  /* type size */
1214
14.4k
  _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */
1215
14.4k
  _sw32(context->dest + 8, context->blocksize);  /* block size */
1216
14.4k
  context->bstarts = context->dest + 16;  /* starts for every block */
1217
14.4k
  context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */
1218
1219
14.4k
  if (context->clevel == 0) {
1220
    /* Compression level 0 means buffer to be memcpy'ed */
1221
8
    *(context->header_flags) |= BLOSC_MEMCPYED;
1222
8
    context->num_output_bytes = 16;      /* space just for header */
1223
8
  }
1224
1225
14.4k
  if (context->sourcesize < MIN_BUFFERSIZE) {
1226
    /* Buffer is too small.  Try memcpy'ing. */
1227
49
    *(context->header_flags) |= BLOSC_MEMCPYED;
1228
49
    context->num_output_bytes = 16;      /* space just for header */
1229
49
  }
1230
1231
14.4k
  if (doshuffle == BLOSC_SHUFFLE) {
1232
    /* Byte-shuffle is active */
1233
3.46k
    *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */
1234
3.46k
  }
1235
1236
14.4k
  if (doshuffle == BLOSC_BITSHUFFLE) {
1237
    /* Bit-shuffle is active */
1238
6.41k
    *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */
1239
6.41k
  }
1240
1241
14.4k
  dont_split = !split_block(context->compcode, context->typesize,
1242
14.4k
                            context->blocksize);
1243
14.4k
  *(context->header_flags) |= dont_split << 4;  /* dont_split is in bit 4 */
1244
14.4k
  *(context->header_flags) |= compformat << 5;  /* compressor format starts at bit 5 */
1245
1246
14.4k
  return 1;
1247
14.4k
}
1248
1249
1250
int blosc_compress_context(struct blosc_context* context)
1251
14.4k
{
1252
14.4k
  int32_t ntbytes = 0;
1253
1254
14.4k
  if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
1255
52
      (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
1256
52
    return 0;   /* data cannot be copied without overrun destination */
1257
52
  }
1258
1259
  /* Do the actual compression */
1260
14.4k
  ntbytes = do_job(context);
1261
14.4k
  if (ntbytes < 0) {
1262
0
    return -1;
1263
0
  }
1264
14.4k
  if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
1265
    /* Last chance for fitting `src` buffer in `dest`.  Update flags and force a copy. */
1266
0
    *(context->header_flags) |= BLOSC_MEMCPYED;
1267
0
    context->num_output_bytes = BLOSC_MAX_OVERHEAD;  /* reset the output bytes in previous step */
1268
0
    ntbytes = do_job(context);
1269
0
    if (ntbytes < 0) {
1270
0
      return -1;
1271
0
    }
1272
0
  }
1273
1274
  /* Set the number of compressed bytes in header */
1275
14.4k
  _sw32(context->dest + 12, ntbytes);
1276
1277
14.4k
  assert(ntbytes <= context->destsize);
1278
14.4k
  return ntbytes;
1279
14.4k
}
1280
1281
/* The public routine for compression with context. */
1282
int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1283
                       size_t nbytes, const void* src, void* dest,
1284
                       size_t destsize, const char* compressor,
1285
                       size_t blocksize, int numinternalthreads)
1286
0
{
1287
0
  int error, result;
1288
0
  struct blosc_context context;
1289
1290
0
  context.threads_started = 0;
1291
0
  error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1292
0
           nbytes, src, dest, destsize,
1293
0
           blosc_compname_to_compcode(compressor),
1294
0
           blocksize, numinternalthreads, 0);
1295
0
  if (error <= 0) { return error; }
1296
1297
0
  error = write_compression_header(&context, clevel, doshuffle);
1298
0
  if (error <= 0) { return error; }
1299
1300
0
  result = blosc_compress_context(&context);
1301
1302
0
  if (numinternalthreads > 1)
1303
0
  {
1304
0
    blosc_release_threadpool(&context);
1305
0
  }
1306
1307
0
  return result;
1308
0
}
1309
1310
/* The public routine for compression.  See blosc.h for docstrings. */
1311
int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1312
                   const void *src, void *dest, size_t destsize)
1313
14.4k
{
1314
14.4k
  int result;
1315
14.4k
  char* envvar;
1316
1317
  /* Check if should initialize */
1318
14.4k
  if (!g_initlib) blosc_init();
1319
1320
  /* Check for environment variables */
1321
14.4k
  envvar = getenv("BLOSC_CLEVEL");
1322
14.4k
  if (envvar != NULL) {
1323
0
    long value;
1324
0
    value = strtol(envvar, NULL, 10);
1325
0
    if ((value != EINVAL) && (value >= 0)) {
1326
0
      clevel = (int)value;
1327
0
    }
1328
0
  }
1329
1330
14.4k
  envvar = getenv("BLOSC_SHUFFLE");
1331
14.4k
  if (envvar != NULL) {
1332
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
1333
0
      doshuffle = BLOSC_NOSHUFFLE;
1334
0
    }
1335
0
    if (strcmp(envvar, "SHUFFLE") == 0) {
1336
0
      doshuffle = BLOSC_SHUFFLE;
1337
0
    }
1338
0
    if (strcmp(envvar, "BITSHUFFLE") == 0) {
1339
0
      doshuffle = BLOSC_BITSHUFFLE;
1340
0
    }
1341
0
  }
1342
1343
14.4k
  envvar = getenv("BLOSC_TYPESIZE");
1344
14.4k
  if (envvar != NULL) {
1345
0
    long value;
1346
0
    value = strtol(envvar, NULL, 10);
1347
0
    if ((value != EINVAL) && (value > 0)) {
1348
0
      typesize = (int)value;
1349
0
    }
1350
0
  }
1351
1352
14.4k
  envvar = getenv("BLOSC_COMPRESSOR");
1353
14.4k
  if (envvar != NULL) {
1354
0
    result = blosc_set_compressor(envvar);
1355
0
    if (result < 0) { return result; }
1356
0
  }
1357
1358
14.4k
  envvar = getenv("BLOSC_BLOCKSIZE");
1359
14.4k
  if (envvar != NULL) {
1360
0
    long blocksize;
1361
0
    blocksize = strtol(envvar, NULL, 10);
1362
0
    if ((blocksize != EINVAL) && (blocksize > 0)) {
1363
0
      blosc_set_blocksize((size_t)blocksize);
1364
0
    }
1365
0
  }
1366
1367
14.4k
  envvar = getenv("BLOSC_NTHREADS");
1368
14.4k
  if (envvar != NULL) {
1369
0
    long nthreads;
1370
0
    nthreads = strtol(envvar, NULL, 10);
1371
0
    if ((nthreads != EINVAL) && (nthreads > 0)) {
1372
0
      result = blosc_set_nthreads((int)nthreads);
1373
0
      if (result < 0) { return result; }
1374
0
    }
1375
0
  }
1376
1377
14.4k
  envvar = getenv("BLOSC_SPLITMODE");
1378
14.4k
  if (envvar != NULL) {
1379
0
    if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
1380
0
      blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
1381
0
    }
1382
0
    else if (strcmp(envvar, "AUTO") == 0) {
1383
0
      blosc_set_splitmode(BLOSC_AUTO_SPLIT);
1384
0
    }
1385
0
    else if (strcmp(envvar, "ALWAYS") == 0) {
1386
0
      blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
1387
0
    }
1388
0
    else if (strcmp(envvar, "NEVER") == 0) {
1389
0
      blosc_set_splitmode(BLOSC_NEVER_SPLIT);
1390
0
    }
1391
0
    else {
1392
0
      fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
1393
0
      return -1;
1394
0
    }
1395
0
  }
1396
1397
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
1398
     that this should be the last env var so that it can take the
1399
     previous ones into account */
1400
14.4k
  envvar = getenv("BLOSC_NOLOCK");
1401
14.4k
  if (envvar != NULL) {
1402
0
    const char *compname;
1403
0
    blosc_compcode_to_compname(g_compressor, &compname);
1404
0
    result = blosc_compress_ctx(clevel, doshuffle, typesize,
1405
0
        nbytes, src, dest, destsize,
1406
0
        compname, g_force_blocksize, g_threads);
1407
0
    return result;
1408
0
  }
1409
1410
14.4k
  pthread_mutex_lock(global_comp_mutex);
1411
1412
14.4k
  do {
1413
14.4k
    int warnlvl = 0;
1414
14.4k
    envvar = getenv("BLOSC_WARN");
1415
14.4k
    if (envvar != NULL) {
1416
0
      warnlvl = strtol(envvar, NULL, 10);
1417
0
    }
1418
14.4k
    result = initialize_context_compression(g_global_context, clevel, doshuffle,
1419
14.4k
                                           typesize, nbytes, src, dest, destsize,
1420
14.4k
                                           g_compressor, g_force_blocksize,
1421
14.4k
                                           g_threads, warnlvl);
1422
14.4k
    if (result <= 0) { break; }
1423
1424
14.4k
    result = write_compression_header(g_global_context, clevel, doshuffle);
1425
14.4k
    if (result <= 0) { break; }
1426
1427
14.4k
    result = blosc_compress_context(g_global_context);
1428
14.4k
  } while (0);
1429
1430
14.4k
  pthread_mutex_unlock(global_comp_mutex);
1431
1432
14.4k
  return result;
1433
14.4k
}
1434
1435
static int blosc_run_decompression_with_context(struct blosc_context* context,
1436
                                                const void* src,
1437
                                                void* dest,
1438
                                                size_t destsize,
1439
                                                int numinternalthreads)
1440
20.6k
{
1441
20.6k
  uint8_t version;
1442
20.6k
  int32_t ntbytes;
1443
1444
20.6k
  context->compress = 0;
1445
20.6k
  context->src = (const uint8_t*)src;
1446
20.6k
  context->dest = (uint8_t*)dest;
1447
20.6k
  context->destsize = destsize;
1448
20.6k
  context->num_output_bytes = 0;
1449
20.6k
  context->numthreads = numinternalthreads;
1450
20.6k
  context->end_threads = 0;
1451
1452
  /* Read the header block */
1453
20.6k
  version = context->src[0];                        /* blosc format version */
1454
20.6k
  context->compversion = context->src[1];
1455
1456
20.6k
  context->header_flags = (uint8_t*)(context->src + 2);           /* flags */
1457
20.6k
  context->typesize = (int32_t)context->src[3];      /* typesize */
1458
20.6k
  context->sourcesize = sw32_(context->src + 4);     /* buffer size */
1459
20.6k
  context->blocksize = sw32_(context->src + 8);      /* block size */
1460
20.6k
  context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
1461
20.6k
  context->bstarts = (uint8_t*)(context->src + 16);
1462
1463
20.6k
  if (context->sourcesize == 0) {
1464
    /* Source buffer was empty, so we are done */
1465
0
    return 0;
1466
0
  }
1467
1468
20.6k
  if (context->blocksize <= 0 || context->blocksize > destsize ||
1469
11.8k
      context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
1470
11.8k
      context->typesize > BLOSC_MAX_TYPESIZE) {
1471
8.81k
    return -1;
1472
8.81k
  }
1473
1474
11.8k
  if (version != BLOSC_VERSION_FORMAT) {
1475
    /* Version from future */
1476
0
    return -1;
1477
0
  }
1478
11.8k
  if (*context->header_flags & 0x08) {
1479
    /* compressor flags from the future */
1480
1
    return -1;
1481
1
  }
1482
1483
  /* Compute some params */
1484
  /* Total blocks */
1485
11.8k
  context->nblocks = context->sourcesize / context->blocksize;
1486
11.8k
  context->leftover = context->sourcesize % context->blocksize;
1487
11.8k
  context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1488
1489
  /* Check that we have enough space to decompress */
1490
11.8k
  if (context->sourcesize > (int32_t)destsize) {
1491
2.78k
    return -1;
1492
2.78k
  }
1493
1494
9.06k
  if (*(context->header_flags) & BLOSC_MEMCPYED) {
1495
    /* Validate that compressed size is equal to decompressed size + header
1496
       size. */
1497
46
    if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
1498
9
      return -1;
1499
9
    }
1500
9.01k
  } else {
1501
9.01k
    ntbytes = initialize_decompress_func(context);
1502
9.01k
    if (ntbytes != 0) return ntbytes;
1503
1504
    /* Validate that compressed size is large enough to hold the bstarts array */
1505
8.98k
    if (context->nblocks > (context->compressedsize - 16) / 4) {
1506
27
      return -1;
1507
27
    }
1508
8.98k
  }
1509
1510
  /* Do the actual decompression */
1511
8.99k
  ntbytes = do_job(context);
1512
8.99k
  if (ntbytes < 0) {
1513
7.20k
    return -1;
1514
7.20k
  }
1515
1516
8.99k
  assert(ntbytes <= (int32_t)destsize);
1517
1.79k
  return ntbytes;
1518
8.99k
}
1519
1520
int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
1521
0
                         int numinternalthreads) {
1522
0
  int result;
1523
0
  struct blosc_context context;
1524
1525
0
  context.threads_started = 0;
1526
0
  result = blosc_run_decompression_with_context(&context, src, dest, destsize,
1527
0
                                                numinternalthreads);
1528
1529
0
  if (numinternalthreads > 1)
1530
0
  {
1531
0
    blosc_release_threadpool(&context);
1532
0
  }
1533
1534
0
  return result;
1535
0
}
1536
1537
20.6k
int blosc_decompress(const void* src, void* dest, size_t destsize) {
1538
20.6k
  int result;
1539
20.6k
  char* envvar;
1540
20.6k
  long nthreads;
1541
1542
  /* Check if should initialize */
1543
20.6k
  if (!g_initlib) blosc_init();
1544
1545
  /* Check for a BLOSC_NTHREADS environment variable */
1546
20.6k
  envvar = getenv("BLOSC_NTHREADS");
1547
20.6k
  if (envvar != NULL) {
1548
0
    nthreads = strtol(envvar, NULL, 10);
1549
0
    if ((nthreads != EINVAL) && (nthreads > 0)) {
1550
0
      result = blosc_set_nthreads((int)nthreads);
1551
0
      if (result < 0) { return result; }
1552
0
    }
1553
0
  }
1554
1555
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
1556
     that this should be the last env var so that it can take the
1557
     previous ones into account */
1558
20.6k
  envvar = getenv("BLOSC_NOLOCK");
1559
20.6k
  if (envvar != NULL) {
1560
0
    result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1561
0
    return result;
1562
0
  }
1563
1564
20.6k
  pthread_mutex_lock(global_comp_mutex);
1565
1566
20.6k
  result = blosc_run_decompression_with_context(g_global_context, src, dest,
1567
20.6k
                                                destsize, g_threads);
1568
1569
20.6k
  pthread_mutex_unlock(global_comp_mutex);
1570
1571
20.6k
  return result;
1572
20.6k
}
1573
1574
0
int blosc_getitem(const void* src, int start, int nitems, void* dest) {
1575
0
  uint8_t *_src=NULL;               /* current pos for source buffer */
1576
0
  uint8_t version, compversion;     /* versions for compressed header */
1577
0
  uint8_t flags;                    /* flags for header */
1578
0
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
1579
0
  int32_t nblocks;                  /* number of total blocks in buffer */
1580
0
  int32_t leftover;                 /* extra bytes at end of buffer */
1581
0
  uint8_t *bstarts;                 /* start pointers for each block */
1582
0
  int32_t typesize, blocksize, nbytes, compressedsize;
1583
0
  int32_t j, bsize, bsize2, leftoverblock;
1584
0
  int32_t cbytes, startb, stopb;
1585
0
  int stop = start + nitems;
1586
0
  uint8_t *tmp;
1587
0
  uint8_t *tmp2;
1588
0
  uint8_t *tmp3;
1589
0
  int32_t ebsize;
1590
0
  struct blosc_context context = {0};
1591
1592
0
  _src = (uint8_t *)(src);
1593
1594
  /* Read the header block */
1595
0
  version = _src[0];                        /* blosc format version */
1596
0
  compversion = _src[1];
1597
0
  flags = _src[2];                          /* flags */
1598
0
  typesize = (int32_t)_src[3];              /* typesize */
1599
0
  nbytes = sw32_(_src + 4);                 /* buffer size */
1600
0
  blocksize = sw32_(_src + 8);              /* block size */
1601
0
  compressedsize = sw32_(_src + 12); /* compressed buffer size */
1602
1603
0
  if (version != BLOSC_VERSION_FORMAT)
1604
0
    return -9;
1605
1606
0
  if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
1607
0
      typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
1608
0
    return -1;
1609
0
  }
1610
1611
  /* Compute some params */
1612
  /* Total blocks */
1613
0
  nblocks = nbytes / blocksize;
1614
0
  leftover = nbytes % blocksize;
1615
0
  nblocks = (leftover>0)? nblocks+1: nblocks;
1616
1617
  /* Only initialize the fields blosc_d uses */
1618
0
  context.typesize = typesize;
1619
0
  context.header_flags = &flags;
1620
0
  context.compversion = compversion;
1621
0
  context.compressedsize = compressedsize;
1622
0
  if (flags & BLOSC_MEMCPYED) {
1623
0
    if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
1624
0
      return -1;
1625
0
    }
1626
0
  } else {
1627
0
    ntbytes = initialize_decompress_func(&context);
1628
0
    if (ntbytes != 0) return ntbytes;
1629
1630
0
    if (nblocks >= (compressedsize - 16) / 4) {
1631
0
      return -1;
1632
0
    }
1633
0
  }
1634
1635
0
  ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1636
0
  tmp = my_malloc(blocksize + ebsize + blocksize);
1637
0
  tmp2 = tmp + blocksize;
1638
0
  tmp3 = tmp + blocksize + ebsize;
1639
1640
0
  _src += 16;
1641
0
  bstarts = _src;
1642
0
  _src += sizeof(int32_t)*nblocks;
1643
1644
  /* Check region boundaries */
1645
0
  if ((start < 0) || (start*typesize > nbytes)) {
1646
0
    fprintf(stderr, "`start` out of bounds");
1647
0
    return -1;
1648
0
  }
1649
1650
0
  if ((stop < 0) || (stop*typesize > nbytes)) {
1651
0
    fprintf(stderr, "`start`+`nitems` out of bounds");
1652
0
    return -1;
1653
0
  }
1654
1655
0
  for (j = 0; j < nblocks; j++) {
1656
0
    bsize = blocksize;
1657
0
    leftoverblock = 0;
1658
0
    if ((j == nblocks - 1) && (leftover > 0)) {
1659
0
      bsize = leftover;
1660
0
      leftoverblock = 1;
1661
0
    }
1662
1663
    /* Compute start & stop for each block */
1664
0
    startb = start * typesize - j * blocksize;
1665
0
    stopb = stop * typesize - j * blocksize;
1666
0
    if ((startb >= (int)blocksize) || (stopb <= 0)) {
1667
0
      continue;
1668
0
    }
1669
0
    if (startb < 0) {
1670
0
      startb = 0;
1671
0
    }
1672
0
    if (stopb > (int)blocksize) {
1673
0
      stopb = blocksize;
1674
0
    }
1675
0
    bsize2 = stopb - startb;
1676
1677
    /* Do the actual data copy */
1678
0
    if (flags & BLOSC_MEMCPYED) {
1679
      /* We want to memcpy only */
1680
0
      fastcopy((uint8_t *) dest + ntbytes,
1681
0
               (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
1682
0
      cbytes = bsize2;
1683
0
    }
1684
0
    else {
1685
      /* Regular decompression.  Put results in tmp2. */
1686
0
      cbytes = blosc_d(&context, bsize, leftoverblock,
1687
0
                       (uint8_t *)src, sw32_(bstarts + j * 4),
1688
0
                       tmp2, tmp, tmp3);
1689
0
      if (cbytes < 0) {
1690
0
        ntbytes = cbytes;
1691
0
        break;
1692
0
      }
1693
      /* Copy to destination */
1694
0
      fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
1695
0
      cbytes = bsize2;
1696
0
    }
1697
0
    ntbytes += cbytes;
1698
0
  }
1699
1700
0
  my_free(tmp);
1701
1702
0
  return ntbytes;
1703
0
}
1704
1705
/* Decompress & unshuffle several blocks in a single thread */
1706
static void *t_blosc(void *ctxt)
1707
0
{
1708
0
  struct thread_context* context = (struct thread_context*)ctxt;
1709
0
  int32_t cbytes, ntdest;
1710
0
  int32_t tblocks;              /* number of blocks per thread */
1711
0
  int32_t leftover2;
1712
0
  int32_t tblock;               /* limit block on a thread */
1713
0
  int32_t nblock_;              /* private copy of nblock */
1714
0
  int32_t bsize, leftoverblock;
1715
  /* Parameters for threads */
1716
0
  int32_t blocksize;
1717
0
  int32_t ebsize;
1718
0
  int32_t compress;
1719
0
  int32_t maxbytes;
1720
0
  int32_t ntbytes;
1721
0
  int32_t flags;
1722
0
  int32_t nblocks;
1723
0
  int32_t leftover;
1724
0
  uint8_t *bstarts;
1725
0
  const uint8_t *src;
1726
0
  uint8_t *dest;
1727
0
  uint8_t *tmp;
1728
0
  uint8_t *tmp2;
1729
0
  uint8_t *tmp3;
1730
0
  int rc;
1731
0
  (void)rc;  // just to avoid 'unused-variable' warning
1732
1733
0
  while(1)
1734
0
  {
1735
    /* Synchronization point for all threads (wait for initialization) */
1736
0
    WAIT_INIT(NULL, context->parent_context);
1737
1738
0
    if(context->parent_context->end_threads)
1739
0
    {
1740
0
      break;
1741
0
    }
1742
1743
    /* Get parameters for this thread before entering the main loop */
1744
0
    blocksize = context->parent_context->blocksize;
1745
0
    ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1746
0
    compress = context->parent_context->compress;
1747
0
    flags = *(context->parent_context->header_flags);
1748
0
    maxbytes = context->parent_context->destsize;
1749
0
    nblocks = context->parent_context->nblocks;
1750
0
    leftover = context->parent_context->leftover;
1751
0
    bstarts = context->parent_context->bstarts;
1752
0
    src = context->parent_context->src;
1753
0
    dest = context->parent_context->dest;
1754
1755
0
    if (blocksize > context->tmpblocksize)
1756
0
    {
1757
0
      my_free(context->tmp);
1758
0
      context->tmp = my_malloc(blocksize + ebsize + blocksize);
1759
0
      context->tmp2 = context->tmp + blocksize;
1760
0
      context->tmp3 = context->tmp + blocksize + ebsize;
1761
0
    }
1762
1763
0
    tmp = context->tmp;
1764
0
    tmp2 = context->tmp2;
1765
0
    tmp3 = context->tmp3;
1766
1767
0
    ntbytes = 0;                /* only useful for decompression */
1768
1769
0
    if (compress && !(flags & BLOSC_MEMCPYED)) {
1770
      /* Compression always has to follow the block order */
1771
0
      pthread_mutex_lock(&context->parent_context->count_mutex);
1772
0
      context->parent_context->thread_nblock++;
1773
0
      nblock_ = context->parent_context->thread_nblock;
1774
0
      pthread_mutex_unlock(&context->parent_context->count_mutex);
1775
0
      tblock = nblocks;
1776
0
    }
1777
0
    else {
1778
      /* Decompression can happen using any order.  We choose
1779
       sequential block order on each thread */
1780
1781
      /* Blocks per thread */
1782
0
      tblocks = nblocks / context->parent_context->numthreads;
1783
0
      leftover2 = nblocks % context->parent_context->numthreads;
1784
0
      tblocks = (leftover2>0)? tblocks+1: tblocks;
1785
1786
0
      nblock_ = context->tid*tblocks;
1787
0
      tblock = nblock_ + tblocks;
1788
0
      if (tblock > nblocks) {
1789
0
        tblock = nblocks;
1790
0
      }
1791
0
    }
1792
1793
    /* Loop over blocks */
1794
0
    leftoverblock = 0;
1795
0
    while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1796
0
      bsize = blocksize;
1797
0
      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1798
0
        bsize = leftover;
1799
0
        leftoverblock = 1;
1800
0
      }
1801
0
      if (compress) {
1802
0
        if (flags & BLOSC_MEMCPYED) {
1803
          /* We want to memcpy only */
1804
0
          fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
1805
0
                   src + nblock_ * blocksize, bsize);
1806
0
          cbytes = bsize;
1807
0
        }
1808
0
        else {
1809
          /* Regular compression */
1810
0
          cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1811
0
                           src+nblock_*blocksize, tmp2, tmp, tmp3);
1812
0
        }
1813
0
      }
1814
0
      else {
1815
0
        if (flags & BLOSC_MEMCPYED) {
1816
          /* We want to memcpy only */
1817
0
          fastcopy(dest + nblock_ * blocksize,
1818
0
                   src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, bsize);
1819
0
          cbytes = bsize;
1820
0
        }
1821
0
        else {
1822
0
          cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1823
0
                           src, sw32_(bstarts + nblock_ * 4),
1824
0
                           dest+nblock_*blocksize,
1825
0
                           tmp, tmp2);
1826
0
        }
1827
0
      }
1828
1829
      /* Check whether current thread has to giveup */
1830
0
      if (context->parent_context->thread_giveup_code <= 0) {
1831
0
        break;
1832
0
      }
1833
1834
      /* Check results for the compressed/decompressed block */
1835
0
      if (cbytes < 0) {            /* compr/decompr failure */
1836
        /* Set giveup_code error */
1837
0
        pthread_mutex_lock(&context->parent_context->count_mutex);
1838
0
        context->parent_context->thread_giveup_code = cbytes;
1839
0
        pthread_mutex_unlock(&context->parent_context->count_mutex);
1840
0
        break;
1841
0
      }
1842
1843
0
      if (compress && !(flags & BLOSC_MEMCPYED)) {
1844
        /* Start critical section */
1845
0
        pthread_mutex_lock(&context->parent_context->count_mutex);
1846
0
        ntdest = context->parent_context->num_output_bytes;
1847
0
        _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1848
0
        if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1849
0
          context->parent_context->thread_giveup_code = 0;  /* incompressible buffer */
1850
0
          pthread_mutex_unlock(&context->parent_context->count_mutex);
1851
0
          break;
1852
0
        }
1853
0
        context->parent_context->thread_nblock++;
1854
0
        nblock_ = context->parent_context->thread_nblock;
1855
0
        context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */
1856
0
        pthread_mutex_unlock(&context->parent_context->count_mutex);
1857
        /* End of critical section */
1858
1859
        /* Copy the compressed buffer to destination */
1860
0
        fastcopy(dest + ntdest, tmp2, cbytes);
1861
0
      }
1862
0
      else {
1863
0
        nblock_++;
1864
        /* Update counter for this thread */
1865
0
        ntbytes += cbytes;
1866
0
      }
1867
1868
0
    } /* closes while (nblock_) */
1869
1870
    /* Sum up all the bytes decompressed */
1871
0
    if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1872
      /* Update global counter for all threads (decompression only) */
1873
0
      pthread_mutex_lock(&context->parent_context->count_mutex);
1874
0
      context->parent_context->num_output_bytes += ntbytes;
1875
0
      pthread_mutex_unlock(&context->parent_context->count_mutex);
1876
0
    }
1877
1878
    /* Meeting point for all threads (wait for finalization) */
1879
0
    WAIT_FINISH(NULL, context->parent_context);
1880
0
  }
1881
1882
  /* Cleanup our working space and context */
1883
0
  my_free(context->tmp);
1884
0
  my_free(context);
1885
1886
0
  return(NULL);
1887
0
}
1888
1889
1890
static int init_threads(struct blosc_context* context)
1891
0
{
1892
0
  int32_t tid;
1893
0
  int rc2;
1894
0
  int32_t ebsize;
1895
0
  struct thread_context* thread_context;
1896
1897
  /* Initialize mutex and condition variable objects */
1898
0
  pthread_mutex_init(&context->count_mutex, NULL);
1899
1900
  /* Set context thread sentinels */
1901
0
  context->thread_giveup_code = 1;
1902
0
  context->thread_nblock = -1;
1903
1904
  /* Barrier initialization */
1905
0
#ifdef _POSIX_BARRIERS_MINE
1906
0
  pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1907
0
  pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1908
#else
1909
  pthread_mutex_init(&context->count_threads_mutex, NULL);
1910
  pthread_cond_init(&context->count_threads_cv, NULL);
1911
  context->count_threads = 0;      /* Reset threads counter */
1912
#endif
1913
1914
0
#if !defined(_WIN32)
1915
  /* Initialize and set thread detached attribute */
1916
0
  pthread_attr_init(&context->ct_attr);
1917
0
  pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1918
0
#endif
1919
1920
  /* Finally, create the threads in detached state */
1921
0
  for (tid = 0; tid < context->numthreads; tid++) {
1922
0
    context->tids[tid] = tid;
1923
1924
    /* Create a thread context thread owns context (will destroy when finished) */
1925
0
    thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1926
0
    thread_context->parent_context = context;
1927
0
    thread_context->tid = tid;
1928
1929
0
    ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1930
0
    thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1931
0
    thread_context->tmp2 = thread_context->tmp + context->blocksize;
1932
0
    thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1933
0
    thread_context->tmpblocksize = context->blocksize;
1934
1935
0
#if !defined(_WIN32)
1936
0
    rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1937
#else
1938
    rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1939
#endif
1940
0
    if (rc2) {
1941
0
      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1942
0
      fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1943
0
      return(-1);
1944
0
    }
1945
0
  }
1946
1947
1948
0
  return(0);
1949
0
}
1950
1951
int blosc_get_nthreads(void)
1952
0
{
1953
0
  int ret = g_threads;
1954
1955
0
  return ret;
1956
0
}
1957
1958
int blosc_set_nthreads(int nthreads_new)
1959
14.4k
{
1960
14.4k
  int ret = g_threads;
1961
1962
  /* Check if should initialize */
1963
14.4k
  if (!g_initlib) blosc_init();
1964
1965
14.4k
  if (nthreads_new != ret){
1966
    /* Re-initialize Blosc */
1967
0
    blosc_destroy();
1968
0
    blosc_init();
1969
0
    g_threads = nthreads_new;
1970
0
  }
1971
1972
14.4k
  return ret;
1973
14.4k
}
1974
1975
int blosc_set_nthreads_(struct blosc_context* context)
1976
0
{
1977
0
  if (context->numthreads > BLOSC_MAX_THREADS) {
1978
0
    fprintf(stderr,
1979
0
            "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
1980
0
            BLOSC_MAX_THREADS);
1981
0
    return -1;
1982
0
  }
1983
0
  else if (context->numthreads <= 0) {
1984
0
    fprintf(stderr, "Error.  nthreads must be a positive integer");
1985
0
    return -1;
1986
0
  }
1987
1988
  /* Launch a new pool of threads */
1989
0
  if (context->numthreads > 1 && context->numthreads != context->threads_started) {
1990
0
    blosc_release_threadpool(context);
1991
0
    if (init_threads(context) < 0) {
1992
0
      return -1;
1993
0
    }
1994
0
  }
1995
1996
  /* We have now started the threads */
1997
0
  context->threads_started = context->numthreads;
1998
1999
0
  return context->numthreads;
2000
0
}
2001
2002
const char* blosc_get_compressor(void)
2003
0
{
2004
0
  const char* compname;
2005
0
  blosc_compcode_to_compname(g_compressor, &compname);
2006
2007
0
  return compname;
2008
0
}
2009
2010
int blosc_set_compressor(const char *compname)
2011
15.5k
{
2012
15.5k
  int code = blosc_compname_to_compcode(compname);
2013
2014
15.5k
  g_compressor = code;
2015
2016
  /* Check if should initialize */
2017
15.5k
  if (!g_initlib) blosc_init();
2018
2019
15.5k
  return code;
2020
15.5k
}
2021
2022
const char* blosc_list_compressors(void)
2023
0
{
2024
0
  static int compressors_list_done = 0;
2025
0
  static char ret[256];
2026
2027
0
  if (compressors_list_done) return ret;
2028
0
  ret[0] = '\0';
2029
0
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
2030
0
#if defined(HAVE_LZ4)
2031
0
  strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
2032
0
  strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
2033
0
#endif /* HAVE_LZ4 */
2034
#if defined(HAVE_SNAPPY)
2035
  strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
2036
#endif /* HAVE_SNAPPY */
2037
0
#if defined(HAVE_ZLIB)
2038
0
  strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
2039
0
#endif /* HAVE_ZLIB */
2040
0
#if defined(HAVE_ZSTD)
2041
0
  strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
2042
0
#endif /* HAVE_ZSTD */
2043
0
  compressors_list_done = 1;
2044
0
  return ret;
2045
0
}
2046
2047
const char* blosc_get_version_string(void)
2048
0
{
2049
0
  return BLOSC_VERSION_STRING;
2050
0
}
2051
2052
int blosc_get_complib_info(const char *compname, char **complib, char **version)
2053
0
{
2054
0
  int clibcode;
2055
0
  const char *clibname;
2056
0
  const char *clibversion = "unknown";
2057
2058
0
#if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
2059
0
  char sbuffer[256];
2060
0
#endif
2061
2062
0
  clibcode = compname_to_clibcode(compname);
2063
0
  clibname = clibcode_to_clibname(clibcode);
2064
2065
  /* complib version */
2066
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
2067
0
    clibversion = BLOSCLZ_VERSION_STRING;
2068
0
  }
2069
0
#if defined(HAVE_LZ4)
2070
0
  else if (clibcode == BLOSC_LZ4_LIB) {
2071
0
#if defined(LZ4_VERSION_MAJOR)
2072
0
    sprintf(sbuffer, "%d.%d.%d",
2073
0
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
2074
0
    clibversion = sbuffer;
2075
0
#endif /* LZ4_VERSION_MAJOR */
2076
0
  }
2077
0
#endif /* HAVE_LZ4 */
2078
#if defined(HAVE_SNAPPY)
2079
  else if (clibcode == BLOSC_SNAPPY_LIB) {
2080
#if defined(SNAPPY_VERSION)
2081
    sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
2082
    clibversion = sbuffer;
2083
#endif /* SNAPPY_VERSION */
2084
  }
2085
#endif /* HAVE_SNAPPY */
2086
0
#if defined(HAVE_ZLIB)
2087
0
  else if (clibcode == BLOSC_ZLIB_LIB) {
2088
0
    clibversion = ZLIB_VERSION;
2089
0
  }
2090
0
#endif /* HAVE_ZLIB */
2091
0
#if defined(HAVE_ZSTD)
2092
0
  else if (clibcode == BLOSC_ZSTD_LIB) {
2093
0
    sprintf(sbuffer, "%d.%d.%d",
2094
0
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
2095
0
    clibversion = sbuffer;
2096
0
  }
2097
0
#endif /* HAVE_ZSTD */
2098
0
  else {
2099
    /* Unsupported library */
2100
0
    if (complib != NULL) *complib = NULL;
2101
0
    if (version != NULL) *version = NULL;
2102
0
    return -1;
2103
0
  }
2104
2105
0
  if (complib != NULL) *complib = strdup(clibname);
2106
0
  if (version != NULL) *version = strdup(clibversion);
2107
2108
0
  return clibcode;
2109
0
}
2110
2111
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
2112
void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
2113
                         size_t *cbytes, size_t *blocksize)
2114
28.5k
{
2115
28.5k
  uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
2116
28.5k
  uint8_t version = _src[0];               /* version of header */
2117
2118
28.5k
  if (version != BLOSC_VERSION_FORMAT) {
2119
10
    *nbytes = *blocksize = *cbytes = 0;
2120
10
    return;
2121
10
  }
2122
2123
  /* Read the interesting values */
2124
28.5k
  *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */
2125
28.5k
  *blocksize = (size_t)sw32_(_src + 8);    /* block size */
2126
28.5k
  *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */
2127
28.5k
}
2128
2129
7.75k
int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
2130
7.75k
  size_t header_cbytes, header_blocksize;
2131
7.75k
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
2132
7.75k
  blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
2133
7.75k
  if (header_cbytes != cbytes) return -1;
2134
7.75k
  if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
2135
7.71k
  return 0;
2136
7.75k
}
2137
2138
/* Return `typesize` and `flags` from a compressed buffer. */
2139
void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
2140
                            int *flags)
2141
0
{
2142
0
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2143
2144
0
  uint8_t version = _src[0];               /* version of header */
2145
2146
0
  if (version != BLOSC_VERSION_FORMAT) {
2147
0
    *flags = *typesize = 0;
2148
0
    return;
2149
0
  }
2150
2151
  /* Read the interesting values */
2152
0
  *flags = (int)_src[2] & 7;             /* first three flags */
2153
0
  *typesize = (size_t)_src[3];           /* typesize */
2154
0
}
2155
2156
2157
/* Return version information from a compressed buffer. */
2158
void blosc_cbuffer_versions(const void *cbuffer, int *version,
2159
                            int *versionlz)
2160
0
{
2161
0
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2162
2163
  /* Read the version info */
2164
0
  *version = (int)_src[0];         /* blosc format version */
2165
0
  *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */
2166
0
}
2167
2168
2169
/* Return the compressor library/format used in a compressed buffer. */
2170
const char *blosc_cbuffer_complib(const void *cbuffer)
2171
0
{
2172
0
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2173
0
  int clibcode;
2174
0
  const char *complib;
2175
2176
  /* Read the compressor format/library info */
2177
0
  clibcode = (_src[2] & 0xe0) >> 5;
2178
0
  complib = clibcode_to_clibname(clibcode);
2179
0
  return complib;
2180
0
}
2181
2182
/* Get the internal blocksize to be used during compression.  0 means
2183
   that an automatic blocksize is computed internally. */
2184
int blosc_get_blocksize(void)
2185
0
{
2186
0
  return (int)g_force_blocksize;
2187
0
}
2188
2189
/* Force the use of a specific blocksize.  If 0, an automatic
2190
   blocksize will be used (the default). */
2191
void blosc_set_blocksize(size_t size)
2192
5.36k
{
2193
5.36k
  g_force_blocksize = (int32_t)size;
2194
5.36k
}
2195
2196
/* Force the use of a specific split mode. */
2197
void blosc_set_splitmode(int mode)
2198
14.4k
{
2199
14.4k
  g_splitmode = mode;
2200
14.4k
}
2201
2202
/* Child global context is invalid and pool threads no longer exist post-fork.
2203
 * Discard the old, inconsistent global context and global context mutex and
2204
 * mark as uninitialized.  Subsequent calls through `blosc_*` interfaces will
2205
 * trigger re-init of the global context.
2206
 *
2207
 * All pthread interfaces have undefined behavior in child handler in current
2208
 * posix standards: https://pubs.opengroup.org/onlinepubs/9699919799/
2209
 */
2210
0
void blosc_atfork_child(void) {
2211
0
  if (!g_initlib) return;
2212
2213
0
  g_initlib = 0;
2214
2215
0
  my_free(global_comp_mutex);
2216
0
  global_comp_mutex = NULL;
2217
2218
0
  my_free(g_global_context);
2219
0
  g_global_context = NULL;
2220
2221
0
}
2222
2223
void blosc_init(void)
2224
2
{
2225
  /* Return if we are already initialized */
2226
2
  if (g_initlib) return;
2227
2228
2
  global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
2229
2
  pthread_mutex_init(global_comp_mutex, NULL);
2230
2231
2
  g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2232
2
  g_global_context->threads_started = 0;
2233
2234
2
  #if !defined(_WIN32)
2235
  /* atfork handlers are only be registered once, though multiple re-inits may
2236
   * occur via blosc_destroy/blosc_init.  */
2237
2
  if (!g_atfork_registered) {
2238
2
    g_atfork_registered = 1;
2239
2
    pthread_atfork(NULL, NULL, &blosc_atfork_child);
2240
2
  }
2241
2
  #endif
2242
2243
2
  g_initlib = 1;
2244
2
}
2245
2246
void blosc_destroy(void)
2247
0
{
2248
  /* Return if Blosc is not initialized */
2249
0
  if (!g_initlib) return;
2250
2251
0
  g_initlib = 0;
2252
2253
0
  blosc_release_threadpool(g_global_context);
2254
0
  my_free(g_global_context);
2255
0
  g_global_context = NULL;
2256
2257
0
  pthread_mutex_destroy(global_comp_mutex);
2258
0
  my_free(global_comp_mutex);
2259
0
  global_comp_mutex = NULL;
2260
0
}
2261
2262
int blosc_release_threadpool(struct blosc_context* context)
2263
0
{
2264
0
  int32_t t;
2265
0
  void* status;
2266
0
  int rc;
2267
0
  int rc2;
2268
0
  (void)rc;  // just to avoid 'unused-variable' warning
2269
2270
0
  if (context->threads_started > 0)
2271
0
  {
2272
    /* Tell all existing threads to finish */
2273
0
    context->end_threads = 1;
2274
2275
    /* Sync threads */
2276
0
    WAIT_INIT(-1, context);
2277
2278
    /* Join exiting threads */
2279
0
    for (t=0; t<context->threads_started; t++) {
2280
0
      rc2 = pthread_join(context->threads[t], &status);
2281
0
      if (rc2) {
2282
0
        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2283
0
        fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2284
0
      }
2285
0
    }
2286
2287
    /* Release mutex and condition variable objects */
2288
0
    pthread_mutex_destroy(&context->count_mutex);
2289
2290
    /* Barriers */
2291
0
  #ifdef _POSIX_BARRIERS_MINE
2292
0
      pthread_barrier_destroy(&context->barr_init);
2293
0
      pthread_barrier_destroy(&context->barr_finish);
2294
  #else
2295
      pthread_mutex_destroy(&context->count_threads_mutex);
2296
      pthread_cond_destroy(&context->count_threads_cv);
2297
  #endif
2298
2299
      /* Thread attributes */
2300
0
  #if !defined(_WIN32)
2301
0
      pthread_attr_destroy(&context->ct_attr);
2302
0
  #endif
2303
2304
0
  }
2305
2306
0
  context->threads_started = 0;
2307
2308
0
  return 0;
2309
0
}
2310
2311
int blosc_free_resources(void)
2312
0
{
2313
  /* Return if Blosc is not initialized */
2314
0
  if (!g_initlib) return -1;
2315
2316
0
  return blosc_release_threadpool(g_global_context);
2317
0
}