Coverage Report

Created: 2024-05-21 06:09

/src/c-blosc/blosc/blosc.c
Line
Count
Source (jump to first uncovered line)
1
/*********************************************************************
2
  Blosc - Blocked Shuffling and Compression Library
3
4
  Author: Francesc Alted <francesc@blosc.org>
5
  Creation date: 2009-05-20
6
7
  See LICENSE.txt for details about copyright and rights to use.
8
**********************************************************************/
9
10
11
#include <stdio.h>
12
#include <stdlib.h>
13
#include <errno.h>
14
#include <string.h>
15
#include <sys/types.h>
16
#include <assert.h>
17
18
#include "fastcopy.h"
19
20
#if defined(USING_CMAKE)
21
  #include "config.h"
22
#endif /*  USING_CMAKE */
23
#include "blosc.h"
24
#include "shuffle.h"
25
#include "blosclz.h"
26
#if defined(HAVE_LZ4)
27
  #include "lz4.h"
28
  #include "lz4hc.h"
29
#endif /*  HAVE_LZ4 */
30
#if defined(HAVE_SNAPPY)
31
  #include "snappy-c.h"
32
#endif /*  HAVE_SNAPPY */
33
#if defined(HAVE_ZLIB)
34
  #include "zlib.h"
35
#endif /*  HAVE_ZLIB */
36
#if defined(HAVE_ZSTD)
37
  #include "zstd.h"
38
#endif /*  HAVE_ZSTD */
39
40
#if defined(_WIN32) && !defined(__MINGW32__)
41
  #include <windows.h>
42
  #include <malloc.h>
43
44
  /* stdint.h only available in VS2010 (VC++ 16.0) and newer */
45
  #if defined(_MSC_VER) && _MSC_VER < 1600
46
    #include "win32/stdint-windows.h"
47
  #else
48
    #include <stdint.h>
49
  #endif
50
51
  #include <process.h>
52
  #define getpid _getpid
53
#else
54
  #include <stdint.h>
55
  #include <unistd.h>
56
  #include <inttypes.h>
57
#endif  /* _WIN32 */
58
59
/* Include the win32/pthread.h library for all the Windows builds. See #224. */
60
#if defined(_WIN32)
61
  #include "win32/pthread.h"
62
  #include "win32/pthread.c"
63
#else
64
  #include <pthread.h>
65
#endif
66
67
68
/* Some useful units */
69
0
#define KB 1024
70
#define MB (1024 * (KB))
71
72
/* Minimum buffer size to be compressed */
73
38.0k
#define MIN_BUFFERSIZE 128       /* Cannot be smaller than 66 */
74
75
/* The maximum number of splits in a block for compression */
76
11.5k
#define MAX_SPLITS 16            /* Cannot be larger than 128 */
77
78
/* The size of L1 cache.  32 KB is quite common nowadays. */
79
0
#define L1 (32 * (KB))
80
81
/* Have problems using posix barriers when symbol value is 200112L */
82
/* This requires more investigation, but will work for the moment */
83
#if defined(_POSIX_BARRIERS) && ( (_POSIX_BARRIERS - 20012L) >= 0 && _POSIX_BARRIERS != 200112L)
84
#define _POSIX_BARRIERS_MINE
85
#endif
86
/* Synchronization variables */
87
88
89
struct blosc_context {
90
  int32_t compress;               /* 1 if we are doing compression 0 if decompress */
91
92
  const uint8_t* src;
93
  uint8_t* dest;                  /* The current pos in the destination buffer */
94
  uint8_t* header_flags;          /* Flags for header */
95
  int compversion;                /* Compressor version byte, only used during decompression */
96
  int32_t sourcesize;             /* Number of bytes in source buffer (or uncompressed bytes in compressed file) */
97
  int32_t compressedsize;         /* Number of bytes of compressed data (only used when decompressing) */
98
  int32_t nblocks;                /* Number of total blocks in buffer */
99
  int32_t leftover;               /* Extra bytes at end of buffer */
100
  int32_t blocksize;              /* Length of the block in bytes */
101
  int32_t typesize;               /* Type size */
102
  int32_t num_output_bytes;       /* Counter for the number of output bytes */
103
  int32_t destsize;               /* Maximum size for destination buffer */
104
  uint8_t* bstarts;               /* Start of the buffer past header info */
105
  int32_t compcode;               /* Compressor code to use */
106
  int clevel;                     /* Compression level (1-9) */
107
  /* Function to use for decompression.  Only used when decompression */
108
  int (*decompress_func)(const void* input, int compressed_length, void* output,
109
                         int maxout);
110
111
  /* Threading */
112
  int32_t numthreads;
113
  int32_t threads_started;
114
  int32_t end_threads;
115
  pthread_t threads[BLOSC_MAX_THREADS];
116
  int32_t tids[BLOSC_MAX_THREADS];
117
  pthread_mutex_t count_mutex;
118
  #ifdef _POSIX_BARRIERS_MINE
119
  pthread_barrier_t barr_init;
120
  pthread_barrier_t barr_finish;
121
  #else
122
  int32_t count_threads;
123
  pthread_mutex_t count_threads_mutex;
124
  pthread_cond_t count_threads_cv;
125
  #endif
126
  #if !defined(_WIN32)
127
  pthread_attr_t ct_attr;            /* creation time attrs for threads */
128
  #endif
129
  int32_t thread_giveup_code;               /* error code when give up */
130
  int32_t thread_nblock;                    /* block counter */
131
};
132
133
struct thread_context {
134
  struct blosc_context* parent_context;
135
  int32_t tid;
136
  uint8_t* tmp;
137
  uint8_t* tmp2;
138
  uint8_t* tmp3;
139
  int32_t tmpblocksize; /* Used to keep track of how big the temporary buffers are */
140
};
141
142
/* Global context for non-contextual API */
143
static struct blosc_context* g_global_context;
144
static pthread_mutex_t* global_comp_mutex;
145
static int32_t g_compressor = BLOSC_BLOSCLZ;  /* the compressor to use by default */
146
static int32_t g_threads = 1;
147
static int32_t g_force_blocksize = 0;
148
static int32_t g_initlib = 0;
149
static int32_t g_atfork_registered = 0;
150
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
151
152
153
154
/* Wrapped function to adjust the number of threads used by blosc */
155
int blosc_set_nthreads_(struct blosc_context*);
156
157
/* Releases the global threadpool */
158
int blosc_release_threadpool(struct blosc_context* context);
159
160
/* Macros for synchronization */
161
162
/* Wait until all threads are initialized */
163
#ifdef _POSIX_BARRIERS_MINE
164
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)  \
165
0
  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_init); \
166
0
  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
167
0
    printf("Could not wait on barrier (init): %d\n", rc); \
168
0
    return((RET_VAL));                            \
169
0
  }
170
#else
171
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)   \
172
  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
173
  if (CONTEXT_PTR->count_threads < CONTEXT_PTR->numthreads) { \
174
    CONTEXT_PTR->count_threads++;  \
175
    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
176
  } \
177
  else { \
178
    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
179
  } \
180
  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
181
#endif
182
183
/* Wait for all threads to finish */
184
#ifdef _POSIX_BARRIERS_MINE
185
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)   \
186
0
  rc = pthread_barrier_wait(&CONTEXT_PTR->barr_finish); \
187
0
  if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) { \
188
0
    printf("Could not wait on barrier (finish)\n"); \
189
0
    return((RET_VAL));                              \
190
0
  }
191
#else
192
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                           \
193
  pthread_mutex_lock(&CONTEXT_PTR->count_threads_mutex); \
194
  if (CONTEXT_PTR->count_threads > 0) { \
195
    CONTEXT_PTR->count_threads--; \
196
    pthread_cond_wait(&CONTEXT_PTR->count_threads_cv, &CONTEXT_PTR->count_threads_mutex); \
197
  } \
198
  else { \
199
    pthread_cond_broadcast(&CONTEXT_PTR->count_threads_cv); \
200
  } \
201
  pthread_mutex_unlock(&CONTEXT_PTR->count_threads_mutex);
202
#endif
203
204
205
/* A function for aligned malloc that is portable */
206
static uint8_t *my_malloc(size_t size)
207
23.8k
{
208
23.8k
  void *block = NULL;
209
23.8k
  int res = 0;
210
211
/* Do an alignment to 32 bytes because AVX2 is supported */
212
#if defined(_WIN32)
213
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
214
  block = (void *)_aligned_malloc(size, 32);
215
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
216
  /* Platform does have an implementation of posix_memalign */
217
23.8k
  res = posix_memalign(&block, 32, size);
218
#else
219
  block = malloc(size);
220
#endif  /* _WIN32 */
221
222
23.8k
  if (block == NULL || res != 0) {
223
0
    printf("Error allocating memory!");
224
0
    return NULL;
225
0
  }
226
227
23.8k
  return (uint8_t *)block;
228
23.8k
}
229
230
231
/* Release memory booked by my_malloc */
232
static void my_free(void *block)
233
23.8k
{
234
#if defined(_WIN32)
235
    _aligned_free(block);
236
#else
237
23.8k
    free(block);
238
23.8k
#endif  /* _WIN32 */
239
23.8k
}
240
241
242
/* Copy 4 bytes from `*pa` to int32_t, changing endianness if necessary. */
243
static int32_t sw32_(const uint8_t *pa)
244
212k
{
245
212k
  int32_t idest;
246
212k
  uint8_t *dest = (uint8_t *)&idest;
247
212k
  int i = 1;                    /* for big/little endian detection */
248
212k
  char *p = (char *)&i;
249
250
212k
  if (p[0] != 1) {
251
    /* big endian */
252
0
    dest[0] = pa[3];
253
0
    dest[1] = pa[2];
254
0
    dest[2] = pa[1];
255
0
    dest[3] = pa[0];
256
0
  }
257
212k
  else {
258
    /* little endian */
259
212k
    dest[0] = pa[0];
260
212k
    dest[1] = pa[1];
261
212k
    dest[2] = pa[2];
262
212k
    dest[3] = pa[3];
263
212k
  }
264
212k
  return idest;
265
212k
}
266
267
268
/* Copy 4 bytes from `*pa` to `*dest`, changing endianness if necessary. */
269
static void _sw32(uint8_t* dest, int32_t a)
270
405k
{
271
405k
  uint8_t *pa = (uint8_t *)&a;
272
405k
  int i = 1;                    /* for big/little endian detection */
273
405k
  char *p = (char *)&i;
274
275
405k
  if (p[0] != 1) {
276
    /* big endian */
277
0
    dest[0] = pa[3];
278
0
    dest[1] = pa[2];
279
0
    dest[2] = pa[1];
280
0
    dest[3] = pa[0];
281
0
  }
282
405k
  else {
283
    /* little endian */
284
405k
    dest[0] = pa[0];
285
405k
    dest[1] = pa[1];
286
405k
    dest[2] = pa[2];
287
405k
    dest[3] = pa[3];
288
405k
  }
289
405k
}
290
291
/*
292
 * Conversion routines between compressor and compression libraries
293
 */
294
295
/* Return the library code associated with the compressor name */
296
static int compname_to_clibcode(const char *compname)
297
0
{
298
0
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
299
0
    return BLOSC_BLOSCLZ_LIB;
300
0
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
301
0
    return BLOSC_LZ4_LIB;
302
0
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
303
0
    return BLOSC_LZ4_LIB;
304
0
  if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0)
305
0
    return BLOSC_SNAPPY_LIB;
306
0
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
307
0
    return BLOSC_ZLIB_LIB;
308
0
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
309
0
    return BLOSC_ZSTD_LIB;
310
0
  return -1;
311
0
}
312
313
/* Return the library name associated with the compressor code */
314
static const char *clibcode_to_clibname(int clibcode)
315
0
{
316
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
317
0
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
318
0
  if (clibcode == BLOSC_SNAPPY_LIB) return BLOSC_SNAPPY_LIBNAME;
319
0
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
320
0
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
321
0
  return NULL;                  /* should never happen */
322
0
}
323
324
325
/*
326
 * Conversion routines between compressor names and compressor codes
327
 */
328
329
/* Get the compressor name associated with the compressor code */
330
int blosc_compcode_to_compname(int compcode, const char **compname)
331
0
{
332
0
  int code = -1;    /* -1 means non-existent compressor code */
333
0
  const char *name = NULL;
334
335
  /* Map the compressor code */
336
0
  if (compcode == BLOSC_BLOSCLZ)
337
0
    name = BLOSC_BLOSCLZ_COMPNAME;
338
0
  else if (compcode == BLOSC_LZ4)
339
0
    name = BLOSC_LZ4_COMPNAME;
340
0
  else if (compcode == BLOSC_LZ4HC)
341
0
    name = BLOSC_LZ4HC_COMPNAME;
342
0
  else if (compcode == BLOSC_SNAPPY)
343
0
    name = BLOSC_SNAPPY_COMPNAME;
344
0
  else if (compcode == BLOSC_ZLIB)
345
0
    name = BLOSC_ZLIB_COMPNAME;
346
0
  else if (compcode == BLOSC_ZSTD)
347
0
    name = BLOSC_ZSTD_COMPNAME;
348
349
0
  *compname = name;
350
351
  /* Guess if there is support for this code */
352
0
  if (compcode == BLOSC_BLOSCLZ)
353
0
    code = BLOSC_BLOSCLZ;
354
0
#if defined(HAVE_LZ4)
355
0
  else if (compcode == BLOSC_LZ4)
356
0
    code = BLOSC_LZ4;
357
0
  else if (compcode == BLOSC_LZ4HC)
358
0
    code = BLOSC_LZ4HC;
359
0
#endif /*  HAVE_LZ4 */
360
#if defined(HAVE_SNAPPY)
361
  else if (compcode == BLOSC_SNAPPY)
362
    code = BLOSC_SNAPPY;
363
#endif /*  HAVE_SNAPPY */
364
0
#if defined(HAVE_ZLIB)
365
0
  else if (compcode == BLOSC_ZLIB)
366
0
    code = BLOSC_ZLIB;
367
0
#endif /*  HAVE_ZLIB */
368
0
#if defined(HAVE_ZSTD)
369
0
  else if (compcode == BLOSC_ZSTD)
370
0
    code = BLOSC_ZSTD;
371
0
#endif /*  HAVE_ZSTD */
372
373
0
  return code;
374
0
}
375
376
/* Get the compressor code for the compressor name. -1 if it is not available */
377
int blosc_compname_to_compcode(const char *compname)
378
16.8k
{
379
16.8k
  int code = -1;  /* -1 means non-existent compressor code */
380
381
16.8k
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
382
665
    code = BLOSC_BLOSCLZ;
383
665
  }
384
16.2k
#if defined(HAVE_LZ4)
385
16.2k
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
386
551
    code = BLOSC_LZ4;
387
551
  }
388
15.6k
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
389
1.82k
    code = BLOSC_LZ4HC;
390
1.82k
  }
391
13.8k
#endif /*  HAVE_LZ4 */
392
#if defined(HAVE_SNAPPY)
393
  else if (strcmp(compname, BLOSC_SNAPPY_COMPNAME) == 0) {
394
    code = BLOSC_SNAPPY;
395
  }
396
#endif /*  HAVE_SNAPPY */
397
13.8k
#if defined(HAVE_ZLIB)
398
13.8k
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
399
1.96k
    code = BLOSC_ZLIB;
400
1.96k
  }
401
11.8k
#endif /*  HAVE_ZLIB */
402
11.8k
#if defined(HAVE_ZSTD)
403
11.8k
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
404
10.4k
    code = BLOSC_ZSTD;
405
10.4k
  }
406
16.8k
#endif /*  HAVE_ZSTD */
407
408
16.8k
return code;
409
16.8k
}
410
411
412
#if defined(HAVE_LZ4)
413
static int lz4_wrap_compress(const char* input, size_t input_length,
414
                             char* output, size_t maxout, int accel)
415
9.88k
{
416
9.88k
  int cbytes;
417
9.88k
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout,
418
9.88k
                             accel);
419
9.88k
  return cbytes;
420
9.88k
}
421
422
static int lz4hc_wrap_compress(const char* input, size_t input_length,
423
                               char* output, size_t maxout, int clevel)
424
14.5k
{
425
14.5k
  int cbytes;
426
14.5k
  if (input_length > (size_t)(UINT32_C(2)<<30))
427
0
    return -1;   /* input larger than 2 GB is not supported */
428
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
429
   * but levels larger than 9 do not buy much compression. */
430
14.5k
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
431
14.5k
                           clevel);
432
14.5k
  return cbytes;
433
14.5k
}
434
435
static int lz4_wrap_decompress(const void* input, int compressed_length,
436
                               void* output, int maxout)
437
3.02k
{
438
3.02k
  return LZ4_decompress_safe(input, output, compressed_length, maxout);
439
3.02k
}
440
441
#endif /* HAVE_LZ4 */
442
443
#if defined(HAVE_SNAPPY)
444
static int snappy_wrap_compress(const char* input, size_t input_length,
445
                                char* output, size_t maxout)
446
{
447
  snappy_status status;
448
  size_t cl = maxout;
449
  status = snappy_compress(input, input_length, output, &cl);
450
  if (status != SNAPPY_OK){
451
    return 0;
452
  }
453
  return (int)cl;
454
}
455
456
static int snappy_wrap_decompress(const void* input, int compressed_length,
457
                                  void* output, int maxout)
458
{
459
  snappy_status status;
460
  size_t ul = maxout;
461
  status = snappy_uncompress(input, compressed_length, output, &ul);
462
  if (status != SNAPPY_OK){
463
    return 0;
464
  }
465
  return (int)ul;
466
}
467
#endif /* HAVE_SNAPPY */
468
469
#if defined(HAVE_ZLIB)
470
/* zlib is not very respectful with sharing name space with others.
471
 Fortunately, its names do not collide with those already in blosc. */
472
static int zlib_wrap_compress(const char* input, size_t input_length,
473
                              char* output, size_t maxout, int clevel)
474
23.1k
{
475
23.1k
  int status;
476
23.1k
  uLongf cl = maxout;
477
23.1k
  status = compress2(
478
23.1k
             (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
479
23.1k
  if (status != Z_OK){
480
3.60k
    return 0;
481
3.60k
  }
482
19.5k
  return (int)cl;
483
23.1k
}
484
485
static int zlib_wrap_decompress(const void* input, int compressed_length,
486
3.52k
                                void* output, int maxout) {
487
3.52k
  int status;
488
3.52k
  uLongf ul = maxout;
489
3.52k
  status = uncompress(
490
3.52k
             (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
491
3.52k
  if (status != Z_OK){
492
1.42k
    return 0;
493
1.42k
  }
494
2.10k
  return (int)ul;
495
3.52k
}
496
#endif /*  HAVE_ZLIB */
497
498
#if defined(HAVE_ZSTD)
499
static int zstd_wrap_compress(const char* input, size_t input_length,
500
126k
                              char* output, size_t maxout, int clevel) {
501
126k
  size_t code;
502
126k
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
503
  /* Make the level 8 close enough to maxCLevel */
504
126k
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
505
126k
  code = ZSTD_compress(
506
126k
      (void*)output, maxout, (void*)input, input_length, clevel);
507
126k
  if (ZSTD_isError(code)) {
508
18.1k
    return 0;
509
18.1k
  }
510
108k
  return (int)code;
511
126k
}
512
513
static int zstd_wrap_decompress(const void* input, int compressed_length,
514
7.03k
                                void* output, int maxout) {
515
7.03k
  size_t code;
516
7.03k
  code = ZSTD_decompress(
517
7.03k
      (void*)output, maxout, (void*)input, compressed_length);
518
7.03k
  if (ZSTD_isError(code)) {
519
3.88k
    return 0;
520
3.88k
  }
521
3.14k
  return (int)code;
522
7.03k
}
523
#endif /*  HAVE_ZSTD */
524
525
8.42k
static int initialize_decompress_func(struct blosc_context* context) {
526
8.42k
  int8_t header_flags = *(context->header_flags);
527
8.42k
  int32_t compformat = (header_flags & 0xe0) >> 5;
528
8.42k
  int compversion = context->compversion;
529
530
8.42k
  if (compformat == BLOSC_BLOSCLZ_FORMAT) {
531
677
    if (compversion != BLOSC_BLOSCLZ_VERSION_FORMAT) {
532
8
      return -9;
533
8
    }
534
669
    context->decompress_func = &blosclz_decompress;
535
669
    return 0;
536
677
  }
537
7.74k
#if defined(HAVE_LZ4)
538
7.74k
  if (compformat == BLOSC_LZ4_FORMAT) {
539
1.15k
    if (compversion != BLOSC_LZ4_VERSION_FORMAT) {
540
8
      return -9;
541
8
    }
542
1.15k
    context->decompress_func = &lz4_wrap_decompress;
543
1.15k
    return 0;
544
1.15k
  }
545
6.58k
#endif /*  HAVE_LZ4 */
546
#if defined(HAVE_SNAPPY)
547
  if (compformat == BLOSC_SNAPPY_FORMAT) {
548
    if (compversion != BLOSC_SNAPPY_VERSION_FORMAT) {
549
      return -9;
550
    }
551
    context->decompress_func = &snappy_wrap_decompress;
552
    return 0;
553
  }
554
#endif /*  HAVE_SNAPPY */
555
6.58k
#if defined(HAVE_ZLIB)
556
6.58k
  if (compformat == BLOSC_ZLIB_FORMAT) {
557
2.07k
    if (compversion != BLOSC_ZLIB_VERSION_FORMAT) {
558
8
      return -9;
559
8
    }
560
2.06k
    context->decompress_func = &zlib_wrap_decompress;
561
2.06k
    return 0;
562
2.07k
  }
563
4.51k
#endif /*  HAVE_ZLIB */
564
4.51k
#if defined(HAVE_ZSTD)
565
4.51k
  if (compformat == BLOSC_ZSTD_FORMAT) {
566
4.50k
    if (compversion != BLOSC_ZSTD_VERSION_FORMAT) {
567
8
      return -9;
568
8
    }
569
4.50k
    context->decompress_func = &zstd_wrap_decompress;
570
4.50k
    return 0;
571
4.50k
  }
572
3
#endif /*  HAVE_ZSTD */
573
3
  return -5; /* signals no decompression support */
574
4.51k
}
575
576
/* Compute acceleration for blosclz */
577
180k
static int get_accel(const struct blosc_context* context) {
578
180k
  int32_t clevel = context->clevel;
579
580
180k
  if (context->compcode == BLOSC_LZ4) {
581
    /* This acceleration setting based on discussions held in:
582
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
583
     */
584
9.88k
    return (10 - clevel);
585
9.88k
  }
586
170k
  return 1;
587
180k
}
588
589
590
/* Shuffle & compress a single block */
591
static int blosc_c(const struct blosc_context* context, int32_t blocksize,
592
                   int32_t leftoverblock, int32_t ntbytes, int32_t maxbytes,
593
                   const uint8_t *src, uint8_t *dest, uint8_t *tmp,
594
                   uint8_t *tmp2)
595
180k
{
596
180k
  int8_t header_flags = *(context->header_flags);
597
180k
  int dont_split = (header_flags & 0x10) >> 4;
598
180k
  int32_t j, neblock, nsplits;
599
180k
  int32_t cbytes;                   /* number of compressed bytes in split */
600
180k
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
601
180k
  int32_t maxout;
602
180k
  int32_t typesize = context->typesize;
603
180k
  const uint8_t *_tmp = src;
604
180k
  const char *compname;
605
180k
  int accel;
606
180k
  int bscount;
607
180k
  int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
608
180k
  int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
609
180k
                      (blocksize >= typesize));
610
611
180k
  if (doshuffle) {
612
    /* Byte shuffling only makes sense if typesize > 1 */
613
0
    blosc_internal_shuffle(typesize, blocksize, src, tmp);
614
0
    _tmp = tmp;
615
0
  }
616
  /* We don't allow more than 1 filter at the same time (yet) */
617
180k
  else if (dobitshuffle) {
618
82.4k
    bscount = blosc_internal_bitshuffle(typesize, blocksize, src, tmp, tmp2);
619
82.4k
    if (bscount < 0)
620
0
      return bscount;
621
82.4k
    _tmp = tmp;
622
82.4k
  }
623
624
  /* Calculate acceleration for different compressors */
625
180k
  accel = get_accel(context);
626
627
  /* The number of splits for this block */
628
180k
  if (!dont_split && !leftoverblock) {
629
11.5k
    nsplits = typesize;
630
11.5k
  }
631
168k
  else {
632
168k
    nsplits = 1;
633
168k
  }
634
180k
  neblock = blocksize / nsplits;
635
359k
  for (j = 0; j < nsplits; j++) {
636
180k
    dest += sizeof(int32_t);
637
180k
    ntbytes += (int32_t)sizeof(int32_t);
638
180k
    ctbytes += (int32_t)sizeof(int32_t);
639
180k
    maxout = neblock;
640
    #if defined(HAVE_SNAPPY)
641
    if (context->compcode == BLOSC_SNAPPY) {
642
      /* TODO perhaps refactor this to keep the value stashed somewhere */
643
      maxout = snappy_max_compressed_length(neblock);
644
    }
645
    #endif /*  HAVE_SNAPPY */
646
180k
    if (ntbytes+maxout > maxbytes) {
647
9.99k
      maxout = maxbytes - ntbytes;   /* avoid buffer overrun */
648
9.99k
      if (maxout <= 0) {
649
8
        return 0;                  /* non-compressible block */
650
8
      }
651
9.99k
    }
652
180k
    if (context->compcode == BLOSC_BLOSCLZ) {
653
6.08k
      cbytes = blosclz_compress(context->clevel, _tmp+j*neblock, neblock,
654
6.08k
                                dest, maxout, !dont_split);
655
6.08k
    }
656
174k
    #if defined(HAVE_LZ4)
657
174k
    else if (context->compcode == BLOSC_LZ4) {
658
9.88k
      cbytes = lz4_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
659
9.88k
                                 (char *)dest, (size_t)maxout, accel);
660
9.88k
    }
661
164k
    else if (context->compcode == BLOSC_LZ4HC) {
662
14.5k
      cbytes = lz4hc_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
663
14.5k
                                   (char *)dest, (size_t)maxout,
664
14.5k
                                   context->clevel);
665
14.5k
    }
666
149k
    #endif /* HAVE_LZ4 */
667
    #if defined(HAVE_SNAPPY)
668
    else if (context->compcode == BLOSC_SNAPPY) {
669
      cbytes = snappy_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
670
                                    (char *)dest, (size_t)maxout);
671
    }
672
    #endif /* HAVE_SNAPPY */
673
149k
    #if defined(HAVE_ZLIB)
674
149k
    else if (context->compcode == BLOSC_ZLIB) {
675
23.1k
      cbytes = zlib_wrap_compress((char *)_tmp+j*neblock, (size_t)neblock,
676
23.1k
                                  (char *)dest, (size_t)maxout,
677
23.1k
                                  context->clevel);
678
23.1k
    }
679
126k
    #endif /* HAVE_ZLIB */
680
126k
    #if defined(HAVE_ZSTD)
681
126k
    else if (context->compcode == BLOSC_ZSTD) {
682
126k
      cbytes = zstd_wrap_compress((char*)_tmp + j * neblock, (size_t)neblock,
683
126k
                                  (char*)dest, (size_t)maxout, context->clevel);
684
126k
    }
685
0
    #endif /* HAVE_ZSTD */
686
687
0
    else {
688
0
      blosc_compcode_to_compname(context->compcode, &compname);
689
0
      if (compname == NULL) {
690
0
          compname = "(null)";
691
0
      }
692
0
      fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
693
0
      fprintf(stderr, "compression support.  Please use one having it.");
694
0
      return -5;    /* signals no compression support */
695
0
    }
696
697
180k
    if (cbytes > maxout) {
698
      /* Buffer overrun caused by compression (should never happen) */
699
0
      return -1;
700
0
    }
701
180k
    else if (cbytes < 0) {
702
      /* cbytes should never be negative */
703
0
      return -2;
704
0
    }
705
180k
    else if (cbytes == 0 || cbytes == neblock) {
706
      /* The compressor has been unable to compress data at all. */
707
      /* Before doing the copy, check that we are not running into a
708
         buffer overflow. */
709
29.2k
      if ((ntbytes+neblock) > maxbytes) {
710
1.46k
        return 0;    /* Non-compressible data */
711
1.46k
      }
712
27.7k
      fastcopy(dest, _tmp + j * neblock, neblock);
713
27.7k
      cbytes = neblock;
714
27.7k
    }
715
179k
    _sw32(dest - 4, cbytes);
716
179k
    dest += cbytes;
717
179k
    ntbytes += cbytes;
718
179k
    ctbytes += cbytes;
719
179k
  }  /* Closes j < nsplits */
720
721
179k
  return ctbytes;
722
180k
}
723
724
/* Decompress & unshuffle a single block */
725
static int blosc_d(struct blosc_context* context, int32_t blocksize,
726
                   int32_t leftoverblock, const uint8_t* base_src,
727
                   int32_t src_offset, uint8_t* dest, uint8_t* tmp,
728
30.5k
                   uint8_t* tmp2) {
729
30.5k
  int8_t header_flags = *(context->header_flags);
730
30.5k
  int dont_split = (header_flags & 0x10) >> 4;
731
30.5k
  int32_t j, neblock, nsplits;
732
30.5k
  int32_t nbytes;                /* number of decompressed bytes in split */
733
30.5k
  const int32_t compressedsize = context->compressedsize;
734
30.5k
  int32_t cbytes;                /* number of compressed bytes in split */
735
30.5k
  int32_t ctbytes = 0;           /* number of compressed bytes in block */
736
30.5k
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
737
30.5k
  uint8_t *_tmp = dest;
738
30.5k
  int32_t typesize = context->typesize;
739
30.5k
  int bscount;
740
30.5k
  int doshuffle = (header_flags & BLOSC_DOSHUFFLE) && (typesize > 1);
741
30.5k
  int dobitshuffle = ((header_flags & BLOSC_DOBITSHUFFLE) &&
742
30.5k
                      (blocksize >= typesize));
743
30.5k
  const uint8_t* src;
744
745
30.5k
  if (doshuffle || dobitshuffle) {
746
22.5k
    _tmp = tmp;
747
22.5k
  }
748
749
  /* The number of splits for this block */
750
30.5k
  if (!dont_split &&
751
      /* For compatibility with before the introduction of the split flag */
752
30.5k
      ((typesize <= MAX_SPLITS) && (blocksize/typesize) >= MIN_BUFFERSIZE) &&
753
30.5k
      !leftoverblock) {
754
891
    nsplits = typesize;
755
891
  }
756
29.6k
  else {
757
29.6k
    nsplits = 1;
758
29.6k
  }
759
760
30.5k
  neblock = blocksize / nsplits;
761
54.2k
  for (j = 0; j < nsplits; j++) {
762
    /* Validate src_offset */
763
30.5k
    if (src_offset < 0 || src_offset > compressedsize - sizeof(int32_t)) {
764
361
      return -1;
765
361
    }
766
30.2k
    cbytes = sw32_(base_src + src_offset); /* amount of compressed bytes */
767
30.2k
    src_offset += sizeof(int32_t);
768
    /* Validate cbytes */
769
30.2k
    if (cbytes < 0 || cbytes > context->compressedsize - src_offset) {
770
131
      return -1;
771
131
    }
772
30.1k
    ctbytes += (int32_t)sizeof(int32_t);
773
30.1k
    src = base_src + src_offset;
774
    /* Uncompress */
775
30.1k
    if (cbytes == neblock) {
776
15.3k
      fastcopy(_tmp, src, neblock);
777
15.3k
      nbytes = neblock;
778
15.3k
    }
779
14.7k
    else {
780
14.7k
      nbytes = context->decompress_func(src, cbytes, _tmp, neblock);
781
      /* Check that decompressed bytes number is correct */
782
14.7k
      if (nbytes != neblock) {
783
6.42k
        return -2;
784
6.42k
      }
785
14.7k
    }
786
23.6k
    src_offset += cbytes;
787
23.6k
    ctbytes += cbytes;
788
23.6k
    _tmp += nbytes;
789
23.6k
    ntbytes += nbytes;
790
23.6k
  } /* Closes j < nsplits */
791
792
23.6k
  if (doshuffle) {
793
3.36k
    blosc_internal_unshuffle(typesize, blocksize, tmp, dest);
794
3.36k
  }
795
20.2k
  else if (dobitshuffle) {
796
14.5k
    bscount = blosc_internal_bitunshuffle(typesize, blocksize, tmp, dest, tmp2);
797
14.5k
    if (bscount < 0)
798
0
      return bscount;
799
14.5k
  }
800
801
  /* Return the number of uncompressed bytes */
802
23.6k
  return ntbytes;
803
23.6k
}
804
805
/* Serial version for compression/decompression */
806
static int serial_blosc(struct blosc_context* context)
807
23.8k
{
808
23.8k
  int32_t j, bsize, leftoverblock;
809
23.8k
  int32_t cbytes;
810
811
23.8k
  int32_t ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
812
23.8k
  int32_t ntbytes = context->num_output_bytes;
813
814
23.8k
  uint8_t *tmp = my_malloc(context->blocksize + ebsize);
815
23.8k
  uint8_t *tmp2 = tmp + context->blocksize;
816
817
229k
  for (j = 0; j < context->nblocks; j++) {
818
214k
    if (context->compress && !(*(context->header_flags) & BLOSC_MEMCPYED)) {
819
180k
      _sw32(context->bstarts + j * 4, ntbytes);
820
180k
    }
821
214k
    bsize = context->blocksize;
822
214k
    leftoverblock = 0;
823
214k
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
824
10.1k
      bsize = context->leftover;
825
10.1k
      leftoverblock = 1;
826
10.1k
    }
827
214k
    if (context->compress) {
828
180k
      if (*(context->header_flags) & BLOSC_MEMCPYED) {
829
        /* We want to memcpy only */
830
0
        fastcopy(context->dest + BLOSC_MAX_OVERHEAD + j * context->blocksize,
831
0
                 context->src + j * context->blocksize, bsize);
832
0
        cbytes = bsize;
833
0
      }
834
180k
      else {
835
        /* Regular compression */
836
180k
        cbytes = blosc_c(context, bsize, leftoverblock, ntbytes,
837
180k
                         context->destsize, context->src+j*context->blocksize,
838
180k
                         context->dest+ntbytes, tmp, tmp2);
839
180k
        if (cbytes == 0) {
840
1.47k
          ntbytes = 0;              /* incompressible data */
841
1.47k
          break;
842
1.47k
        }
843
180k
      }
844
180k
    }
845
33.6k
    else {
846
33.6k
      if (*(context->header_flags) & BLOSC_MEMCPYED) {
847
        /* We want to memcpy only */
848
3.04k
        fastcopy(context->dest + j * context->blocksize,
849
3.04k
                 context->src + BLOSC_MAX_OVERHEAD + j * context->blocksize, bsize);
850
3.04k
        cbytes = bsize;
851
3.04k
      }
852
30.5k
      else {
853
        /* Regular decompression */
854
30.5k
        cbytes = blosc_d(context, bsize, leftoverblock, context->src,
855
30.5k
                         sw32_(context->bstarts + j * 4),
856
30.5k
                         context->dest + j * context->blocksize, tmp, tmp2);
857
30.5k
      }
858
33.6k
    }
859
212k
    if (cbytes < 0) {
860
6.91k
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
861
6.91k
      break;
862
6.91k
    }
863
205k
    ntbytes += cbytes;
864
205k
  }
865
866
  /* Free temporaries */
867
23.8k
  my_free(tmp);
868
869
23.8k
  return ntbytes;
870
23.8k
}
871
872
873
/* Threaded version for compression/decompression */
874
static int parallel_blosc(struct blosc_context* context)
875
0
{
876
0
  int rc;
877
0
  (void)rc;  // just to avoid 'unused-variable' warning
878
879
  /* Check whether we need to restart threads */
880
0
  if (blosc_set_nthreads_(context) < 0) {
881
0
    return -1;
882
0
  }
883
884
  /* Set sentinels */
885
0
  context->thread_giveup_code = 1;
886
0
  context->thread_nblock = -1;
887
888
  /* Synchronization point for all threads (wait for initialization) */
889
0
  WAIT_INIT(-1, context);
890
891
  /* Synchronization point for all threads (wait for finalization) */
892
0
  WAIT_FINISH(-1, context);
893
894
0
  if (context->thread_giveup_code > 0) {
895
    /* Return the total bytes (de-)compressed in threads */
896
0
    return context->num_output_bytes;
897
0
  }
898
0
  else {
899
    /* Compression/decompression gave up.  Return error code. */
900
0
    return context->thread_giveup_code;
901
0
  }
902
0
}
903
904
905
/* Do the compression or decompression of the buffer depending on the
906
   global params. */
907
static int do_job(struct blosc_context* context)
908
23.8k
{
909
23.8k
  int32_t ntbytes;
910
911
  /* Run the serial version when nthreads is 1 or when the buffers are
912
     not much larger than blocksize */
913
23.8k
  if (context->numthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
914
23.8k
    ntbytes = serial_blosc(context);
915
23.8k
  }
916
0
  else {
917
0
    ntbytes = parallel_blosc(context);
918
0
  }
919
920
23.8k
  return ntbytes;
921
23.8k
}
922
923
924
/* Whether a codec is meant for High Compression Ratios */
925
0
#define HCR(codec) (  \
926
0
             ((codec) == BLOSC_LZ4HC) ||                  \
927
0
             ((codec) == BLOSC_ZLIB) ||                   \
928
0
             ((codec) == BLOSC_ZSTD) ? 1 : 0 )
929
930
931
/* Conditions for splitting a block before compressing with a codec. */
932
30.9k
static int split_block(int compcode, int typesize, int blocksize) {
933
30.9k
  int splitblock = -1;
934
935
30.9k
  switch (g_splitmode) {
936
12.0k
    case BLOSC_ALWAYS_SPLIT:
937
12.0k
      splitblock = 1;
938
12.0k
      break;
939
7.43k
    case BLOSC_NEVER_SPLIT:
940
7.43k
      splitblock = 0;
941
7.43k
      break;
942
4.28k
    case BLOSC_AUTO_SPLIT:
943
      /* Normally all the compressors designed for speed benefit from a
944
         split.  However, in conducted benchmarks LZ4 seems that it runs
945
         faster if we don't split, which is quite surprising. */
946
4.28k
      splitblock= (((compcode == BLOSC_BLOSCLZ) ||
947
4.28k
                    (compcode == BLOSC_SNAPPY)) &&
948
4.28k
                   (typesize <= MAX_SPLITS) &&
949
4.28k
                   (blocksize / typesize) >= MIN_BUFFERSIZE);
950
4.28k
      break;
951
7.14k
    case BLOSC_FORWARD_COMPAT_SPLIT:
952
      /* The zstd support was introduced at the same time than the split flag, so
953
       * there should be not a problem with not splitting bloscks with it */
954
7.14k
      splitblock = ((compcode != BLOSC_ZSTD) &&
955
7.14k
                    (typesize <= MAX_SPLITS) &&
956
7.14k
                    (blocksize / typesize) >= MIN_BUFFERSIZE);
957
7.14k
      break;
958
0
    default:
959
0
      fprintf(stderr, "Split mode %d not supported", g_splitmode);
960
30.9k
  }
961
30.9k
  return splitblock;
962
30.9k
}
963
964
965
static int32_t compute_blocksize(struct blosc_context* context, int32_t clevel,
966
                                 int32_t typesize, int32_t nbytes,
967
                                 int32_t forced_blocksize)
968
15.4k
{
969
15.4k
  int32_t blocksize;
970
971
  /* Protection against very small buffers */
972
15.4k
  if (nbytes < (int32_t)typesize) {
973
0
    return 1;
974
0
  }
975
976
15.4k
  blocksize = nbytes;           /* Start by a whole buffer as blocksize */
977
978
15.4k
  if (forced_blocksize) {
979
15.4k
    blocksize = forced_blocksize;
980
    /* Check that forced blocksize is not too small */
981
15.4k
    if (blocksize < MIN_BUFFERSIZE) {
982
0
      blocksize = MIN_BUFFERSIZE;
983
0
    }
984
    /* Check that forced blocksize is not too large */
985
15.4k
    if (blocksize > BLOSC_MAX_BLOCKSIZE) {
986
0
      blocksize = BLOSC_MAX_BLOCKSIZE;
987
0
    }
988
15.4k
  }
989
0
  else if (nbytes >= L1) {
990
0
    blocksize = L1;
991
992
    /* For HCR codecs, increase the block sizes by a factor of 2 because they
993
       are meant for compressing large blocks (i.e. they show a big overhead
994
       when compressing small ones). */
995
0
    if (HCR(context->compcode)) {
996
0
      blocksize *= 2;
997
0
    }
998
999
0
    switch (clevel) {
1000
0
      case 0:
1001
        /* Case of plain copy */
1002
0
        blocksize /= 4;
1003
0
        break;
1004
0
      case 1:
1005
0
        blocksize /= 2;
1006
0
        break;
1007
0
      case 2:
1008
0
        blocksize *= 1;
1009
0
        break;
1010
0
      case 3:
1011
0
        blocksize *= 2;
1012
0
        break;
1013
0
      case 4:
1014
0
      case 5:
1015
0
        blocksize *= 4;
1016
0
        break;
1017
0
      case 6:
1018
0
      case 7:
1019
0
      case 8:
1020
0
        blocksize *= 8;
1021
0
        break;
1022
0
      case 9:
1023
0
        blocksize *= 8;
1024
0
        if (HCR(context->compcode)) {
1025
0
          blocksize *= 2;
1026
0
        }
1027
0
        break;
1028
0
      default:
1029
0
        assert(0);
1030
0
        break;
1031
0
    }
1032
0
  }
1033
1034
  /* Enlarge the blocksize for splittable codecs */
1035
15.4k
  if (clevel > 0 && split_block(context->compcode, typesize, blocksize)) {
1036
7.19k
    if (blocksize > (1 << 18)) {
1037
      /* Do not use a too large split buffer (> 256 KB) for splitting codecs */
1038
0
      blocksize = (1 << 18);
1039
0
    }
1040
7.19k
    blocksize *= typesize;
1041
7.19k
    if (blocksize < (1 << 16)) {
1042
      /* Do not use a too small blocksize (< 64 KB) when typesize is small */
1043
7.19k
      blocksize = (1 << 16);
1044
7.19k
    }
1045
7.19k
    if (blocksize > 1024 * 1024) {
1046
      /* But do not exceed 1 MB per thread (having this capacity in L3 is normal in modern CPUs) */
1047
0
      blocksize = 1024 * 1024;
1048
0
    }
1049
1050
7.19k
  }
1051
1052
  /* Check that blocksize is not too large */
1053
15.4k
  if (blocksize > (int32_t)nbytes) {
1054
9.27k
    blocksize = nbytes;
1055
9.27k
  }
1056
1057
  /* blocksize *must absolutely* be a multiple of the typesize */
1058
15.4k
  if (blocksize > typesize) {
1059
15.4k
    blocksize = blocksize / typesize * typesize;
1060
15.4k
  }
1061
1062
15.4k
  return blocksize;
1063
15.4k
}
1064
1065
static int initialize_context_compression(struct blosc_context* context,
1066
                          int clevel,
1067
                          int doshuffle,
1068
                          size_t typesize,
1069
                          size_t sourcesize,
1070
                          const void* src,
1071
                          void* dest,
1072
                          size_t destsize,
1073
                          int32_t compressor,
1074
                          int32_t blocksize,
1075
                          int32_t numthreads)
1076
15.4k
{
1077
15.4k
  char *envvar = NULL;
1078
15.4k
  int warnlvl = 0;
1079
  /* Set parameters */
1080
15.4k
  context->compress = 1;
1081
15.4k
  context->src = (const uint8_t*)src;
1082
15.4k
  context->dest = (uint8_t *)(dest);
1083
15.4k
  context->num_output_bytes = 0;
1084
15.4k
  context->destsize = (int32_t)destsize;
1085
15.4k
  context->sourcesize = sourcesize;
1086
15.4k
  context->typesize = typesize;
1087
15.4k
  context->compcode = compressor;
1088
15.4k
  context->numthreads = numthreads;
1089
15.4k
  context->end_threads = 0;
1090
15.4k
  context->clevel = clevel;
1091
1092
15.4k
  envvar = getenv("BLOSC_WARN");
1093
15.4k
  if (envvar != NULL) {
1094
0
    warnlvl = strtol(envvar, NULL, 10);
1095
0
  }
1096
1097
  /* Check buffer size limits */
1098
15.4k
  if (sourcesize > BLOSC_MAX_BUFFERSIZE) {
1099
0
    if (warnlvl > 0) {
1100
0
      fprintf(stderr, "Input buffer size cannot exceed %d bytes\n",
1101
0
              BLOSC_MAX_BUFFERSIZE);
1102
0
    }
1103
0
    return 0;
1104
0
  }
1105
15.4k
  if (destsize < BLOSC_MAX_OVERHEAD) {
1106
19
    if (warnlvl > 0) {
1107
0
      fprintf(stderr, "Output buffer size should be larger than %d bytes\n",
1108
0
              BLOSC_MAX_OVERHEAD);
1109
0
    }
1110
19
    return 0;
1111
19
  }
1112
1113
  /* Compression level */
1114
15.4k
  if (clevel < 0 || clevel > 9) {
1115
0
    fprintf(stderr, "`clevel` parameter must be between 0 and 9!\n");
1116
0
    return -10;
1117
0
  }
1118
1119
  /* Shuffle */
1120
15.4k
  if (doshuffle != 0 && doshuffle != 1 && doshuffle != 2) {
1121
0
    fprintf(stderr, "`shuffle` parameter must be either 0, 1 or 2!\n");
1122
0
    return -10;
1123
0
  }
1124
1125
  /* Check typesize limits */
1126
15.4k
  if (context->typesize > BLOSC_MAX_TYPESIZE) {
1127
    /* If typesize is too large, treat buffer as an 1-byte stream. */
1128
0
    context->typesize = 1;
1129
0
  }
1130
1131
  /* Get the blocksize */
1132
15.4k
  context->blocksize = compute_blocksize(context, clevel, (int32_t)context->typesize, context->sourcesize, blocksize);
1133
1134
  /* Compute number of blocks in buffer */
1135
15.4k
  context->nblocks = context->sourcesize / context->blocksize;
1136
15.4k
  context->leftover = context->sourcesize % context->blocksize;
1137
15.4k
  context->nblocks = (context->leftover > 0) ? (context->nblocks + 1) : context->nblocks;
1138
1139
15.4k
  return 1;
1140
15.4k
}
1141
1142
1143
static int write_compression_header(struct blosc_context* context, int clevel, int doshuffle)
1144
15.4k
{
1145
15.4k
  int32_t compformat;
1146
15.4k
  int dont_split;
1147
1148
  /* Write version header for this block */
1149
15.4k
  context->dest[0] = BLOSC_VERSION_FORMAT;           /* blosc format version */
1150
1151
  /* Write compressor format */
1152
15.4k
  compformat = -1;
1153
15.4k
  switch (context->compcode)
1154
15.4k
  {
1155
658
  case BLOSC_BLOSCLZ:
1156
658
    compformat = BLOSC_BLOSCLZ_FORMAT;
1157
658
    context->dest[1] = BLOSC_BLOSCLZ_VERSION_FORMAT; /* blosclz format version */
1158
658
    break;
1159
1160
0
#if defined(HAVE_LZ4)
1161
548
  case BLOSC_LZ4:
1162
548
    compformat = BLOSC_LZ4_FORMAT;
1163
548
    context->dest[1] = BLOSC_LZ4_VERSION_FORMAT;  /* lz4 format version */
1164
548
    break;
1165
1.82k
  case BLOSC_LZ4HC:
1166
1.82k
    compformat = BLOSC_LZ4HC_FORMAT;
1167
1.82k
    context->dest[1] = BLOSC_LZ4HC_VERSION_FORMAT; /* lz4hc is the same as lz4 */
1168
1.82k
    break;
1169
0
#endif /* HAVE_LZ4 */
1170
1171
#if defined(HAVE_SNAPPY)
1172
  case BLOSC_SNAPPY:
1173
    compformat = BLOSC_SNAPPY_FORMAT;
1174
    context->dest[1] = BLOSC_SNAPPY_VERSION_FORMAT;    /* snappy format version */
1175
    break;
1176
#endif /* HAVE_SNAPPY */
1177
1178
0
#if defined(HAVE_ZLIB)
1179
1.96k
  case BLOSC_ZLIB:
1180
1.96k
    compformat = BLOSC_ZLIB_FORMAT;
1181
1.96k
    context->dest[1] = BLOSC_ZLIB_VERSION_FORMAT;      /* zlib format version */
1182
1.96k
    break;
1183
0
#endif /* HAVE_ZLIB */
1184
1185
0
#if defined(HAVE_ZSTD)
1186
10.4k
  case BLOSC_ZSTD:
1187
10.4k
    compformat = BLOSC_ZSTD_FORMAT;
1188
10.4k
    context->dest[1] = BLOSC_ZSTD_VERSION_FORMAT;      /* zstd format version */
1189
10.4k
    break;
1190
0
#endif /* HAVE_ZSTD */
1191
1192
0
  default:
1193
0
  {
1194
0
    const char *compname;
1195
0
    compname = clibcode_to_clibname(compformat);
1196
0
    if (compname == NULL) {
1197
0
        compname = "(null)";
1198
0
    }
1199
0
    fprintf(stderr, "Blosc has not been compiled with '%s' ", compname);
1200
0
    fprintf(stderr, "compression support.  Please use one having it.");
1201
0
    return -5;    /* signals no compression support */
1202
0
    break;
1203
0
  }
1204
15.4k
  }
1205
1206
15.4k
  context->header_flags = context->dest+2;  /* flags */
1207
15.4k
  context->dest[2] = 0;  /* zeroes flags */
1208
15.4k
  context->dest[3] = (uint8_t)context->typesize;  /* type size */
1209
15.4k
  _sw32(context->dest + 4, context->sourcesize);  /* size of the buffer */
1210
15.4k
  _sw32(context->dest + 8, context->blocksize);  /* block size */
1211
15.4k
  context->bstarts = context->dest + 16;  /* starts for every block */
1212
15.4k
  context->num_output_bytes = 16 + sizeof(int32_t)*context->nblocks;  /* space for header and pointers */
1213
1214
15.4k
  if (context->clevel == 0) {
1215
    /* Compression level 0 means buffer to be memcpy'ed */
1216
11
    *(context->header_flags) |= BLOSC_MEMCPYED;
1217
11
    context->num_output_bytes = 16;      /* space just for header */
1218
11
  }
1219
1220
15.4k
  if (context->sourcesize < MIN_BUFFERSIZE) {
1221
    /* Buffer is too small.  Try memcpy'ing. */
1222
47
    *(context->header_flags) |= BLOSC_MEMCPYED;
1223
47
    context->num_output_bytes = 16;      /* space just for header */
1224
47
  }
1225
1226
15.4k
  if (doshuffle == BLOSC_SHUFFLE) {
1227
    /* Byte-shuffle is active */
1228
3.60k
    *(context->header_flags) |= BLOSC_DOSHUFFLE;     /* bit 0 set to one in flags */
1229
3.60k
  }
1230
1231
15.4k
  if (doshuffle == BLOSC_BITSHUFFLE) {
1232
    /* Bit-shuffle is active */
1233
6.58k
    *(context->header_flags) |= BLOSC_DOBITSHUFFLE;  /* bit 2 set to one in flags */
1234
6.58k
  }
1235
1236
15.4k
  dont_split = !split_block(context->compcode, context->typesize,
1237
15.4k
                            context->blocksize);
1238
15.4k
  *(context->header_flags) |= dont_split << 4;  /* dont_split is in bit 4 */
1239
15.4k
  *(context->header_flags) |= compformat << 5;  /* compressor format starts at bit 5 */
1240
1241
15.4k
  return 1;
1242
15.4k
}
1243
1244
1245
int blosc_compress_context(struct blosc_context* context)
1246
15.4k
{
1247
15.4k
  int32_t ntbytes = 0;
1248
1249
15.4k
  if ((*(context->header_flags) & BLOSC_MEMCPYED) &&
1250
15.4k
      (context->sourcesize + BLOSC_MAX_OVERHEAD > context->destsize)) {
1251
55
    return 0;   /* data cannot be copied without overrun destination */
1252
55
  }
1253
1254
  /* Do the actual compression */
1255
15.4k
  ntbytes = do_job(context);
1256
15.4k
  if (ntbytes < 0) {
1257
0
    return -1;
1258
0
  }
1259
15.4k
  if ((ntbytes == 0) && (context->sourcesize + BLOSC_MAX_OVERHEAD <= context->destsize)) {
1260
    /* Last chance for fitting `src` buffer in `dest`.  Update flags and force a copy. */
1261
0
    *(context->header_flags) |= BLOSC_MEMCPYED;
1262
0
    context->num_output_bytes = BLOSC_MAX_OVERHEAD;  /* reset the output bytes in previous step */
1263
0
    ntbytes = do_job(context);
1264
0
    if (ntbytes < 0) {
1265
0
      return -1;
1266
0
    }
1267
0
  }
1268
1269
  /* Set the number of compressed bytes in header */
1270
15.4k
  _sw32(context->dest + 12, ntbytes);
1271
1272
15.4k
  assert(ntbytes <= context->destsize);
1273
15.4k
  return ntbytes;
1274
15.4k
}
1275
1276
/* The public routine for compression with context. */
1277
int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
1278
                       size_t nbytes, const void* src, void* dest,
1279
                       size_t destsize, const char* compressor,
1280
                       size_t blocksize, int numinternalthreads)
1281
0
{
1282
0
  int error, result;
1283
0
  struct blosc_context context;
1284
1285
0
  context.threads_started = 0;
1286
0
  error = initialize_context_compression(&context, clevel, doshuffle, typesize,
1287
0
           nbytes, src, dest, destsize,
1288
0
           blosc_compname_to_compcode(compressor),
1289
0
           blocksize, numinternalthreads);
1290
0
  if (error <= 0) { return error; }
1291
1292
0
  error = write_compression_header(&context, clevel, doshuffle);
1293
0
  if (error <= 0) { return error; }
1294
1295
0
  result = blosc_compress_context(&context);
1296
1297
0
  if (numinternalthreads > 1)
1298
0
  {
1299
0
    blosc_release_threadpool(&context);
1300
0
  }
1301
1302
0
  return result;
1303
0
}
1304
1305
/* The public routine for compression.  See blosc.h for docstrings. */
1306
int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
1307
                   const void *src, void *dest, size_t destsize)
1308
15.4k
{
1309
15.4k
  int result;
1310
15.4k
  char* envvar;
1311
1312
  /* Check if should initialize */
1313
15.4k
  if (!g_initlib) blosc_init();
1314
1315
  /* Check for environment variables */
1316
15.4k
  envvar = getenv("BLOSC_CLEVEL");
1317
15.4k
  if (envvar != NULL) {
1318
0
    long value;
1319
0
    value = strtol(envvar, NULL, 10);
1320
0
    if ((value != EINVAL) && (value >= 0)) {
1321
0
      clevel = (int)value;
1322
0
    }
1323
0
  }
1324
1325
15.4k
  envvar = getenv("BLOSC_SHUFFLE");
1326
15.4k
  if (envvar != NULL) {
1327
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
1328
0
      doshuffle = BLOSC_NOSHUFFLE;
1329
0
    }
1330
0
    if (strcmp(envvar, "SHUFFLE") == 0) {
1331
0
      doshuffle = BLOSC_SHUFFLE;
1332
0
    }
1333
0
    if (strcmp(envvar, "BITSHUFFLE") == 0) {
1334
0
      doshuffle = BLOSC_BITSHUFFLE;
1335
0
    }
1336
0
  }
1337
1338
15.4k
  envvar = getenv("BLOSC_TYPESIZE");
1339
15.4k
  if (envvar != NULL) {
1340
0
    long value;
1341
0
    value = strtol(envvar, NULL, 10);
1342
0
    if ((value != EINVAL) && (value > 0)) {
1343
0
      typesize = (int)value;
1344
0
    }
1345
0
  }
1346
1347
15.4k
  envvar = getenv("BLOSC_COMPRESSOR");
1348
15.4k
  if (envvar != NULL) {
1349
0
    result = blosc_set_compressor(envvar);
1350
0
    if (result < 0) { return result; }
1351
0
  }
1352
1353
15.4k
  envvar = getenv("BLOSC_BLOCKSIZE");
1354
15.4k
  if (envvar != NULL) {
1355
0
    long blocksize;
1356
0
    blocksize = strtol(envvar, NULL, 10);
1357
0
    if ((blocksize != EINVAL) && (blocksize > 0)) {
1358
0
      blosc_set_blocksize((size_t)blocksize);
1359
0
    }
1360
0
  }
1361
1362
15.4k
  envvar = getenv("BLOSC_NTHREADS");
1363
15.4k
  if (envvar != NULL) {
1364
0
    long nthreads;
1365
0
    nthreads = strtol(envvar, NULL, 10);
1366
0
    if ((nthreads != EINVAL) && (nthreads > 0)) {
1367
0
      result = blosc_set_nthreads((int)nthreads);
1368
0
      if (result < 0) { return result; }
1369
0
    }
1370
0
  }
1371
1372
15.4k
  envvar = getenv("BLOSC_SPLITMODE");
1373
15.4k
  if (envvar != NULL) {
1374
0
    if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
1375
0
      blosc_set_splitmode(BLOSC_FORWARD_COMPAT_SPLIT);
1376
0
    }
1377
0
    else if (strcmp(envvar, "AUTO") == 0) {
1378
0
      blosc_set_splitmode(BLOSC_AUTO_SPLIT);
1379
0
    }
1380
0
    else if (strcmp(envvar, "ALWAYS") == 0) {
1381
0
      blosc_set_splitmode(BLOSC_ALWAYS_SPLIT);
1382
0
    }
1383
0
    else if (strcmp(envvar, "NEVER") == 0) {
1384
0
      blosc_set_splitmode(BLOSC_NEVER_SPLIT);
1385
0
    }
1386
0
    else {
1387
0
      fprintf(stderr, "BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
1388
0
      return -1;
1389
0
    }
1390
0
  }
1391
1392
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
1393
     that this should be the last env var so that it can take the
1394
     previous ones into account */
1395
15.4k
  envvar = getenv("BLOSC_NOLOCK");
1396
15.4k
  if (envvar != NULL) {
1397
0
    const char *compname;
1398
0
    blosc_compcode_to_compname(g_compressor, &compname);
1399
0
    result = blosc_compress_ctx(clevel, doshuffle, typesize,
1400
0
        nbytes, src, dest, destsize,
1401
0
        compname, g_force_blocksize, g_threads);
1402
0
    return result;
1403
0
  }
1404
1405
15.4k
  pthread_mutex_lock(global_comp_mutex);
1406
1407
15.4k
  do {
1408
15.4k
    result = initialize_context_compression(g_global_context, clevel, doshuffle,
1409
15.4k
                                           typesize, nbytes, src, dest, destsize,
1410
15.4k
                                           g_compressor, g_force_blocksize,
1411
15.4k
                                           g_threads);
1412
15.4k
    if (result <= 0) { break; }
1413
1414
15.4k
    result = write_compression_header(g_global_context, clevel, doshuffle);
1415
15.4k
    if (result <= 0) { break; }
1416
1417
15.4k
    result = blosc_compress_context(g_global_context);
1418
15.4k
  } while (0);
1419
1420
0
  pthread_mutex_unlock(global_comp_mutex);
1421
1422
15.4k
  return result;
1423
15.4k
}
1424
1425
static int blosc_run_decompression_with_context(struct blosc_context* context,
1426
                                                const void* src,
1427
                                                void* dest,
1428
                                                size_t destsize,
1429
                                                int numinternalthreads)
1430
21.4k
{
1431
21.4k
  uint8_t version;
1432
21.4k
  int32_t ntbytes;
1433
1434
21.4k
  context->compress = 0;
1435
21.4k
  context->src = (const uint8_t*)src;
1436
21.4k
  context->dest = (uint8_t*)dest;
1437
21.4k
  context->destsize = destsize;
1438
21.4k
  context->num_output_bytes = 0;
1439
21.4k
  context->numthreads = numinternalthreads;
1440
21.4k
  context->end_threads = 0;
1441
1442
  /* Read the header block */
1443
21.4k
  version = context->src[0];                        /* blosc format version */
1444
21.4k
  context->compversion = context->src[1];
1445
1446
21.4k
  context->header_flags = (uint8_t*)(context->src + 2);           /* flags */
1447
21.4k
  context->typesize = (int32_t)context->src[3];      /* typesize */
1448
21.4k
  context->sourcesize = sw32_(context->src + 4);     /* buffer size */
1449
21.4k
  context->blocksize = sw32_(context->src + 8);      /* block size */
1450
21.4k
  context->compressedsize = sw32_(context->src + 12); /* compressed buffer size */
1451
21.4k
  context->bstarts = (uint8_t*)(context->src + 16);
1452
1453
21.4k
  if (context->sourcesize == 0) {
1454
    /* Source buffer was empty, so we are done */
1455
0
    return 0;
1456
0
  }
1457
1458
21.4k
  if (context->blocksize <= 0 || context->blocksize > destsize ||
1459
21.4k
      context->blocksize > BLOSC_MAX_BLOCKSIZE || context->typesize <= 0 ||
1460
21.4k
      context->typesize > BLOSC_MAX_TYPESIZE) {
1461
9.99k
    return -1;
1462
9.99k
  }
1463
1464
11.4k
  if (version != BLOSC_VERSION_FORMAT) {
1465
    /* Version from future */
1466
0
    return -1;
1467
0
  }
1468
11.4k
  if (*context->header_flags & 0x08) {
1469
    /* compressor flags from the future */
1470
2
    return -1;
1471
2
  }
1472
1473
  /* Compute some params */
1474
  /* Total blocks */
1475
11.4k
  context->nblocks = context->sourcesize / context->blocksize;
1476
11.4k
  context->leftover = context->sourcesize % context->blocksize;
1477
11.4k
  context->nblocks = (context->leftover>0)? context->nblocks+1: context->nblocks;
1478
1479
  /* Check that we have enough space to decompress */
1480
11.4k
  if (context->sourcesize > (int32_t)destsize) {
1481
2.91k
    return -1;
1482
2.91k
  }
1483
1484
8.48k
  if (*(context->header_flags) & BLOSC_MEMCPYED) {
1485
    /* Validate that compressed size is equal to decompressed size + header
1486
       size. */
1487
63
    if (context->sourcesize + BLOSC_MAX_OVERHEAD != context->compressedsize) {
1488
12
      return -1;
1489
12
    }
1490
8.42k
  } else {
1491
8.42k
    ntbytes = initialize_decompress_func(context);
1492
8.42k
    if (ntbytes != 0) return ntbytes;
1493
1494
    /* Validate that compressed size is large enough to hold the bstarts array */
1495
8.38k
    if (context->nblocks > (context->compressedsize - 16) / 4) {
1496
23
      return -1;
1497
23
    }
1498
8.38k
  }
1499
1500
  /* Do the actual decompression */
1501
8.41k
  ntbytes = do_job(context);
1502
8.41k
  if (ntbytes < 0) {
1503
6.91k
    return -1;
1504
6.91k
  }
1505
1506
1.49k
  assert(ntbytes <= (int32_t)destsize);
1507
1.49k
  return ntbytes;
1508
8.41k
}
1509
1510
int blosc_decompress_ctx(const void* src, void* dest, size_t destsize,
1511
0
                         int numinternalthreads) {
1512
0
  int result;
1513
0
  struct blosc_context context;
1514
1515
0
  context.threads_started = 0;
1516
0
  result = blosc_run_decompression_with_context(&context, src, dest, destsize,
1517
0
                                                numinternalthreads);
1518
1519
0
  if (numinternalthreads > 1)
1520
0
  {
1521
0
    blosc_release_threadpool(&context);
1522
0
  }
1523
1524
0
  return result;
1525
0
}
1526
1527
21.4k
int blosc_decompress(const void* src, void* dest, size_t destsize) {
1528
21.4k
  int result;
1529
21.4k
  char* envvar;
1530
21.4k
  long nthreads;
1531
1532
  /* Check if should initialize */
1533
21.4k
  if (!g_initlib) blosc_init();
1534
1535
  /* Check for a BLOSC_NTHREADS environment variable */
1536
21.4k
  envvar = getenv("BLOSC_NTHREADS");
1537
21.4k
  if (envvar != NULL) {
1538
0
    nthreads = strtol(envvar, NULL, 10);
1539
0
    if ((nthreads != EINVAL) && (nthreads > 0)) {
1540
0
      result = blosc_set_nthreads((int)nthreads);
1541
0
      if (result < 0) { return result; }
1542
0
    }
1543
0
  }
1544
1545
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
1546
     that this should be the last env var so that it can take the
1547
     previous ones into account */
1548
21.4k
  envvar = getenv("BLOSC_NOLOCK");
1549
21.4k
  if (envvar != NULL) {
1550
0
    result = blosc_decompress_ctx(src, dest, destsize, g_threads);
1551
0
    return result;
1552
0
  }
1553
1554
21.4k
  pthread_mutex_lock(global_comp_mutex);
1555
1556
21.4k
  result = blosc_run_decompression_with_context(g_global_context, src, dest,
1557
21.4k
                                                destsize, g_threads);
1558
1559
21.4k
  pthread_mutex_unlock(global_comp_mutex);
1560
1561
21.4k
  return result;
1562
21.4k
}
1563
1564
0
int blosc_getitem(const void* src, int start, int nitems, void* dest) {
1565
0
  uint8_t *_src=NULL;               /* current pos for source buffer */
1566
0
  uint8_t version, compversion;     /* versions for compressed header */
1567
0
  uint8_t flags;                    /* flags for header */
1568
0
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
1569
0
  int32_t nblocks;                  /* number of total blocks in buffer */
1570
0
  int32_t leftover;                 /* extra bytes at end of buffer */
1571
0
  uint8_t *bstarts;                 /* start pointers for each block */
1572
0
  int32_t typesize, blocksize, nbytes, compressedsize;
1573
0
  int32_t j, bsize, bsize2, leftoverblock;
1574
0
  int32_t cbytes, startb, stopb;
1575
0
  int stop = start + nitems;
1576
0
  uint8_t *tmp;
1577
0
  uint8_t *tmp2;
1578
0
  uint8_t *tmp3;
1579
0
  int32_t ebsize;
1580
0
  struct blosc_context context = {0};
1581
1582
0
  _src = (uint8_t *)(src);
1583
1584
  /* Read the header block */
1585
0
  version = _src[0];                        /* blosc format version */
1586
0
  compversion = _src[1];
1587
0
  flags = _src[2];                          /* flags */
1588
0
  typesize = (int32_t)_src[3];              /* typesize */
1589
0
  nbytes = sw32_(_src + 4);                 /* buffer size */
1590
0
  blocksize = sw32_(_src + 8);              /* block size */
1591
0
  compressedsize = sw32_(_src + 12); /* compressed buffer size */
1592
1593
0
  if (version != BLOSC_VERSION_FORMAT)
1594
0
    return -9;
1595
1596
0
  if (blocksize <= 0 || blocksize > nbytes || blocksize > BLOSC_MAX_BLOCKSIZE ||
1597
0
      typesize <= 0 || typesize > BLOSC_MAX_TYPESIZE) {
1598
0
    return -1;
1599
0
  }
1600
1601
  /* Compute some params */
1602
  /* Total blocks */
1603
0
  nblocks = nbytes / blocksize;
1604
0
  leftover = nbytes % blocksize;
1605
0
  nblocks = (leftover>0)? nblocks+1: nblocks;
1606
1607
  /* Only initialize the fields blosc_d uses */
1608
0
  context.typesize = typesize;
1609
0
  context.header_flags = &flags;
1610
0
  context.compversion = compversion;
1611
0
  context.compressedsize = compressedsize;
1612
0
  if (flags & BLOSC_MEMCPYED) {
1613
0
    if (nbytes + BLOSC_MAX_OVERHEAD != compressedsize) {
1614
0
      return -1;
1615
0
    }
1616
0
  } else {
1617
0
    ntbytes = initialize_decompress_func(&context);
1618
0
    if (ntbytes != 0) return ntbytes;
1619
1620
0
    if (nblocks >= (compressedsize - 16) / 4) {
1621
0
      return -1;
1622
0
    }
1623
0
  }
1624
1625
0
  ebsize = blocksize + typesize * (int32_t)sizeof(int32_t);
1626
0
  tmp = my_malloc(blocksize + ebsize + blocksize);
1627
0
  tmp2 = tmp + blocksize;
1628
0
  tmp3 = tmp + blocksize + ebsize;
1629
1630
0
  _src += 16;
1631
0
  bstarts = _src;
1632
0
  _src += sizeof(int32_t)*nblocks;
1633
1634
  /* Check region boundaries */
1635
0
  if ((start < 0) || (start*typesize > nbytes)) {
1636
0
    fprintf(stderr, "`start` out of bounds");
1637
0
    return -1;
1638
0
  }
1639
1640
0
  if ((stop < 0) || (stop*typesize > nbytes)) {
1641
0
    fprintf(stderr, "`start`+`nitems` out of bounds");
1642
0
    return -1;
1643
0
  }
1644
1645
0
  for (j = 0; j < nblocks; j++) {
1646
0
    bsize = blocksize;
1647
0
    leftoverblock = 0;
1648
0
    if ((j == nblocks - 1) && (leftover > 0)) {
1649
0
      bsize = leftover;
1650
0
      leftoverblock = 1;
1651
0
    }
1652
1653
    /* Compute start & stop for each block */
1654
0
    startb = start * typesize - j * blocksize;
1655
0
    stopb = stop * typesize - j * blocksize;
1656
0
    if ((startb >= (int)blocksize) || (stopb <= 0)) {
1657
0
      continue;
1658
0
    }
1659
0
    if (startb < 0) {
1660
0
      startb = 0;
1661
0
    }
1662
0
    if (stopb > (int)blocksize) {
1663
0
      stopb = blocksize;
1664
0
    }
1665
0
    bsize2 = stopb - startb;
1666
1667
    /* Do the actual data copy */
1668
0
    if (flags & BLOSC_MEMCPYED) {
1669
      /* We want to memcpy only */
1670
0
      fastcopy((uint8_t *) dest + ntbytes,
1671
0
               (uint8_t *) src + BLOSC_MAX_OVERHEAD + j * blocksize + startb, bsize2);
1672
0
      cbytes = bsize2;
1673
0
    }
1674
0
    else {
1675
      /* Regular decompression.  Put results in tmp2. */
1676
0
      cbytes = blosc_d(&context, bsize, leftoverblock,
1677
0
                       (uint8_t *)src, sw32_(bstarts + j * 4),
1678
0
                       tmp2, tmp, tmp3);
1679
0
      if (cbytes < 0) {
1680
0
        ntbytes = cbytes;
1681
0
        break;
1682
0
      }
1683
      /* Copy to destination */
1684
0
      fastcopy((uint8_t *) dest + ntbytes, tmp2 + startb, bsize2);
1685
0
      cbytes = bsize2;
1686
0
    }
1687
0
    ntbytes += cbytes;
1688
0
  }
1689
1690
0
  my_free(tmp);
1691
1692
0
  return ntbytes;
1693
0
}
1694
1695
/* Decompress & unshuffle several blocks in a single thread */
1696
static void *t_blosc(void *ctxt)
1697
0
{
1698
0
  struct thread_context* context = (struct thread_context*)ctxt;
1699
0
  int32_t cbytes, ntdest;
1700
0
  int32_t tblocks;              /* number of blocks per thread */
1701
0
  int32_t leftover2;
1702
0
  int32_t tblock;               /* limit block on a thread */
1703
0
  int32_t nblock_;              /* private copy of nblock */
1704
0
  int32_t bsize, leftoverblock;
1705
  /* Parameters for threads */
1706
0
  int32_t blocksize;
1707
0
  int32_t ebsize;
1708
0
  int32_t compress;
1709
0
  int32_t maxbytes;
1710
0
  int32_t ntbytes;
1711
0
  int32_t flags;
1712
0
  int32_t nblocks;
1713
0
  int32_t leftover;
1714
0
  uint8_t *bstarts;
1715
0
  const uint8_t *src;
1716
0
  uint8_t *dest;
1717
0
  uint8_t *tmp;
1718
0
  uint8_t *tmp2;
1719
0
  uint8_t *tmp3;
1720
0
  int rc;
1721
0
  (void)rc;  // just to avoid 'unused-variable' warning
1722
1723
0
  while(1)
1724
0
  {
1725
    /* Synchronization point for all threads (wait for initialization) */
1726
0
    WAIT_INIT(NULL, context->parent_context);
1727
1728
0
    if(context->parent_context->end_threads)
1729
0
    {
1730
0
      break;
1731
0
    }
1732
1733
    /* Get parameters for this thread before entering the main loop */
1734
0
    blocksize = context->parent_context->blocksize;
1735
0
    ebsize = blocksize + context->parent_context->typesize * (int32_t)sizeof(int32_t);
1736
0
    compress = context->parent_context->compress;
1737
0
    flags = *(context->parent_context->header_flags);
1738
0
    maxbytes = context->parent_context->destsize;
1739
0
    nblocks = context->parent_context->nblocks;
1740
0
    leftover = context->parent_context->leftover;
1741
0
    bstarts = context->parent_context->bstarts;
1742
0
    src = context->parent_context->src;
1743
0
    dest = context->parent_context->dest;
1744
1745
0
    if (blocksize > context->tmpblocksize)
1746
0
    {
1747
0
      my_free(context->tmp);
1748
0
      context->tmp = my_malloc(blocksize + ebsize + blocksize);
1749
0
      context->tmp2 = context->tmp + blocksize;
1750
0
      context->tmp3 = context->tmp + blocksize + ebsize;
1751
0
    }
1752
1753
0
    tmp = context->tmp;
1754
0
    tmp2 = context->tmp2;
1755
0
    tmp3 = context->tmp3;
1756
1757
0
    ntbytes = 0;                /* only useful for decompression */
1758
1759
0
    if (compress && !(flags & BLOSC_MEMCPYED)) {
1760
      /* Compression always has to follow the block order */
1761
0
      pthread_mutex_lock(&context->parent_context->count_mutex);
1762
0
      context->parent_context->thread_nblock++;
1763
0
      nblock_ = context->parent_context->thread_nblock;
1764
0
      pthread_mutex_unlock(&context->parent_context->count_mutex);
1765
0
      tblock = nblocks;
1766
0
    }
1767
0
    else {
1768
      /* Decompression can happen using any order.  We choose
1769
       sequential block order on each thread */
1770
1771
      /* Blocks per thread */
1772
0
      tblocks = nblocks / context->parent_context->numthreads;
1773
0
      leftover2 = nblocks % context->parent_context->numthreads;
1774
0
      tblocks = (leftover2>0)? tblocks+1: tblocks;
1775
1776
0
      nblock_ = context->tid*tblocks;
1777
0
      tblock = nblock_ + tblocks;
1778
0
      if (tblock > nblocks) {
1779
0
        tblock = nblocks;
1780
0
      }
1781
0
    }
1782
1783
    /* Loop over blocks */
1784
0
    leftoverblock = 0;
1785
0
    while ((nblock_ < tblock) && context->parent_context->thread_giveup_code > 0) {
1786
0
      bsize = blocksize;
1787
0
      if (nblock_ == (nblocks - 1) && (leftover > 0)) {
1788
0
        bsize = leftover;
1789
0
        leftoverblock = 1;
1790
0
      }
1791
0
      if (compress) {
1792
0
        if (flags & BLOSC_MEMCPYED) {
1793
          /* We want to memcpy only */
1794
0
          fastcopy(dest + BLOSC_MAX_OVERHEAD + nblock_ * blocksize,
1795
0
                   src + nblock_ * blocksize, bsize);
1796
0
          cbytes = bsize;
1797
0
        }
1798
0
        else {
1799
          /* Regular compression */
1800
0
          cbytes = blosc_c(context->parent_context, bsize, leftoverblock, 0, ebsize,
1801
0
                           src+nblock_*blocksize, tmp2, tmp, tmp3);
1802
0
        }
1803
0
      }
1804
0
      else {
1805
0
        if (flags & BLOSC_MEMCPYED) {
1806
          /* We want to memcpy only */
1807
0
          fastcopy(dest + nblock_ * blocksize,
1808
0
                   src + BLOSC_MAX_OVERHEAD + nblock_ * blocksize, bsize);
1809
0
          cbytes = bsize;
1810
0
        }
1811
0
        else {
1812
0
          cbytes = blosc_d(context->parent_context, bsize, leftoverblock,
1813
0
                           src, sw32_(bstarts + nblock_ * 4),
1814
0
                           dest+nblock_*blocksize,
1815
0
                           tmp, tmp2);
1816
0
        }
1817
0
      }
1818
1819
      /* Check whether current thread has to giveup */
1820
0
      if (context->parent_context->thread_giveup_code <= 0) {
1821
0
        break;
1822
0
      }
1823
1824
      /* Check results for the compressed/decompressed block */
1825
0
      if (cbytes < 0) {            /* compr/decompr failure */
1826
        /* Set giveup_code error */
1827
0
        pthread_mutex_lock(&context->parent_context->count_mutex);
1828
0
        context->parent_context->thread_giveup_code = cbytes;
1829
0
        pthread_mutex_unlock(&context->parent_context->count_mutex);
1830
0
        break;
1831
0
      }
1832
1833
0
      if (compress && !(flags & BLOSC_MEMCPYED)) {
1834
        /* Start critical section */
1835
0
        pthread_mutex_lock(&context->parent_context->count_mutex);
1836
0
        ntdest = context->parent_context->num_output_bytes;
1837
0
        _sw32(bstarts + nblock_ * 4, ntdest); /* update block start counter */
1838
0
        if ( (cbytes == 0) || (ntdest+cbytes > maxbytes) ) {
1839
0
          context->parent_context->thread_giveup_code = 0;  /* incompressible buffer */
1840
0
          pthread_mutex_unlock(&context->parent_context->count_mutex);
1841
0
          break;
1842
0
        }
1843
0
        context->parent_context->thread_nblock++;
1844
0
        nblock_ = context->parent_context->thread_nblock;
1845
0
        context->parent_context->num_output_bytes += cbytes;           /* update return bytes counter */
1846
0
        pthread_mutex_unlock(&context->parent_context->count_mutex);
1847
        /* End of critical section */
1848
1849
        /* Copy the compressed buffer to destination */
1850
0
        fastcopy(dest + ntdest, tmp2, cbytes);
1851
0
      }
1852
0
      else {
1853
0
        nblock_++;
1854
        /* Update counter for this thread */
1855
0
        ntbytes += cbytes;
1856
0
      }
1857
1858
0
    } /* closes while (nblock_) */
1859
1860
    /* Sum up all the bytes decompressed */
1861
0
    if ((!compress || (flags & BLOSC_MEMCPYED)) && context->parent_context->thread_giveup_code > 0) {
1862
      /* Update global counter for all threads (decompression only) */
1863
0
      pthread_mutex_lock(&context->parent_context->count_mutex);
1864
0
      context->parent_context->num_output_bytes += ntbytes;
1865
0
      pthread_mutex_unlock(&context->parent_context->count_mutex);
1866
0
    }
1867
1868
    /* Meeting point for all threads (wait for finalization) */
1869
0
    WAIT_FINISH(NULL, context->parent_context);
1870
0
  }
1871
1872
  /* Cleanup our working space and context */
1873
0
  my_free(context->tmp);
1874
0
  my_free(context);
1875
1876
0
  return(NULL);
1877
0
}
1878
1879
1880
static int init_threads(struct blosc_context* context)
1881
0
{
1882
0
  int32_t tid;
1883
0
  int rc2;
1884
0
  int32_t ebsize;
1885
0
  struct thread_context* thread_context;
1886
1887
  /* Initialize mutex and condition variable objects */
1888
0
  pthread_mutex_init(&context->count_mutex, NULL);
1889
1890
  /* Set context thread sentinels */
1891
0
  context->thread_giveup_code = 1;
1892
0
  context->thread_nblock = -1;
1893
1894
  /* Barrier initialization */
1895
0
#ifdef _POSIX_BARRIERS_MINE
1896
0
  pthread_barrier_init(&context->barr_init, NULL, context->numthreads+1);
1897
0
  pthread_barrier_init(&context->barr_finish, NULL, context->numthreads+1);
1898
#else
1899
  pthread_mutex_init(&context->count_threads_mutex, NULL);
1900
  pthread_cond_init(&context->count_threads_cv, NULL);
1901
  context->count_threads = 0;      /* Reset threads counter */
1902
#endif
1903
1904
0
#if !defined(_WIN32)
1905
  /* Initialize and set thread detached attribute */
1906
0
  pthread_attr_init(&context->ct_attr);
1907
0
  pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
1908
0
#endif
1909
1910
  /* Finally, create the threads in detached state */
1911
0
  for (tid = 0; tid < context->numthreads; tid++) {
1912
0
    context->tids[tid] = tid;
1913
1914
    /* Create a thread context thread owns context (will destroy when finished) */
1915
0
    thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
1916
0
    thread_context->parent_context = context;
1917
0
    thread_context->tid = tid;
1918
1919
0
    ebsize = context->blocksize + context->typesize * (int32_t)sizeof(int32_t);
1920
0
    thread_context->tmp = my_malloc(context->blocksize + ebsize + context->blocksize);
1921
0
    thread_context->tmp2 = thread_context->tmp + context->blocksize;
1922
0
    thread_context->tmp3 = thread_context->tmp + context->blocksize + ebsize;
1923
0
    thread_context->tmpblocksize = context->blocksize;
1924
1925
0
#if !defined(_WIN32)
1926
0
    rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc, (void *)thread_context);
1927
#else
1928
    rc2 = pthread_create(&context->threads[tid], NULL, t_blosc, (void *)thread_context);
1929
#endif
1930
0
    if (rc2) {
1931
0
      fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", rc2);
1932
0
      fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
1933
0
      return(-1);
1934
0
    }
1935
0
  }
1936
1937
1938
0
  return(0);
1939
0
}
1940
1941
int blosc_get_nthreads(void)
1942
0
{
1943
0
  int ret = g_threads;
1944
1945
0
  return ret;
1946
0
}
1947
1948
int blosc_set_nthreads(int nthreads_new)
1949
15.4k
{
1950
15.4k
  int ret = g_threads;
1951
1952
  /* Check if should initialize */
1953
15.4k
  if (!g_initlib) blosc_init();
1954
1955
15.4k
  if (nthreads_new != ret){
1956
    /* Re-initialize Blosc */
1957
0
    blosc_destroy();
1958
0
    blosc_init();
1959
0
    g_threads = nthreads_new;
1960
0
  }
1961
1962
15.4k
  return ret;
1963
15.4k
}
1964
1965
int blosc_set_nthreads_(struct blosc_context* context)
1966
0
{
1967
0
  if (context->numthreads > BLOSC_MAX_THREADS) {
1968
0
    fprintf(stderr,
1969
0
            "Error.  nthreads cannot be larger than BLOSC_MAX_THREADS (%d)",
1970
0
            BLOSC_MAX_THREADS);
1971
0
    return -1;
1972
0
  }
1973
0
  else if (context->numthreads <= 0) {
1974
0
    fprintf(stderr, "Error.  nthreads must be a positive integer");
1975
0
    return -1;
1976
0
  }
1977
1978
  /* Launch a new pool of threads */
1979
0
  if (context->numthreads > 1 && context->numthreads != context->threads_started) {
1980
0
    blosc_release_threadpool(context);
1981
0
    if (init_threads(context) < 0) {
1982
0
      return -1;
1983
0
    }
1984
0
  }
1985
1986
  /* We have now started the threads */
1987
0
  context->threads_started = context->numthreads;
1988
1989
0
  return context->numthreads;
1990
0
}
1991
1992
const char* blosc_get_compressor(void)
1993
0
{
1994
0
  const char* compname;
1995
0
  blosc_compcode_to_compname(g_compressor, &compname);
1996
1997
0
  return compname;
1998
0
}
1999
2000
int blosc_set_compressor(const char *compname)
2001
16.8k
{
2002
16.8k
  int code = blosc_compname_to_compcode(compname);
2003
2004
16.8k
  g_compressor = code;
2005
2006
  /* Check if should initialize */
2007
16.8k
  if (!g_initlib) blosc_init();
2008
2009
16.8k
  return code;
2010
16.8k
}
2011
2012
const char* blosc_list_compressors(void)
2013
0
{
2014
0
  static int compressors_list_done = 0;
2015
0
  static char ret[256];
2016
2017
0
  if (compressors_list_done) return ret;
2018
0
  ret[0] = '\0';
2019
0
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
2020
0
#if defined(HAVE_LZ4)
2021
0
  strcat(ret, ","); strcat(ret, BLOSC_LZ4_COMPNAME);
2022
0
  strcat(ret, ","); strcat(ret, BLOSC_LZ4HC_COMPNAME);
2023
0
#endif /* HAVE_LZ4 */
2024
#if defined(HAVE_SNAPPY)
2025
  strcat(ret, ","); strcat(ret, BLOSC_SNAPPY_COMPNAME);
2026
#endif /* HAVE_SNAPPY */
2027
0
#if defined(HAVE_ZLIB)
2028
0
  strcat(ret, ","); strcat(ret, BLOSC_ZLIB_COMPNAME);
2029
0
#endif /* HAVE_ZLIB */
2030
0
#if defined(HAVE_ZSTD)
2031
0
  strcat(ret, ","); strcat(ret, BLOSC_ZSTD_COMPNAME);
2032
0
#endif /* HAVE_ZSTD */
2033
0
  compressors_list_done = 1;
2034
0
  return ret;
2035
0
}
2036
2037
const char* blosc_get_version_string(void)
2038
0
{
2039
0
  return BLOSC_VERSION_STRING;
2040
0
}
2041
2042
int blosc_get_complib_info(const char *compname, char **complib, char **version)
2043
0
{
2044
0
  int clibcode;
2045
0
  const char *clibname;
2046
0
  const char *clibversion = "unknown";
2047
2048
0
#if (defined(HAVE_LZ4) && defined(LZ4_VERSION_MAJOR)) || (defined(HAVE_SNAPPY) && defined(SNAPPY_VERSION)) || defined(ZSTD_VERSION_MAJOR)
2049
0
  char sbuffer[256];
2050
0
#endif
2051
2052
0
  clibcode = compname_to_clibcode(compname);
2053
0
  clibname = clibcode_to_clibname(clibcode);
2054
2055
  /* complib version */
2056
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
2057
0
    clibversion = BLOSCLZ_VERSION_STRING;
2058
0
  }
2059
0
#if defined(HAVE_LZ4)
2060
0
  else if (clibcode == BLOSC_LZ4_LIB) {
2061
0
#if defined(LZ4_VERSION_MAJOR)
2062
0
    sprintf(sbuffer, "%d.%d.%d",
2063
0
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
2064
0
    clibversion = sbuffer;
2065
0
#endif /* LZ4_VERSION_MAJOR */
2066
0
  }
2067
0
#endif /* HAVE_LZ4 */
2068
#if defined(HAVE_SNAPPY)
2069
  else if (clibcode == BLOSC_SNAPPY_LIB) {
2070
#if defined(SNAPPY_VERSION)
2071
    sprintf(sbuffer, "%d.%d.%d", SNAPPY_MAJOR, SNAPPY_MINOR, SNAPPY_PATCHLEVEL);
2072
    clibversion = sbuffer;
2073
#endif /* SNAPPY_VERSION */
2074
  }
2075
#endif /* HAVE_SNAPPY */
2076
0
#if defined(HAVE_ZLIB)
2077
0
  else if (clibcode == BLOSC_ZLIB_LIB) {
2078
0
    clibversion = ZLIB_VERSION;
2079
0
  }
2080
0
#endif /* HAVE_ZLIB */
2081
0
#if defined(HAVE_ZSTD)
2082
0
  else if (clibcode == BLOSC_ZSTD_LIB) {
2083
0
    sprintf(sbuffer, "%d.%d.%d",
2084
0
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
2085
0
    clibversion = sbuffer;
2086
0
  }
2087
0
#endif /* HAVE_ZSTD */
2088
0
  else {
2089
    /* Unsupported library */
2090
0
    if (complib != NULL) *complib = NULL;
2091
0
    if (version != NULL) *version = NULL;
2092
0
    return -1;
2093
0
  }
2094
2095
0
  if (complib != NULL) *complib = strdup(clibname);
2096
0
  if (version != NULL) *version = strdup(clibversion);
2097
2098
0
  return clibcode;
2099
0
}
2100
2101
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
2102
void blosc_cbuffer_sizes(const void *cbuffer, size_t *nbytes,
2103
                         size_t *cbytes, size_t *blocksize)
2104
29.0k
{
2105
29.0k
  uint8_t *_src = (uint8_t *)(cbuffer);    /* current pos for source buffer */
2106
29.0k
  uint8_t version = _src[0];               /* version of header */
2107
2108
29.0k
  if (version != BLOSC_VERSION_FORMAT) {
2109
11
    *nbytes = *blocksize = *cbytes = 0;
2110
11
    return;
2111
11
  }
2112
2113
  /* Read the interesting values */
2114
29.0k
  *nbytes = (size_t)sw32_(_src + 4);       /* uncompressed buffer size */
2115
29.0k
  *blocksize = (size_t)sw32_(_src + 8);    /* block size */
2116
29.0k
  *cbytes = (size_t)sw32_(_src + 12);      /* compressed buffer size */
2117
29.0k
}
2118
2119
7.51k
int blosc_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
2120
7.51k
  size_t header_cbytes, header_blocksize;
2121
7.51k
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) return -1;
2122
7.51k
  blosc_cbuffer_sizes(cbuffer, nbytes, &header_cbytes, &header_blocksize);
2123
7.51k
  if (header_cbytes != cbytes) return -1;
2124
7.51k
  if (*nbytes > BLOSC_MAX_BUFFERSIZE) return -1;
2125
7.46k
  return 0;
2126
7.51k
}
2127
2128
/* Return `typesize` and `flags` from a compressed buffer. */
2129
void blosc_cbuffer_metainfo(const void *cbuffer, size_t *typesize,
2130
                            int *flags)
2131
0
{
2132
0
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2133
2134
0
  uint8_t version = _src[0];               /* version of header */
2135
2136
0
  if (version != BLOSC_VERSION_FORMAT) {
2137
0
    *flags = *typesize = 0;
2138
0
    return;
2139
0
  }
2140
2141
  /* Read the interesting values */
2142
0
  *flags = (int)_src[2] & 7;             /* first three flags */
2143
0
  *typesize = (size_t)_src[3];           /* typesize */
2144
0
}
2145
2146
2147
/* Return version information from a compressed buffer. */
2148
void blosc_cbuffer_versions(const void *cbuffer, int *version,
2149
                            int *versionlz)
2150
0
{
2151
0
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2152
2153
  /* Read the version info */
2154
0
  *version = (int)_src[0];         /* blosc format version */
2155
0
  *versionlz = (int)_src[1];       /* Lempel-Ziv compressor format version */
2156
0
}
2157
2158
2159
/* Return the compressor library/format used in a compressed buffer. */
2160
const char *blosc_cbuffer_complib(const void *cbuffer)
2161
0
{
2162
0
  uint8_t *_src = (uint8_t *)(cbuffer);  /* current pos for source buffer */
2163
0
  int clibcode;
2164
0
  const char *complib;
2165
2166
  /* Read the compressor format/library info */
2167
0
  clibcode = (_src[2] & 0xe0) >> 5;
2168
0
  complib = clibcode_to_clibname(clibcode);
2169
0
  return complib;
2170
0
}
2171
2172
/* Get the internal blocksize to be used during compression.  0 means
2173
   that an automatic blocksize is computed internally. */
2174
int blosc_get_blocksize(void)
2175
0
{
2176
0
  return (int)g_force_blocksize;
2177
0
}
2178
2179
/* Force the use of a specific blocksize.  If 0, an automatic
2180
   blocksize will be used (the default). */
2181
void blosc_set_blocksize(size_t size)
2182
6.28k
{
2183
6.28k
  g_force_blocksize = (int32_t)size;
2184
6.28k
}
2185
2186
/* Force the use of a specific split mode. */
2187
void blosc_set_splitmode(int mode)
2188
15.4k
{
2189
15.4k
  g_splitmode = mode;
2190
15.4k
}
2191
2192
/* Child global context is invalid and pool threads no longer exist post-fork.
2193
 * Discard the old, inconsistent global context and global context mutex and
2194
 * mark as uninitialized.  Subsequent calls through `blosc_*` interfaces will
2195
 * trigger re-init of the global context.
2196
 *
2197
 * All pthread interfaces have undefined behavior in child handler in current
2198
 * posix standards: https://pubs.opengroup.org/onlinepubs/9699919799/
2199
 */
2200
0
void blosc_atfork_child(void) {
2201
0
  if (!g_initlib) return;
2202
2203
0
  g_initlib = 0;
2204
2205
0
  my_free(global_comp_mutex);
2206
0
  global_comp_mutex = NULL;
2207
2208
0
  my_free(g_global_context);
2209
0
  g_global_context = NULL;
2210
2211
0
}
2212
2213
void blosc_init(void)
2214
2
{
2215
  /* Return if we are already initialized */
2216
2
  if (g_initlib) return;
2217
2218
2
  global_comp_mutex = (pthread_mutex_t*)my_malloc(sizeof(pthread_mutex_t));
2219
2
  pthread_mutex_init(global_comp_mutex, NULL);
2220
2221
2
  g_global_context = (struct blosc_context*)my_malloc(sizeof(struct blosc_context));
2222
2
  g_global_context->threads_started = 0;
2223
2224
2
  #if !defined(_WIN32)
2225
  /* atfork handlers are only be registered once, though multiple re-inits may
2226
   * occur via blosc_destroy/blosc_init.  */
2227
2
  if (!g_atfork_registered) {
2228
2
    g_atfork_registered = 1;
2229
2
    pthread_atfork(NULL, NULL, &blosc_atfork_child);
2230
2
  }
2231
2
  #endif
2232
2233
2
  g_initlib = 1;
2234
2
}
2235
2236
void blosc_destroy(void)
2237
0
{
2238
  /* Return if Blosc is not initialized */
2239
0
  if (!g_initlib) return;
2240
2241
0
  g_initlib = 0;
2242
2243
0
  blosc_release_threadpool(g_global_context);
2244
0
  my_free(g_global_context);
2245
0
  g_global_context = NULL;
2246
2247
0
  pthread_mutex_destroy(global_comp_mutex);
2248
0
  my_free(global_comp_mutex);
2249
0
  global_comp_mutex = NULL;
2250
0
}
2251
2252
int blosc_release_threadpool(struct blosc_context* context)
2253
0
{
2254
0
  int32_t t;
2255
0
  void* status;
2256
0
  int rc;
2257
0
  int rc2;
2258
0
  (void)rc;  // just to avoid 'unused-variable' warning
2259
2260
0
  if (context->threads_started > 0)
2261
0
  {
2262
    /* Tell all existing threads to finish */
2263
0
    context->end_threads = 1;
2264
2265
    /* Sync threads */
2266
0
    WAIT_INIT(-1, context);
2267
2268
    /* Join exiting threads */
2269
0
    for (t=0; t<context->threads_started; t++) {
2270
0
      rc2 = pthread_join(context->threads[t], &status);
2271
0
      if (rc2) {
2272
0
        fprintf(stderr, "ERROR; return code from pthread_join() is %d\n", rc2);
2273
0
        fprintf(stderr, "\tError detail: %s\n", strerror(rc2));
2274
0
      }
2275
0
    }
2276
2277
    /* Release mutex and condition variable objects */
2278
0
    pthread_mutex_destroy(&context->count_mutex);
2279
2280
    /* Barriers */
2281
0
  #ifdef _POSIX_BARRIERS_MINE
2282
0
      pthread_barrier_destroy(&context->barr_init);
2283
0
      pthread_barrier_destroy(&context->barr_finish);
2284
  #else
2285
      pthread_mutex_destroy(&context->count_threads_mutex);
2286
      pthread_cond_destroy(&context->count_threads_cv);
2287
  #endif
2288
2289
      /* Thread attributes */
2290
0
  #if !defined(_WIN32)
2291
0
      pthread_attr_destroy(&context->ct_attr);
2292
0
  #endif
2293
2294
0
  }
2295
2296
0
  context->threads_started = 0;
2297
2298
0
  return 0;
2299
0
}
2300
2301
int blosc_free_resources(void)
2302
0
{
2303
  /* Return if Blosc is not initialized */
2304
0
  if (!g_initlib) return -1;
2305
2306
0
  return blosc_release_threadpool(g_global_context);
2307
0
}