Coverage Report

Created: 2024-05-21 06:22

/src/c-blosc2/blosc/blosc2.c
Line
Count
Source (jump to first uncovered line)
1
/*********************************************************************
2
  Blosc - Blocked Shuffling and Compression Library
3
4
  Copyright (c) 2021  Blosc Development Team <blosc@blosc.org>
5
  https://blosc.org
6
  License: BSD 3-Clause (see LICENSE.txt)
7
8
  See LICENSE.txt for details about copyright and rights to use.
9
**********************************************************************/
10
11
12
#include "blosc2.h"
13
#include "blosc-private.h"
14
#include "../plugins/codecs/zfp/blosc2-zfp.h"
15
#include "frame.h"
16
#include "b2nd-private.h"
17
#include "schunk-private.h"
18
19
#if defined(USING_CMAKE)
20
  #include "config.h"
21
#endif /*  USING_CMAKE */
22
#include "context.h"
23
24
#include "shuffle.h"
25
#include "delta.h"
26
#include "trunc-prec.h"
27
#include "blosclz.h"
28
#include "stune.h"
29
#include "blosc2/codecs-registry.h"
30
#include "blosc2/filters-registry.h"
31
#include "blosc2/tuners-registry.h"
32
33
#include "lz4.h"
34
#include "lz4hc.h"
35
#ifdef HAVE_IPP
36
  #include <ipps.h>
37
  #include <ippdc.h>
38
#endif
39
#if defined(HAVE_ZLIB_NG)
40
#ifdef ZLIB_COMPAT
41
  #include "zlib.h"
42
#else
43
  #include "zlib-ng.h"
44
#endif
45
#elif defined(HAVE_ZLIB)
46
  #include "zlib.h"
47
#endif /*  HAVE_MINIZ */
48
#if defined(HAVE_ZSTD)
49
  #include "zstd.h"
50
  #include "zstd_errors.h"
51
  // #include "cover.h"  // for experimenting with fast cover training for building dicts
52
  #include "zdict.h"
53
#endif /*  HAVE_ZSTD */
54
55
#if defined(_WIN32) && !defined(__MINGW32__)
56
  #include <windows.h>
57
  #include <malloc.h>
58
  #include <process.h>
59
  #define getpid _getpid
60
#endif  /* _WIN32 */
61
62
#if defined(_WIN32) && !defined(__GNUC__)
63
  #include "win32/pthread.c"
64
#endif
65
66
#include <stdio.h>
67
#include <stdlib.h>
68
#include <errno.h>
69
#include <string.h>
70
#include <sys/types.h>
71
#include <assert.h>
72
#include <math.h>
73
74
75
/* Synchronization variables */
76
77
/* Global context for non-contextual API */
78
static blosc2_context* g_global_context;
79
static pthread_mutex_t global_comp_mutex;
80
static int g_compressor = BLOSC_BLOSCLZ;
81
static int g_delta = 0;
82
/* The default splitmode */
83
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
84
/* the compressor to use by default */
85
static int16_t g_nthreads = 1;
86
static int32_t g_force_blocksize = 0;
87
static int g_initlib = 0;
88
static blosc2_schunk* g_schunk = NULL;   /* the pointer to super-chunk */
89
90
blosc2_codec g_codecs[256] = {0};
91
uint8_t g_ncodecs = 0;
92
93
static blosc2_filter g_filters[256] = {0};
94
static uint64_t g_nfilters = 0;
95
96
static blosc2_io_cb g_ios[256] = {0};
97
static uint64_t g_nio = 0;
98
99
blosc2_tuner g_tuners[256] = {0};
100
int g_ntuners = 0;
101
102
static int g_tuner = BLOSC_STUNE;
103
104
// Forward declarations
105
int init_threadpool(blosc2_context *context);
106
int release_threadpool(blosc2_context *context);
107
108
/* Macros for synchronization */
109
110
/* Wait until all threads are initialized */
111
#ifdef BLOSC_POSIX_BARRIERS
112
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
113
0
  do {                                                                 \
114
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_init);              \
115
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
116
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (init): %d", rc);   \
117
0
      return((RET_VAL));                                               \
118
0
    }                                                                  \
119
0
  } while (0)
120
#else
121
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
122
  do {                                                                 \
123
    pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
124
    if ((CONTEXT_PTR)->count_threads < (CONTEXT_PTR)->nthreads) {      \
125
      (CONTEXT_PTR)->count_threads++;                                  \
126
      pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
127
                        &(CONTEXT_PTR)->count_threads_mutex);          \
128
    }                                                                  \
129
    else {                                                             \
130
      pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
131
    }                                                                  \
132
    pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
133
  } while (0)
134
#endif
135
136
/* Wait for all threads to finish */
137
#ifdef BLOSC_POSIX_BARRIERS
138
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
139
0
  do {                                                                 \
140
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_finish);            \
141
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
142
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (finish)");         \
143
0
      return((RET_VAL));                                               \
144
0
    }                                                                  \
145
0
  } while (0)
146
#else
147
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
148
  do {                                                                 \
149
    pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
150
    if ((CONTEXT_PTR)->count_threads > 0) {                            \
151
      (CONTEXT_PTR)->count_threads--;                                  \
152
      pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
153
                        &(CONTEXT_PTR)->count_threads_mutex);          \
154
    }                                                                  \
155
    else {                                                             \
156
      pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
157
    }                                                                  \
158
    pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
159
  } while (0)
160
#endif
161
162
163
/* global variable to change threading backend from Blosc-managed to caller-managed */
164
static blosc_threads_callback threads_callback = 0;
165
static void *threads_callback_data = 0;
166
167
168
/* non-threadsafe function should be called before any other Blosc function in
169
   order to change how threads are managed */
170
void blosc2_set_threads_callback(blosc_threads_callback callback, void *callback_data)
171
0
{
172
0
  threads_callback = callback;
173
0
  threads_callback_data = callback_data;
174
0
}
175
176
177
/* A function for aligned malloc that is portable */
178
52.4k
static uint8_t* my_malloc(size_t size) {
179
52.4k
  void* block = NULL;
180
52.4k
  int res = 0;
181
182
/* Do an alignment to 32 bytes because AVX2 is supported */
183
#if defined(_WIN32)
184
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
185
  block = (void *)_aligned_malloc(size, 32);
186
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
187
  /* Platform does have an implementation of posix_memalign */
188
52.4k
  res = posix_memalign(&block, 32, size);
189
#else
190
  block = malloc(size);
191
#endif  /* _WIN32 */
192
193
52.4k
  if (block == NULL || res != 0) {
194
0
    BLOSC_TRACE_ERROR("Error allocating memory!");
195
0
    return NULL;
196
0
  }
197
198
52.4k
  return (uint8_t*)block;
199
52.4k
}
200
201
202
/* Release memory booked by my_malloc */
203
52.4k
static void my_free(void* block) {
204
#if defined(_WIN32)
205
  _aligned_free(block);
206
#else
207
52.4k
  free(block);
208
52.4k
#endif  /* _WIN32 */
209
52.4k
}
210
211
212
/*
213
 * Conversion routines between compressor and compression libraries
214
 */
215
216
/* Return the library code associated with the compressor name */
217
0
static int compname_to_clibcode(const char* compname) {
218
0
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
219
0
    return BLOSC_BLOSCLZ_LIB;
220
0
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
221
0
    return BLOSC_LZ4_LIB;
222
0
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
223
0
    return BLOSC_LZ4_LIB;
224
0
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
225
0
    return BLOSC_ZLIB_LIB;
226
0
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
227
0
    return BLOSC_ZSTD_LIB;
228
0
  for (int i = 0; i < g_ncodecs; ++i) {
229
0
    if (strcmp(compname, g_codecs[i].compname) == 0)
230
0
      return g_codecs[i].complib;
231
0
  }
232
0
  return BLOSC2_ERROR_NOT_FOUND;
233
0
}
234
235
/* Return the library name associated with the compressor code */
236
17
static const char* clibcode_to_clibname(int clibcode) {
237
17
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
238
17
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
239
17
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
240
17
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
241
119
  for (int i = 0; i < g_ncodecs; ++i) {
242
102
    if (clibcode == g_codecs[i].complib)
243
0
      return g_codecs[i].compname;
244
102
  }
245
17
  return NULL;                  /* should never happen */
246
17
}
247
248
249
/*
250
 * Conversion routines between compressor names and compressor codes
251
 */
252
253
/* Get the compressor name associated with the compressor code */
254
0
int blosc2_compcode_to_compname(int compcode, const char** compname) {
255
0
  int code = -1;    /* -1 means non-existent compressor code */
256
0
  const char* name = NULL;
257
258
  /* Map the compressor code */
259
0
  if (compcode == BLOSC_BLOSCLZ)
260
0
    name = BLOSC_BLOSCLZ_COMPNAME;
261
0
  else if (compcode == BLOSC_LZ4)
262
0
    name = BLOSC_LZ4_COMPNAME;
263
0
  else if (compcode == BLOSC_LZ4HC)
264
0
    name = BLOSC_LZ4HC_COMPNAME;
265
0
  else if (compcode == BLOSC_ZLIB)
266
0
    name = BLOSC_ZLIB_COMPNAME;
267
0
  else if (compcode == BLOSC_ZSTD)
268
0
    name = BLOSC_ZSTD_COMPNAME;
269
0
  else {
270
0
    for (int i = 0; i < g_ncodecs; ++i) {
271
0
      if (compcode == g_codecs[i].compcode) {
272
0
        name = g_codecs[i].compname;
273
0
        break;
274
0
      }
275
0
    }
276
0
  }
277
278
0
  *compname = name;
279
280
  /* Guess if there is support for this code */
281
0
  if (compcode == BLOSC_BLOSCLZ)
282
0
    code = BLOSC_BLOSCLZ;
283
0
  else if (compcode == BLOSC_LZ4)
284
0
    code = BLOSC_LZ4;
285
0
  else if (compcode == BLOSC_LZ4HC)
286
0
    code = BLOSC_LZ4HC;
287
0
#if defined(HAVE_ZLIB)
288
0
  else if (compcode == BLOSC_ZLIB)
289
0
    code = BLOSC_ZLIB;
290
0
#endif /* HAVE_ZLIB */
291
0
#if defined(HAVE_ZSTD)
292
0
  else if (compcode == BLOSC_ZSTD)
293
0
    code = BLOSC_ZSTD;
294
0
#endif /* HAVE_ZSTD */
295
0
  else if (compcode >= BLOSC_LAST_CODEC)
296
0
    code = compcode;
297
0
  return code;
298
0
}
299
300
/* Get the compressor code for the compressor name. -1 if it is not available */
301
15.6k
int blosc2_compname_to_compcode(const char* compname) {
302
15.6k
  int code = -1;  /* -1 means non-existent compressor code */
303
304
15.6k
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
305
2.77k
    code = BLOSC_BLOSCLZ;
306
2.77k
  }
307
12.8k
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
308
632
    code = BLOSC_LZ4;
309
632
  }
310
12.2k
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
311
1.97k
    code = BLOSC_LZ4HC;
312
1.97k
  }
313
10.2k
#if defined(HAVE_ZLIB)
314
10.2k
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
315
2.19k
    code = BLOSC_ZLIB;
316
2.19k
  }
317
8.09k
#endif /*  HAVE_ZLIB */
318
8.09k
#if defined(HAVE_ZSTD)
319
8.09k
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
320
8.09k
    code = BLOSC_ZSTD;
321
8.09k
  }
322
0
#endif /*  HAVE_ZSTD */
323
0
  else{
324
0
    for (int i = 0; i < g_ncodecs; ++i) {
325
0
      if (strcmp(compname, g_codecs[i].compname) == 0) {
326
0
        code = g_codecs[i].compcode;
327
0
        break;
328
0
      }
329
0
    }
330
0
  }
331
15.6k
  return code;
332
15.6k
}
333
334
335
/* Convert compressor code to blosc compressor format code */
336
60.2k
static int compcode_to_compformat(int compcode) {
337
60.2k
  switch (compcode) {
338
47.4k
    case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_FORMAT;
339
628
    case BLOSC_LZ4:     return BLOSC_LZ4_FORMAT;
340
1.96k
    case BLOSC_LZ4HC:   return BLOSC_LZ4HC_FORMAT;
341
342
0
#if defined(HAVE_ZLIB)
343
2.18k
    case BLOSC_ZLIB:    return BLOSC_ZLIB_FORMAT;
344
0
#endif /*  HAVE_ZLIB */
345
346
0
#if defined(HAVE_ZSTD)
347
8.07k
    case BLOSC_ZSTD:    return BLOSC_ZSTD_FORMAT;
348
0
      break;
349
0
#endif /*  HAVE_ZSTD */
350
0
    default:
351
0
      return BLOSC_UDCODEC_FORMAT;
352
60.2k
  }
353
0
  BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
354
0
}
355
356
357
/* Convert compressor code to blosc compressor format version */
358
60.7k
static int compcode_to_compversion(int compcode) {
359
  /* Write compressor format */
360
60.7k
  switch (compcode) {
361
47.9k
    case BLOSC_BLOSCLZ:
362
47.9k
      return BLOSC_BLOSCLZ_VERSION_FORMAT;
363
629
    case BLOSC_LZ4:
364
629
      return BLOSC_LZ4_VERSION_FORMAT;
365
1.96k
    case BLOSC_LZ4HC:
366
1.96k
      return BLOSC_LZ4HC_VERSION_FORMAT;
367
368
0
#if defined(HAVE_ZLIB)
369
2.19k
    case BLOSC_ZLIB:
370
2.19k
      return BLOSC_ZLIB_VERSION_FORMAT;
371
0
      break;
372
0
#endif /*  HAVE_ZLIB */
373
374
0
#if defined(HAVE_ZSTD)
375
8.08k
    case BLOSC_ZSTD:
376
8.08k
      return BLOSC_ZSTD_VERSION_FORMAT;
377
0
      break;
378
0
#endif /*  HAVE_ZSTD */
379
0
    default:
380
0
      for (int i = 0; i < g_ncodecs; ++i) {
381
0
        if (compcode == g_codecs[i].compcode) {
382
0
          return g_codecs[i].version;
383
0
        }
384
0
      }
385
60.7k
  }
386
0
  return BLOSC2_ERROR_FAILURE;
387
60.7k
}
388
389
390
static int lz4_wrap_compress(const char* input, size_t input_length,
391
24.1k
                             char* output, size_t maxout, int accel, void* hash_table) {
392
24.1k
  BLOSC_UNUSED_PARAM(accel);
393
24.1k
  int cbytes;
394
#ifdef HAVE_IPP
395
  if (hash_table == NULL) {
396
    return BLOSC2_ERROR_INVALID_PARAM;  // the hash table should always be initialized
397
  }
398
  int outlen = (int)maxout;
399
  int inlen = (int)input_length;
400
  // I have not found any function that uses `accel` like in `LZ4_compress_fast`, but
401
  // the IPP LZ4Safe call does a pretty good job on compressing well, so let's use it
402
  IppStatus status = ippsEncodeLZ4Safe_8u((const Ipp8u*)input, &inlen,
403
                                           (Ipp8u*)output, &outlen, (Ipp8u*)hash_table);
404
  if (status == ippStsDstSizeLessExpected) {
405
    return 0;  // we cannot compress in required outlen
406
  }
407
  else if (status != ippStsNoErr) {
408
    return BLOSC2_ERROR_FAILURE;  // an unexpected error happened
409
  }
410
  cbytes = outlen;
411
#else
412
24.1k
  BLOSC_UNUSED_PARAM(hash_table);
413
24.1k
  accel = 1;  // deactivate acceleration to match IPP behaviour
414
24.1k
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, accel);
415
24.1k
#endif
416
24.1k
  return cbytes;
417
24.1k
}
418
419
420
static int lz4hc_wrap_compress(const char* input, size_t input_length,
421
18.0k
                               char* output, size_t maxout, int clevel) {
422
18.0k
  int cbytes;
423
18.0k
  if (input_length > (size_t)(UINT32_C(2) << 30))
424
0
    return BLOSC2_ERROR_2GB_LIMIT;
425
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
426
   * but levels larger than 9 do not buy much compression. */
427
18.0k
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
428
18.0k
                           clevel);
429
18.0k
  return cbytes;
430
18.0k
}
431
432
433
static int lz4_wrap_decompress(const char* input, size_t compressed_length,
434
1.99k
                               char* output, size_t maxout) {
435
1.99k
  int nbytes;
436
#ifdef HAVE_IPP
437
  int outlen = (int)maxout;
438
  int inlen = (int)compressed_length;
439
  IppStatus status;
440
  status = ippsDecodeLZ4_8u((const Ipp8u*)input, inlen, (Ipp8u*)output, &outlen);
441
  //status = ippsDecodeLZ4Dict_8u((const Ipp8u*)input, &inlen, (Ipp8u*)output, 0, &outlen, NULL, 1 << 16);
442
  nbytes = (status == ippStsNoErr) ? outlen : -outlen;
443
#else
444
1.99k
  nbytes = LZ4_decompress_safe(input, output, (int)compressed_length, (int)maxout);
445
1.99k
#endif
446
1.99k
  if (nbytes != (int)maxout) {
447
581
    return 0;
448
581
  }
449
1.41k
  return (int)maxout;
450
1.99k
}
451
452
#if defined(HAVE_ZLIB)
453
/* zlib is not very respectful with sharing name space with others.
454
 Fortunately, its names do not collide with those already in blosc. */
455
static int zlib_wrap_compress(const char* input, size_t input_length,
456
35.5k
                              char* output, size_t maxout, int clevel) {
457
35.5k
  int status;
458
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
459
  size_t cl = maxout;
460
  status = zng_compress2(
461
      (uint8_t*)output, &cl, (uint8_t*)input, input_length, clevel);
462
#else
463
35.5k
  uLongf cl = (uLongf)maxout;
464
35.5k
  status = compress2(
465
35.5k
      (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
466
35.5k
#endif
467
35.5k
  if (status != Z_OK) {
468
6.74k
    return 0;
469
6.74k
  }
470
28.8k
  return (int)cl;
471
35.5k
}
472
473
static int zlib_wrap_decompress(const char* input, size_t compressed_length,
474
3.78k
                                char* output, size_t maxout) {
475
3.78k
  int status;
476
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
477
  size_t ul = maxout;
478
  status = zng_uncompress(
479
      (uint8_t*)output, &ul, (uint8_t*)input, compressed_length);
480
#else
481
3.78k
  uLongf ul = (uLongf)maxout;
482
3.78k
  status = uncompress(
483
3.78k
      (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
484
3.78k
#endif
485
3.78k
  if (status != Z_OK) {
486
1.52k
    return 0;
487
1.52k
  }
488
2.26k
  return (int)ul;
489
3.78k
}
490
#endif /*  HAVE_ZLIB */
491
492
493
#if defined(HAVE_ZSTD)
494
static int zstd_wrap_compress(struct thread_context* thread_context,
495
                              const char* input, size_t input_length,
496
365k
                              char* output, size_t maxout, int clevel) {
497
365k
  size_t code;
498
365k
  blosc2_context* context = thread_context->parent_context;
499
500
365k
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
501
  /* Make the level 8 close enough to maxCLevel */
502
365k
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
503
504
365k
  if (thread_context->zstd_cctx == NULL) {
505
1.04k
    thread_context->zstd_cctx = ZSTD_createCCtx();
506
1.04k
  }
507
508
365k
  if (context->use_dict) {
509
0
    assert(context->dict_cdict != NULL);
510
0
    code = ZSTD_compress_usingCDict(
511
0
            thread_context->zstd_cctx, (void*)output, maxout, (void*)input,
512
0
            input_length, context->dict_cdict);
513
365k
  } else {
514
365k
    code = ZSTD_compressCCtx(thread_context->zstd_cctx,
515
365k
        (void*)output, maxout, (void*)input, input_length, clevel);
516
365k
  }
517
365k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
518
    // Blosc will just memcpy this buffer
519
40.9k
    return 0;
520
40.9k
  }
521
325k
  return (int)code;
522
365k
}
523
524
static int zstd_wrap_decompress(struct thread_context* thread_context,
525
                                const char* input, size_t compressed_length,
526
8.53k
                                char* output, size_t maxout) {
527
8.53k
  size_t code;
528
8.53k
  blosc2_context* context = thread_context->parent_context;
529
530
8.53k
  if (thread_context->zstd_dctx == NULL) {
531
4.85k
    thread_context->zstd_dctx = ZSTD_createDCtx();
532
4.85k
  }
533
534
8.53k
  if (context->use_dict) {
535
1.84k
    assert(context->dict_ddict != NULL);
536
1.84k
    code = ZSTD_decompress_usingDDict(
537
1.84k
            thread_context->zstd_dctx, (void*)output, maxout, (void*)input,
538
1.84k
            compressed_length, context->dict_ddict);
539
6.68k
  } else {
540
6.68k
    code = ZSTD_decompressDCtx(thread_context->zstd_dctx,
541
6.68k
        (void*)output, maxout, (void*)input, compressed_length);
542
6.68k
  }
543
8.53k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
544
4.68k
    BLOSC_TRACE_ERROR("Error in ZSTD decompression: '%s'.  Giving up.",
545
0
                      ZDICT_getErrorName(code));
546
0
    return 0;
547
4.68k
  }
548
3.85k
  return (int)code;
549
8.53k
}
550
#endif /*  HAVE_ZSTD */
551
552
/* Compute acceleration for blosclz */
553
509k
static int get_accel(const blosc2_context* context) {
554
509k
  int clevel = context->clevel;
555
556
509k
  if (context->compcode == BLOSC_LZ4) {
557
    /* This acceleration setting based on discussions held in:
558
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
559
     */
560
24.3k
    return (10 - clevel);
561
24.3k
  }
562
484k
  return 1;
563
509k
}
564
565
566
3.43M
int do_nothing(uint8_t filter, char cmode) {
567
3.43M
  if (cmode == 'c') {
568
3.05M
    return (filter == BLOSC_NOFILTER);
569
3.05M
  } else {
570
    // TRUNC_PREC do not have to be applied during decompression
571
384k
    return ((filter == BLOSC_NOFILTER) || (filter == BLOSC_TRUNC_PREC));
572
384k
  }
573
3.43M
}
574
575
576
71.3k
int next_filter(const uint8_t* filters, int current_filter, char cmode) {
577
104k
  for (int i = current_filter - 1; i >= 0; i--) {
578
104k
    if (!do_nothing(filters[i], cmode)) {
579
71.3k
      return filters[i];
580
71.3k
    }
581
104k
  }
582
0
  return BLOSC_NOFILTER;
583
71.3k
}
584
585
586
555k
int last_filter(const uint8_t* filters, char cmode) {
587
555k
  int last_index = -1;
588
3.88M
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
589
3.33M
    if (!do_nothing(filters[i], cmode))  {
590
454k
      last_index = i;
591
454k
    }
592
3.33M
  }
593
555k
  return last_index;
594
555k
}
595
596
597
/* Convert filter pipeline to filter flags */
598
82.3k
static uint8_t filters_to_flags(const uint8_t* filters) {
599
82.3k
  uint8_t flags = 0;
600
601
576k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
602
494k
    switch (filters[i]) {
603
41.5k
      case BLOSC_SHUFFLE:
604
41.5k
        flags |= BLOSC_DOSHUFFLE;
605
41.5k
        break;
606
14.1k
      case BLOSC_BITSHUFFLE:
607
14.1k
        flags |= BLOSC_DOBITSHUFFLE;
608
14.1k
        break;
609
2.79k
      case BLOSC_DELTA:
610
2.79k
        flags |= BLOSC_DODELTA;
611
2.79k
        break;
612
435k
      default :
613
435k
        break;
614
494k
    }
615
494k
  }
616
82.3k
  return flags;
617
82.3k
}
618
619
620
/* Convert filter flags to filter pipeline */
621
311k
static void flags_to_filters(const uint8_t flags, uint8_t* filters) {
622
  /* Initialize the filter pipeline */
623
311k
  memset(filters, 0, BLOSC2_MAX_FILTERS);
624
  /* Fill the filter pipeline */
625
311k
  if (flags & BLOSC_DOSHUFFLE)
626
290k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
627
311k
  if (flags & BLOSC_DOBITSHUFFLE)
628
293k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
629
311k
  if (flags & BLOSC_DODELTA)
630
45.9k
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
631
311k
}
632
633
634
/* Get filter flags from header flags */
635
static uint8_t get_filter_flags(const uint8_t header_flags,
636
5.41k
                                const int32_t typesize) {
637
5.41k
  uint8_t flags = 0;
638
639
5.41k
  if ((header_flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
640
800
    flags |= BLOSC_DOSHUFFLE;
641
800
  }
642
5.41k
  if (header_flags & BLOSC_DOBITSHUFFLE) {
643
1.97k
    flags |= BLOSC_DOBITSHUFFLE;
644
1.97k
  }
645
5.41k
  if (header_flags & BLOSC_DODELTA) {
646
2.41k
    flags |= BLOSC_DODELTA;
647
2.41k
  }
648
5.41k
  if (header_flags & BLOSC_MEMCPYED) {
649
43
    flags |= BLOSC_MEMCPYED;
650
43
  }
651
5.41k
  return flags;
652
5.41k
}
653
654
typedef struct blosc_header_s {
655
  uint8_t version;
656
  uint8_t versionlz;
657
  uint8_t flags;
658
  uint8_t typesize;
659
  int32_t nbytes;
660
  int32_t blocksize;
661
  int32_t cbytes;
662
  // Extended Blosc2 header
663
  uint8_t filters[BLOSC2_MAX_FILTERS];
664
  uint8_t udcompcode;
665
  uint8_t compcode_meta;
666
  uint8_t filters_meta[BLOSC2_MAX_FILTERS];
667
  uint8_t reserved2;
668
  uint8_t blosc2_flags;
669
} blosc_header;
670
671
672
int read_chunk_header(const uint8_t* src, int32_t srcsize, bool extended_header, blosc_header* header)
673
335k
{
674
335k
  memset(header, 0, sizeof(blosc_header));
675
676
335k
  if (srcsize < BLOSC_MIN_HEADER_LENGTH) {
677
0
    BLOSC_TRACE_ERROR("Not enough space to read Blosc header.");
678
0
    return BLOSC2_ERROR_READ_BUFFER;
679
0
  }
680
681
335k
  memcpy(header, src, BLOSC_MIN_HEADER_LENGTH);
682
683
335k
  bool little_endian = is_little_endian();
684
685
335k
  if (!little_endian) {
686
0
    header->nbytes = bswap32_(header->nbytes);
687
0
    header->blocksize = bswap32_(header->blocksize);
688
0
    header->cbytes = bswap32_(header->cbytes);
689
0
  }
690
691
335k
  if (header->version > BLOSC2_VERSION_FORMAT) {
692
    /* Version from future */
693
8
    return BLOSC2_ERROR_VERSION_SUPPORT;
694
8
  }
695
335k
  if (header->cbytes < BLOSC_MIN_HEADER_LENGTH) {
696
37
    BLOSC_TRACE_ERROR("`cbytes` is too small to read min header.");
697
0
    return BLOSC2_ERROR_INVALID_HEADER;
698
37
  }
699
335k
  if (header->blocksize <= 0 || (header->nbytes > 0 && (header->blocksize > header->nbytes))) {
700
75
    BLOSC_TRACE_ERROR("`blocksize` is zero or greater than uncompressed size");
701
0
    return BLOSC2_ERROR_INVALID_HEADER;
702
75
  }
703
335k
  if (header->blocksize > BLOSC2_MAXBLOCKSIZE) {
704
10
    BLOSC_TRACE_ERROR("`blocksize` greater than maximum allowed");
705
0
    return BLOSC2_ERROR_INVALID_HEADER;
706
10
  }
707
335k
  if (header->typesize == 0) {
708
6
    BLOSC_TRACE_ERROR("`typesize` is zero.");
709
0
    return BLOSC2_ERROR_INVALID_HEADER;
710
6
  }
711
712
  /* Read extended header if it is wanted */
713
335k
  if ((extended_header) && (header->flags & BLOSC_DOSHUFFLE) && (header->flags & BLOSC_DOBITSHUFFLE)) {
714
29.2k
    if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH) {
715
18
      BLOSC_TRACE_ERROR("`cbytes` is too small to read extended header.");
716
0
      return BLOSC2_ERROR_INVALID_HEADER;
717
18
    }
718
29.1k
    if (srcsize < BLOSC_EXTENDED_HEADER_LENGTH) {
719
0
      BLOSC_TRACE_ERROR("Not enough space to read Blosc extended header.");
720
0
      return BLOSC2_ERROR_READ_BUFFER;
721
0
    }
722
723
29.1k
    memcpy((uint8_t *)header + BLOSC_MIN_HEADER_LENGTH, src + BLOSC_MIN_HEADER_LENGTH,
724
29.1k
      BLOSC_EXTENDED_HEADER_LENGTH - BLOSC_MIN_HEADER_LENGTH);
725
726
29.1k
    int32_t special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
727
29.1k
    if (special_type != 0) {
728
1.30k
      if (header->nbytes % header->typesize != 0) {
729
1
        BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
730
0
        return BLOSC2_ERROR_INVALID_HEADER;
731
1
      }
732
1.30k
      if (special_type == BLOSC2_SPECIAL_VALUE) {
733
406
        if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH + header->typesize) {
734
3
          BLOSC_TRACE_ERROR("`cbytes` is too small for run length encoding");
735
0
          return BLOSC2_ERROR_READ_BUFFER;
736
3
        }
737
406
      }
738
1.30k
    }
739
    // The number of filters depends on the version of the header. Blosc2 alpha series
740
    // did not initialize filters to zero beyond the max supported.
741
29.1k
    if (header->version == BLOSC2_VERSION_FORMAT_ALPHA) {
742
919
      header->filters[5] = 0;
743
919
      header->filters_meta[5] = 0;
744
919
    }
745
29.1k
  }
746
306k
  else {
747
306k
    flags_to_filters(header->flags, header->filters);
748
306k
  }
749
335k
  return 0;
750
335k
}
751
752
87.7k
static inline void blosc2_calculate_blocks(blosc2_context* context) {
753
  /* Compute number of blocks in buffer */
754
87.7k
  context->nblocks = context->sourcesize / context->blocksize;
755
87.7k
  context->leftover = context->sourcesize % context->blocksize;
756
87.7k
  context->nblocks = (context->leftover > 0) ?
757
75.5k
                     (context->nblocks + 1) : context->nblocks;
758
87.7k
}
759
760
26.9k
static int blosc2_initialize_context_from_header(blosc2_context* context, blosc_header* header) {
761
26.9k
  context->header_flags = header->flags;
762
26.9k
  context->typesize = header->typesize;
763
26.9k
  context->sourcesize = header->nbytes;
764
26.9k
  context->blocksize = header->blocksize;
765
26.9k
  context->blosc2_flags = header->blosc2_flags;
766
26.9k
  context->compcode = header->flags >> 5;
767
26.9k
  if (context->compcode == BLOSC_UDCODEC_FORMAT) {
768
860
    context->compcode = header->udcompcode;
769
860
  }
770
26.9k
  blosc2_calculate_blocks(context);
771
772
26.9k
  bool is_lazy = false;
773
26.9k
  if ((context->header_flags & BLOSC_DOSHUFFLE) &&
774
26.9k
      (context->header_flags & BLOSC_DOBITSHUFFLE)) {
775
    /* Extended header */
776
21.5k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
777
778
21.5k
    memcpy(context->filters, header->filters, BLOSC2_MAX_FILTERS);
779
21.5k
    memcpy(context->filters_meta, header->filters_meta, BLOSC2_MAX_FILTERS);
780
21.5k
    context->compcode_meta = header->compcode_meta;
781
782
21.5k
    context->filter_flags = filters_to_flags(header->filters);
783
21.5k
    context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
784
785
21.5k
    is_lazy = (context->blosc2_flags & 0x08u);
786
21.5k
  }
787
5.41k
  else {
788
5.41k
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
789
5.41k
    context->filter_flags = get_filter_flags(context->header_flags, context->typesize);
790
5.41k
    flags_to_filters(context->header_flags, context->filters);
791
5.41k
  }
792
793
  // Some checks for malformed headers
794
26.9k
  if (!is_lazy && header->cbytes > context->srcsize) {
795
0
    return BLOSC2_ERROR_INVALID_HEADER;
796
0
  }
797
798
26.9k
  return 0;
799
26.9k
}
800
801
802
0
int fill_filter(blosc2_filter *filter) {
803
0
  char libpath[PATH_MAX];
804
0
  void *lib = load_lib(filter->name, libpath);
805
0
  if(lib == NULL) {
806
0
    BLOSC_TRACE_ERROR("Error while loading the library");
807
0
    return BLOSC2_ERROR_FAILURE;
808
0
  }
809
810
0
  filter_info *info = dlsym(lib, "info");
811
0
  filter->forward = dlsym(lib, info->forward);
812
0
  filter->backward = dlsym(lib, info->backward);
813
814
0
  if (filter->forward == NULL || filter->backward == NULL){
815
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
816
0
    dlclose(lib);
817
0
    return BLOSC2_ERROR_FAILURE;
818
0
  }
819
820
0
  return BLOSC2_ERROR_SUCCESS;
821
0
}
822
823
824
1
int fill_codec(blosc2_codec *codec) {
825
1
  char libpath[PATH_MAX];
826
1
  void *lib = load_lib(codec->compname, libpath);
827
1
  if(lib == NULL) {
828
1
    BLOSC_TRACE_ERROR("Error while loading the library for codec `%s`", codec->compname);
829
0
    return BLOSC2_ERROR_FAILURE;
830
1
  }
831
832
0
  codec_info *info = dlsym(lib, "info");
833
0
  if (info == NULL) {
834
0
    BLOSC_TRACE_ERROR("`info` symbol cannot be loaded from plugin `%s`", codec->compname);
835
0
    dlclose(lib);
836
0
    return BLOSC2_ERROR_FAILURE;
837
0
  }
838
839
0
  codec->encoder = dlsym(lib, info->encoder);
840
0
  codec->decoder = dlsym(lib, info->decoder);
841
0
  if (codec->encoder == NULL || codec->decoder == NULL) {
842
0
    BLOSC_TRACE_ERROR("encoder or decoder cannot be loaded from plugin `%s`", codec->compname);
843
0
    dlclose(lib);
844
0
    return BLOSC2_ERROR_FAILURE;
845
0
  }
846
847
0
  return BLOSC2_ERROR_SUCCESS;
848
0
}
849
850
851
0
int fill_tuner(blosc2_tuner *tuner) {
852
0
  char libpath[PATH_MAX] = {0};
853
0
  void *lib = load_lib(tuner->name, libpath);
854
0
  if(lib == NULL) {
855
0
    BLOSC_TRACE_ERROR("Error while loading the library");
856
0
    return BLOSC2_ERROR_FAILURE;
857
0
  }
858
859
0
  tuner_info *info = dlsym(lib, "info");
860
0
  tuner->init = dlsym(lib, info->init);
861
0
  tuner->update = dlsym(lib, info->update);
862
0
  tuner->next_blocksize = dlsym(lib, info->next_blocksize);
863
0
  tuner->free = dlsym(lib, info->free);
864
0
  tuner->next_cparams = dlsym(lib, info->next_cparams);
865
866
0
  if (tuner->init == NULL || tuner->update == NULL || tuner->next_blocksize == NULL || tuner->free == NULL
867
0
      || tuner->next_cparams == NULL){
868
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
869
0
    dlclose(lib);
870
0
    return BLOSC2_ERROR_FAILURE;
871
0
  }
872
873
0
  return BLOSC2_ERROR_SUCCESS;
874
0
}
875
876
877
60.7k
static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_header* header, bool extended_header) {
878
60.7k
  memset(header, 0, sizeof(blosc_header));
879
880
60.7k
  header->version = BLOSC2_VERSION_FORMAT;
881
60.7k
  header->versionlz = compcode_to_compversion(context->compcode);
882
60.7k
  header->flags = context->header_flags;
883
60.7k
  header->typesize = (uint8_t)context->typesize;
884
60.7k
  header->nbytes = (int32_t)context->sourcesize;
885
60.7k
  header->blocksize = (int32_t)context->blocksize;
886
887
60.7k
  int little_endian = is_little_endian();
888
60.7k
  if (!little_endian) {
889
0
    header->nbytes = bswap32_(header->nbytes);
890
0
    header->blocksize = bswap32_(header->blocksize);
891
    // cbytes written after compression
892
0
  }
893
894
60.7k
  if (extended_header) {
895
    /* Store filter pipeline info at the end of the header */
896
425k
    for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
897
364k
      header->filters[i] = context->filters[i];
898
364k
      header->filters_meta[i] = context->filters_meta[i];
899
364k
    }
900
60.7k
    header->udcompcode = context->compcode;
901
60.7k
    header->compcode_meta = context->compcode_meta;
902
903
60.7k
    if (!little_endian) {
904
0
      header->blosc2_flags |= BLOSC2_BIGENDIAN;
905
0
    }
906
60.7k
    if (context->use_dict) {
907
0
      header->blosc2_flags |= BLOSC2_USEDICT;
908
0
    }
909
60.7k
    if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
910
0
      header->blosc2_flags |= BLOSC2_INSTR_CODEC;
911
0
    }
912
60.7k
  }
913
914
60.7k
  return 0;
915
60.7k
}
916
917
428k
void _cycle_buffers(uint8_t **src, uint8_t **dest, uint8_t **tmp) {
918
428k
  uint8_t *tmp2 = *src;
919
428k
  *src = *dest;
920
428k
  *dest = *tmp;
921
428k
  *tmp = tmp2;
922
428k
}
923
924
uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
925
                          const uint8_t* src, const int32_t offset,
926
379k
                          uint8_t* dest, uint8_t* tmp) {
927
379k
  blosc2_context* context = thread_context->parent_context;
928
379k
  uint8_t* _src = (uint8_t*)src + offset;
929
379k
  uint8_t* _tmp = tmp;
930
379k
  uint8_t* _dest = dest;
931
379k
  int32_t typesize = context->typesize;
932
379k
  uint8_t* filters = context->filters;
933
379k
  uint8_t* filters_meta = context->filters_meta;
934
379k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
935
936
  /* Prefilter function */
937
379k
  if (context->prefilter != NULL) {
938
    /* Set unwritten values to zero */
939
0
    memset(_dest, 0, bsize);
940
    // Create new prefilter parameters for this block (must be private for each thread)
941
0
    blosc2_prefilter_params preparams;
942
0
    memcpy(&preparams, context->preparams, sizeof(preparams));
943
0
    preparams.input = _src;
944
0
    preparams.output = _dest;
945
0
    preparams.output_size = bsize;
946
0
    preparams.output_typesize = typesize;
947
0
    preparams.output_offset = offset;
948
0
    preparams.nblock = offset / context->blocksize;
949
0
    preparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
950
0
    preparams.tid = thread_context->tid;
951
0
    preparams.ttmp = thread_context->tmp;
952
0
    preparams.ttmp_nbytes = thread_context->tmp_nbytes;
953
0
    preparams.ctx = context;
954
955
0
    if (context->prefilter(&preparams) != 0) {
956
0
      BLOSC_TRACE_ERROR("Execution of prefilter function failed");
957
0
      return NULL;
958
0
    }
959
960
0
    if (memcpyed) {
961
      // No more filters are required
962
0
      return _dest;
963
0
    }
964
0
    _cycle_buffers(&_src, &_dest, &_tmp);
965
0
  }
966
967
  /* Process the filter pipeline */
968
2.65M
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
969
2.27M
    int rc = BLOSC2_ERROR_SUCCESS;
970
2.27M
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
971
2.27M
      switch (filters[i]) {
972
30.9k
        case BLOSC_SHUFFLE:
973
30.9k
          shuffle(typesize, bsize, _src, _dest);
974
30.9k
          break;
975
348k
        case BLOSC_BITSHUFFLE:
976
348k
          if (bitshuffle(typesize, bsize, _src, _dest) < 0) {
977
0
            return NULL;
978
0
          }
979
348k
          break;
980
348k
        case BLOSC_DELTA:
981
0
          delta_encoder(src, offset, bsize, typesize, _src, _dest);
982
0
          break;
983
0
        case BLOSC_TRUNC_PREC:
984
0
          if (truncate_precision(filters_meta[i], typesize, bsize, _src, _dest) < 0) {
985
0
            return NULL;
986
0
          }
987
0
          break;
988
1.89M
        default:
989
1.89M
          if (filters[i] != BLOSC_NOFILTER) {
990
0
            BLOSC_TRACE_ERROR("Filter %d not handled during compression\n", filters[i]);
991
0
            return NULL;
992
0
          }
993
2.27M
      }
994
2.27M
    }
995
0
    else {
996
      // Look for the filters_meta in user filters and run it
997
0
      for (uint64_t j = 0; j < g_nfilters; ++j) {
998
0
        if (g_filters[j].id == filters[i]) {
999
0
          if (g_filters[j].forward == NULL) {
1000
            // Dynamically load library
1001
0
            if (fill_filter(&g_filters[j]) < 0) {
1002
0
              BLOSC_TRACE_ERROR("Could not load filter %d\n", g_filters[j].id);
1003
0
              return NULL;
1004
0
            }
1005
0
          }
1006
0
          if (g_filters[j].forward != NULL) {
1007
0
            blosc2_cparams cparams;
1008
0
            blosc2_ctx_get_cparams(context, &cparams);
1009
0
            rc = g_filters[j].forward(_src, _dest, bsize, filters_meta[i], &cparams, g_filters[j].id);
1010
0
          } else {
1011
0
            BLOSC_TRACE_ERROR("Forward function is NULL");
1012
0
            return NULL;
1013
0
          }
1014
0
          if (rc != BLOSC2_ERROR_SUCCESS) {
1015
0
            BLOSC_TRACE_ERROR("User-defined filter %d failed during compression\n", filters[i]);
1016
0
            return NULL;
1017
0
          }
1018
0
          goto urfiltersuccess;
1019
0
        }
1020
0
      }
1021
0
      BLOSC_TRACE_ERROR("User-defined filter %d not found during compression\n", filters[i]);
1022
0
      return NULL;
1023
1024
0
      urfiltersuccess:;
1025
1026
0
    }
1027
1028
    // Cycle buffers when required
1029
2.27M
    if (filters[i] != BLOSC_NOFILTER) {
1030
379k
      _cycle_buffers(&_src, &_dest, &_tmp);
1031
379k
    }
1032
2.27M
  }
1033
379k
  return _src;
1034
379k
}
1035
1036
1037
// Optimized version for detecting runs.  It compares 8 bytes values wherever possible.
1038
509k
static bool get_run(const uint8_t* ip, const uint8_t* ip_bound) {
1039
509k
  uint8_t x = *ip;
1040
509k
  int64_t value, value2;
1041
  /* Broadcast the value for every byte in a 64-bit register */
1042
509k
  memset(&value, x, 8);
1043
4.97M
  while (ip < (ip_bound - 8)) {
1044
#if defined(BLOSC_STRICT_ALIGN)
1045
    memcpy(&value2, ip, 8);
1046
#else
1047
4.97M
    value2 = *(int64_t*)ip;
1048
4.97M
#endif
1049
4.97M
    if (value != value2) {
1050
      // Values differ.  We don't have a run.
1051
501k
      return false;
1052
501k
    }
1053
4.47M
    else {
1054
4.47M
      ip += 8;
1055
4.47M
    }
1056
4.97M
  }
1057
  /* Look into the remainder */
1058
62.5k
  while ((ip < ip_bound) && (*ip == x)) ip++;
1059
7.51k
  return ip == ip_bound ? true : false;
1060
509k
}
1061
1062
1063
/* Shuffle & compress a single block */
1064
static int blosc_c(struct thread_context* thread_context, int32_t bsize,
1065
                   int32_t leftoverblock, int32_t ntbytes, int32_t destsize,
1066
                   const uint8_t* src, const int32_t offset, uint8_t* dest,
1067
509k
                   uint8_t* tmp, uint8_t* tmp2) {
1068
509k
  blosc2_context* context = thread_context->parent_context;
1069
509k
  int dont_split = (context->header_flags & 0x10) >> 4;
1070
509k
  int dict_training = context->use_dict && context->dict_cdict == NULL;
1071
509k
  int32_t j, neblock, nstreams;
1072
509k
  int32_t cbytes;                   /* number of compressed bytes in split */
1073
509k
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
1074
509k
  int32_t maxout;
1075
509k
  int32_t typesize = context->typesize;
1076
509k
  const char* compname;
1077
509k
  int accel;
1078
509k
  const uint8_t* _src;
1079
509k
  uint8_t *_tmp = tmp, *_tmp2 = tmp2;
1080
509k
  int last_filter_index = last_filter(context->filters, 'c');
1081
509k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1082
509k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1083
509k
  blosc_timestamp_t last, current;
1084
509k
  float filter_time = 0.f;
1085
1086
509k
  if (instr_codec) {
1087
0
    blosc_set_timestamp(&last);
1088
0
  }
1089
1090
  // See whether we have a run here
1091
509k
  if (last_filter_index >= 0 || context->prefilter != NULL) {
1092
    /* Apply the filter pipeline just for the prefilter */
1093
379k
    if (memcpyed && context->prefilter != NULL) {
1094
      // We only need the prefilter output
1095
0
      _src = pipeline_forward(thread_context, bsize, src, offset, dest, _tmp2);
1096
0
      if (_src == NULL) {
1097
0
        return BLOSC2_ERROR_FILTER_PIPELINE;
1098
0
      }
1099
0
      return bsize;
1100
0
    }
1101
    /* Apply regular filter pipeline */
1102
379k
    _src = pipeline_forward(thread_context, bsize, src, offset, _tmp, _tmp2);
1103
379k
    if (_src == NULL) {
1104
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1105
0
    }
1106
379k
  } else {
1107
129k
    _src = src + offset;
1108
129k
  }
1109
1110
509k
  if (instr_codec) {
1111
0
    blosc_set_timestamp(&current);
1112
0
    filter_time = (float) blosc_elapsed_secs(last, current);
1113
0
    last = current;
1114
0
  }
1115
1116
509k
  assert(context->clevel > 0);
1117
1118
  /* Calculate acceleration for different compressors */
1119
509k
  accel = get_accel(context);
1120
1121
  /* The number of compressed data streams for this block */
1122
509k
  if (!dont_split && !leftoverblock && !dict_training) {
1123
30.8k
    nstreams = (int32_t)typesize;
1124
30.8k
  }
1125
478k
  else {
1126
478k
    nstreams = 1;
1127
478k
  }
1128
509k
  neblock = bsize / nstreams;
1129
996k
  for (j = 0; j < nstreams; j++) {
1130
509k
    if (instr_codec) {
1131
0
      blosc_set_timestamp(&last);
1132
0
    }
1133
509k
    if (!dict_training) {
1134
509k
      dest += sizeof(int32_t);
1135
509k
      ntbytes += sizeof(int32_t);
1136
509k
      ctbytes += sizeof(int32_t);
1137
1138
509k
      const uint8_t *ip = (uint8_t *) _src + j * neblock;
1139
509k
      const uint8_t *ipbound = (uint8_t *) _src + (j + 1) * neblock;
1140
1141
509k
      if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && get_run(ip, ipbound)) {
1142
        // A run
1143
6.73k
        int32_t value = _src[j * neblock];
1144
6.73k
        if (ntbytes > destsize) {
1145
5
          return 0;    /* Non-compressible data */
1146
5
        }
1147
1148
6.73k
        if (instr_codec) {
1149
0
          blosc_set_timestamp(&current);
1150
0
          int32_t instr_size = sizeof(blosc2_instr);
1151
0
          ntbytes += instr_size;
1152
0
          ctbytes += instr_size;
1153
0
          if (ntbytes > destsize) {
1154
0
            return 0;    /* Non-compressible data */
1155
0
          }
1156
0
          _sw32(dest - 4, instr_size);
1157
0
          blosc2_instr *desti = (blosc2_instr *)dest;
1158
0
          memset(desti, 0, sizeof(blosc2_instr));
1159
          // Special values have an overhead of about 1 int32
1160
0
          int32_t ssize = value == 0 ? sizeof(int32_t) : sizeof(int32_t) + 1;
1161
0
          desti->cratio = (float) neblock / (float) ssize;
1162
0
          float ctime = (float) blosc_elapsed_secs(last, current);
1163
0
          desti->cspeed = (float) neblock / ctime;
1164
0
          desti->filter_speed = (float) neblock / filter_time;
1165
0
          desti->flags[0] = 1;    // mark a runlen
1166
0
          dest += instr_size;
1167
0
          continue;
1168
0
        }
1169
1170
        // Encode the repeated byte in the first (LSB) byte of the length of the split.
1171
6.73k
        _sw32(dest - 4, -value);    // write the value in two's complement
1172
6.73k
        if (value > 0) {
1173
          // Mark encoding as a run-length (== 0 is always a 0's run)
1174
2.40k
          ntbytes += 1;
1175
2.40k
          ctbytes += 1;
1176
2.40k
          if (ntbytes > destsize) {
1177
4
            return 0;    /* Non-compressible data */
1178
4
          }
1179
          // Set MSB bit (sign) to 1 (not really necessary here, but for demonstration purposes)
1180
          // dest[-1] |= 0x80;
1181
2.40k
          dest[0] = 0x1;   // set run-length bit (0) in token
1182
2.40k
          dest += 1;
1183
2.40k
        }
1184
6.72k
        continue;
1185
6.73k
      }
1186
509k
    }
1187
1188
502k
    maxout = neblock;
1189
502k
    if (ntbytes + maxout > destsize && !instr_codec) {
1190
      /* avoid buffer * overrun */
1191
51.8k
      maxout = destsize - ntbytes;
1192
51.8k
      if (maxout <= 0) {
1193
100
        return 0;                  /* non-compressible block */
1194
100
      }
1195
51.8k
    }
1196
502k
    if (dict_training) {
1197
      // We are in the build dict state, so don't compress
1198
      // TODO: copy only a percentage for sampling
1199
0
      memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1200
0
      cbytes = (int32_t)neblock;
1201
0
    }
1202
502k
    else if (context->compcode == BLOSC_BLOSCLZ) {
1203
58.5k
      cbytes = blosclz_compress(context->clevel, _src + j * neblock,
1204
58.5k
                                (int)neblock, dest, maxout, context);
1205
58.5k
    }
1206
443k
    else if (context->compcode == BLOSC_LZ4) {
1207
24.1k
      void *hash_table = NULL;
1208
    #ifdef HAVE_IPP
1209
      hash_table = (void*)thread_context->lz4_hash_table;
1210
    #endif
1211
24.1k
      cbytes = lz4_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1212
24.1k
                                 (char*)dest, (size_t)maxout, accel, hash_table);
1213
24.1k
    }
1214
419k
    else if (context->compcode == BLOSC_LZ4HC) {
1215
18.0k
      cbytes = lz4hc_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1216
18.0k
                                   (char*)dest, (size_t)maxout, context->clevel);
1217
18.0k
    }
1218
401k
  #if defined(HAVE_ZLIB)
1219
401k
    else if (context->compcode == BLOSC_ZLIB) {
1220
35.5k
      cbytes = zlib_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1221
35.5k
                                  (char*)dest, (size_t)maxout, context->clevel);
1222
35.5k
    }
1223
365k
  #endif /* HAVE_ZLIB */
1224
365k
  #if defined(HAVE_ZSTD)
1225
365k
    else if (context->compcode == BLOSC_ZSTD) {
1226
365k
      cbytes = zstd_wrap_compress(thread_context,
1227
365k
                                  (char*)_src + j * neblock, (size_t)neblock,
1228
365k
                                  (char*)dest, (size_t)maxout, context->clevel);
1229
365k
    }
1230
0
  #endif /* HAVE_ZSTD */
1231
0
    else if (context->compcode > BLOSC2_DEFINED_CODECS_STOP) {
1232
0
      for (int i = 0; i < g_ncodecs; ++i) {
1233
0
        if (g_codecs[i].compcode == context->compcode) {
1234
0
          if (g_codecs[i].encoder == NULL) {
1235
            // Dynamically load codec plugin
1236
0
            if (fill_codec(&g_codecs[i]) < 0) {
1237
0
              BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1238
0
              return BLOSC2_ERROR_CODEC_SUPPORT;
1239
0
            }
1240
0
          }
1241
0
          blosc2_cparams cparams;
1242
0
          blosc2_ctx_get_cparams(context, &cparams);
1243
0
          cbytes = g_codecs[i].encoder(_src + j * neblock,
1244
0
                                        neblock,
1245
0
                                        dest,
1246
0
                                        maxout,
1247
0
                                        context->compcode_meta,
1248
0
                                        &cparams,
1249
0
                                        context->src);
1250
0
          goto urcodecsuccess;
1251
0
        }
1252
0
      }
1253
0
      BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during compression", context->compcode);
1254
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1255
0
    urcodecsuccess:
1256
0
      ;
1257
0
    } else {
1258
0
      blosc2_compcode_to_compname(context->compcode, &compname);
1259
0
      BLOSC_TRACE_ERROR("Blosc has not been compiled with '%s' compression support."
1260
0
                        "Please use one having it.", compname);
1261
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1262
0
    }
1263
1264
502k
    if (cbytes > maxout) {
1265
      /* Buffer overrun caused by compression (should never happen) */
1266
0
      return BLOSC2_ERROR_WRITE_BUFFER;
1267
0
    }
1268
502k
    if (cbytes < 0) {
1269
      /* cbytes should never be negative */
1270
0
      return BLOSC2_ERROR_DATA;
1271
0
    }
1272
502k
    if (cbytes == 0) {
1273
      // When cbytes is 0, the compressor has not been able to compress anything
1274
82.0k
      cbytes = neblock;
1275
82.0k
    }
1276
1277
502k
    if (instr_codec) {
1278
0
      blosc_set_timestamp(&current);
1279
0
      int32_t instr_size = sizeof(blosc2_instr);
1280
0
      ntbytes += instr_size;
1281
0
      ctbytes += instr_size;
1282
0
      if (ntbytes > destsize) {
1283
0
        return 0;    /* Non-compressible data */
1284
0
      }
1285
0
      _sw32(dest - 4, instr_size);
1286
0
      float ctime = (float)blosc_elapsed_secs(last, current);
1287
0
      blosc2_instr *desti = (blosc2_instr *)dest;
1288
0
      memset(desti, 0, sizeof(blosc2_instr));
1289
      // cratio is computed having into account 1 additional int (csize)
1290
0
      desti->cratio = (float)neblock / (float)(cbytes + sizeof(int32_t));
1291
0
      desti->cspeed = (float)neblock / ctime;
1292
0
      desti->filter_speed = (float) neblock / filter_time;
1293
0
      dest += instr_size;
1294
0
      continue;
1295
0
    }
1296
1297
502k
    if (!dict_training) {
1298
502k
      if (cbytes == neblock) {
1299
        /* The compressor has been unable to compress data at all. */
1300
        /* Before doing the copy, check that we are not running into a
1301
           buffer overflow. */
1302
82.4k
        if ((ntbytes + neblock) > destsize) {
1303
21.1k
          return 0;    /* Non-compressible data */
1304
21.1k
        }
1305
61.3k
        memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1306
61.3k
        cbytes = neblock;
1307
61.3k
      }
1308
481k
      _sw32(dest - 4, cbytes);
1309
481k
    }
1310
481k
    dest += cbytes;
1311
481k
    ntbytes += cbytes;
1312
481k
    ctbytes += cbytes;
1313
481k
  }  /* Closes j < nstreams */
1314
1315
487k
  return ctbytes;
1316
509k
}
1317
1318
1319
/* Process the filter pipeline (decompression mode) */
1320
int pipeline_backward(struct thread_context* thread_context, const int32_t bsize, uint8_t* dest,
1321
                      const int32_t offset, uint8_t* src, uint8_t* tmp,
1322
27.4k
                      uint8_t* tmp2, int last_filter_index, int32_t nblock) {
1323
27.4k
  blosc2_context* context = thread_context->parent_context;
1324
27.4k
  int32_t typesize = context->typesize;
1325
27.4k
  uint8_t* filters = context->filters;
1326
27.4k
  uint8_t* filters_meta = context->filters_meta;
1327
27.4k
  uint8_t* _src = src;
1328
27.4k
  uint8_t* _dest = tmp;
1329
27.4k
  uint8_t* _tmp = tmp2;
1330
27.4k
  int errcode = 0;
1331
1332
64.9k
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
1333
    // Delta filter requires the whole chunk ready
1334
64.9k
    int last_copy_filter = (last_filter_index == i) || (next_filter(filters, i, 'd') == BLOSC_DELTA);
1335
64.9k
    if (last_copy_filter && context->postfilter == NULL) {
1336
42.8k
      _dest = dest + offset;
1337
42.8k
    }
1338
64.9k
    int rc = BLOSC2_ERROR_SUCCESS;
1339
64.9k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1340
59.7k
      switch (filters[i]) {
1341
11.4k
        case BLOSC_SHUFFLE:
1342
11.4k
          unshuffle(typesize, bsize, _src, _dest);
1343
11.4k
          break;
1344
18.4k
        case BLOSC_BITSHUFFLE:
1345
18.4k
          if (bitunshuffle(typesize, bsize, _src, _dest, context->src[BLOSC2_CHUNK_VERSION]) < 0) {
1346
0
            return BLOSC2_ERROR_FILTER_PIPELINE;
1347
0
          }
1348
18.4k
          break;
1349
18.4k
        case BLOSC_DELTA:
1350
14.2k
          if (context->nthreads == 1) {
1351
            /* Serial mode */
1352
14.2k
            delta_decoder(dest, offset, bsize, typesize, _dest);
1353
14.2k
          } else {
1354
            /* Force the thread in charge of the block 0 to go first */
1355
0
            pthread_mutex_lock(&context->delta_mutex);
1356
0
            if (context->dref_not_init) {
1357
0
              if (offset != 0) {
1358
0
                pthread_cond_wait(&context->delta_cv, &context->delta_mutex);
1359
0
              } else {
1360
0
                delta_decoder(dest, offset, bsize, typesize, _dest);
1361
0
                context->dref_not_init = 0;
1362
0
                pthread_cond_broadcast(&context->delta_cv);
1363
0
              }
1364
0
            }
1365
0
            pthread_mutex_unlock(&context->delta_mutex);
1366
0
            if (offset != 0) {
1367
0
              delta_decoder(dest, offset, bsize, typesize, _dest);
1368
0
            }
1369
0
          }
1370
14.2k
          break;
1371
3.47k
        case BLOSC_TRUNC_PREC:
1372
          // TRUNC_PREC filter does not need to be undone
1373
3.47k
          break;
1374
12.2k
        default:
1375
12.2k
          if (filters[i] != BLOSC_NOFILTER) {
1376
212
            BLOSC_TRACE_ERROR("Filter %d not handled during decompression.",
1377
0
                              filters[i]);
1378
0
            errcode = -1;
1379
212
          }
1380
59.7k
      }
1381
59.7k
    } else {
1382
        // Look for the filters_meta in user filters and run it
1383
20.7k
        for (uint64_t j = 0; j < g_nfilters; ++j) {
1384
20.5k
          if (g_filters[j].id == filters[i]) {
1385
4.95k
            if (g_filters[j].backward == NULL) {
1386
              // Dynamically load filter
1387
0
              if (fill_filter(&g_filters[j]) < 0) {
1388
0
                BLOSC_TRACE_ERROR("Could not load filter %d.", g_filters[j].id);
1389
0
                return BLOSC2_ERROR_FILTER_PIPELINE;
1390
0
              }
1391
0
            }
1392
4.95k
            if (g_filters[j].backward != NULL) {
1393
4.95k
              blosc2_dparams dparams;
1394
4.95k
              blosc2_ctx_get_dparams(context, &dparams);
1395
4.95k
              rc = g_filters[j].backward(_src, _dest, bsize, filters_meta[i], &dparams, g_filters[j].id);
1396
4.95k
            } else {
1397
0
              BLOSC_TRACE_ERROR("Backward function is NULL");
1398
0
              return BLOSC2_ERROR_FILTER_PIPELINE;
1399
0
            }
1400
4.95k
            if (rc != BLOSC2_ERROR_SUCCESS) {
1401
32
              BLOSC_TRACE_ERROR("User-defined filter %d failed during decompression.", filters[i]);
1402
0
              return rc;
1403
32
            }
1404
4.91k
            goto urfiltersuccess;
1405
4.95k
          }
1406
20.5k
        }
1407
197
      BLOSC_TRACE_ERROR("User-defined filter %d not found during decompression.", filters[i]);
1408
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1409
4.91k
      urfiltersuccess:;
1410
4.91k
    }
1411
1412
    // Cycle buffers when required
1413
64.7k
    if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
1414
49.1k
      _cycle_buffers(&_src, &_dest, &_tmp);
1415
49.1k
    }
1416
64.7k
    if (last_filter_index == i) {
1417
27.2k
      break;
1418
27.2k
    }
1419
64.7k
  }
1420
1421
  /* Postfilter function */
1422
27.2k
  if (context->postfilter != NULL) {
1423
    // Create new postfilter parameters for this block (must be private for each thread)
1424
0
    blosc2_postfilter_params postparams;
1425
0
    memcpy(&postparams, context->postparams, sizeof(postparams));
1426
0
    postparams.input = _src;
1427
0
    postparams.output = dest + offset;
1428
0
    postparams.size = bsize;
1429
0
    postparams.typesize = typesize;
1430
0
    postparams.offset = nblock * context->blocksize;
1431
0
    postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1432
0
    postparams.nblock = nblock;
1433
0
    postparams.tid = thread_context->tid;
1434
0
    postparams.ttmp = thread_context->tmp;
1435
0
    postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1436
0
    postparams.ctx = context;
1437
1438
0
    if (context->postfilter(&postparams) != 0) {
1439
0
      BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1440
0
      return BLOSC2_ERROR_POSTFILTER;
1441
0
    }
1442
0
  }
1443
1444
27.2k
  return errcode;
1445
27.2k
}
1446
1447
1448
1.30k
static int32_t set_nans(int32_t typesize, uint8_t* dest, int32_t destsize) {
1449
1.30k
  if (destsize % typesize != 0) {
1450
20
    BLOSC_TRACE_ERROR("destsize can only be a multiple of typesize");
1451
20
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1452
20
  }
1453
1.28k
  int32_t nitems = destsize / typesize;
1454
1.28k
  if (nitems == 0) {
1455
0
    return 0;
1456
0
  }
1457
1458
1.28k
  if (typesize == 4) {
1459
585
    float* dest_ = (float*)dest;
1460
585
    float val = nanf("");
1461
6.56k
    for (int i = 0; i < nitems; i++) {
1462
5.98k
      dest_[i] = val;
1463
5.98k
    }
1464
585
    return nitems;
1465
585
  }
1466
704
  else if (typesize == 8) {
1467
650
    double* dest_ = (double*)dest;
1468
650
    double val = nan("");
1469
3.21k
    for (int i = 0; i < nitems; i++) {
1470
2.56k
      dest_[i] = val;
1471
2.56k
    }
1472
650
    return nitems;
1473
650
  }
1474
1475
54
  BLOSC_TRACE_ERROR("Unsupported typesize for NaN");
1476
0
  return BLOSC2_ERROR_DATA;
1477
1.28k
}
1478
1479
1480
3.66k
static int32_t set_values(int32_t typesize, const uint8_t* src, uint8_t* dest, int32_t destsize) {
1481
#if defined(BLOSC_STRICT_ALIGN)
1482
  if (destsize % typesize != 0) {
1483
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1484
  }
1485
  int32_t nitems = destsize / typesize;
1486
  if (nitems == 0) {
1487
    return 0;
1488
  }
1489
  for (int i = 0; i < nitems; i++) {
1490
    memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1491
  }
1492
#else
1493
  // destsize can only be a multiple of typesize
1494
3.66k
  int64_t val8;
1495
3.66k
  int64_t* dest8;
1496
3.66k
  int32_t val4;
1497
3.66k
  int32_t* dest4;
1498
3.66k
  int16_t val2;
1499
3.66k
  int16_t* dest2;
1500
3.66k
  int8_t val1;
1501
3.66k
  int8_t* dest1;
1502
1503
3.66k
  if (destsize % typesize != 0) {
1504
30
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1505
30
  }
1506
3.63k
  int32_t nitems = destsize / typesize;
1507
3.63k
  if (nitems == 0) {
1508
0
    return 0;
1509
0
  }
1510
1511
3.63k
  switch (typesize) {
1512
538
    case 8:
1513
538
      val8 = ((int64_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1514
538
      dest8 = (int64_t*)dest;
1515
3.44k
      for (int i = 0; i < nitems; i++) {
1516
2.90k
        dest8[i] = val8;
1517
2.90k
      }
1518
538
      break;
1519
755
    case 4:
1520
755
      val4 = ((int32_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1521
755
      dest4 = (int32_t*)dest;
1522
7.71k
      for (int i = 0; i < nitems; i++) {
1523
6.95k
        dest4[i] = val4;
1524
6.95k
      }
1525
755
      break;
1526
576
    case 2:
1527
576
      val2 = ((int16_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1528
576
      dest2 = (int16_t*)dest;
1529
76.3k
      for (int i = 0; i < nitems; i++) {
1530
75.7k
        dest2[i] = val2;
1531
75.7k
      }
1532
576
      break;
1533
1.15k
    case 1:
1534
1.15k
      val1 = ((int8_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1535
1.15k
      dest1 = (int8_t*)dest;
1536
800k
      for (int i = 0; i < nitems; i++) {
1537
799k
        dest1[i] = val1;
1538
799k
      }
1539
1.15k
      break;
1540
613
    default:
1541
3.52k
      for (int i = 0; i < nitems; i++) {
1542
2.91k
        memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1543
2.91k
      }
1544
3.63k
  }
1545
3.63k
#endif
1546
1547
3.63k
  return nitems;
1548
3.63k
}
1549
1550
1551
/* Decompress & unshuffle a single block */
1552
static int blosc_d(
1553
    struct thread_context* thread_context, int32_t bsize,
1554
    int32_t leftoverblock, bool memcpyed, const uint8_t* src, int32_t srcsize, int32_t src_offset,
1555
206k
    int32_t nblock, uint8_t* dest, int32_t dest_offset, uint8_t* tmp, uint8_t* tmp2) {
1556
206k
  blosc2_context* context = thread_context->parent_context;
1557
206k
  uint8_t* filters = context->filters;
1558
206k
  uint8_t *tmp3 = thread_context->tmp4;
1559
206k
  int32_t compformat = (context->header_flags & (uint8_t)0xe0) >> 5u;
1560
206k
  int dont_split = (context->header_flags & 0x10) >> 4;
1561
206k
  int32_t chunk_nbytes;
1562
206k
  int32_t chunk_cbytes;
1563
206k
  int nstreams;
1564
206k
  int32_t neblock;
1565
206k
  int32_t nbytes;                /* number of decompressed bytes in split */
1566
206k
  int32_t cbytes;                /* number of compressed bytes in split */
1567
  // int32_t ctbytes = 0;           /* number of compressed bytes in block */
1568
206k
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
1569
206k
  uint8_t* _dest;
1570
206k
  int32_t typesize = context->typesize;
1571
206k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1572
206k
  const char* compname;
1573
206k
  int rc;
1574
1575
206k
  if (context->block_maskout != NULL && context->block_maskout[nblock]) {
1576
    // Do not decompress, but act as if we successfully decompressed everything
1577
0
    return bsize;
1578
0
  }
1579
1580
206k
  rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, NULL);
1581
206k
  if (rc < 0) {
1582
0
    return rc;
1583
0
  }
1584
1585
  // In some situations (lazychunks) the context can arrive uninitialized
1586
  // (but BITSHUFFLE needs it for accessing the format of the chunk)
1587
206k
  if (context->src == NULL) {
1588
0
    context->src = src;
1589
0
  }
1590
1591
  // Chunks with special values cannot be lazy
1592
206k
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
1593
206k
          (context->blosc2_flags & 0x08u) && !context->special_type);
1594
206k
  if (is_lazy) {
1595
    // The chunk is on disk, so just lazily load the block
1596
2
    if (context->schunk == NULL) {
1597
2
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk.");
1598
0
      return BLOSC2_ERROR_INVALID_PARAM;
1599
2
    }
1600
0
    if (context->schunk->frame == NULL) {
1601
0
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated frame.");
1602
0
      return BLOSC2_ERROR_INVALID_PARAM;
1603
0
    }
1604
0
    blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
1605
0
    char* urlpath = frame->urlpath;
1606
0
    size_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH + context->nblocks * sizeof(int32_t);
1607
0
    int32_t nchunk;
1608
0
    int64_t chunk_offset;
1609
    // The nchunk and the offset of the current chunk are in the trailer
1610
0
    nchunk = *(int32_t*)(src + trailer_offset);
1611
0
    chunk_offset = *(int64_t*)(src + trailer_offset + sizeof(int32_t));
1612
    // Get the csize of the nblock
1613
0
    int32_t *block_csizes = (int32_t *)(src + trailer_offset + sizeof(int32_t) + sizeof(int64_t));
1614
0
    int32_t block_csize = block_csizes[nblock];
1615
    // Read the lazy block on disk
1616
0
    void* fp = NULL;
1617
0
    blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
1618
0
    if (io_cb == NULL) {
1619
0
      BLOSC_TRACE_ERROR("Error getting the input/output API");
1620
0
      return BLOSC2_ERROR_PLUGIN_IO;
1621
0
    }
1622
1623
0
    int64_t io_pos = 0;
1624
0
    if (frame->sframe) {
1625
      // The chunk is not in the frame
1626
0
      char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
1627
0
      BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
1628
0
      sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk);
1629
0
      fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
1630
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1631
0
      free(chunkpath);
1632
      // The offset of the block is src_offset
1633
0
      io_pos = src_offset;
1634
0
    }
1635
0
    else {
1636
0
      fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
1637
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1638
      // The offset of the block is src_offset
1639
0
      io_pos = frame->file_offset + chunk_offset + src_offset;
1640
0
    }
1641
    // We can make use of tmp3 because it will be used after src is not needed anymore
1642
0
    int64_t rbytes = io_cb->read((void**)&tmp3, 1, block_csize, io_pos, fp);
1643
0
    io_cb->close(fp);
1644
0
    if ((int32_t)rbytes != block_csize) {
1645
0
      BLOSC_TRACE_ERROR("Cannot read the (lazy) block out of the fileframe.");
1646
0
      return BLOSC2_ERROR_READ_BUFFER;
1647
0
    }
1648
0
    src = tmp3;
1649
0
    src_offset = 0;
1650
0
    srcsize = block_csize;
1651
0
  }
1652
1653
  // If the chunk is memcpyed, we just have to copy the block to dest and return
1654
206k
  if (memcpyed) {
1655
159k
    int bsize_ = leftoverblock ? chunk_nbytes % context->blocksize : bsize;
1656
159k
    if (!context->special_type) {
1657
19.7k
      if (chunk_nbytes + context->header_overhead != chunk_cbytes) {
1658
0
        return BLOSC2_ERROR_WRITE_BUFFER;
1659
0
      }
1660
19.7k
      if (chunk_cbytes < context->header_overhead + (nblock * context->blocksize) + bsize_) {
1661
        /* Not enough input to copy block */
1662
0
        return BLOSC2_ERROR_READ_BUFFER;
1663
0
      }
1664
19.7k
    }
1665
159k
    if (!is_lazy) {
1666
159k
      src += context->header_overhead + nblock * context->blocksize;
1667
159k
    }
1668
159k
    _dest = dest + dest_offset;
1669
159k
    if (context->postfilter != NULL) {
1670
      // We are making use of a postfilter, so use a temp for destination
1671
0
      _dest = tmp;
1672
0
    }
1673
159k
    rc = 0;
1674
159k
    switch (context->special_type) {
1675
3.66k
      case BLOSC2_SPECIAL_VALUE:
1676
        // All repeated values
1677
3.66k
        rc = set_values(context->typesize, context->src, _dest, bsize_);
1678
3.66k
        if (rc < 0) {
1679
30
          BLOSC_TRACE_ERROR("set_values failed");
1680
0
          return BLOSC2_ERROR_DATA;
1681
30
        }
1682
3.63k
        break;
1683
3.63k
      case BLOSC2_SPECIAL_NAN:
1684
1.30k
        rc = set_nans(context->typesize, _dest, bsize_);
1685
1.30k
        if (rc < 0) {
1686
74
          BLOSC_TRACE_ERROR("set_nans failed");
1687
0
          return BLOSC2_ERROR_DATA;
1688
74
        }
1689
1.23k
        break;
1690
133k
      case BLOSC2_SPECIAL_ZERO:
1691
133k
        memset(_dest, 0, bsize_);
1692
133k
        break;
1693
1.33k
      case BLOSC2_SPECIAL_UNINIT:
1694
        // We do nothing here
1695
1.33k
        break;
1696
19.7k
      default:
1697
19.7k
        memcpy(_dest, src, bsize_);
1698
159k
    }
1699
159k
    if (context->postfilter != NULL) {
1700
      // Create new postfilter parameters for this block (must be private for each thread)
1701
0
      blosc2_postfilter_params postparams;
1702
0
      memcpy(&postparams, context->postparams, sizeof(postparams));
1703
0
      postparams.input = tmp;
1704
0
      postparams.output = dest + dest_offset;
1705
0
      postparams.size = bsize;
1706
0
      postparams.typesize = typesize;
1707
0
      postparams.offset = nblock * context->blocksize;
1708
0
      postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1709
0
      postparams.nblock = nblock;
1710
0
      postparams.tid = thread_context->tid;
1711
0
      postparams.ttmp = thread_context->tmp;
1712
0
      postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1713
0
      postparams.ctx = context;
1714
1715
      // Execute the postfilter (the processed block will be copied to dest)
1716
0
      if (context->postfilter(&postparams) != 0) {
1717
0
        BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1718
0
        return BLOSC2_ERROR_POSTFILTER;
1719
0
      }
1720
0
    }
1721
159k
    thread_context->zfp_cell_nitems = 0;
1722
1723
159k
    return bsize_;
1724
159k
  }
1725
1726
47.2k
  if (!is_lazy && (src_offset <= 0 || src_offset >= srcsize)) {
1727
    /* Invalid block src offset encountered */
1728
605
    return BLOSC2_ERROR_DATA;
1729
605
  }
1730
1731
46.6k
  src += src_offset;
1732
46.6k
  srcsize -= src_offset;
1733
1734
46.6k
  int last_filter_index = last_filter(filters, 'd');
1735
46.6k
  if (instr_codec) {
1736
    // If instrumented, we don't want to run the filters
1737
2.73k
    _dest = dest + dest_offset;
1738
2.73k
  }
1739
43.9k
  else if (((last_filter_index >= 0) &&
1740
43.9k
       (next_filter(filters, BLOSC2_MAX_FILTERS, 'd') != BLOSC_DELTA)) ||
1741
43.9k
    context->postfilter != NULL) {
1742
    // We are making use of some filter, so use a temp for destination
1743
29.4k
    _dest = tmp;
1744
29.4k
  }
1745
14.4k
  else {
1746
    // If no filters, or only DELTA in pipeline
1747
14.4k
    _dest = dest + dest_offset;
1748
14.4k
  }
1749
1750
  /* The number of compressed data streams for this block */
1751
46.6k
  if (!dont_split && !leftoverblock) {
1752
8.50k
    nstreams = (int32_t)typesize;
1753
8.50k
  }
1754
38.1k
  else {
1755
38.1k
    nstreams = 1;
1756
38.1k
  }
1757
1758
46.6k
  neblock = bsize / nstreams;
1759
46.6k
  if (neblock == 0) {
1760
    /* Not enough space to output bytes */
1761
3
    BLOSC_ERROR(BLOSC2_ERROR_WRITE_BUFFER);
1762
3
  }
1763
87.0k
  for (int j = 0; j < nstreams; j++) {
1764
48.6k
    if (srcsize < (signed)sizeof(int32_t)) {
1765
      /* Not enough input to read compressed size */
1766
28
      return BLOSC2_ERROR_READ_BUFFER;
1767
28
    }
1768
48.6k
    srcsize -= sizeof(int32_t);
1769
48.6k
    cbytes = sw32_(src);      /* amount of compressed bytes */
1770
48.6k
    if (cbytes > 0) {
1771
42.1k
      if (srcsize < cbytes) {
1772
        /* Not enough input to read compressed bytes */
1773
210
        return BLOSC2_ERROR_READ_BUFFER;
1774
210
      }
1775
41.9k
      srcsize -= cbytes;
1776
41.9k
    }
1777
48.4k
    src += sizeof(int32_t);
1778
    // ctbytes += (signed)sizeof(int32_t);
1779
1780
    /* Uncompress */
1781
48.4k
    if (cbytes == 0) {
1782
      // A run of 0's
1783
3.79k
      memset(_dest, 0, (unsigned int)neblock);
1784
3.79k
      nbytes = neblock;
1785
3.79k
    }
1786
44.6k
    else if (cbytes < 0) {
1787
      // A negative number means some encoding depending on the token that comes next
1788
2.68k
      uint8_t token;
1789
1790
2.68k
      if (srcsize < (signed)sizeof(uint8_t)) {
1791
        // Not enough input to read token */
1792
84
        return BLOSC2_ERROR_READ_BUFFER;
1793
84
      }
1794
2.60k
      srcsize -= sizeof(uint8_t);
1795
1796
2.60k
      token = src[0];
1797
2.60k
      src += 1;
1798
      // ctbytes += 1;
1799
1800
2.60k
      if (token & 0x1) {
1801
        // A run of bytes that are different than 0
1802
2.56k
        if (cbytes < -255) {
1803
          // Runs can only encode a byte
1804
176
          return BLOSC2_ERROR_RUN_LENGTH;
1805
176
        }
1806
2.38k
        uint8_t value = -cbytes;
1807
2.38k
        memset(_dest, value, (unsigned int)neblock);
1808
2.38k
      } else {
1809
35
        BLOSC_TRACE_ERROR("Invalid or unsupported compressed stream token value - %d", token);
1810
0
        return BLOSC2_ERROR_RUN_LENGTH;
1811
35
      }
1812
2.38k
      nbytes = neblock;
1813
2.38k
      cbytes = 0;  // everything is encoded in the cbytes token
1814
2.38k
    }
1815
41.9k
    else if (cbytes == neblock) {
1816
14.7k
      memcpy(_dest, src, (unsigned int)neblock);
1817
14.7k
      nbytes = (int32_t)neblock;
1818
14.7k
    }
1819
27.1k
    else {
1820
27.1k
      if (compformat == BLOSC_BLOSCLZ_FORMAT) {
1821
11.1k
        nbytes = blosclz_decompress(src, cbytes, _dest, (int)neblock);
1822
11.1k
      }
1823
16.0k
      else if (compformat == BLOSC_LZ4_FORMAT) {
1824
1.99k
        nbytes = lz4_wrap_decompress((char*)src, (size_t)cbytes,
1825
1.99k
                                     (char*)_dest, (size_t)neblock);
1826
1.99k
      }
1827
14.0k
  #if defined(HAVE_ZLIB)
1828
14.0k
      else if (compformat == BLOSC_ZLIB_FORMAT) {
1829
3.78k
        nbytes = zlib_wrap_decompress((char*)src, (size_t)cbytes,
1830
3.78k
                                      (char*)_dest, (size_t)neblock);
1831
3.78k
      }
1832
10.2k
  #endif /*  HAVE_ZLIB */
1833
10.2k
  #if defined(HAVE_ZSTD)
1834
10.2k
      else if (compformat == BLOSC_ZSTD_FORMAT) {
1835
8.53k
        nbytes = zstd_wrap_decompress(thread_context,
1836
8.53k
                                      (char*)src, (size_t)cbytes,
1837
8.53k
                                      (char*)_dest, (size_t)neblock);
1838
8.53k
      }
1839
1.68k
  #endif /*  HAVE_ZSTD */
1840
1.68k
      else if (compformat == BLOSC_UDCODEC_FORMAT) {
1841
1.66k
        bool getcell = false;
1842
1843
1.66k
#if defined(HAVE_PLUGINS)
1844
1.66k
        if ((context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) &&
1845
1.66k
            (thread_context->zfp_cell_nitems > 0)) {
1846
0
          nbytes = zfp_getcell(thread_context, src, cbytes, _dest, neblock);
1847
0
          if (nbytes < 0) {
1848
0
            return BLOSC2_ERROR_DATA;
1849
0
          }
1850
0
          if (nbytes == thread_context->zfp_cell_nitems * typesize) {
1851
0
            getcell = true;
1852
0
          }
1853
0
        }
1854
1.66k
#endif /* HAVE_PLUGINS */
1855
1.66k
        if (!getcell) {
1856
1.66k
          thread_context->zfp_cell_nitems = 0;
1857
1.75k
          for (int i = 0; i < g_ncodecs; ++i) {
1858
1.74k
            if (g_codecs[i].compcode == context->compcode) {
1859
1.65k
              if (g_codecs[i].decoder == NULL) {
1860
                // Dynamically load codec plugin
1861
1
                if (fill_codec(&g_codecs[i]) < 0) {
1862
1
                  BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1863
0
                  return BLOSC2_ERROR_CODEC_SUPPORT;
1864
1
                }
1865
1
              }
1866
1.65k
              blosc2_dparams dparams;
1867
1.65k
              blosc2_ctx_get_dparams(context, &dparams);
1868
1.65k
              nbytes = g_codecs[i].decoder(src,
1869
1.65k
                                           cbytes,
1870
1.65k
                                           _dest,
1871
1.65k
                                           neblock,
1872
1.65k
                                           context->compcode_meta,
1873
1.65k
                                           &dparams,
1874
1.65k
                                           context->src);
1875
1.65k
              goto urcodecsuccess;
1876
1.65k
            }
1877
1.74k
          }
1878
13
          BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during decompression", context->compcode);
1879
0
          return BLOSC2_ERROR_CODEC_SUPPORT;
1880
1.66k
        }
1881
1.65k
      urcodecsuccess:
1882
1.65k
        ;
1883
1.65k
      }
1884
17
      else {
1885
17
        compname = clibcode_to_clibname(compformat);
1886
17
        BLOSC_TRACE_ERROR(
1887
0
                "Blosc has not been compiled with decompression "
1888
0
                "support for '%s' format.  "
1889
0
                "Please recompile for adding this support.", compname);
1890
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
1891
17
      }
1892
1893
      /* Check that decompressed bytes number is correct */
1894
27.1k
      if ((nbytes != neblock) && (thread_context->zfp_cell_nitems == 0)) {
1895
7.68k
        return BLOSC2_ERROR_DATA;
1896
7.68k
      }
1897
1898
27.1k
    }
1899
40.3k
    src += cbytes;
1900
    // ctbytes += cbytes;
1901
40.3k
    _dest += nbytes;
1902
40.3k
    ntbytes += nbytes;
1903
40.3k
  } /* Closes j < nstreams */
1904
1905
38.4k
  if (!instr_codec) {
1906
36.5k
    if (last_filter_index >= 0 || context->postfilter != NULL) {
1907
      /* Apply regular filter pipeline */
1908
27.4k
      int errcode = pipeline_backward(thread_context, bsize, dest, dest_offset, tmp, tmp2, tmp3,
1909
27.4k
                                      last_filter_index, nblock);
1910
27.4k
      if (errcode < 0)
1911
346
        return errcode;
1912
27.4k
    }
1913
36.5k
  }
1914
1915
  /* Return the number of uncompressed bytes */
1916
38.0k
  return (int)ntbytes;
1917
38.4k
}
1918
1919
1920
/* Serial version for compression/decompression */
1921
104k
static int serial_blosc(struct thread_context* thread_context) {
1922
104k
  blosc2_context* context = thread_context->parent_context;
1923
104k
  int32_t j, bsize, leftoverblock;
1924
104k
  int32_t cbytes;
1925
104k
  int32_t ntbytes = context->output_bytes;
1926
104k
  int32_t* bstarts = context->bstarts;
1927
104k
  uint8_t* tmp = thread_context->tmp;
1928
104k
  uint8_t* tmp2 = thread_context->tmp2;
1929
104k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
1930
104k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1931
104k
  if (!context->do_compress && context->special_type) {
1932
    // Fake a runlen as if it was a memcpyed chunk
1933
1.22k
    memcpyed = true;
1934
1.22k
  }
1935
1936
806k
  for (j = 0; j < context->nblocks; j++) {
1937
733k
    if (context->do_compress && !memcpyed && !dict_training) {
1938
509k
      _sw32(bstarts + j, ntbytes);
1939
509k
    }
1940
733k
    bsize = context->blocksize;
1941
733k
    leftoverblock = 0;
1942
733k
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
1943
6.77k
      bsize = context->leftover;
1944
6.77k
      leftoverblock = 1;
1945
6.77k
    }
1946
733k
    if (context->do_compress) {
1947
526k
      if (memcpyed && !context->prefilter) {
1948
        /* We want to memcpy only */
1949
17.8k
        memcpy(context->dest + context->header_overhead + j * context->blocksize,
1950
17.8k
               context->src + j * context->blocksize, (unsigned int)bsize);
1951
17.8k
        cbytes = (int32_t)bsize;
1952
17.8k
      }
1953
509k
      else {
1954
        /* Regular compression */
1955
509k
        cbytes = blosc_c(thread_context, bsize, leftoverblock, ntbytes,
1956
509k
                         context->destsize, context->src, j * context->blocksize,
1957
509k
                         context->dest + ntbytes, tmp, tmp2);
1958
509k
        if (cbytes == 0) {
1959
21.2k
          ntbytes = 0;              /* incompressible data */
1960
21.2k
          break;
1961
21.2k
        }
1962
509k
      }
1963
526k
    }
1964
206k
    else {
1965
      /* Regular decompression */
1966
      // If memcpyed we don't have a bstarts section (because it is not needed)
1967
206k
      int32_t src_offset = memcpyed ?
1968
159k
          context->header_overhead + j * context->blocksize : sw32_(bstarts + j);
1969
206k
      cbytes = blosc_d(thread_context, bsize, leftoverblock, memcpyed,
1970
206k
                       context->src, context->srcsize, src_offset, j,
1971
206k
                       context->dest, j * context->blocksize, tmp, tmp2);
1972
206k
    }
1973
1974
712k
    if (cbytes < 0) {
1975
9.30k
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
1976
9.30k
      break;
1977
9.30k
    }
1978
702k
    ntbytes += cbytes;
1979
702k
  }
1980
1981
104k
  return ntbytes;
1982
104k
}
1983
1984
static void t_blosc_do_job(void *ctxt);
1985
1986
/* Threaded version for compression/decompression */
1987
0
static int parallel_blosc(blosc2_context* context) {
1988
0
#ifdef BLOSC_POSIX_BARRIERS
1989
0
  int rc;
1990
0
#endif
1991
  /* Set sentinels */
1992
0
  context->thread_giveup_code = 1;
1993
0
  context->thread_nblock = -1;
1994
1995
0
  if (threads_callback) {
1996
0
    threads_callback(threads_callback_data, t_blosc_do_job,
1997
0
                     context->nthreads, sizeof(struct thread_context), (void*) context->thread_contexts);
1998
0
  }
1999
0
  else {
2000
    /* Synchronization point for all threads (wait for initialization) */
2001
0
    WAIT_INIT(-1, context);
2002
2003
    /* Synchronization point for all threads (wait for finalization) */
2004
0
    WAIT_FINISH(-1, context);
2005
0
  }
2006
2007
0
  if (context->thread_giveup_code <= 0) {
2008
    /* Compression/decompression gave up.  Return error code. */
2009
0
    return context->thread_giveup_code;
2010
0
  }
2011
2012
  /* Return the total bytes (de-)compressed in threads */
2013
0
  return (int)context->output_bytes;
2014
0
}
2015
2016
/* initialize a thread_context that has already been allocated */
2017
static int init_thread_context(struct thread_context* thread_context, blosc2_context* context, int32_t tid)
2018
17.0k
{
2019
17.0k
  int32_t ebsize;
2020
2021
17.0k
  thread_context->parent_context = context;
2022
17.0k
  thread_context->tid = tid;
2023
2024
17.0k
  ebsize = context->blocksize + context->typesize * (signed)sizeof(int32_t);
2025
17.0k
  thread_context->tmp_nbytes = (size_t)4 * ebsize;
2026
17.0k
  thread_context->tmp = my_malloc(thread_context->tmp_nbytes);
2027
17.0k
  BLOSC_ERROR_NULL(thread_context->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
2028
17.0k
  thread_context->tmp2 = thread_context->tmp + ebsize;
2029
17.0k
  thread_context->tmp3 = thread_context->tmp2 + ebsize;
2030
17.0k
  thread_context->tmp4 = thread_context->tmp3 + ebsize;
2031
17.0k
  thread_context->tmp_blocksize = context->blocksize;
2032
17.0k
  thread_context->zfp_cell_nitems = 0;
2033
17.0k
  thread_context->zfp_cell_start = 0;
2034
17.0k
  #if defined(HAVE_ZSTD)
2035
17.0k
  thread_context->zstd_cctx = NULL;
2036
17.0k
  thread_context->zstd_dctx = NULL;
2037
17.0k
  #endif
2038
2039
  /* Create the hash table for LZ4 in case we are using IPP */
2040
#ifdef HAVE_IPP
2041
  IppStatus status;
2042
  int inlen = thread_context->tmp_blocksize > 0 ? thread_context->tmp_blocksize : 1 << 16;
2043
  int hash_size = 0;
2044
  status = ippsEncodeLZ4HashTableGetSize_8u(&hash_size);
2045
  if (status != ippStsNoErr) {
2046
      BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableGetSize_8u.");
2047
  }
2048
  Ipp8u *hash_table = ippsMalloc_8u(hash_size);
2049
  status = ippsEncodeLZ4HashTableInit_8u(hash_table, inlen);
2050
  if (status != ippStsNoErr) {
2051
    BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableInit_8u.");
2052
  }
2053
  thread_context->lz4_hash_table = hash_table;
2054
#endif
2055
17.0k
  return 0;
2056
17.0k
}
2057
2058
static struct thread_context*
2059
17.0k
create_thread_context(blosc2_context* context, int32_t tid) {
2060
17.0k
  struct thread_context* thread_context;
2061
17.0k
  thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
2062
17.0k
  BLOSC_ERROR_NULL(thread_context, NULL);
2063
17.0k
  int rc = init_thread_context(thread_context, context, tid);
2064
17.0k
  if (rc < 0) {
2065
0
    return NULL;
2066
0
  }
2067
17.0k
  return thread_context;
2068
17.0k
}
2069
2070
/* free members of thread_context, but not thread_context itself */
2071
17.0k
static void destroy_thread_context(struct thread_context* thread_context) {
2072
17.0k
  my_free(thread_context->tmp);
2073
17.0k
#if defined(HAVE_ZSTD)
2074
17.0k
  if (thread_context->zstd_cctx != NULL) {
2075
1.04k
    ZSTD_freeCCtx(thread_context->zstd_cctx);
2076
1.04k
  }
2077
17.0k
  if (thread_context->zstd_dctx != NULL) {
2078
4.85k
    ZSTD_freeDCtx(thread_context->zstd_dctx);
2079
4.85k
  }
2080
17.0k
#endif
2081
#ifdef HAVE_IPP
2082
  if (thread_context->lz4_hash_table != NULL) {
2083
    ippsFree(thread_context->lz4_hash_table);
2084
  }
2085
#endif
2086
17.0k
}
2087
2088
17.0k
void free_thread_context(struct thread_context* thread_context) {
2089
17.0k
  destroy_thread_context(thread_context);
2090
17.0k
  my_free(thread_context);
2091
17.0k
}
2092
2093
2094
104k
int check_nthreads(blosc2_context* context) {
2095
104k
  if (context->nthreads <= 0) {
2096
0
    BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2097
0
    return BLOSC2_ERROR_INVALID_PARAM;
2098
0
  }
2099
2100
104k
  if (context->new_nthreads != context->nthreads) {
2101
0
    if (context->nthreads > 1) {
2102
0
      release_threadpool(context);
2103
0
    }
2104
0
    context->nthreads = context->new_nthreads;
2105
0
  }
2106
104k
  if (context->new_nthreads > 1 && context->threads_started == 0) {
2107
0
    init_threadpool(context);
2108
0
  }
2109
2110
104k
  return context->nthreads;
2111
104k
}
2112
2113
/* Do the compression or decompression of the buffer depending on the
2114
   global params. */
2115
104k
static int do_job(blosc2_context* context) {
2116
104k
  int32_t ntbytes;
2117
2118
  /* Set sentinels */
2119
104k
  context->dref_not_init = 1;
2120
2121
  /* Check whether we need to restart threads */
2122
104k
  check_nthreads(context);
2123
2124
  /* Run the serial version when nthreads is 1 or when the buffers are
2125
     not larger than blocksize */
2126
104k
  if (context->nthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
2127
    /* The context for this 'thread' has no been initialized yet */
2128
104k
    if (context->serial_context == NULL) {
2129
15.1k
      context->serial_context = create_thread_context(context, 0);
2130
15.1k
    }
2131
88.9k
    else if (context->blocksize != context->serial_context->tmp_blocksize) {
2132
1.84k
      free_thread_context(context->serial_context);
2133
1.84k
      context->serial_context = create_thread_context(context, 0);
2134
1.84k
    }
2135
104k
    BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
2136
104k
    ntbytes = serial_blosc(context->serial_context);
2137
104k
  }
2138
0
  else {
2139
0
    ntbytes = parallel_blosc(context);
2140
0
  }
2141
2142
104k
  return ntbytes;
2143
104k
}
2144
2145
2146
static int initialize_context_compression(
2147
        blosc2_context* context, const void* src, int32_t srcsize, void* dest,
2148
        int32_t destsize, int clevel, uint8_t const *filters,
2149
        uint8_t const *filters_meta, int32_t typesize, int compressor,
2150
        int32_t blocksize, int16_t new_nthreads, int16_t nthreads,
2151
        int32_t splitmode,
2152
        int tuner_id, void *tuner_params,
2153
60.8k
        blosc2_schunk* schunk) {
2154
2155
  /* Set parameters */
2156
60.8k
  context->do_compress = 1;
2157
60.8k
  context->src = (const uint8_t*)src;
2158
60.8k
  context->srcsize = srcsize;
2159
60.8k
  context->dest = (uint8_t*)dest;
2160
60.8k
  context->output_bytes = 0;
2161
60.8k
  context->destsize = destsize;
2162
60.8k
  context->sourcesize = srcsize;
2163
60.8k
  context->typesize = (int32_t)typesize;
2164
60.8k
  context->filter_flags = filters_to_flags(filters);
2165
425k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
2166
364k
    context->filters[i] = filters[i];
2167
364k
    context->filters_meta[i] = filters_meta[i];
2168
364k
  }
2169
60.8k
  context->compcode = compressor;
2170
60.8k
  context->nthreads = nthreads;
2171
60.8k
  context->new_nthreads = new_nthreads;
2172
60.8k
  context->end_threads = 0;
2173
60.8k
  context->clevel = clevel;
2174
60.8k
  context->schunk = schunk;
2175
60.8k
  context->tuner_params = tuner_params;
2176
60.8k
  context->tuner_id = tuner_id;
2177
60.8k
  context->splitmode = splitmode;
2178
  /* tuner some compression parameters */
2179
60.8k
  context->blocksize = (int32_t)blocksize;
2180
60.8k
  int rc = 0;
2181
60.8k
  if (context->tuner_params != NULL) {
2182
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2183
0
      if (blosc_stune_next_cparams(context) < 0) {
2184
0
        BLOSC_TRACE_ERROR("Error in stune next_cparams func\n");
2185
0
        return BLOSC2_ERROR_TUNER;
2186
0
      }
2187
0
    } else {
2188
0
      for (int i = 0; i < g_ntuners; ++i) {
2189
0
        if (g_tuners[i].id == context->tuner_id) {
2190
0
          if (g_tuners[i].next_cparams == NULL) {
2191
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2192
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2193
0
              return BLOSC2_ERROR_FAILURE;
2194
0
            }
2195
0
          }
2196
0
          if (g_tuners[i].next_cparams(context) < 0) {
2197
0
            BLOSC_TRACE_ERROR("Error in tuner %d next_cparams func\n", context->tuner_id);
2198
0
            return BLOSC2_ERROR_TUNER;
2199
0
          }
2200
0
          if (g_tuners[i].id == BLOSC_BTUNE && context->blocksize == 0) {
2201
            // Call stune for initializing blocksize
2202
0
            if (blosc_stune_next_blocksize(context) < 0) {
2203
0
              BLOSC_TRACE_ERROR("Error in stune next_blocksize func\n");
2204
0
              return BLOSC2_ERROR_TUNER;
2205
0
            }
2206
0
          }
2207
0
          goto urtunersuccess;
2208
0
        }
2209
0
      }
2210
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2211
0
      return BLOSC2_ERROR_INVALID_PARAM;
2212
0
    }
2213
60.8k
  } else {
2214
60.8k
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2215
60.8k
      rc = blosc_stune_next_blocksize(context);
2216
60.8k
    } else {
2217
0
      for (int i = 0; i < g_ntuners; ++i) {
2218
0
        if (g_tuners[i].id == context->tuner_id) {
2219
0
          if (g_tuners[i].next_blocksize == NULL) {
2220
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2221
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2222
0
              return BLOSC2_ERROR_FAILURE;
2223
0
            }
2224
0
          }
2225
0
          rc = g_tuners[i].next_blocksize(context);
2226
0
          goto urtunersuccess;
2227
0
        }
2228
0
      }
2229
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2230
0
      return BLOSC2_ERROR_INVALID_PARAM;
2231
0
    }
2232
60.8k
  }
2233
60.8k
  urtunersuccess:;
2234
60.8k
  if (rc < 0) {
2235
0
    BLOSC_TRACE_ERROR("Error in tuner next_blocksize func\n");
2236
0
    return BLOSC2_ERROR_TUNER;
2237
0
  }
2238
2239
2240
  /* Check buffer size limits */
2241
60.8k
  if (srcsize > BLOSC2_MAX_BUFFERSIZE) {
2242
0
    BLOSC_TRACE_ERROR("Input buffer size cannot exceed %d bytes.",
2243
0
                      BLOSC2_MAX_BUFFERSIZE);
2244
0
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2245
0
  }
2246
2247
60.8k
  if (destsize < BLOSC2_MAX_OVERHEAD) {
2248
31
    BLOSC_TRACE_ERROR("Output buffer size should be larger than %d bytes.",
2249
0
                      BLOSC2_MAX_OVERHEAD);
2250
0
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2251
31
  }
2252
2253
  /* Compression level */
2254
60.7k
  if (clevel < 0 || clevel > 9) {
2255
    /* If clevel not in 0..9, print an error */
2256
0
    BLOSC_TRACE_ERROR("`clevel` parameter must be between 0 and 9!.");
2257
0
    return BLOSC2_ERROR_CODEC_PARAM;
2258
0
  }
2259
2260
  /* Check typesize limits */
2261
60.7k
  if (context->typesize > BLOSC_MAX_TYPESIZE) {
2262
    /* If typesize is too large, treat buffer as an 1-byte stream. */
2263
0
    context->typesize = 1;
2264
0
  }
2265
2266
60.7k
  blosc2_calculate_blocks(context);
2267
2268
60.7k
  return 1;
2269
60.7k
}
2270
2271
2272
static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
2273
26.8k
                                            int32_t srcsize, void* dest, int32_t destsize) {
2274
26.8k
  int32_t bstarts_end;
2275
2276
26.8k
  context->do_compress = 0;
2277
26.8k
  context->src = (const uint8_t*)src;
2278
26.8k
  context->srcsize = srcsize;
2279
26.8k
  context->dest = (uint8_t*)dest;
2280
26.8k
  context->destsize = destsize;
2281
26.8k
  context->output_bytes = 0;
2282
26.8k
  context->end_threads = 0;
2283
2284
26.8k
  int rc = blosc2_initialize_context_from_header(context, header);
2285
26.8k
  if (rc < 0) {
2286
0
    return rc;
2287
0
  }
2288
2289
  /* Check that we have enough space to decompress */
2290
26.8k
  if (context->sourcesize > (int32_t)context->destsize) {
2291
0
    return BLOSC2_ERROR_WRITE_BUFFER;
2292
0
  }
2293
2294
26.8k
  if (context->block_maskout != NULL && context->block_maskout_nitems != context->nblocks) {
2295
0
    BLOSC_TRACE_ERROR("The number of items in block_maskout (%d) must match the number"
2296
0
                      " of blocks in chunk (%d).",
2297
0
                      context->block_maskout_nitems, context->nblocks);
2298
0
    return BLOSC2_ERROR_DATA;
2299
0
  }
2300
2301
26.8k
  context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
2302
26.8k
  if (context->special_type > BLOSC2_SPECIAL_LASTID) {
2303
2
    BLOSC_TRACE_ERROR("Unknown special values ID (%d) ",
2304
0
                      context->special_type);
2305
0
    return BLOSC2_ERROR_DATA;
2306
2
  }
2307
2308
26.8k
  int memcpyed = (context->header_flags & (uint8_t) BLOSC_MEMCPYED);
2309
26.8k
  if (memcpyed && (header->cbytes != header->nbytes + context->header_overhead)) {
2310
33
    BLOSC_TRACE_ERROR("Wrong header info for this memcpyed chunk");
2311
0
    return BLOSC2_ERROR_DATA;
2312
33
  }
2313
2314
26.8k
  if ((header->nbytes == 0) && (header->cbytes == context->header_overhead) &&
2315
26.8k
      !context->special_type) {
2316
    // A compressed buffer with only a header can only contain a zero-length buffer
2317
0
    return 0;
2318
0
  }
2319
2320
26.8k
  context->bstarts = (int32_t *) (context->src + context->header_overhead);
2321
26.8k
  bstarts_end = context->header_overhead;
2322
26.8k
  if (!context->special_type && !memcpyed) {
2323
    /* If chunk is not special or a memcpyed, we do have a bstarts section */
2324
21.8k
    bstarts_end = (int32_t)(context->header_overhead + (context->nblocks * sizeof(int32_t)));
2325
21.8k
  }
2326
2327
26.8k
  if (srcsize < bstarts_end) {
2328
38
    BLOSC_TRACE_ERROR("`bstarts` exceeds length of source buffer.");
2329
0
    return BLOSC2_ERROR_READ_BUFFER;
2330
38
  }
2331
26.8k
  srcsize -= bstarts_end;
2332
2333
  /* Read optional dictionary if flag set */
2334
26.8k
  if (context->blosc2_flags & BLOSC2_USEDICT) {
2335
2.70k
#if defined(HAVE_ZSTD)
2336
2.70k
    context->use_dict = 1;
2337
2.70k
    if (context->dict_ddict != NULL) {
2338
      // Free the existing dictionary (probably from another chunk)
2339
0
      ZSTD_freeDDict(context->dict_ddict);
2340
0
    }
2341
    // The trained dictionary is after the bstarts block
2342
2.70k
    if (srcsize < (signed)sizeof(int32_t)) {
2343
3
      BLOSC_TRACE_ERROR("Not enough space to read size of dictionary.");
2344
0
      return BLOSC2_ERROR_READ_BUFFER;
2345
3
    }
2346
2.69k
    srcsize -= sizeof(int32_t);
2347
    // Read dictionary size
2348
2.69k
    context->dict_size = sw32_(context->src + bstarts_end);
2349
2.69k
    if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2350
47
      BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2351
0
      return BLOSC2_ERROR_CODEC_DICT;
2352
47
    }
2353
2.65k
    if (srcsize < (int32_t)context->dict_size) {
2354
22
      BLOSC_TRACE_ERROR("Not enough space to read entire dictionary.");
2355
0
      return BLOSC2_ERROR_READ_BUFFER;
2356
22
    }
2357
2.62k
    srcsize -= context->dict_size;
2358
    // Read dictionary
2359
2.62k
    context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2360
2.62k
    context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2361
2.62k
#endif   // HAVE_ZSTD
2362
2.62k
  }
2363
2364
26.7k
  return 0;
2365
26.8k
}
2366
2367
60.7k
static int write_compression_header(blosc2_context* context, bool extended_header) {
2368
60.7k
  blosc_header header;
2369
60.7k
  int dont_split;
2370
60.7k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
2371
2372
60.7k
  context->header_flags = 0;
2373
2374
60.7k
  if (context->clevel == 0) {
2375
    /* Compression level 0 means buffer to be memcpy'ed */
2376
369
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2377
369
  }
2378
60.7k
  if (context->sourcesize < BLOSC_MIN_BUFFERSIZE) {
2379
    /* Buffer is too small.  Try memcpy'ing. */
2380
131
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2381
131
  }
2382
2383
60.7k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2384
60.7k
  if (extended_header) {
2385
    /* Indicate that we are building an extended header */
2386
60.7k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
2387
60.7k
    context->header_flags |= (BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE);
2388
    /* Store filter pipeline info at the end of the header */
2389
60.7k
    if (dict_training || memcpyed) {
2390
490
      context->bstarts = NULL;
2391
490
      context->output_bytes = context->header_overhead;
2392
60.2k
    } else {
2393
60.2k
      context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2394
60.2k
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2395
60.2k
    }
2396
60.7k
  } else {
2397
    // Regular header
2398
0
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
2399
0
    if (memcpyed) {
2400
0
      context->bstarts = NULL;
2401
0
      context->output_bytes = context->header_overhead;
2402
0
    } else {
2403
0
      context->bstarts = (int32_t *) (context->dest + context->header_overhead);
2404
0
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2405
0
    }
2406
0
  }
2407
2408
  // when memcpyed bit is set, there is no point in dealing with others
2409
60.7k
  if (!memcpyed) {
2410
60.2k
    if (context->filter_flags & BLOSC_DOSHUFFLE) {
2411
      /* Byte-shuffle is active */
2412
30.8k
      context->header_flags |= BLOSC_DOSHUFFLE;
2413
30.8k
    }
2414
2415
60.2k
    if (context->filter_flags & BLOSC_DOBITSHUFFLE) {
2416
      /* Bit-shuffle is active */
2417
10.4k
      context->header_flags |= BLOSC_DOBITSHUFFLE;
2418
10.4k
    }
2419
2420
60.2k
    if (context->filter_flags & BLOSC_DODELTA) {
2421
      /* Delta is active */
2422
0
      context->header_flags |= BLOSC_DODELTA;
2423
0
    }
2424
2425
60.2k
    dont_split = !split_block(context, context->typesize,
2426
60.2k
                              context->blocksize);
2427
2428
    /* dont_split is in bit 4 */
2429
60.2k
    context->header_flags |= dont_split << 4;
2430
    /* codec starts at bit 5 */
2431
60.2k
    uint8_t compformat = compcode_to_compformat(context->compcode);
2432
60.2k
    context->header_flags |= compformat << 5;
2433
60.2k
  }
2434
2435
  // Create blosc header and store to dest
2436
60.7k
  blosc2_intialize_header_from_context(context, &header, extended_header);
2437
2438
60.7k
  memcpy(context->dest, &header, (extended_header) ?
2439
60.7k
    BLOSC_EXTENDED_HEADER_LENGTH : BLOSC_MIN_HEADER_LENGTH);
2440
2441
60.7k
  return 1;
2442
60.7k
}
2443
2444
2445
60.7k
static int blosc_compress_context(blosc2_context* context) {
2446
60.7k
  int ntbytes = 0;
2447
60.7k
  blosc_timestamp_t last, current;
2448
60.7k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2449
2450
60.7k
  blosc_set_timestamp(&last);
2451
2452
60.7k
  if (!memcpyed) {
2453
    /* Do the actual compression */
2454
60.2k
    ntbytes = do_job(context);
2455
60.2k
    if (ntbytes < 0) {
2456
0
      return ntbytes;
2457
0
    }
2458
60.2k
    if (ntbytes == 0) {
2459
      // Try out with a memcpy later on (last chance for fitting src buffer in dest).
2460
21.2k
      context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2461
21.2k
      memcpyed = true;
2462
21.2k
    }
2463
60.2k
  }
2464
2465
60.7k
  int dont_split = (context->header_flags & 0x10) >> 4;
2466
60.7k
  int nstreams = context->nblocks;
2467
60.7k
  if (!dont_split) {
2468
    // When splitting, the number of streams is computed differently
2469
31.3k
    if (context->leftover) {
2470
252
      nstreams = (context->nblocks - 1) * context->typesize + 1;
2471
252
    }
2472
31.0k
    else {
2473
31.0k
      nstreams *= context->typesize;
2474
31.0k
    }
2475
31.3k
  }
2476
2477
60.7k
  if (memcpyed) {
2478
21.7k
    if (context->sourcesize + context->header_overhead > context->destsize) {
2479
      /* We are exceeding maximum output size */
2480
4.70k
      ntbytes = 0;
2481
4.70k
    }
2482
17.0k
    else {
2483
17.0k
      context->output_bytes = context->header_overhead;
2484
17.0k
      ntbytes = do_job(context);
2485
17.0k
      if (ntbytes < 0) {
2486
0
        return ntbytes;
2487
0
      }
2488
      // Success!  update the memcpy bit in header
2489
17.0k
      context->dest[BLOSC2_CHUNK_FLAGS] = context->header_flags;
2490
      // and clear the memcpy bit in context (for next reuse)
2491
17.0k
      context->header_flags &= ~(uint8_t)BLOSC_MEMCPYED;
2492
17.0k
    }
2493
21.7k
  }
2494
39.0k
  else {
2495
    // Check whether we have a run for the whole chunk
2496
39.0k
    int start_csizes = context->header_overhead + 4 * context->nblocks;
2497
39.0k
    if (ntbytes == (int)(start_csizes + nstreams * sizeof(int32_t))) {
2498
      // The streams are all zero runs (by construction).  Encode it...
2499
1.38k
      context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] |= BLOSC2_SPECIAL_ZERO << 4;
2500
      // ...and assign the new chunk length
2501
1.38k
      ntbytes = context->header_overhead;
2502
1.38k
    }
2503
39.0k
  }
2504
2505
  /* Set the number of compressed bytes in header */
2506
60.7k
  _sw32(context->dest + BLOSC2_CHUNK_CBYTES, ntbytes);
2507
60.7k
  if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
2508
0
    dont_split = (context->header_flags & 0x10) >> 4;
2509
0
    int32_t blocksize = dont_split ? (int32_t)sizeof(blosc2_instr) : (int32_t)sizeof(blosc2_instr) * context->typesize;
2510
0
    _sw32(context->dest + BLOSC2_CHUNK_NBYTES, nstreams * (int32_t)sizeof(blosc2_instr));
2511
0
    _sw32(context->dest + BLOSC2_CHUNK_BLOCKSIZE, blocksize);
2512
0
  }
2513
2514
  /* Set the number of bytes in dest buffer (might be useful for tuner) */
2515
60.7k
  context->destsize = ntbytes;
2516
2517
60.7k
  if (context->tuner_params != NULL) {
2518
0
    blosc_set_timestamp(&current);
2519
0
    double ctime = blosc_elapsed_secs(last, current);
2520
0
    int rc;
2521
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2522
0
      rc = blosc_stune_update(context, ctime);
2523
0
    } else {
2524
0
      for (int i = 0; i < g_ntuners; ++i) {
2525
0
        if (g_tuners[i].id == context->tuner_id) {
2526
0
          if (g_tuners[i].update == NULL) {
2527
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2528
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2529
0
              return BLOSC2_ERROR_FAILURE;
2530
0
            }
2531
0
          }
2532
0
          rc = g_tuners[i].update(context, ctime);
2533
0
          goto urtunersuccess;
2534
0
        }
2535
0
      }
2536
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2537
0
      return BLOSC2_ERROR_INVALID_PARAM;
2538
0
      urtunersuccess:;
2539
0
    }
2540
0
    if (rc < 0) {
2541
0
      BLOSC_TRACE_ERROR("Error in tuner update func\n");
2542
0
      return BLOSC2_ERROR_TUNER;
2543
0
    }
2544
0
  }
2545
2546
60.7k
  return ntbytes;
2547
60.7k
}
2548
2549
2550
/* The public secure routine for compression with context. */
2551
int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2552
47.4k
                        void* dest, int32_t destsize) {
2553
47.4k
  int error, cbytes;
2554
2555
47.4k
  if (context->do_compress != 1) {
2556
0
    BLOSC_TRACE_ERROR("Context is not meant for compression.  Giving up.");
2557
0
    return BLOSC2_ERROR_INVALID_PARAM;
2558
0
  }
2559
2560
47.4k
  error = initialize_context_compression(
2561
47.4k
          context, src, srcsize, dest, destsize,
2562
47.4k
          context->clevel, context->filters, context->filters_meta,
2563
47.4k
          context->typesize, context->compcode, context->blocksize,
2564
47.4k
          context->new_nthreads, context->nthreads, context->splitmode,
2565
47.4k
          context->tuner_id, context->tuner_params, context->schunk);
2566
47.4k
  if (error <= 0) {
2567
0
    return error;
2568
0
  }
2569
2570
  /* Write the extended header */
2571
47.4k
  error = write_compression_header(context, true);
2572
47.4k
  if (error < 0) {
2573
0
    return error;
2574
0
  }
2575
2576
47.4k
  cbytes = blosc_compress_context(context);
2577
47.4k
  if (cbytes < 0) {
2578
0
    return cbytes;
2579
0
  }
2580
2581
47.4k
  if (context->use_dict && context->dict_cdict == NULL) {
2582
2583
0
    if (context->compcode != BLOSC_ZSTD) {
2584
0
      const char* compname;
2585
0
      compname = clibcode_to_clibname(context->compcode);
2586
0
      BLOSC_TRACE_ERROR("Codec %s does not support dicts.  Giving up.",
2587
0
                        compname);
2588
0
      return BLOSC2_ERROR_CODEC_DICT;
2589
0
    }
2590
2591
0
#ifdef HAVE_ZSTD
2592
    // Build the dictionary out of the filters outcome and compress with it
2593
0
    int32_t dict_maxsize = BLOSC2_MAXDICTSIZE;
2594
    // Do not make the dict more than 5% larger than uncompressed buffer
2595
0
    if (dict_maxsize > srcsize / 20) {
2596
0
      dict_maxsize = srcsize / 20;
2597
0
    }
2598
0
    void* samples_buffer = context->dest + context->header_overhead;
2599
0
    unsigned nblocks = (unsigned)context->nblocks;
2600
0
    int dont_split = (context->header_flags & 0x10) >> 4;
2601
0
    if (!dont_split) {
2602
0
      nblocks = nblocks * context->typesize;
2603
0
    }
2604
0
    if (nblocks < 8) {
2605
0
      nblocks = 8;  // the minimum that accepts zstd as of 1.4.0
2606
0
    }
2607
2608
    // 1 allows to use most of the chunk for training, but it is slower,
2609
    // and it does not always seem to improve compression ratio.
2610
    // Let's use 16, which is faster and still gives good results
2611
    // on test_dict_schunk.c, but this seems very dependent on the data.
2612
0
    unsigned sample_fraction = 16;
2613
0
    size_t sample_size = context->sourcesize / nblocks / sample_fraction;
2614
2615
    // Populate the samples sizes for training the dictionary
2616
0
    size_t* samples_sizes = malloc(nblocks * sizeof(void*));
2617
0
    BLOSC_ERROR_NULL(samples_sizes, BLOSC2_ERROR_MEMORY_ALLOC);
2618
0
    for (size_t i = 0; i < nblocks; i++) {
2619
0
      samples_sizes[i] = sample_size;
2620
0
    }
2621
2622
    // Train from samples
2623
0
    void* dict_buffer = malloc(dict_maxsize);
2624
0
    BLOSC_ERROR_NULL(dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2625
0
    int32_t dict_actual_size = (int32_t)ZDICT_trainFromBuffer(
2626
0
        dict_buffer, dict_maxsize,
2627
0
        samples_buffer, samples_sizes, nblocks);
2628
2629
    // TODO: experiment with parameters of low-level fast cover algorithm
2630
    // Note that this API is still unstable.  See: https://github.com/facebook/zstd/issues/1599
2631
    // ZDICT_fastCover_params_t fast_cover_params;
2632
    // memset(&fast_cover_params, 0, sizeof(fast_cover_params));
2633
    // fast_cover_params.d = nblocks;
2634
    // fast_cover_params.steps = 4;
2635
    // fast_cover_params.zParams.compressionLevel = context->clevel;
2636
    // size_t dict_actual_size = ZDICT_optimizeTrainFromBuffer_fastCover(
2637
    //   dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks,
2638
    //   &fast_cover_params);
2639
2640
0
    if (ZDICT_isError(dict_actual_size) != ZSTD_error_no_error) {
2641
0
      BLOSC_TRACE_ERROR("Error in ZDICT_trainFromBuffer(): '%s'."
2642
0
                        "  Giving up.", ZDICT_getErrorName(dict_actual_size));
2643
0
      return BLOSC2_ERROR_CODEC_DICT;
2644
0
    }
2645
0
    assert(dict_actual_size > 0);
2646
0
    free(samples_sizes);
2647
2648
    // Update bytes counter and pointers to bstarts for the new compressed buffer
2649
0
    context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2650
0
    context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2651
    /* Write the size of trained dict at the end of bstarts */
2652
0
    _sw32(context->dest + context->output_bytes, (int32_t)dict_actual_size);
2653
0
    context->output_bytes += sizeof(int32_t);
2654
    /* Write the trained dict afterwards */
2655
0
    context->dict_buffer = context->dest + context->output_bytes;
2656
0
    memcpy(context->dict_buffer, dict_buffer, (unsigned int)dict_actual_size);
2657
0
    context->dict_cdict = ZSTD_createCDict(dict_buffer, dict_actual_size, 1);  // TODO: use get_accel()
2658
0
    free(dict_buffer);      // the dictionary is copied in the header now
2659
0
    context->output_bytes += (int32_t)dict_actual_size;
2660
0
    context->dict_size = dict_actual_size;
2661
2662
    /* Compress with dict */
2663
0
    cbytes = blosc_compress_context(context);
2664
2665
    // Invalidate the dictionary for compressing other chunks using the same context
2666
0
    context->dict_buffer = NULL;
2667
0
    ZSTD_freeCDict(context->dict_cdict);
2668
0
    context->dict_cdict = NULL;
2669
0
#endif  // HAVE_ZSTD
2670
0
  }
2671
2672
47.4k
  return cbytes;
2673
47.4k
}
2674
2675
2676
void build_filters(const int doshuffle, const int delta,
2677
15.6k
                   const int32_t typesize, uint8_t* filters) {
2678
2679
  /* Fill the end part of the filter pipeline */
2680
15.6k
  if ((doshuffle == BLOSC_SHUFFLE) && (typesize > 1))
2681
0
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
2682
15.6k
  if (doshuffle == BLOSC_BITSHUFFLE)
2683
5.62k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
2684
15.6k
  if (doshuffle == BLOSC_NOSHUFFLE)
2685
4.44k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_NOSHUFFLE;
2686
15.6k
  if (delta)
2687
0
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
2688
15.6k
}
2689
2690
/* The public secure routine for compression. */
2691
int blosc2_compress(int clevel, int doshuffle, int32_t typesize,
2692
13.4k
                    const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2693
13.4k
  int error;
2694
13.4k
  int result;
2695
13.4k
  char* envvar;
2696
2697
  /* Check whether the library should be initialized */
2698
13.4k
  if (!g_initlib) blosc2_init();
2699
2700
  /* Check for a BLOSC_CLEVEL environment variable */
2701
13.4k
  envvar = getenv("BLOSC_CLEVEL");
2702
13.4k
  if (envvar != NULL) {
2703
0
    long value;
2704
0
    value = strtol(envvar, NULL, 10);
2705
0
    if ((errno != EINVAL) && (value >= 0)) {
2706
0
      clevel = (int)value;
2707
0
    }
2708
0
    else {
2709
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
2710
0
    }
2711
0
  }
2712
2713
  /* Check for a BLOSC_SHUFFLE environment variable */
2714
0
  envvar = getenv("BLOSC_SHUFFLE");
2715
13.4k
  if (envvar != NULL) {
2716
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
2717
0
      doshuffle = BLOSC_NOSHUFFLE;
2718
0
    }
2719
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
2720
0
      doshuffle = BLOSC_SHUFFLE;
2721
0
    }
2722
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
2723
0
      doshuffle = BLOSC_BITSHUFFLE;
2724
0
    }
2725
0
    else {
2726
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
2727
0
    }
2728
0
  }
2729
2730
  /* Check for a BLOSC_DELTA environment variable */
2731
0
  envvar = getenv("BLOSC_DELTA");
2732
13.4k
  if (envvar != NULL) {
2733
0
    if (strcmp(envvar, "1") == 0) {
2734
0
      blosc2_set_delta(1);
2735
0
    } else if (strcmp(envvar, "0") == 0) {
2736
0
      blosc2_set_delta(0);
2737
0
    }
2738
0
    else {
2739
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
2740
0
    }
2741
0
  }
2742
2743
  /* Check for a BLOSC_TYPESIZE environment variable */
2744
0
  envvar = getenv("BLOSC_TYPESIZE");
2745
13.4k
  if (envvar != NULL) {
2746
0
    long value;
2747
0
    value = strtol(envvar, NULL, 10);
2748
0
    if ((errno != EINVAL) && (value > 0)) {
2749
0
      typesize = (int32_t)value;
2750
0
    }
2751
0
    else {
2752
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
2753
0
    }
2754
0
  }
2755
2756
  /* Check for a BLOSC_COMPRESSOR environment variable */
2757
0
  envvar = getenv("BLOSC_COMPRESSOR");
2758
13.4k
  if (envvar != NULL) {
2759
0
    result = blosc1_set_compressor(envvar);
2760
0
    if (result < 0) {
2761
0
      BLOSC_TRACE_WARNING("BLOSC_COMPRESSOR environment variable '%s' not recognized\n", envvar);
2762
0
    }
2763
0
  }
2764
2765
  /* Check for a BLOSC_BLOCKSIZE environment variable */
2766
0
  envvar = getenv("BLOSC_BLOCKSIZE");
2767
13.4k
  if (envvar != NULL) {
2768
0
    long blocksize;
2769
0
    blocksize = strtol(envvar, NULL, 10);
2770
0
    if ((errno != EINVAL) && (blocksize > 0)) {
2771
0
      blosc1_set_blocksize((size_t) blocksize);
2772
0
    }
2773
0
    else {
2774
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
2775
0
    }
2776
0
  }
2777
2778
  /* Check for a BLOSC_NTHREADS environment variable */
2779
0
  envvar = getenv("BLOSC_NTHREADS");
2780
13.4k
  if (envvar != NULL) {
2781
0
    long nthreads;
2782
0
    nthreads = strtol(envvar, NULL, 10);
2783
0
    if ((errno != EINVAL) && (nthreads > 0)) {
2784
0
      result = blosc2_set_nthreads((int16_t) nthreads);
2785
0
      if (result < 0) {
2786
0
        BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
2787
0
      }
2788
0
    }
2789
0
  }
2790
2791
  /* Check for a BLOSC_SPLITMODE environment variable */
2792
0
  envvar = getenv("BLOSC_SPLITMODE");
2793
13.4k
  if (envvar != NULL) {
2794
0
    int32_t splitmode = -1;
2795
0
    if (strcmp(envvar, "ALWAYS") == 0) {
2796
0
      splitmode = BLOSC_ALWAYS_SPLIT;
2797
0
    }
2798
0
    else if (strcmp(envvar, "NEVER") == 0) {
2799
0
      splitmode = BLOSC_NEVER_SPLIT;
2800
0
    }
2801
0
    else if (strcmp(envvar, "AUTO") == 0) {
2802
0
      splitmode = BLOSC_AUTO_SPLIT;
2803
0
    }
2804
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
2805
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
2806
0
    }
2807
0
    else {
2808
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
2809
0
    }
2810
2811
0
    if (splitmode >= 0) {
2812
0
      blosc1_set_splitmode(splitmode);
2813
0
    }
2814
0
  }
2815
2816
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
2817
     that this should be the last env var so that it can take the
2818
     previous ones into account */
2819
0
  envvar = getenv("BLOSC_NOLOCK");
2820
13.4k
  if (envvar != NULL) {
2821
    // TODO: here is the only place that returns an extended header from
2822
    //  a blosc1_compress() call.  This should probably be fixed.
2823
0
    const char *compname;
2824
0
    blosc2_context *cctx;
2825
0
    blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
2826
2827
0
    blosc2_compcode_to_compname(g_compressor, &compname);
2828
    /* Create a context for compression */
2829
0
    build_filters(doshuffle, g_delta, typesize, cparams.filters);
2830
    // TODO: cparams can be shared in a multithreaded environment.  do a copy!
2831
0
    cparams.typesize = (uint8_t)typesize;
2832
0
    cparams.compcode = (uint8_t)g_compressor;
2833
0
    cparams.clevel = (uint8_t)clevel;
2834
0
    cparams.nthreads = g_nthreads;
2835
0
    cparams.splitmode = g_splitmode;
2836
0
    cctx = blosc2_create_cctx(cparams);
2837
0
    if (cctx == NULL) {
2838
0
      BLOSC_TRACE_ERROR("Error while creating the compression context");
2839
0
      return BLOSC2_ERROR_NULL_POINTER;
2840
0
    }
2841
    /* Do the actual compression */
2842
0
    result = blosc2_compress_ctx(cctx, src, srcsize, dest, destsize);
2843
    /* Release context resources */
2844
0
    blosc2_free_ctx(cctx);
2845
0
    return result;
2846
0
  }
2847
2848
13.4k
  pthread_mutex_lock(&global_comp_mutex);
2849
2850
  /* Initialize a context compression */
2851
13.4k
  uint8_t* filters = calloc(1, BLOSC2_MAX_FILTERS);
2852
13.4k
  BLOSC_ERROR_NULL(filters, BLOSC2_ERROR_MEMORY_ALLOC);
2853
13.4k
  uint8_t* filters_meta = calloc(1, BLOSC2_MAX_FILTERS);
2854
13.4k
  BLOSC_ERROR_NULL(filters_meta, BLOSC2_ERROR_MEMORY_ALLOC);
2855
13.4k
  build_filters(doshuffle, g_delta, typesize, filters);
2856
13.4k
  error = initialize_context_compression(
2857
13.4k
          g_global_context, src, srcsize, dest, destsize, clevel, filters,
2858
13.4k
          filters_meta, (int32_t)typesize, g_compressor, g_force_blocksize, g_nthreads, g_nthreads,
2859
13.4k
          g_splitmode, g_tuner, NULL, g_schunk);
2860
13.4k
  free(filters);
2861
13.4k
  free(filters_meta);
2862
13.4k
  if (error <= 0) {
2863
31
    pthread_mutex_unlock(&global_comp_mutex);
2864
31
    return error;
2865
31
  }
2866
2867
13.3k
  envvar = getenv("BLOSC_BLOSC1_COMPAT");
2868
13.3k
  if (envvar != NULL) {
2869
    /* Write chunk header without extended header (Blosc1 compatibility mode) */
2870
0
    error = write_compression_header(g_global_context, false);
2871
0
  }
2872
13.3k
  else {
2873
13.3k
    error = write_compression_header(g_global_context, true);
2874
13.3k
  }
2875
13.3k
  if (error < 0) {
2876
0
    pthread_mutex_unlock(&global_comp_mutex);
2877
0
    return error;
2878
0
  }
2879
2880
13.3k
  result = blosc_compress_context(g_global_context);
2881
2882
13.3k
  pthread_mutex_unlock(&global_comp_mutex);
2883
2884
13.3k
  return result;
2885
13.3k
}
2886
2887
2888
/* The public routine for compression. */
2889
int blosc1_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
2890
0
                    const void* src, void* dest, size_t destsize) {
2891
0
  return blosc2_compress(clevel, doshuffle, (int32_t)typesize, src, (int32_t)nbytes, dest, (int32_t)destsize);
2892
0
}
2893
2894
2895
2896
static int blosc_run_decompression_with_context(blosc2_context* context, const void* src, int32_t srcsize,
2897
34.6k
                                                void* dest, int32_t destsize) {
2898
34.6k
  blosc_header header;
2899
34.6k
  int32_t ntbytes;
2900
34.6k
  int rc;
2901
2902
34.6k
  rc = read_chunk_header(src, srcsize, true, &header);
2903
34.6k
  if (rc < 0) {
2904
22
    return rc;
2905
22
  }
2906
2907
34.6k
  if (header.nbytes > destsize) {
2908
    // Not enough space for writing into the destination
2909
7.74k
    return BLOSC2_ERROR_WRITE_BUFFER;
2910
7.74k
  }
2911
2912
26.8k
  rc = initialize_context_decompression(context, &header, src, srcsize, dest, destsize);
2913
26.8k
  if (rc < 0) {
2914
145
    return rc;
2915
145
  }
2916
2917
  /* Do the actual decompression */
2918
26.7k
  ntbytes = do_job(context);
2919
26.7k
  if (ntbytes < 0) {
2920
9.30k
    return ntbytes;
2921
9.30k
  }
2922
2923
17.4k
  assert(ntbytes <= (int32_t)destsize);
2924
17.4k
  return ntbytes;
2925
26.7k
}
2926
2927
2928
/* The public secure routine for decompression with context. */
2929
int blosc2_decompress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2930
14.6k
                          void* dest, int32_t destsize) {
2931
14.6k
  int result;
2932
2933
14.6k
  if (context->do_compress != 0) {
2934
0
    BLOSC_TRACE_ERROR("Context is not meant for decompression.  Giving up.");
2935
0
    return BLOSC2_ERROR_INVALID_PARAM;
2936
0
  }
2937
2938
14.6k
  result = blosc_run_decompression_with_context(context, src, srcsize, dest, destsize);
2939
2940
  // Reset a possible block_maskout
2941
14.6k
  if (context->block_maskout != NULL) {
2942
0
    free(context->block_maskout);
2943
0
    context->block_maskout = NULL;
2944
0
  }
2945
14.6k
  context->block_maskout_nitems = 0;
2946
2947
14.6k
  return result;
2948
14.6k
}
2949
2950
2951
/* The public secure routine for decompression. */
2952
19.9k
int blosc2_decompress(const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2953
19.9k
  int result;
2954
19.9k
  char* envvar;
2955
19.9k
  long nthreads;
2956
19.9k
  blosc2_context *dctx;
2957
19.9k
  blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
2958
2959
  /* Check whether the library should be initialized */
2960
19.9k
  if (!g_initlib) blosc2_init();
2961
2962
  /* Check for a BLOSC_NTHREADS environment variable */
2963
19.9k
  envvar = getenv("BLOSC_NTHREADS");
2964
19.9k
  if (envvar != NULL) {
2965
0
    nthreads = strtol(envvar, NULL, 10);
2966
0
    if ((errno != EINVAL)) {
2967
0
      if ((nthreads <= 0) || (nthreads > INT16_MAX)) {
2968
0
        BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2969
0
        return BLOSC2_ERROR_INVALID_PARAM;
2970
0
      }
2971
0
      result = blosc2_set_nthreads((int16_t) nthreads);
2972
0
      if (result < 0) {
2973
0
        return result;
2974
0
      }
2975
0
    }
2976
0
  }
2977
2978
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
2979
     that this should be the last env var so that it can take the
2980
     previous ones into account */
2981
19.9k
  envvar = getenv("BLOSC_NOLOCK");
2982
19.9k
  if (envvar != NULL) {
2983
0
    dparams.nthreads = g_nthreads;
2984
0
    dctx = blosc2_create_dctx(dparams);
2985
0
    if (dctx == NULL) {
2986
0
      BLOSC_TRACE_ERROR("Error while creating the decompression context");
2987
0
      return BLOSC2_ERROR_NULL_POINTER;
2988
0
    }
2989
0
    result = blosc2_decompress_ctx(dctx, src, srcsize, dest, destsize);
2990
0
    blosc2_free_ctx(dctx);
2991
0
    return result;
2992
0
  }
2993
2994
19.9k
  pthread_mutex_lock(&global_comp_mutex);
2995
2996
19.9k
  result = blosc_run_decompression_with_context(
2997
19.9k
          g_global_context, src, srcsize, dest, destsize);
2998
2999
19.9k
  pthread_mutex_unlock(&global_comp_mutex);
3000
3001
19.9k
  return result;
3002
19.9k
}
3003
3004
3005
/* The public routine for decompression. */
3006
8.68k
int blosc1_decompress(const void* src, void* dest, size_t destsize) {
3007
8.68k
  return blosc2_decompress(src, INT32_MAX, dest, (int32_t)destsize);
3008
8.68k
}
3009
3010
3011
/* Specific routine optimized for decompression a small number of
3012
   items out of a compressed chunk.  This does not use threads because
3013
   it would affect negatively to performance. */
3014
int _blosc_getitem(blosc2_context* context, blosc_header* header, const void* src, int32_t srcsize,
3015
45
                   int start, int nitems, void* dest, int32_t destsize) {
3016
45
  uint8_t* _src = (uint8_t*)(src);  /* current pos for source buffer */
3017
45
  uint8_t* _dest = (uint8_t*)(dest);
3018
45
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
3019
45
  int32_t bsize, bsize2, ebsize, leftoverblock;
3020
45
  int32_t startb, stopb;
3021
45
  int32_t stop = start + nitems;
3022
45
  int j, rc;
3023
3024
45
  if (nitems == 0) {
3025
    // We have nothing to do
3026
0
    return 0;
3027
0
  }
3028
45
  if (nitems * header->typesize > destsize) {
3029
0
    BLOSC_TRACE_ERROR("`nitems`*`typesize` out of dest bounds.");
3030
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3031
0
  }
3032
3033
45
  context->bstarts = (int32_t*)(_src + context->header_overhead);
3034
3035
  /* Check region boundaries */
3036
45
  if ((start < 0) || (start * header->typesize > header->nbytes)) {
3037
0
    BLOSC_TRACE_ERROR("`start` out of bounds.");
3038
0
    return BLOSC2_ERROR_INVALID_PARAM;
3039
0
  }
3040
3041
45
  if ((stop < 0) || (stop * header->typesize > header->nbytes)) {
3042
0
    BLOSC_TRACE_ERROR("`start`+`nitems` out of bounds.");
3043
0
    return BLOSC2_ERROR_INVALID_PARAM;
3044
0
  }
3045
3046
45
  int chunk_memcpy = header->flags & 0x1;
3047
45
  if (!context->special_type && !chunk_memcpy &&
3048
45
      ((uint8_t *)(_src + srcsize) < (uint8_t *)(context->bstarts + context->nblocks))) {
3049
0
    BLOSC_TRACE_ERROR("`bstarts` out of bounds.");
3050
0
    return BLOSC2_ERROR_READ_BUFFER;
3051
0
  }
3052
3053
45
  bool memcpyed = header->flags & (uint8_t)BLOSC_MEMCPYED;
3054
45
  if (context->special_type) {
3055
    // Fake a runlen as if its a memcpyed chunk
3056
9
    memcpyed = true;
3057
9
  }
3058
3059
45
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
3060
45
                  (context->blosc2_flags & 0x08u) && !context->special_type);
3061
45
  if (memcpyed && !is_lazy && !context->postfilter) {
3062
    // Short-circuit for (non-lazy) memcpyed or special values
3063
45
    ntbytes = nitems * header->typesize;
3064
45
    switch (context->special_type) {
3065
0
      case BLOSC2_SPECIAL_VALUE:
3066
        // All repeated values
3067
0
        rc = set_values(context->typesize, _src, _dest, ntbytes);
3068
0
        if (rc < 0) {
3069
0
          BLOSC_TRACE_ERROR("set_values failed");
3070
0
          return BLOSC2_ERROR_DATA;
3071
0
        }
3072
0
        break;
3073
0
      case BLOSC2_SPECIAL_NAN:
3074
0
        rc = set_nans(context->typesize, _dest, ntbytes);
3075
0
        if (rc < 0) {
3076
0
          BLOSC_TRACE_ERROR("set_nans failed");
3077
0
          return BLOSC2_ERROR_DATA;
3078
0
        }
3079
0
        break;
3080
9
      case BLOSC2_SPECIAL_ZERO:
3081
9
        memset(_dest, 0, ntbytes);
3082
9
        break;
3083
0
      case BLOSC2_SPECIAL_UNINIT:
3084
        // We do nothing here
3085
0
        break;
3086
36
      case BLOSC2_NO_SPECIAL:
3087
36
        _src += context->header_overhead + start * context->typesize;
3088
36
        memcpy(_dest, _src, ntbytes);
3089
36
        break;
3090
0
      default:
3091
0
        BLOSC_TRACE_ERROR("Unhandled special value case");
3092
0
        BLOSC_ERROR(BLOSC2_ERROR_SCHUNK_SPECIAL);
3093
45
    }
3094
45
    return ntbytes;
3095
45
  }
3096
3097
0
  ebsize = header->blocksize + header->typesize * (signed)sizeof(int32_t);
3098
0
  struct thread_context* scontext = context->serial_context;
3099
  /* Resize the temporaries in serial context if needed */
3100
0
  if (header->blocksize > scontext->tmp_blocksize) {
3101
0
    my_free(scontext->tmp);
3102
0
    scontext->tmp_nbytes = (size_t)4 * ebsize;
3103
0
    scontext->tmp = my_malloc(scontext->tmp_nbytes);
3104
0
    BLOSC_ERROR_NULL(scontext->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
3105
0
    scontext->tmp2 = scontext->tmp + ebsize;
3106
0
    scontext->tmp3 = scontext->tmp2 + ebsize;
3107
0
    scontext->tmp4 = scontext->tmp3 + ebsize;
3108
0
    scontext->tmp_blocksize = (int32_t)header->blocksize;
3109
0
  }
3110
3111
0
  for (j = 0; j < context->nblocks; j++) {
3112
0
    bsize = header->blocksize;
3113
0
    leftoverblock = 0;
3114
0
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
3115
0
      bsize = context->leftover;
3116
0
      leftoverblock = 1;
3117
0
    }
3118
3119
    /* Compute start & stop for each block */
3120
0
    startb = start * header->typesize - j * header->blocksize;
3121
0
    stopb = stop * header->typesize - j * header->blocksize;
3122
0
    if (stopb <= 0) {
3123
      // We can exit as soon as this block is beyond stop
3124
0
      break;
3125
0
    }
3126
0
    if (startb >= header->blocksize) {
3127
0
      continue;
3128
0
    }
3129
0
    if (startb < 0) {
3130
0
      startb = 0;
3131
0
    }
3132
0
    if (stopb > header->blocksize) {
3133
0
      stopb = header->blocksize;
3134
0
    }
3135
0
    bsize2 = stopb - startb;
3136
3137
0
#if defined(HAVE_PLUGINS)
3138
0
    if (context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) {
3139
0
      scontext->zfp_cell_start = startb / context->typesize;
3140
0
      scontext->zfp_cell_nitems = nitems;
3141
0
    }
3142
0
#endif /* HAVE_PLUGINS */
3143
3144
    /* Do the actual data copy */
3145
    // Regular decompression.  Put results in tmp2.
3146
    // If the block is aligned and the worst case fits in destination, let's avoid a copy
3147
0
    bool get_single_block = ((startb == 0) && (bsize == nitems * header->typesize));
3148
0
    uint8_t* tmp2 = get_single_block ? dest : scontext->tmp2;
3149
3150
    // If memcpyed we don't have a bstarts section (because it is not needed)
3151
0
    int32_t src_offset = memcpyed ?
3152
0
      context->header_overhead + j * header->blocksize : sw32_(context->bstarts + j);
3153
3154
0
    int32_t cbytes = blosc_d(context->serial_context, bsize, leftoverblock, memcpyed,
3155
0
                             src, srcsize, src_offset, j,
3156
0
                             tmp2, 0, scontext->tmp, scontext->tmp3);
3157
0
    if (cbytes < 0) {
3158
0
      ntbytes = cbytes;
3159
0
      break;
3160
0
    }
3161
0
    if (scontext->zfp_cell_nitems > 0) {
3162
0
      if (cbytes == bsize2) {
3163
0
        memcpy((uint8_t *) dest, tmp2, (unsigned int) bsize2);
3164
0
      } else if (cbytes == context->blocksize) {
3165
0
        memcpy((uint8_t *) dest, tmp2 + scontext->zfp_cell_start * context->typesize, (unsigned int) bsize2);
3166
0
        cbytes = bsize2;
3167
0
      }
3168
0
    } else if (!get_single_block) {
3169
      /* Copy to destination */
3170
0
      memcpy((uint8_t *) dest + ntbytes, tmp2 + startb, (unsigned int) bsize2);
3171
0
    }
3172
0
    ntbytes += bsize2;
3173
0
  }
3174
3175
0
  scontext->zfp_cell_nitems = 0;
3176
3177
0
  return ntbytes;
3178
0
}
3179
3180
45
int blosc2_getitem(const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) {
3181
45
  blosc2_context context;
3182
45
  int result;
3183
3184
  /* Minimally populate the context */
3185
45
  memset(&context, 0, sizeof(blosc2_context));
3186
3187
45
  context.schunk = g_schunk;
3188
45
  context.nthreads = 1;  // force a serial decompression; fixes #95
3189
3190
  /* Call the actual getitem function */
3191
45
  result = blosc2_getitem_ctx(&context, src, srcsize, start, nitems, dest, destsize);
3192
3193
  /* Release resources */
3194
45
  if (context.serial_context != NULL) {
3195
45
    free_thread_context(context.serial_context);
3196
45
  }
3197
45
  return result;
3198
45
}
3199
3200
/* Specific routine optimized for decompression a small number of
3201
   items out of a compressed chunk.  Public non-contextual API. */
3202
0
int blosc1_getitem(const void* src, int start, int nitems, void* dest) {
3203
0
  return blosc2_getitem(src, INT32_MAX, start, nitems, dest, INT32_MAX);
3204
0
}
3205
3206
int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize,
3207
45
    int start, int nitems, void* dest, int32_t destsize) {
3208
45
  blosc_header header;
3209
45
  int result;
3210
3211
  /* Minimally populate the context */
3212
45
  result = read_chunk_header((uint8_t *) src, srcsize, true, &header);
3213
45
  if (result < 0) {
3214
0
    return result;
3215
0
  }
3216
3217
45
  context->src = src;
3218
45
  context->srcsize = srcsize;
3219
45
  context->dest = dest;
3220
45
  context->destsize = destsize;
3221
3222
45
  result = blosc2_initialize_context_from_header(context, &header);
3223
45
  if (result < 0) {
3224
0
    return result;
3225
0
  }
3226
3227
45
  if (context->serial_context == NULL) {
3228
45
    context->serial_context = create_thread_context(context, 0);
3229
45
  }
3230
45
  BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
3231
  /* Call the actual getitem function */
3232
45
  result = _blosc_getitem(context, &header, src, srcsize, start, nitems, dest, destsize);
3233
3234
45
  return result;
3235
45
}
3236
3237
/* execute single compression/decompression job for a single thread_context */
3238
static void t_blosc_do_job(void *ctxt)
3239
0
{
3240
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3241
0
  blosc2_context* context = thcontext->parent_context;
3242
0
  int32_t cbytes;
3243
0
  int32_t ntdest;
3244
0
  int32_t tblocks;               /* number of blocks per thread */
3245
0
  int32_t tblock;                /* limit block on a thread */
3246
0
  int32_t nblock_;              /* private copy of nblock */
3247
0
  int32_t bsize;
3248
0
  int32_t leftoverblock;
3249
  /* Parameters for threads */
3250
0
  int32_t blocksize;
3251
0
  int32_t ebsize;
3252
0
  int32_t srcsize;
3253
0
  bool compress = context->do_compress != 0;
3254
0
  int32_t maxbytes;
3255
0
  int32_t nblocks;
3256
0
  int32_t leftover;
3257
0
  int32_t leftover2;
3258
0
  int32_t* bstarts;
3259
0
  const uint8_t* src;
3260
0
  uint8_t* dest;
3261
0
  uint8_t* tmp;
3262
0
  uint8_t* tmp2;
3263
0
  uint8_t* tmp3;
3264
3265
  /* Get parameters for this thread before entering the main loop */
3266
0
  blocksize = context->blocksize;
3267
0
  ebsize = blocksize + context->typesize * (int32_t)sizeof(int32_t);
3268
0
  maxbytes = context->destsize;
3269
0
  nblocks = context->nblocks;
3270
0
  leftover = context->leftover;
3271
0
  bstarts = context->bstarts;
3272
0
  src = context->src;
3273
0
  srcsize = context->srcsize;
3274
0
  dest = context->dest;
3275
3276
  /* Resize the temporaries if needed */
3277
0
  if (blocksize > thcontext->tmp_blocksize) {
3278
0
    my_free(thcontext->tmp);
3279
0
    thcontext->tmp_nbytes = (size_t) 4 * ebsize;
3280
0
    thcontext->tmp = my_malloc(thcontext->tmp_nbytes);
3281
0
    thcontext->tmp2 = thcontext->tmp + ebsize;
3282
0
    thcontext->tmp3 = thcontext->tmp2 + ebsize;
3283
0
    thcontext->tmp4 = thcontext->tmp3 + ebsize;
3284
0
    thcontext->tmp_blocksize = blocksize;
3285
0
  }
3286
3287
0
  tmp = thcontext->tmp;
3288
0
  tmp2 = thcontext->tmp2;
3289
0
  tmp3 = thcontext->tmp3;
3290
3291
  // Determine whether we can do a static distribution of workload among different threads
3292
0
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
3293
0
  if (!context->do_compress && context->special_type) {
3294
    // Fake a runlen as if its a memcpyed chunk
3295
0
    memcpyed = true;
3296
0
  }
3297
3298
0
  bool static_schedule = (!compress || memcpyed) && context->block_maskout == NULL;
3299
0
  if (static_schedule) {
3300
      /* Blocks per thread */
3301
0
      tblocks = nblocks / context->nthreads;
3302
0
      leftover2 = nblocks % context->nthreads;
3303
0
      tblocks = (leftover2 > 0) ? tblocks + 1 : tblocks;
3304
0
      nblock_ = thcontext->tid * tblocks;
3305
0
      tblock = nblock_ + tblocks;
3306
0
      if (tblock > nblocks) {
3307
0
          tblock = nblocks;
3308
0
      }
3309
0
  }
3310
0
  else {
3311
    // Use dynamic schedule via a queue.  Get the next block.
3312
0
    pthread_mutex_lock(&context->count_mutex);
3313
0
    context->thread_nblock++;
3314
0
    nblock_ = context->thread_nblock;
3315
0
    pthread_mutex_unlock(&context->count_mutex);
3316
0
    tblock = nblocks;
3317
0
  }
3318
3319
  /* Loop over blocks */
3320
0
  leftoverblock = 0;
3321
0
  while ((nblock_ < tblock) && (context->thread_giveup_code > 0)) {
3322
0
    bsize = blocksize;
3323
0
    if (nblock_ == (nblocks - 1) && (leftover > 0)) {
3324
0
      bsize = leftover;
3325
0
      leftoverblock = 1;
3326
0
    }
3327
0
    if (compress) {
3328
0
      if (memcpyed) {
3329
0
        if (!context->prefilter) {
3330
          /* We want to memcpy only */
3331
0
          memcpy(dest + context->header_overhead + nblock_ * blocksize,
3332
0
                 src + nblock_ * blocksize, (unsigned int) bsize);
3333
0
          cbytes = (int32_t) bsize;
3334
0
        }
3335
0
        else {
3336
          /* Only the prefilter has to be executed, and this is done in blosc_c().
3337
           * However, no further actions are needed, so we can put the result
3338
           * directly in dest. */
3339
0
          cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3340
0
                           ebsize, src, nblock_ * blocksize,
3341
0
                           dest + context->header_overhead + nblock_ * blocksize,
3342
0
                           tmp, tmp3);
3343
0
        }
3344
0
      }
3345
0
      else {
3346
        /* Regular compression */
3347
0
        cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3348
0
                          ebsize, src, nblock_ * blocksize, tmp2, tmp, tmp3);
3349
0
      }
3350
0
    }
3351
0
    else {
3352
      /* Regular decompression */
3353
0
      if (context->special_type == BLOSC2_NO_SPECIAL && !memcpyed &&
3354
0
          (srcsize < (int32_t)(context->header_overhead + (sizeof(int32_t) * nblocks)))) {
3355
        /* Not enough input to read all `bstarts` */
3356
0
        cbytes = -1;
3357
0
      }
3358
0
      else {
3359
        // If memcpyed we don't have a bstarts section (because it is not needed)
3360
0
        int32_t src_offset = memcpyed ?
3361
0
            context->header_overhead + nblock_ * blocksize : sw32_(bstarts + nblock_);
3362
0
        cbytes = blosc_d(thcontext, bsize, leftoverblock, memcpyed,
3363
0
                          src, srcsize, src_offset, nblock_,
3364
0
                          dest, nblock_ * blocksize, tmp, tmp2);
3365
0
      }
3366
0
    }
3367
3368
    /* Check whether current thread has to giveup */
3369
0
    if (context->thread_giveup_code <= 0) {
3370
0
      break;
3371
0
    }
3372
3373
    /* Check results for the compressed/decompressed block */
3374
0
    if (cbytes < 0) {            /* compr/decompr failure */
3375
      /* Set giveup_code error */
3376
0
      pthread_mutex_lock(&context->count_mutex);
3377
0
      context->thread_giveup_code = cbytes;
3378
0
      pthread_mutex_unlock(&context->count_mutex);
3379
0
      break;
3380
0
    }
3381
3382
0
    if (compress && !memcpyed) {
3383
      /* Start critical section */
3384
0
      pthread_mutex_lock(&context->count_mutex);
3385
0
      ntdest = context->output_bytes;
3386
      // Note: do not use a typical local dict_training variable here
3387
      // because it is probably cached from previous calls if the number of
3388
      // threads does not change (the usual thing).
3389
0
      if (!(context->use_dict && context->dict_cdict == NULL)) {
3390
0
        _sw32(bstarts + nblock_, (int32_t) ntdest);
3391
0
      }
3392
3393
0
      if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
3394
0
        context->thread_giveup_code = 0;  /* incompressible buf */
3395
0
        pthread_mutex_unlock(&context->count_mutex);
3396
0
        break;
3397
0
      }
3398
0
      context->thread_nblock++;
3399
0
      nblock_ = context->thread_nblock;
3400
0
      context->output_bytes += cbytes;
3401
0
      pthread_mutex_unlock(&context->count_mutex);
3402
      /* End of critical section */
3403
3404
      /* Copy the compressed buffer to destination */
3405
0
      memcpy(dest + ntdest, tmp2, (unsigned int) cbytes);
3406
0
    }
3407
0
    else if (static_schedule) {
3408
0
      nblock_++;
3409
0
    }
3410
0
    else {
3411
0
      pthread_mutex_lock(&context->count_mutex);
3412
0
      context->thread_nblock++;
3413
0
      nblock_ = context->thread_nblock;
3414
0
      context->output_bytes += cbytes;
3415
0
      pthread_mutex_unlock(&context->count_mutex);
3416
0
    }
3417
3418
0
  } /* closes while (nblock_) */
3419
3420
0
  if (static_schedule) {
3421
0
    pthread_mutex_lock(&context->count_mutex);
3422
0
    context->output_bytes = context->sourcesize;
3423
0
    if (compress) {
3424
0
      context->output_bytes += context->header_overhead;
3425
0
    }
3426
0
    pthread_mutex_unlock(&context->count_mutex);
3427
0
  }
3428
3429
0
}
3430
3431
/* Decompress & unshuffle several blocks in a single thread */
3432
0
static void* t_blosc(void* ctxt) {
3433
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3434
0
  blosc2_context* context = thcontext->parent_context;
3435
0
#ifdef BLOSC_POSIX_BARRIERS
3436
0
  int rc;
3437
0
#endif
3438
3439
0
  while (1) {
3440
    /* Synchronization point for all threads (wait for initialization) */
3441
0
    WAIT_INIT(NULL, context);
3442
3443
0
    if (context->end_threads) {
3444
0
      break;
3445
0
    }
3446
3447
0
    t_blosc_do_job(ctxt);
3448
3449
    /* Meeting point for all threads (wait for finalization) */
3450
0
    WAIT_FINISH(NULL, context);
3451
0
  }
3452
3453
  /* Cleanup our working space and context */
3454
0
  free_thread_context(thcontext);
3455
3456
0
  return (NULL);
3457
0
}
3458
3459
3460
0
int init_threadpool(blosc2_context *context) {
3461
0
  int32_t tid;
3462
0
  int rc2;
3463
3464
  /* Initialize mutex and condition variable objects */
3465
0
  pthread_mutex_init(&context->count_mutex, NULL);
3466
0
  pthread_mutex_init(&context->delta_mutex, NULL);
3467
0
  pthread_mutex_init(&context->nchunk_mutex, NULL);
3468
0
  pthread_cond_init(&context->delta_cv, NULL);
3469
3470
  /* Set context thread sentinels */
3471
0
  context->thread_giveup_code = 1;
3472
0
  context->thread_nblock = -1;
3473
3474
  /* Barrier initialization */
3475
0
#ifdef BLOSC_POSIX_BARRIERS
3476
0
  pthread_barrier_init(&context->barr_init, NULL, context->nthreads + 1);
3477
0
  pthread_barrier_init(&context->barr_finish, NULL, context->nthreads + 1);
3478
#else
3479
  pthread_mutex_init(&context->count_threads_mutex, NULL);
3480
  pthread_cond_init(&context->count_threads_cv, NULL);
3481
  context->count_threads = 0;      /* Reset threads counter */
3482
#endif
3483
3484
0
  if (threads_callback) {
3485
      /* Create thread contexts to store data for callback threads */
3486
0
    context->thread_contexts = (struct thread_context *)my_malloc(
3487
0
            context->nthreads * sizeof(struct thread_context));
3488
0
    BLOSC_ERROR_NULL(context->thread_contexts, BLOSC2_ERROR_MEMORY_ALLOC);
3489
0
    for (tid = 0; tid < context->nthreads; tid++)
3490
0
      init_thread_context(context->thread_contexts + tid, context, tid);
3491
0
  }
3492
0
  else {
3493
0
    #if !defined(_WIN32)
3494
      /* Initialize and set thread detached attribute */
3495
0
      pthread_attr_init(&context->ct_attr);
3496
0
      pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
3497
0
    #endif
3498
3499
    /* Make space for thread handlers */
3500
0
    context->threads = (pthread_t*)my_malloc(
3501
0
            context->nthreads * sizeof(pthread_t));
3502
0
    BLOSC_ERROR_NULL(context->threads, BLOSC2_ERROR_MEMORY_ALLOC);
3503
    /* Finally, create the threads */
3504
0
    for (tid = 0; tid < context->nthreads; tid++) {
3505
      /* Create a thread context (will destroy when finished) */
3506
0
      struct thread_context *thread_context = create_thread_context(context, tid);
3507
0
      BLOSC_ERROR_NULL(thread_context, BLOSC2_ERROR_THREAD_CREATE);
3508
0
      #if !defined(_WIN32)
3509
0
        rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc,
3510
0
                            (void*)thread_context);
3511
      #else
3512
        rc2 = pthread_create(&context->threads[tid], NULL, t_blosc,
3513
                            (void *)thread_context);
3514
      #endif
3515
0
      if (rc2) {
3516
0
        BLOSC_TRACE_ERROR("Return code from pthread_create() is %d.\n"
3517
0
                          "\tError detail: %s\n", rc2, strerror(rc2));
3518
0
        return BLOSC2_ERROR_THREAD_CREATE;
3519
0
      }
3520
0
    }
3521
0
  }
3522
3523
  /* We have now started/initialized the threads */
3524
0
  context->threads_started = context->nthreads;
3525
0
  context->new_nthreads = context->nthreads;
3526
3527
0
  return 0;
3528
0
}
3529
3530
int16_t blosc2_get_nthreads(void)
3531
20
{
3532
20
  return g_nthreads;
3533
20
}
3534
3535
24.9k
int16_t blosc2_set_nthreads(int16_t nthreads) {
3536
24.9k
  int16_t ret = g_nthreads;          /* the previous number of threads */
3537
3538
  /* Check whether the library should be initialized */
3539
24.9k
  if (!g_initlib) blosc2_init();
3540
3541
24.9k
 if (nthreads != ret) {
3542
0
   g_nthreads = nthreads;
3543
0
   g_global_context->new_nthreads = nthreads;
3544
0
   int16_t ret2 = check_nthreads(g_global_context);
3545
0
   if (ret2 < 0) {
3546
0
     return ret2;
3547
0
   }
3548
0
 }
3549
3550
24.9k
  return ret;
3551
24.9k
}
3552
3553
3554
const char* blosc1_get_compressor(void)
3555
0
{
3556
0
  const char* compname;
3557
0
  blosc2_compcode_to_compname(g_compressor, &compname);
3558
3559
0
  return compname;
3560
0
}
3561
3562
15.6k
int blosc1_set_compressor(const char* compname) {
3563
15.6k
  int code = blosc2_compname_to_compcode(compname);
3564
15.6k
  if (code >= BLOSC_LAST_CODEC) {
3565
0
    BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
3566
0
    BLOSC_ERROR(BLOSC2_ERROR_CODEC_SUPPORT);
3567
0
  }
3568
15.6k
  g_compressor = code;
3569
3570
  /* Check whether the library should be initialized */
3571
15.6k
  if (!g_initlib) blosc2_init();
3572
3573
15.6k
  return code;
3574
15.6k
}
3575
3576
0
void blosc2_set_delta(int dodelta) {
3577
3578
0
  g_delta = dodelta;
3579
3580
  /* Check whether the library should be initialized */
3581
0
  if (!g_initlib) blosc2_init();
3582
3583
0
}
3584
3585
0
const char* blosc2_list_compressors(void) {
3586
0
  static int compressors_list_done = 0;
3587
0
  static char ret[256];
3588
3589
0
  if (compressors_list_done) return ret;
3590
0
  ret[0] = '\0';
3591
0
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
3592
0
  strcat(ret, ",");
3593
0
  strcat(ret, BLOSC_LZ4_COMPNAME);
3594
0
  strcat(ret, ",");
3595
0
  strcat(ret, BLOSC_LZ4HC_COMPNAME);
3596
0
#if defined(HAVE_ZLIB)
3597
0
  strcat(ret, ",");
3598
0
  strcat(ret, BLOSC_ZLIB_COMPNAME);
3599
0
#endif /* HAVE_ZLIB */
3600
0
#if defined(HAVE_ZSTD)
3601
0
  strcat(ret, ",");
3602
0
  strcat(ret, BLOSC_ZSTD_COMPNAME);
3603
0
#endif /* HAVE_ZSTD */
3604
0
  compressors_list_done = 1;
3605
0
  return ret;
3606
0
}
3607
3608
3609
0
const char* blosc2_get_version_string(void) {
3610
0
  return BLOSC2_VERSION_STRING;
3611
0
}
3612
3613
3614
0
int blosc2_get_complib_info(const char* compname, char** complib, char** version) {
3615
0
  int clibcode;
3616
0
  const char* clibname;
3617
0
  const char* clibversion = "unknown";
3618
0
  char sbuffer[256];
3619
3620
0
  clibcode = compname_to_clibcode(compname);
3621
0
  clibname = clibcode_to_clibname(clibcode);
3622
3623
  /* complib version */
3624
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
3625
0
    clibversion = BLOSCLZ_VERSION_STRING;
3626
0
  }
3627
0
  else if (clibcode == BLOSC_LZ4_LIB) {
3628
0
    sprintf(sbuffer, "%d.%d.%d",
3629
0
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
3630
0
    clibversion = sbuffer;
3631
0
  }
3632
0
#if defined(HAVE_ZLIB)
3633
0
  else if (clibcode == BLOSC_ZLIB_LIB) {
3634
0
#ifdef ZLIB_COMPAT
3635
0
    clibversion = ZLIB_VERSION;
3636
#elif defined(HAVE_ZLIB_NG)
3637
    clibversion = ZLIBNG_VERSION;
3638
#else
3639
    clibversion = ZLIB_VERSION;
3640
#endif
3641
0
  }
3642
0
#endif /* HAVE_ZLIB */
3643
0
#if defined(HAVE_ZSTD)
3644
0
  else if (clibcode == BLOSC_ZSTD_LIB) {
3645
0
    sprintf(sbuffer, "%d.%d.%d",
3646
0
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
3647
0
    clibversion = sbuffer;
3648
0
  }
3649
0
#endif /* HAVE_ZSTD */
3650
3651
#ifdef _MSC_VER
3652
  *complib = _strdup(clibname);
3653
  *version = _strdup(clibversion);
3654
#else
3655
0
  *complib = strdup(clibname);
3656
0
  *version = strdup(clibversion);
3657
0
#endif
3658
0
  return clibcode;
3659
0
}
3660
3661
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
3662
20.2k
void blosc1_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, size_t* blocksize) {
3663
20.2k
  int32_t nbytes32, cbytes32, blocksize32;
3664
20.2k
  blosc2_cbuffer_sizes(cbuffer, &nbytes32, &cbytes32, &blocksize32);
3665
20.2k
  *nbytes = nbytes32;
3666
20.2k
  *cbytes = cbytes32;
3667
20.2k
  *blocksize = blocksize32;
3668
20.2k
}
3669
3670
301k
int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, int32_t* cbytes, int32_t* blocksize) {
3671
301k
  blosc_header header;
3672
301k
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3673
301k
  if (rc < 0) {
3674
    /* Return zeros if error reading header */
3675
136
    memset(&header, 0, sizeof(header));
3676
136
  }
3677
3678
  /* Read the interesting values */
3679
301k
  if (nbytes != NULL)
3680
300k
    *nbytes = header.nbytes;
3681
301k
  if (cbytes != NULL)
3682
300k
    *cbytes = header.cbytes;
3683
301k
  if (blocksize != NULL)
3684
20.2k
    *blocksize = header.blocksize;
3685
301k
  return rc;
3686
301k
}
3687
3688
11.3k
int blosc1_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
3689
11.3k
  int32_t header_cbytes;
3690
11.3k
  int32_t header_nbytes;
3691
11.3k
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) {
3692
    /* Compressed data should contain enough space for header */
3693
0
    *nbytes = 0;
3694
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3695
0
  }
3696
11.3k
  int rc = blosc2_cbuffer_sizes(cbuffer, &header_nbytes, &header_cbytes, NULL);
3697
11.3k
  if (rc < 0) {
3698
0
    *nbytes = 0;
3699
0
    return rc;
3700
0
  }
3701
11.3k
  *nbytes = header_nbytes;
3702
11.3k
  if (header_cbytes != (int32_t)cbytes) {
3703
    /* Compressed size from header does not match `cbytes` */
3704
0
    *nbytes = 0;
3705
0
    return BLOSC2_ERROR_INVALID_HEADER;
3706
0
  }
3707
11.3k
  if (*nbytes > BLOSC2_MAX_BUFFERSIZE) {
3708
    /* Uncompressed size is larger than allowed */
3709
51
    *nbytes = 0;
3710
51
    return BLOSC2_ERROR_MEMORY_ALLOC;
3711
51
  }
3712
11.2k
  return 0;
3713
11.3k
}
3714
3715
/* Return `typesize` and `flags` from a compressed buffer. */
3716
0
void blosc1_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) {
3717
0
  blosc_header header;
3718
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3719
0
  if (rc < 0) {
3720
0
    *typesize = *flags = 0;
3721
0
    return;
3722
0
  }
3723
3724
  /* Read the interesting values */
3725
0
  *flags = header.flags;
3726
0
  *typesize = header.typesize;
3727
0
}
3728
3729
3730
/* Return version information from a compressed buffer. */
3731
0
void blosc2_cbuffer_versions(const void* cbuffer, int* version, int* versionlz) {
3732
0
  blosc_header header;
3733
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3734
0
  if (rc < 0) {
3735
0
    *version = *versionlz = 0;
3736
0
    return;
3737
0
  }
3738
3739
  /* Read the version info */
3740
0
  *version = header.version;
3741
0
  *versionlz = header.versionlz;
3742
0
}
3743
3744
3745
/* Return the compressor library/format used in a compressed buffer. */
3746
0
const char* blosc2_cbuffer_complib(const void* cbuffer) {
3747
0
  blosc_header header;
3748
0
  int clibcode;
3749
0
  const char* complib;
3750
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3751
0
  if (rc < 0) {
3752
0
    return NULL;
3753
0
  }
3754
3755
  /* Read the compressor format/library info */
3756
0
  clibcode = (header.flags & 0xe0) >> 5;
3757
0
  complib = clibcode_to_clibname(clibcode);
3758
0
  return complib;
3759
0
}
3760
3761
3762
/* Get the internal blocksize to be used during compression.  0 means
3763
   that an automatic blocksize is computed internally. */
3764
int blosc1_get_blocksize(void)
3765
0
{
3766
0
  return (int)g_force_blocksize;
3767
0
}
3768
3769
3770
/* Force the use of a specific blocksize.  If 0, an automatic
3771
   blocksize will be used (the default). */
3772
2.75k
void blosc1_set_blocksize(size_t blocksize) {
3773
2.75k
  g_force_blocksize = (int32_t)blocksize;
3774
2.75k
}
3775
3776
3777
/* Force the use of a specific split mode. */
3778
void blosc1_set_splitmode(int mode)
3779
0
{
3780
0
  g_splitmode = mode;
3781
0
}
3782
3783
3784
/* Set pointer to super-chunk.  If NULL, no super-chunk will be
3785
   reachable (the default). */
3786
0
void blosc_set_schunk(blosc2_schunk* schunk) {
3787
0
  g_schunk = schunk;
3788
0
  g_global_context->schunk = schunk;
3789
0
}
3790
3791
blosc2_io *blosc2_io_global = NULL;
3792
blosc2_io_cb BLOSC2_IO_CB_DEFAULTS;
3793
blosc2_io_cb BLOSC2_IO_CB_MMAP;
3794
3795
13.8k
void blosc2_init(void) {
3796
  /* Return if Blosc is already initialized */
3797
13.8k
  if (g_initlib) return;
3798
3799
13.8k
  BLOSC2_IO_CB_DEFAULTS.id = BLOSC2_IO_FILESYSTEM;
3800
13.8k
  BLOSC2_IO_CB_DEFAULTS.name = "filesystem";
3801
13.8k
  BLOSC2_IO_CB_DEFAULTS.is_allocation_necessary = true;
3802
13.8k
  BLOSC2_IO_CB_DEFAULTS.open = (blosc2_open_cb) blosc2_stdio_open;
3803
13.8k
  BLOSC2_IO_CB_DEFAULTS.close = (blosc2_close_cb) blosc2_stdio_close;
3804
13.8k
  BLOSC2_IO_CB_DEFAULTS.size = (blosc2_size_cb) blosc2_stdio_size;
3805
13.8k
  BLOSC2_IO_CB_DEFAULTS.write = (blosc2_write_cb) blosc2_stdio_write;
3806
13.8k
  BLOSC2_IO_CB_DEFAULTS.read = (blosc2_read_cb) blosc2_stdio_read;
3807
13.8k
  BLOSC2_IO_CB_DEFAULTS.truncate = (blosc2_truncate_cb) blosc2_stdio_truncate;
3808
13.8k
  BLOSC2_IO_CB_DEFAULTS.destroy = NULL;
3809
3810
13.8k
  BLOSC2_IO_CB_MMAP.id = BLOSC2_IO_FILESYSTEM_MMAP;
3811
13.8k
  BLOSC2_IO_CB_MMAP.name = "filesystem_mmap";
3812
13.8k
  BLOSC2_IO_CB_MMAP.is_allocation_necessary = false;
3813
13.8k
  BLOSC2_IO_CB_MMAP.open = (blosc2_open_cb) blosc2_stdio_mmap_open;
3814
13.8k
  BLOSC2_IO_CB_MMAP.close = (blosc2_close_cb) blosc2_stdio_mmap_close;
3815
13.8k
  BLOSC2_IO_CB_MMAP.read = (blosc2_read_cb) blosc2_stdio_mmap_read;
3816
13.8k
  BLOSC2_IO_CB_MMAP.size = (blosc2_size_cb) blosc2_stdio_mmap_size;
3817
13.8k
  BLOSC2_IO_CB_MMAP.write = (blosc2_write_cb) blosc2_stdio_mmap_write;
3818
13.8k
  BLOSC2_IO_CB_MMAP.truncate = (blosc2_truncate_cb) blosc2_stdio_mmap_truncate;
3819
13.8k
  BLOSC2_IO_CB_MMAP.destroy = (blosc2_destroy_cb) blosc2_stdio_mmap_destroy;
3820
3821
13.8k
  g_ncodecs = 0;
3822
13.8k
  g_nfilters = 0;
3823
13.8k
  g_ntuners = 0;
3824
3825
13.8k
#if defined(HAVE_PLUGINS)
3826
13.8k
  #include "blosc2/blosc2-common.h"
3827
13.8k
  #include "blosc2/blosc2-stdio.h"
3828
13.8k
  register_codecs();
3829
13.8k
  register_filters();
3830
13.8k
  register_tuners();
3831
13.8k
#endif
3832
13.8k
  pthread_mutex_init(&global_comp_mutex, NULL);
3833
  /* Create a global context */
3834
13.8k
  g_global_context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3835
13.8k
  memset(g_global_context, 0, sizeof(blosc2_context));
3836
13.8k
  g_global_context->nthreads = g_nthreads;
3837
13.8k
  g_global_context->new_nthreads = g_nthreads;
3838
13.8k
  g_initlib = 1;
3839
13.8k
}
3840
3841
3842
13.8k
int blosc2_free_resources(void) {
3843
  /* Return if Blosc is not initialized */
3844
13.8k
  if (!g_initlib) return BLOSC2_ERROR_FAILURE;
3845
3846
13.8k
  return release_threadpool(g_global_context);
3847
13.8k
}
3848
3849
3850
13.8k
void blosc2_destroy(void) {
3851
  /* Return if Blosc is not initialized */
3852
13.8k
  if (!g_initlib) return;
3853
3854
13.8k
  blosc2_free_resources();
3855
13.8k
  g_initlib = 0;
3856
13.8k
  blosc2_free_ctx(g_global_context);
3857
3858
13.8k
  pthread_mutex_destroy(&global_comp_mutex);
3859
3860
13.8k
}
3861
3862
3863
32.1k
int release_threadpool(blosc2_context *context) {
3864
32.1k
  int32_t t;
3865
32.1k
  void* status;
3866
32.1k
  int rc;
3867
3868
32.1k
  if (context->threads_started > 0) {
3869
0
    if (threads_callback) {
3870
      /* free context data for user-managed threads */
3871
0
      for (t=0; t<context->threads_started; t++)
3872
0
        destroy_thread_context(context->thread_contexts + t);
3873
0
      my_free(context->thread_contexts);
3874
0
    }
3875
0
    else {
3876
      /* Tell all existing threads to finish */
3877
0
      context->end_threads = 1;
3878
0
      WAIT_INIT(-1, context);
3879
3880
      /* Join exiting threads */
3881
0
      for (t = 0; t < context->threads_started; t++) {
3882
0
        rc = pthread_join(context->threads[t], &status);
3883
0
        if (rc) {
3884
0
          BLOSC_TRACE_ERROR("Return code from pthread_join() is %d\n"
3885
0
                            "\tError detail: %s.", rc, strerror(rc));
3886
0
        }
3887
0
      }
3888
3889
      /* Thread attributes */
3890
0
      #if !defined(_WIN32)
3891
0
        pthread_attr_destroy(&context->ct_attr);
3892
0
      #endif
3893
3894
      /* Release thread handlers */
3895
0
      my_free(context->threads);
3896
0
    }
3897
3898
    /* Release mutex and condition variable objects */
3899
0
    pthread_mutex_destroy(&context->count_mutex);
3900
0
    pthread_mutex_destroy(&context->delta_mutex);
3901
0
    pthread_mutex_destroy(&context->nchunk_mutex);
3902
0
    pthread_cond_destroy(&context->delta_cv);
3903
3904
    /* Barriers */
3905
0
  #ifdef BLOSC_POSIX_BARRIERS
3906
0
    pthread_barrier_destroy(&context->barr_init);
3907
0
    pthread_barrier_destroy(&context->barr_finish);
3908
  #else
3909
    pthread_mutex_destroy(&context->count_threads_mutex);
3910
    pthread_cond_destroy(&context->count_threads_cv);
3911
    context->count_threads = 0;      /* Reset threads counter */
3912
  #endif
3913
3914
    /* Reset flags and counters */
3915
0
    context->end_threads = 0;
3916
0
    context->threads_started = 0;
3917
0
  }
3918
3919
3920
32.1k
  return 0;
3921
32.1k
}
3922
3923
3924
/* Contexts */
3925
3926
/* Create a context for compression */
3927
2.27k
blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) {
3928
2.27k
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3929
2.27k
  BLOSC_ERROR_NULL(context, NULL);
3930
3931
  /* Populate the context, using zeros as default values */
3932
2.27k
  memset(context, 0, sizeof(blosc2_context));
3933
2.27k
  context->do_compress = 1;   /* meant for compression */
3934
2.27k
  context->use_dict = cparams.use_dict;
3935
2.27k
  if (cparams.instr_codec) {
3936
0
    context->blosc2_flags = BLOSC2_INSTR_CODEC;
3937
0
  }
3938
3939
15.9k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
3940
13.6k
    context->filters[i] = cparams.filters[i];
3941
13.6k
    context->filters_meta[i] = cparams.filters_meta[i];
3942
3943
13.6k
    if (context->filters[i] >= BLOSC_LAST_FILTER && context->filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
3944
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3945
0
                        context->filters[i]);
3946
0
      free(context);
3947
0
      return NULL;
3948
0
    }
3949
13.6k
    if (context->filters[i] > BLOSC_LAST_REGISTERED_FILTER && context->filters[i] <= BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP) {
3950
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3951
0
                        context->filters[i]);
3952
0
      free(context);
3953
0
      return NULL;
3954
0
    }
3955
13.6k
  }
3956
3957
2.27k
#if defined(HAVE_PLUGINS)
3958
2.27k
#include "blosc2/codecs-registry.h"
3959
2.27k
  if ((context->compcode >= BLOSC_CODEC_ZFP_FIXED_ACCURACY) && (context->compcode <= BLOSC_CODEC_ZFP_FIXED_RATE)) {
3960
0
    for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
3961
0
      if ((context->filters[i] == BLOSC_SHUFFLE) || (context->filters[i] == BLOSC_BITSHUFFLE)) {
3962
0
        BLOSC_TRACE_ERROR("ZFP cannot be run in presence of SHUFFLE / BITSHUFFLE");
3963
0
        return NULL;
3964
0
      }
3965
0
    }
3966
0
  }
3967
2.27k
#endif /* HAVE_PLUGINS */
3968
3969
  /* Check for a BLOSC_SHUFFLE environment variable */
3970
2.27k
  int doshuffle = -1;
3971
2.27k
  char* envvar = getenv("BLOSC_SHUFFLE");
3972
2.27k
  if (envvar != NULL) {
3973
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
3974
0
      doshuffle = BLOSC_NOSHUFFLE;
3975
0
    }
3976
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
3977
0
      doshuffle = BLOSC_SHUFFLE;
3978
0
    }
3979
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
3980
0
      doshuffle = BLOSC_BITSHUFFLE;
3981
0
    }
3982
0
    else {
3983
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
3984
0
    }
3985
0
  }
3986
  /* Check for a BLOSC_DELTA environment variable */
3987
0
  int dodelta = BLOSC_NOFILTER;
3988
2.27k
  envvar = getenv("BLOSC_DELTA");
3989
2.27k
  if (envvar != NULL) {
3990
0
    if (strcmp(envvar, "1") == 0) {
3991
0
      dodelta = BLOSC_DELTA;
3992
0
    } else if (strcmp(envvar, "0") == 0){
3993
0
      dodelta = BLOSC_NOFILTER;
3994
0
    }
3995
0
    else {
3996
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
3997
0
    }
3998
0
  }
3999
  /* Check for a BLOSC_TYPESIZE environment variable */
4000
0
  context->typesize = cparams.typesize;
4001
2.27k
  envvar = getenv("BLOSC_TYPESIZE");
4002
2.27k
  if (envvar != NULL) {
4003
0
    int32_t value;
4004
0
    value = (int32_t) strtol(envvar, NULL, 10);
4005
0
    if ((errno != EINVAL) && (value > 0)) {
4006
0
      context->typesize = value;
4007
0
    }
4008
0
    else {
4009
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
4010
0
    }
4011
0
  }
4012
0
  build_filters(doshuffle, dodelta, context->typesize, context->filters);
4013
4014
2.27k
  context->clevel = cparams.clevel;
4015
  /* Check for a BLOSC_CLEVEL environment variable */
4016
2.27k
  envvar = getenv("BLOSC_CLEVEL");
4017
2.27k
  if (envvar != NULL) {
4018
0
    int value;
4019
0
    value = (int)strtol(envvar, NULL, 10);
4020
0
    if ((errno != EINVAL) && (value >= 0)) {
4021
0
      context->clevel = value;
4022
0
    }
4023
0
    else {
4024
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
4025
0
    }
4026
0
  }
4027
4028
0
  context->compcode = cparams.compcode;
4029
  /* Check for a BLOSC_COMPRESSOR environment variable */
4030
2.27k
  envvar = getenv("BLOSC_COMPRESSOR");
4031
2.27k
  if (envvar != NULL) {
4032
0
    int codec = blosc2_compname_to_compcode(envvar);
4033
0
    if (codec >= BLOSC_LAST_CODEC) {
4034
0
      BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
4035
0
      return NULL;
4036
0
    }
4037
0
    context->compcode = codec;
4038
0
  }
4039
2.27k
  context->compcode_meta = cparams.compcode_meta;
4040
4041
2.27k
  context->blocksize = cparams.blocksize;
4042
  /* Check for a BLOSC_BLOCKSIZE environment variable */
4043
2.27k
  envvar = getenv("BLOSC_BLOCKSIZE");
4044
2.27k
  if (envvar != NULL) {
4045
0
    int32_t blocksize;
4046
0
    blocksize = (int32_t) strtol(envvar, NULL, 10);
4047
0
    if ((errno != EINVAL) && (blocksize > 0)) {
4048
0
      context->blocksize = blocksize;
4049
0
    }
4050
0
    else {
4051
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
4052
0
    }
4053
0
  }
4054
4055
0
  context->nthreads = cparams.nthreads;
4056
  /* Check for a BLOSC_NTHREADS environment variable */
4057
2.27k
  envvar = getenv("BLOSC_NTHREADS");
4058
2.27k
  if (envvar != NULL) {
4059
0
    int16_t nthreads = (int16_t) strtol(envvar, NULL, 10);
4060
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4061
0
      context->nthreads = nthreads;
4062
0
    }
4063
0
    else {
4064
0
      BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
4065
0
    }
4066
0
  }
4067
0
  context->new_nthreads = context->nthreads;
4068
4069
2.27k
  context->splitmode = cparams.splitmode;
4070
  /* Check for a BLOSC_SPLITMODE environment variable */
4071
2.27k
  envvar = getenv("BLOSC_SPLITMODE");
4072
2.27k
  if (envvar != NULL) {
4073
0
    int32_t splitmode = -1;
4074
0
    if (strcmp(envvar, "ALWAYS") == 0) {
4075
0
      splitmode = BLOSC_ALWAYS_SPLIT;
4076
0
    }
4077
0
    else if (strcmp(envvar, "NEVER") == 0) {
4078
0
      splitmode = BLOSC_NEVER_SPLIT;
4079
0
    }
4080
0
    else if (strcmp(envvar, "AUTO") == 0) {
4081
0
      splitmode = BLOSC_AUTO_SPLIT;
4082
0
    }
4083
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
4084
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
4085
0
    }
4086
0
    else {
4087
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
4088
0
    }
4089
0
    if (splitmode >= 0) {
4090
0
      context->splitmode = splitmode;
4091
0
    }
4092
0
  }
4093
4094
0
  context->threads_started = 0;
4095
2.27k
  context->schunk = cparams.schunk;
4096
4097
2.27k
  if (cparams.prefilter != NULL) {
4098
0
    context->prefilter = cparams.prefilter;
4099
0
    context->preparams = (blosc2_prefilter_params*)my_malloc(sizeof(blosc2_prefilter_params));
4100
0
    BLOSC_ERROR_NULL(context->preparams, NULL);
4101
0
    memcpy(context->preparams, cparams.preparams, sizeof(blosc2_prefilter_params));
4102
0
  }
4103
4104
2.27k
  if (cparams.tuner_id <= 0) {
4105
2.27k
    cparams.tuner_id = g_tuner;
4106
2.27k
  } else {
4107
0
    for (int i = 0; i < g_ntuners; ++i) {
4108
0
      if (g_tuners[i].id == cparams.tuner_id) {
4109
0
        if (g_tuners[i].init == NULL) {
4110
0
          if (fill_tuner(&g_tuners[i]) < 0) {
4111
0
            BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4112
0
            return NULL;
4113
0
          }
4114
0
        }
4115
0
        if (g_tuners[i].init(cparams.tuner_params, context, NULL) < 0) {
4116
0
          BLOSC_TRACE_ERROR("Error in user-defined tuner %d init function\n", cparams.tuner_id);
4117
0
          return NULL;
4118
0
        }
4119
0
        goto urtunersuccess;
4120
0
      }
4121
0
    }
4122
0
    BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", cparams.tuner_id);
4123
0
    return NULL;
4124
0
  }
4125
2.27k
  urtunersuccess:;
4126
4127
2.27k
  context->tuner_id = cparams.tuner_id;
4128
4129
2.27k
  context->codec_params = cparams.codec_params;
4130
2.27k
  memcpy(context->filter_params, cparams.filter_params, BLOSC2_MAX_FILTERS * sizeof(void*));
4131
4132
2.27k
  return context;
4133
2.27k
}
4134
4135
/* Create a context for decompression */
4136
2.26k
blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) {
4137
2.26k
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
4138
2.26k
  BLOSC_ERROR_NULL(context, NULL);
4139
4140
  /* Populate the context, using zeros as default values */
4141
2.26k
  memset(context, 0, sizeof(blosc2_context));
4142
2.26k
  context->do_compress = 0;   /* Meant for decompression */
4143
4144
2.26k
  context->nthreads = dparams.nthreads;
4145
2.26k
  char* envvar = getenv("BLOSC_NTHREADS");
4146
2.26k
  if (envvar != NULL) {
4147
0
    long nthreads = strtol(envvar, NULL, 10);
4148
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4149
0
      context->nthreads = (int16_t) nthreads;
4150
0
    }
4151
0
  }
4152
2.26k
  context->new_nthreads = context->nthreads;
4153
4154
2.26k
  context->threads_started = 0;
4155
2.26k
  context->block_maskout = NULL;
4156
2.26k
  context->block_maskout_nitems = 0;
4157
2.26k
  context->schunk = dparams.schunk;
4158
4159
2.26k
  if (dparams.postfilter != NULL) {
4160
0
    context->postfilter = dparams.postfilter;
4161
0
    context->postparams = (blosc2_postfilter_params*)my_malloc(sizeof(blosc2_postfilter_params));
4162
0
    BLOSC_ERROR_NULL(context->postparams, NULL);
4163
0
    memcpy(context->postparams, dparams.postparams, sizeof(blosc2_postfilter_params));
4164
0
  }
4165
4166
2.26k
  return context;
4167
2.26k
}
4168
4169
4170
18.3k
void blosc2_free_ctx(blosc2_context* context) {
4171
18.3k
  release_threadpool(context);
4172
18.3k
  if (context->serial_context != NULL) {
4173
15.1k
    free_thread_context(context->serial_context);
4174
15.1k
  }
4175
18.3k
  if (context->dict_cdict != NULL) {
4176
0
#ifdef HAVE_ZSTD
4177
0
    ZSTD_freeCDict(context->dict_cdict);
4178
0
#endif
4179
0
  }
4180
18.3k
  if (context->dict_ddict != NULL) {
4181
1.93k
#ifdef HAVE_ZSTD
4182
1.93k
    ZSTD_freeDDict(context->dict_ddict);
4183
1.93k
#endif
4184
1.93k
  }
4185
18.3k
  if (context->tuner_params != NULL) {
4186
0
    int rc;
4187
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
4188
0
      rc = blosc_stune_free(context);
4189
0
    } else {
4190
0
      for (int i = 0; i < g_ntuners; ++i) {
4191
0
        if (g_tuners[i].id == context->tuner_id) {
4192
0
          if (g_tuners[i].free == NULL) {
4193
0
            if (fill_tuner(&g_tuners[i]) < 0) {
4194
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4195
0
              return;
4196
0
            }
4197
0
          }
4198
0
          rc = g_tuners[i].free(context);
4199
0
          goto urtunersuccess;
4200
0
        }
4201
0
      }
4202
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
4203
0
      return;
4204
0
      urtunersuccess:;
4205
0
    }
4206
0
    if (rc < 0) {
4207
0
      BLOSC_TRACE_ERROR("Error in user-defined tuner free function\n");
4208
0
      return;
4209
0
    }
4210
0
  }
4211
18.3k
  if (context->prefilter != NULL) {
4212
0
    my_free(context->preparams);
4213
0
  }
4214
18.3k
  if (context->postfilter != NULL) {
4215
0
    my_free(context->postparams);
4216
0
  }
4217
4218
18.3k
  if (context->block_maskout != NULL) {
4219
0
    free(context->block_maskout);
4220
0
  }
4221
18.3k
  my_free(context);
4222
18.3k
}
4223
4224
4225
0
int blosc2_ctx_get_cparams(blosc2_context *ctx, blosc2_cparams *cparams) {
4226
0
  cparams->compcode = ctx->compcode;
4227
0
  cparams->compcode_meta = ctx->compcode_meta;
4228
0
  cparams->clevel = ctx->clevel;
4229
0
  cparams->use_dict = ctx->use_dict;
4230
0
  cparams->instr_codec = ctx->blosc2_flags & BLOSC2_INSTR_CODEC;
4231
0
  cparams->typesize = ctx->typesize;
4232
0
  cparams->nthreads = ctx->nthreads;
4233
0
  cparams->blocksize = ctx->blocksize;
4234
0
  cparams->splitmode = ctx->splitmode;
4235
0
  cparams->schunk = ctx->schunk;
4236
0
  for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
4237
0
    cparams->filters[i] = ctx->filters[i];
4238
0
    cparams->filters_meta[i] = ctx->filters_meta[i];
4239
0
  }
4240
0
  cparams->prefilter = ctx->prefilter;
4241
0
  cparams->preparams = ctx->preparams;
4242
0
  cparams->tuner_id = ctx->tuner_id;
4243
0
  cparams->codec_params = ctx->codec_params;
4244
4245
0
  return BLOSC2_ERROR_SUCCESS;
4246
0
}
4247
4248
4249
6.60k
int blosc2_ctx_get_dparams(blosc2_context *ctx, blosc2_dparams *dparams) {
4250
6.60k
  dparams->nthreads = ctx->nthreads;
4251
6.60k
  dparams->schunk = ctx->schunk;
4252
6.60k
  dparams->postfilter = ctx->postfilter;
4253
6.60k
  dparams->postparams = ctx->postparams;
4254
4255
6.60k
  return BLOSC2_ERROR_SUCCESS;
4256
6.60k
}
4257
4258
4259
/* Set a maskout in decompression context */
4260
0
int blosc2_set_maskout(blosc2_context *ctx, bool *maskout, int nblocks) {
4261
4262
0
  if (ctx->block_maskout != NULL) {
4263
    // Get rid of a possible mask here
4264
0
    free(ctx->block_maskout);
4265
0
  }
4266
4267
0
  bool *maskout_ = malloc(nblocks);
4268
0
  BLOSC_ERROR_NULL(maskout_, BLOSC2_ERROR_MEMORY_ALLOC);
4269
0
  memcpy(maskout_, maskout, nblocks);
4270
0
  ctx->block_maskout = maskout_;
4271
0
  ctx->block_maskout_nitems = nblocks;
4272
4273
0
  return 0;
4274
0
}
4275
4276
4277
/* Create a chunk made of zeros */
4278
9
int blosc2_chunk_zeros(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4279
9
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4280
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4281
0
    return BLOSC2_ERROR_DATA;
4282
0
  }
4283
4284
9
  if (nbytes % cparams.typesize) {
4285
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4286
0
    return BLOSC2_ERROR_DATA;
4287
0
  }
4288
4289
9
  blosc_header header;
4290
9
  blosc2_context* context = blosc2_create_cctx(cparams);
4291
9
  if (context == NULL) {
4292
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4293
0
    return BLOSC2_ERROR_NULL_POINTER;
4294
0
  }
4295
4296
9
  int error = initialize_context_compression(
4297
9
          context, NULL, nbytes, dest, destsize,
4298
9
          context->clevel, context->filters, context->filters_meta,
4299
9
          context->typesize, context->compcode, context->blocksize,
4300
9
          context->new_nthreads, context->nthreads, context->splitmode,
4301
9
          context->tuner_id, context->tuner_params, context->schunk);
4302
9
  if (error <= 0) {
4303
0
    blosc2_free_ctx(context);
4304
0
    return error;
4305
0
  }
4306
4307
9
  memset(&header, 0, sizeof(header));
4308
9
  header.version = BLOSC2_VERSION_FORMAT;
4309
9
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4310
9
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4311
9
  header.typesize = context->typesize;
4312
9
  header.nbytes = (int32_t)nbytes;
4313
9
  header.blocksize = context->blocksize;
4314
9
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4315
9
  header.blosc2_flags = BLOSC2_SPECIAL_ZERO << 4;  // mark chunk as all zeros
4316
9
  memcpy((uint8_t *)dest, &header, sizeof(header));
4317
4318
9
  blosc2_free_ctx(context);
4319
4320
9
  return BLOSC_EXTENDED_HEADER_LENGTH;
4321
9
}
4322
4323
4324
/* Create a chunk made of uninitialized values */
4325
0
int blosc2_chunk_uninit(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4326
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4327
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4328
0
    return BLOSC2_ERROR_DATA;
4329
0
  }
4330
4331
0
  if (nbytes % cparams.typesize) {
4332
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4333
0
    return BLOSC2_ERROR_DATA;
4334
0
  }
4335
4336
0
  blosc_header header;
4337
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4338
0
  if (context == NULL) {
4339
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4340
0
    return BLOSC2_ERROR_NULL_POINTER;
4341
0
  }
4342
0
  int error = initialize_context_compression(
4343
0
          context, NULL, nbytes, dest, destsize,
4344
0
          context->clevel, context->filters, context->filters_meta,
4345
0
          context->typesize, context->compcode, context->blocksize,
4346
0
          context->new_nthreads, context->nthreads, context->splitmode,
4347
0
          context->tuner_id, context->tuner_params, context->schunk);
4348
0
  if (error <= 0) {
4349
0
    blosc2_free_ctx(context);
4350
0
    return error;
4351
0
  }
4352
4353
0
  memset(&header, 0, sizeof(header));
4354
0
  header.version = BLOSC2_VERSION_FORMAT;
4355
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4356
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4357
0
  header.typesize = context->typesize;
4358
0
  header.nbytes = (int32_t)nbytes;
4359
0
  header.blocksize = context->blocksize;
4360
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4361
0
  header.blosc2_flags = BLOSC2_SPECIAL_UNINIT << 4;  // mark chunk as uninitialized
4362
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4363
4364
0
  blosc2_free_ctx(context);
4365
4366
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4367
0
}
4368
4369
4370
/* Create a chunk made of nans */
4371
0
int blosc2_chunk_nans(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4372
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4373
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4374
0
    return BLOSC2_ERROR_DATA;
4375
0
  }
4376
4377
0
  if (nbytes % cparams.typesize) {
4378
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4379
0
    return BLOSC2_ERROR_DATA;
4380
0
  }
4381
4382
0
  blosc_header header;
4383
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4384
0
  if (context == NULL) {
4385
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4386
0
    return BLOSC2_ERROR_NULL_POINTER;
4387
0
  }
4388
4389
0
  int error = initialize_context_compression(
4390
0
          context, NULL, nbytes, dest, destsize,
4391
0
          context->clevel, context->filters, context->filters_meta,
4392
0
          context->typesize, context->compcode, context->blocksize,
4393
0
          context->new_nthreads, context->nthreads, context->splitmode,
4394
0
          context->tuner_id, context->tuner_params, context->schunk);
4395
0
  if (error <= 0) {
4396
0
    blosc2_free_ctx(context);
4397
0
    return error;
4398
0
  }
4399
4400
0
  memset(&header, 0, sizeof(header));
4401
0
  header.version = BLOSC2_VERSION_FORMAT;
4402
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4403
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4404
0
  header.typesize = context->typesize;
4405
0
  header.nbytes = (int32_t)nbytes;
4406
0
  header.blocksize = context->blocksize;
4407
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4408
0
  header.blosc2_flags = BLOSC2_SPECIAL_NAN << 4;  // mark chunk as all NaNs
4409
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4410
4411
0
  blosc2_free_ctx(context);
4412
4413
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4414
0
}
4415
4416
4417
/* Create a chunk made of repeated values */
4418
int blosc2_chunk_repeatval(blosc2_cparams cparams, const int32_t nbytes,
4419
0
                           void* dest, int32_t destsize, const void* repeatval) {
4420
0
  uint8_t typesize = cparams.typesize;
4421
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH + typesize) {
4422
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4423
0
    return BLOSC2_ERROR_DATA;
4424
0
  }
4425
4426
0
  if (nbytes % cparams.typesize) {
4427
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4428
0
    return BLOSC2_ERROR_DATA;
4429
0
  }
4430
4431
0
  blosc_header header;
4432
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4433
0
  if (context == NULL) {
4434
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4435
0
    return BLOSC2_ERROR_NULL_POINTER;
4436
0
  }
4437
4438
0
  int error = initialize_context_compression(
4439
0
          context, NULL, nbytes, dest, destsize,
4440
0
          context->clevel, context->filters, context->filters_meta,
4441
0
          context->typesize, context->compcode, context->blocksize,
4442
0
          context->new_nthreads, context->nthreads, context->splitmode,
4443
0
          context->tuner_id, context->tuner_params, context->schunk);
4444
0
  if (error <= 0) {
4445
0
    blosc2_free_ctx(context);
4446
0
    return error;
4447
0
  }
4448
4449
0
  memset(&header, 0, sizeof(header));
4450
0
  header.version = BLOSC2_VERSION_FORMAT;
4451
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4452
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4453
0
  header.typesize = (uint8_t)typesize;
4454
0
  header.nbytes = (int32_t)nbytes;
4455
0
  header.blocksize = context->blocksize;
4456
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH + (int32_t)typesize;
4457
0
  header.blosc2_flags = BLOSC2_SPECIAL_VALUE << 4;  // mark chunk as all repeated value
4458
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4459
0
  memcpy((uint8_t *)dest + sizeof(header), repeatval, typesize);
4460
4461
0
  blosc2_free_ctx(context);
4462
4463
0
  return BLOSC_EXTENDED_HEADER_LENGTH + (uint8_t)typesize;
4464
0
}
4465
4466
4467
/* Register filters */
4468
4469
69.1k
int register_filter_private(blosc2_filter *filter) {
4470
69.1k
    BLOSC_ERROR_NULL(filter, BLOSC2_ERROR_INVALID_PARAM);
4471
69.1k
    if (g_nfilters == UINT8_MAX) {
4472
0
        BLOSC_TRACE_ERROR("Can not register more filters");
4473
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
4474
0
    }
4475
69.1k
    if (filter->id < BLOSC2_GLOBAL_REGISTERED_FILTERS_START) {
4476
0
        BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_FILTERS_START);
4477
0
        return BLOSC2_ERROR_FAILURE;
4478
0
    }
4479
    /* This condition can never be fulfilled
4480
    if (filter->id > BLOSC2_USER_REGISTERED_FILTERS_STOP) {
4481
        BLOSC_TRACE_ERROR("The id must be less than or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_STOP);
4482
        return BLOSC2_ERROR_FAILURE;
4483
    }
4484
    */
4485
4486
207k
    for (uint64_t i = 0; i < g_nfilters; ++i) {
4487
138k
      if (g_filters[i].id == filter->id) {
4488
0
        if (strcmp(g_filters[i].name, filter->name) != 0) {
4489
0
          BLOSC_TRACE_ERROR("The filter (ID: %d) plugin is already registered with name: %s."
4490
0
                            "  Choose another one !", filter->id, g_filters[i].name);
4491
0
          return BLOSC2_ERROR_FAILURE;
4492
0
        }
4493
0
        else {
4494
          // Already registered, so no more actions needed
4495
0
          return BLOSC2_ERROR_SUCCESS;
4496
0
        }
4497
0
      }
4498
138k
    }
4499
4500
69.1k
    blosc2_filter *filter_new = &g_filters[g_nfilters++];
4501
69.1k
    memcpy(filter_new, filter, sizeof(blosc2_filter));
4502
4503
69.1k
    return BLOSC2_ERROR_SUCCESS;
4504
69.1k
}
4505
4506
4507
0
int blosc2_register_filter(blosc2_filter *filter) {
4508
0
  if (filter->id < BLOSC2_USER_REGISTERED_FILTERS_START) {
4509
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_START);
4510
0
    return BLOSC2_ERROR_FAILURE;
4511
0
  }
4512
4513
0
  return register_filter_private(filter);
4514
0
}
4515
4516
4517
/* Register codecs */
4518
4519
82.9k
int register_codec_private(blosc2_codec *codec) {
4520
82.9k
    BLOSC_ERROR_NULL(codec, BLOSC2_ERROR_INVALID_PARAM);
4521
82.9k
    if (g_ncodecs == UINT8_MAX) {
4522
0
      BLOSC_TRACE_ERROR("Can not register more codecs");
4523
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
4524
0
    }
4525
82.9k
    if (codec->compcode < BLOSC2_GLOBAL_REGISTERED_CODECS_START) {
4526
0
      BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_CODECS_START);
4527
0
      return BLOSC2_ERROR_FAILURE;
4528
0
    }
4529
    /* This condition can never be fulfilled
4530
    if (codec->compcode > BLOSC2_USER_REGISTERED_CODECS_STOP) {
4531
      BLOSC_TRACE_ERROR("The id must be less or equal to %d", BLOSC2_USER_REGISTERED_CODECS_STOP);
4532
      return BLOSC2_ERROR_FAILURE;
4533
    }
4534
     */
4535
4536
290k
    for (int i = 0; i < g_ncodecs; ++i) {
4537
207k
      if (g_codecs[i].compcode == codec->compcode) {
4538
0
        if (strcmp(g_codecs[i].compname, codec->compname) != 0) {
4539
0
          BLOSC_TRACE_ERROR("The codec (ID: %d) plugin is already registered with name: %s."
4540
0
                            "  Choose another one !", codec->compcode, codec->compname);
4541
0
          return BLOSC2_ERROR_CODEC_PARAM;
4542
0
        }
4543
0
        else {
4544
          // Already registered, so no more actions needed
4545
0
          return BLOSC2_ERROR_SUCCESS;
4546
0
        }
4547
0
      }
4548
207k
    }
4549
4550
82.9k
    blosc2_codec *codec_new = &g_codecs[g_ncodecs++];
4551
82.9k
    memcpy(codec_new, codec, sizeof(blosc2_codec));
4552
4553
82.9k
    return BLOSC2_ERROR_SUCCESS;
4554
82.9k
}
4555
4556
4557
0
int blosc2_register_codec(blosc2_codec *codec) {
4558
0
  if (codec->compcode < BLOSC2_USER_REGISTERED_CODECS_START) {
4559
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_USER_REGISTERED_CODECS_START);
4560
0
    return BLOSC2_ERROR_CODEC_PARAM;
4561
0
  }
4562
4563
0
  return register_codec_private(codec);
4564
0
}
4565
4566
4567
/* Register tuners */
4568
4569
13.8k
int register_tuner_private(blosc2_tuner *tuner) {
4570
13.8k
  BLOSC_ERROR_NULL(tuner, BLOSC2_ERROR_INVALID_PARAM);
4571
13.8k
  if (g_ntuners == UINT8_MAX) {
4572
0
    BLOSC_TRACE_ERROR("Can not register more tuners");
4573
0
    return BLOSC2_ERROR_CODEC_SUPPORT;
4574
0
  }
4575
13.8k
  if (tuner->id < BLOSC2_GLOBAL_REGISTERED_TUNER_START) {
4576
0
    BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_TUNER_START);
4577
0
    return BLOSC2_ERROR_FAILURE;
4578
0
  }
4579
4580
13.8k
  for (int i = 0; i < g_ntuners; ++i) {
4581
0
    if (g_tuners[i].id == tuner->id) {
4582
0
      if (strcmp(g_tuners[i].name, tuner->name) != 0) {
4583
0
        BLOSC_TRACE_ERROR("The tuner (ID: %d) plugin is already registered with name: %s."
4584
0
                          "  Choose another one !", tuner->id, g_tuners[i].name);
4585
0
        return BLOSC2_ERROR_FAILURE;
4586
0
      }
4587
0
      else {
4588
        // Already registered, so no more actions needed
4589
0
        return BLOSC2_ERROR_SUCCESS;
4590
0
      }
4591
0
    }
4592
0
  }
4593
4594
13.8k
  blosc2_tuner *tuner_new = &g_tuners[g_ntuners++];
4595
13.8k
  memcpy(tuner_new, tuner, sizeof(blosc2_tuner));
4596
4597
13.8k
  return BLOSC2_ERROR_SUCCESS;
4598
13.8k
}
4599
4600
4601
0
int blosc2_register_tuner(blosc2_tuner *tuner) {
4602
0
  if (tuner->id < BLOSC2_USER_REGISTERED_TUNER_START) {
4603
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_TUNER_START);
4604
0
    return BLOSC2_ERROR_FAILURE;
4605
0
  }
4606
4607
0
  return register_tuner_private(tuner);
4608
0
}
4609
4610
4611
2
int _blosc2_register_io_cb(const blosc2_io_cb *io) {
4612
4613
2
  for (uint64_t i = 0; i < g_nio; ++i) {
4614
0
    if (g_ios[i].id == io->id) {
4615
0
      if (strcmp(g_ios[i].name, io->name) != 0) {
4616
0
        BLOSC_TRACE_ERROR("The IO (ID: %d) plugin is already registered with name: %s."
4617
0
                          "  Choose another one !", io->id, g_ios[i].name);
4618
0
        return BLOSC2_ERROR_PLUGIN_IO;
4619
0
      }
4620
0
      else {
4621
        // Already registered, so no more actions needed
4622
0
        return BLOSC2_ERROR_SUCCESS;
4623
0
      }
4624
0
    }
4625
0
  }
4626
4627
2
  blosc2_io_cb *io_new = &g_ios[g_nio++];
4628
2
  memcpy(io_new, io, sizeof(blosc2_io_cb));
4629
4630
2
  return BLOSC2_ERROR_SUCCESS;
4631
2
}
4632
4633
0
int blosc2_register_io_cb(const blosc2_io_cb *io) {
4634
0
  BLOSC_ERROR_NULL(io, BLOSC2_ERROR_INVALID_PARAM);
4635
0
  if (g_nio == UINT8_MAX) {
4636
0
    BLOSC_TRACE_ERROR("Can not register more codecs");
4637
0
    return BLOSC2_ERROR_PLUGIN_IO;
4638
0
  }
4639
4640
0
  if (io->id < BLOSC2_IO_REGISTERED) {
4641
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_IO_REGISTERED);
4642
0
    return BLOSC2_ERROR_PLUGIN_IO;
4643
0
  }
4644
4645
0
  return _blosc2_register_io_cb(io);
4646
0
}
4647
4648
2.37k
blosc2_io_cb *blosc2_get_io_cb(uint8_t id) {
4649
2.37k
  for (uint64_t i = 0; i < g_nio; ++i) {
4650
2.37k
    if (g_ios[i].id == id) {
4651
2.37k
      return &g_ios[i];
4652
2.37k
    }
4653
2.37k
  }
4654
2
  if (id == BLOSC2_IO_FILESYSTEM) {
4655
2
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS) < 0) {
4656
0
      BLOSC_TRACE_ERROR("Error registering the default IO API");
4657
0
      return NULL;
4658
0
    }
4659
2
    return blosc2_get_io_cb(id);
4660
2
  }
4661
0
  else if (id == BLOSC2_IO_FILESYSTEM_MMAP) {
4662
0
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_MMAP) < 0) {
4663
0
      BLOSC_TRACE_ERROR("Error registering the mmap IO API");
4664
0
      return NULL;
4665
0
    }
4666
0
    return blosc2_get_io_cb(id);
4667
0
  }
4668
0
  return NULL;
4669
2
}
4670
4671
0
void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) {
4672
0
  if (ndim == 0) {
4673
0
    return;
4674
0
  }
4675
0
  int64_t *strides = malloc(ndim * sizeof(int64_t));
4676
0
  strides[ndim - 1] = 1;
4677
0
  for (int j = ndim - 2; j >= 0; --j) {
4678
0
      strides[j] = shape[j + 1] * strides[j + 1];
4679
0
  }
4680
4681
0
  index[0] = i / strides[0];
4682
0
  for (int j = 1; j < ndim; ++j) {
4683
0
      index[j] = (i % strides[j - 1]) / strides[j];
4684
0
  }
4685
0
  free(strides);
4686
0
}
4687
4688
0
void blosc2_multidim_to_unidim(const int64_t *index, int8_t ndim, const int64_t *strides, int64_t *i) {
4689
0
  *i = 0;
4690
0
  for (int j = 0; j < ndim; ++j) {
4691
0
    *i += index[j] * strides[j];
4692
0
  }
4693
0
}
4694
4695
0
int blosc2_get_slice_nchunks(blosc2_schunk* schunk, int64_t *start, int64_t *stop, int64_t **chunks_idx) {
4696
0
  BLOSC_ERROR_NULL(schunk, BLOSC2_ERROR_NULL_POINTER);
4697
0
  if (blosc2_meta_exists(schunk, "b2nd") < 0) {
4698
    // Try with a caterva metalayer; we are meant to be backward compatible with it
4699
0
    if (blosc2_meta_exists(schunk, "caterva") < 0) {
4700
0
      return schunk_get_slice_nchunks(schunk, *start, *stop, chunks_idx);
4701
0
    }
4702
0
  }
4703
4704
0
  b2nd_array_t *array;
4705
0
  int rc = b2nd_from_schunk(schunk, &array);
4706
0
  if (rc < 0) {
4707
0
    BLOSC_TRACE_ERROR("Could not get b2nd array from schunk.");
4708
0
    return rc;
4709
0
  }
4710
0
  rc = b2nd_get_slice_nchunks(array, start, stop, chunks_idx);
4711
0
  array->sc = NULL; // Free only array struct
4712
0
  b2nd_free(array);
4713
4714
0
  return rc;
4715
0
}
4716
4717
0
blosc2_cparams blosc2_get_blosc2_cparams_defaults(void) {
4718
0
  return BLOSC2_CPARAMS_DEFAULTS;
4719
0
};
4720
4721
0
blosc2_dparams blosc2_get_blosc2_dparams_defaults(void) {
4722
0
  return BLOSC2_DPARAMS_DEFAULTS;
4723
0
};
4724
4725
0
blosc2_storage blosc2_get_blosc2_storage_defaults(void) {
4726
0
  return BLOSC2_STORAGE_DEFAULTS;
4727
0
};
4728
4729
0
blosc2_io blosc2_get_blosc2_io_defaults(void) {
4730
0
  return BLOSC2_IO_DEFAULTS;
4731
0
};
4732
4733
0
blosc2_stdio_mmap blosc2_get_blosc2_stdio_mmap_defaults(void) {
4734
0
  return BLOSC2_STDIO_MMAP_DEFAULTS;
4735
0
};