Coverage Report

Created: 2023-12-08 06:32

/src/c-blosc2/blosc/blosc2.c
Line
Count
Source (jump to first uncovered line)
1
/*********************************************************************
2
  Blosc - Blocked Shuffling and Compression Library
3
4
  Copyright (c) 2021  The Blosc Development Team <blosc@blosc.org>
5
  https://blosc.org
6
  License: BSD 3-Clause (see LICENSE.txt)
7
8
  See LICENSE.txt for details about copyright and rights to use.
9
**********************************************************************/
10
11
12
#include "blosc2.h"
13
#include "blosc-private.h"
14
#include "../plugins/codecs/zfp/blosc2-zfp.h"
15
#include "frame.h"
16
17
#if defined(USING_CMAKE)
18
  #include "config.h"
19
#endif /*  USING_CMAKE */
20
#include "context.h"
21
22
#include "shuffle.h"
23
#include "delta.h"
24
#include "trunc-prec.h"
25
#include "blosclz.h"
26
#include "stune.h"
27
#include "blosc2/codecs-registry.h"
28
#include "blosc2/filters-registry.h"
29
#include "blosc2/tuners-registry.h"
30
31
#include "lz4.h"
32
#include "lz4hc.h"
33
#ifdef HAVE_IPP
34
  #include <ipps.h>
35
  #include <ippdc.h>
36
#endif
37
#if defined(HAVE_ZLIB_NG)
38
#ifdef ZLIB_COMPAT
39
  #include "zlib.h"
40
#else
41
  #include "zlib-ng.h"
42
#endif
43
#elif defined(HAVE_ZLIB)
44
  #include "zlib.h"
45
#endif /*  HAVE_MINIZ */
46
#if defined(HAVE_ZSTD)
47
  #include "zstd.h"
48
  #include "zstd_errors.h"
49
  // #include "cover.h"  // for experimenting with fast cover training for building dicts
50
  #include "zdict.h"
51
#endif /*  HAVE_ZSTD */
52
53
#if defined(_WIN32) && !defined(__MINGW32__)
54
  #include <windows.h>
55
  #include <malloc.h>
56
  #include <process.h>
57
  #define getpid _getpid
58
#endif  /* _WIN32 */
59
60
#if defined(_WIN32) && !defined(__GNUC__)
61
  #include "win32/pthread.c"
62
#endif
63
64
#include <stdio.h>
65
#include <stdlib.h>
66
#include <errno.h>
67
#include <string.h>
68
#include <sys/types.h>
69
#include <assert.h>
70
#include <math.h>
71
72
73
/* Synchronization variables */
74
75
/* Global context for non-contextual API */
76
static blosc2_context* g_global_context;
77
static pthread_mutex_t global_comp_mutex;
78
static int g_compressor = BLOSC_BLOSCLZ;
79
static int g_delta = 0;
80
/* The default splitmode */
81
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
82
/* the compressor to use by default */
83
static int16_t g_nthreads = 1;
84
static int32_t g_force_blocksize = 0;
85
static int g_initlib = 0;
86
static blosc2_schunk* g_schunk = NULL;   /* the pointer to super-chunk */
87
88
blosc2_codec g_codecs[256] = {0};
89
uint8_t g_ncodecs = 0;
90
91
static blosc2_filter g_filters[256] = {0};
92
static uint64_t g_nfilters = 0;
93
94
static blosc2_io_cb g_ios[256] = {0};
95
static uint64_t g_nio = 0;
96
97
blosc2_tuner g_tuners[256] = {0};
98
int g_ntuners = 0;
99
100
static int g_tuner = BLOSC_STUNE;
101
102
// Forward declarations
103
int init_threadpool(blosc2_context *context);
104
int release_threadpool(blosc2_context *context);
105
106
/* Macros for synchronization */
107
108
/* Wait until all threads are initialized */
109
#ifdef BLOSC_POSIX_BARRIERS
110
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
111
0
  do {                                                                 \
112
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_init);              \
113
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
114
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (init): %d", rc);   \
115
0
      return((RET_VAL));                                               \
116
0
    }                                                                  \
117
0
  } while (0)
118
#else
119
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
120
  do {                                                                 \
121
    pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
122
    if ((CONTEXT_PTR)->count_threads < (CONTEXT_PTR)->nthreads) {      \
123
      (CONTEXT_PTR)->count_threads++;                                  \
124
      pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
125
                        &(CONTEXT_PTR)->count_threads_mutex);          \
126
    }                                                                  \
127
    else {                                                             \
128
      pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
129
    }                                                                  \
130
    pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
131
  } while (0)
132
#endif
133
134
/* Wait for all threads to finish */
135
#ifdef BLOSC_POSIX_BARRIERS
136
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
137
0
  do {                                                                 \
138
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_finish);            \
139
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
140
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (finish)");         \
141
0
      return((RET_VAL));                                               \
142
0
    }                                                                  \
143
0
  } while (0)
144
#else
145
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
146
  do {                                                                 \
147
    pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
148
    if ((CONTEXT_PTR)->count_threads > 0) {                            \
149
      (CONTEXT_PTR)->count_threads--;                                  \
150
      pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
151
                        &(CONTEXT_PTR)->count_threads_mutex);          \
152
    }                                                                  \
153
    else {                                                             \
154
      pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
155
    }                                                                  \
156
    pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
157
  } while (0)
158
#endif
159
160
161
/* global variable to change threading backend from Blosc-managed to caller-managed */
162
static blosc_threads_callback threads_callback = 0;
163
static void *threads_callback_data = 0;
164
165
166
/* non-threadsafe function should be called before any other Blosc function in
167
   order to change how threads are managed */
168
void blosc2_set_threads_callback(blosc_threads_callback callback, void *callback_data)
169
0
{
170
0
  threads_callback = callback;
171
0
  threads_callback_data = callback_data;
172
0
}
173
174
175
/* A function for aligned malloc that is portable */
176
55.0k
static uint8_t* my_malloc(size_t size) {
177
55.0k
  void* block = NULL;
178
55.0k
  int res = 0;
179
180
/* Do an alignment to 32 bytes because AVX2 is supported */
181
#if defined(_WIN32)
182
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
183
  block = (void *)_aligned_malloc(size, 32);
184
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
185
  /* Platform does have an implementation of posix_memalign */
186
55.0k
  res = posix_memalign(&block, 32, size);
187
#else
188
  block = malloc(size);
189
#endif  /* _WIN32 */
190
191
55.0k
  if (block == NULL || res != 0) {
192
0
    BLOSC_TRACE_ERROR("Error allocating memory!");
193
0
    return NULL;
194
0
  }
195
196
55.0k
  return (uint8_t*)block;
197
55.0k
}
198
199
200
/* Release memory booked by my_malloc */
201
55.0k
static void my_free(void* block) {
202
#if defined(_WIN32)
203
  _aligned_free(block);
204
#else
205
55.0k
  free(block);
206
55.0k
#endif  /* _WIN32 */
207
55.0k
}
208
209
210
/*
211
 * Conversion routines between compressor and compression libraries
212
 */
213
214
/* Return the library code associated with the compressor name */
215
0
static int compname_to_clibcode(const char* compname) {
216
0
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
217
0
    return BLOSC_BLOSCLZ_LIB;
218
0
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
219
0
    return BLOSC_LZ4_LIB;
220
0
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
221
0
    return BLOSC_LZ4_LIB;
222
0
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
223
0
    return BLOSC_ZLIB_LIB;
224
0
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
225
0
    return BLOSC_ZSTD_LIB;
226
0
  for (int i = 0; i < g_ncodecs; ++i) {
227
0
    if (strcmp(compname, g_codecs[i].compname) == 0)
228
0
      return g_codecs[i].complib;
229
0
  }
230
0
  return BLOSC2_ERROR_NOT_FOUND;
231
0
}
232
233
/* Return the library name associated with the compressor code */
234
16
static const char* clibcode_to_clibname(int clibcode) {
235
16
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
236
16
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
237
16
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
238
16
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
239
96
  for (int i = 0; i < g_ncodecs; ++i) {
240
80
    if (clibcode == g_codecs[i].complib)
241
0
      return g_codecs[i].compname;
242
80
  }
243
16
  return NULL;                  /* should never happen */
244
16
}
245
246
247
/*
248
 * Conversion routines between compressor names and compressor codes
249
 */
250
251
/* Get the compressor name associated with the compressor code */
252
0
int blosc2_compcode_to_compname(int compcode, const char** compname) {
253
0
  int code = -1;    /* -1 means non-existent compressor code */
254
0
  const char* name = NULL;
255
256
  /* Map the compressor code */
257
0
  if (compcode == BLOSC_BLOSCLZ)
258
0
    name = BLOSC_BLOSCLZ_COMPNAME;
259
0
  else if (compcode == BLOSC_LZ4)
260
0
    name = BLOSC_LZ4_COMPNAME;
261
0
  else if (compcode == BLOSC_LZ4HC)
262
0
    name = BLOSC_LZ4HC_COMPNAME;
263
0
  else if (compcode == BLOSC_ZLIB)
264
0
    name = BLOSC_ZLIB_COMPNAME;
265
0
  else if (compcode == BLOSC_ZSTD)
266
0
    name = BLOSC_ZSTD_COMPNAME;
267
0
  else {
268
0
    for (int i = 0; i < g_ncodecs; ++i) {
269
0
      if (compcode == g_codecs[i].compcode) {
270
0
        name = g_codecs[i].compname;
271
0
        break;
272
0
      }
273
0
    }
274
0
  }
275
276
0
  *compname = name;
277
278
  /* Guess if there is support for this code */
279
0
  if (compcode == BLOSC_BLOSCLZ)
280
0
    code = BLOSC_BLOSCLZ;
281
0
  else if (compcode == BLOSC_LZ4)
282
0
    code = BLOSC_LZ4;
283
0
  else if (compcode == BLOSC_LZ4HC)
284
0
    code = BLOSC_LZ4HC;
285
0
#if defined(HAVE_ZLIB)
286
0
  else if (compcode == BLOSC_ZLIB)
287
0
    code = BLOSC_ZLIB;
288
0
#endif /* HAVE_ZLIB */
289
0
#if defined(HAVE_ZSTD)
290
0
  else if (compcode == BLOSC_ZSTD)
291
0
    code = BLOSC_ZSTD;
292
0
#endif /* HAVE_ZSTD */
293
0
  else if (compcode >= BLOSC_LAST_CODEC)
294
0
    code = compcode;
295
0
  return code;
296
0
}
297
298
/* Get the compressor code for the compressor name. -1 if it is not available */
299
18.3k
int blosc2_compname_to_compcode(const char* compname) {
300
18.3k
  int code = -1;  /* -1 means non-existent compressor code */
301
302
18.3k
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
303
3.00k
    code = BLOSC_BLOSCLZ;
304
3.00k
  }
305
15.3k
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
306
786
    code = BLOSC_LZ4;
307
786
  }
308
14.5k
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
309
2.59k
    code = BLOSC_LZ4HC;
310
2.59k
  }
311
11.9k
#if defined(HAVE_ZLIB)
312
11.9k
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
313
2.96k
    code = BLOSC_ZLIB;
314
2.96k
  }
315
9.01k
#endif /*  HAVE_ZLIB */
316
9.01k
#if defined(HAVE_ZSTD)
317
9.01k
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
318
9.01k
    code = BLOSC_ZSTD;
319
9.01k
  }
320
0
#endif /*  HAVE_ZSTD */
321
0
  else{
322
0
    for (int i = 0; i < g_ncodecs; ++i) {
323
0
      if (strcmp(compname, g_codecs[i].compname) == 0) {
324
0
        code = g_codecs[i].compcode;
325
0
        break;
326
0
      }
327
0
    }
328
0
  }
329
18.3k
  return code;
330
18.3k
}
331
332
333
/* Convert compressor code to blosc compressor format code */
334
63.6k
static int compcode_to_compformat(int compcode) {
335
63.6k
  switch (compcode) {
336
48.2k
    case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_FORMAT;
337
781
    case BLOSC_LZ4:     return BLOSC_LZ4_FORMAT;
338
2.59k
    case BLOSC_LZ4HC:   return BLOSC_LZ4HC_FORMAT;
339
340
0
#if defined(HAVE_ZLIB)
341
2.96k
    case BLOSC_ZLIB:    return BLOSC_ZLIB_FORMAT;
342
0
#endif /*  HAVE_ZLIB */
343
344
0
#if defined(HAVE_ZSTD)
345
9.00k
    case BLOSC_ZSTD:    return BLOSC_ZSTD_FORMAT;
346
0
      break;
347
0
#endif /*  HAVE_ZSTD */
348
0
    default:
349
0
      return BLOSC_UDCODEC_FORMAT;
350
63.6k
  }
351
0
  BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
352
0
}
353
354
355
/* Convert compressor code to blosc compressor format version */
356
64.1k
static int compcode_to_compversion(int compcode) {
357
  /* Write compressor format */
358
64.1k
  switch (compcode) {
359
48.7k
    case BLOSC_BLOSCLZ:
360
48.7k
      return BLOSC_BLOSCLZ_VERSION_FORMAT;
361
782
    case BLOSC_LZ4:
362
782
      return BLOSC_LZ4_VERSION_FORMAT;
363
2.59k
    case BLOSC_LZ4HC:
364
2.59k
      return BLOSC_LZ4HC_VERSION_FORMAT;
365
366
0
#if defined(HAVE_ZLIB)
367
2.96k
    case BLOSC_ZLIB:
368
2.96k
      return BLOSC_ZLIB_VERSION_FORMAT;
369
0
      break;
370
0
#endif /*  HAVE_ZLIB */
371
372
0
#if defined(HAVE_ZSTD)
373
9.01k
    case BLOSC_ZSTD:
374
9.01k
      return BLOSC_ZSTD_VERSION_FORMAT;
375
0
      break;
376
0
#endif /*  HAVE_ZSTD */
377
0
    default:
378
0
      for (int i = 0; i < g_ncodecs; ++i) {
379
0
        if (compcode == g_codecs[i].compcode) {
380
0
          return g_codecs[i].version;
381
0
        }
382
0
      }
383
64.1k
  }
384
0
  return BLOSC2_ERROR_FAILURE;
385
64.1k
}
386
387
388
static int lz4_wrap_compress(const char* input, size_t input_length,
389
10.7k
                             char* output, size_t maxout, int accel, void* hash_table) {
390
10.7k
  BLOSC_UNUSED_PARAM(accel);
391
10.7k
  int cbytes;
392
#ifdef HAVE_IPP
393
  if (hash_table == NULL) {
394
    return BLOSC2_ERROR_INVALID_PARAM;  // the hash table should always be initialized
395
  }
396
  int outlen = (int)maxout;
397
  int inlen = (int)input_length;
398
  // I have not found any function that uses `accel` like in `LZ4_compress_fast`, but
399
  // the IPP LZ4Safe call does a pretty good job on compressing well, so let's use it
400
  IppStatus status = ippsEncodeLZ4Safe_8u((const Ipp8u*)input, &inlen,
401
                                           (Ipp8u*)output, &outlen, (Ipp8u*)hash_table);
402
  if (status == ippStsDstSizeLessExpected) {
403
    return 0;  // we cannot compress in required outlen
404
  }
405
  else if (status != ippStsNoErr) {
406
    return BLOSC2_ERROR_FAILURE;  // an unexpected error happened
407
  }
408
  cbytes = outlen;
409
#else
410
10.7k
  BLOSC_UNUSED_PARAM(hash_table);
411
10.7k
  accel = 1;  // deactivate acceleration to match IPP behaviour
412
10.7k
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, accel);
413
10.7k
#endif
414
10.7k
  return cbytes;
415
10.7k
}
416
417
418
static int lz4hc_wrap_compress(const char* input, size_t input_length,
419
15.8k
                               char* output, size_t maxout, int clevel) {
420
15.8k
  int cbytes;
421
15.8k
  if (input_length > (size_t)(UINT32_C(2) << 30))
422
0
    return BLOSC2_ERROR_2GB_LIMIT;
423
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
424
   * but levels larger than 9 do not buy much compression. */
425
15.8k
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
426
15.8k
                           clevel);
427
15.8k
  return cbytes;
428
15.8k
}
429
430
431
static int lz4_wrap_decompress(const char* input, size_t compressed_length,
432
6.45k
                               char* output, size_t maxout) {
433
6.45k
  int nbytes;
434
#ifdef HAVE_IPP
435
  int outlen = (int)maxout;
436
  int inlen = (int)compressed_length;
437
  IppStatus status;
438
  status = ippsDecodeLZ4_8u((const Ipp8u*)input, inlen, (Ipp8u*)output, &outlen);
439
  //status = ippsDecodeLZ4Dict_8u((const Ipp8u*)input, &inlen, (Ipp8u*)output, 0, &outlen, NULL, 1 << 16);
440
  nbytes = (status == ippStsNoErr) ? outlen : -outlen;
441
#else
442
6.45k
  nbytes = LZ4_decompress_safe(input, output, (int)compressed_length, (int)maxout);
443
6.45k
#endif
444
6.45k
  if (nbytes != (int)maxout) {
445
538
    return 0;
446
538
  }
447
5.92k
  return (int)maxout;
448
6.45k
}
449
450
#if defined(HAVE_ZLIB)
451
/* zlib is not very respectful with sharing name space with others.
452
 Fortunately, its names do not collide with those already in blosc. */
453
static int zlib_wrap_compress(const char* input, size_t input_length,
454
37.2k
                              char* output, size_t maxout, int clevel) {
455
37.2k
  int status;
456
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
457
  size_t cl = maxout;
458
  status = zng_compress2(
459
      (uint8_t*)output, &cl, (uint8_t*)input, input_length, clevel);
460
#else
461
37.2k
  uLongf cl = (uLongf)maxout;
462
37.2k
  status = compress2(
463
37.2k
      (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
464
37.2k
#endif
465
37.2k
  if (status != Z_OK) {
466
10.4k
    return 0;
467
10.4k
  }
468
26.8k
  return (int)cl;
469
37.2k
}
470
471
static int zlib_wrap_decompress(const char* input, size_t compressed_length,
472
9.08k
                                char* output, size_t maxout) {
473
9.08k
  int status;
474
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
475
  size_t ul = maxout;
476
  status = zng_uncompress(
477
      (uint8_t*)output, &ul, (uint8_t*)input, compressed_length);
478
#else
479
9.08k
  uLongf ul = (uLongf)maxout;
480
9.08k
  status = uncompress(
481
9.08k
      (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
482
9.08k
#endif
483
9.08k
  if (status != Z_OK) {
484
1.54k
    return 0;
485
1.54k
  }
486
7.53k
  return (int)ul;
487
9.08k
}
488
#endif /*  HAVE_ZLIB */
489
490
491
#if defined(HAVE_ZSTD)
492
static int zstd_wrap_compress(struct thread_context* thread_context,
493
                              const char* input, size_t input_length,
494
125k
                              char* output, size_t maxout, int clevel) {
495
125k
  size_t code;
496
125k
  blosc2_context* context = thread_context->parent_context;
497
498
125k
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
499
  /* Make the level 8 close enough to maxCLevel */
500
125k
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
501
502
125k
  if (thread_context->zstd_cctx == NULL) {
503
1.07k
    thread_context->zstd_cctx = ZSTD_createCCtx();
504
1.07k
  }
505
506
125k
  if (context->use_dict) {
507
0
    assert(context->dict_cdict != NULL);
508
0
    code = ZSTD_compress_usingCDict(
509
0
            thread_context->zstd_cctx, (void*)output, maxout, (void*)input,
510
0
            input_length, context->dict_cdict);
511
125k
  } else {
512
125k
    code = ZSTD_compressCCtx(thread_context->zstd_cctx,
513
125k
        (void*)output, maxout, (void*)input, input_length, clevel);
514
125k
  }
515
125k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
516
    // Blosc will just memcpy this buffer
517
34.4k
    return 0;
518
34.4k
  }
519
91.0k
  return (int)code;
520
125k
}
521
522
static int zstd_wrap_decompress(struct thread_context* thread_context,
523
                                const char* input, size_t compressed_length,
524
17.0k
                                char* output, size_t maxout) {
525
17.0k
  size_t code;
526
17.0k
  blosc2_context* context = thread_context->parent_context;
527
528
17.0k
  if (thread_context->zstd_dctx == NULL) {
529
5.44k
    thread_context->zstd_dctx = ZSTD_createDCtx();
530
5.44k
  }
531
532
17.0k
  if (context->use_dict) {
533
2.92k
    assert(context->dict_ddict != NULL);
534
2.92k
    code = ZSTD_decompress_usingDDict(
535
2.92k
            thread_context->zstd_dctx, (void*)output, maxout, (void*)input,
536
2.92k
            compressed_length, context->dict_ddict);
537
14.1k
  } else {
538
14.1k
    code = ZSTD_decompressDCtx(thread_context->zstd_dctx,
539
14.1k
        (void*)output, maxout, (void*)input, compressed_length);
540
14.1k
  }
541
17.0k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
542
5.07k
    BLOSC_TRACE_ERROR("Error in ZSTD decompression: '%s'.  Giving up.",
543
0
                      ZDICT_getErrorName(code));
544
0
    return 0;
545
5.07k
  }
546
12.0k
  return (int)code;
547
17.0k
}
548
#endif /*  HAVE_ZSTD */
549
550
/* Compute acceleration for blosclz */
551
245k
static int get_accel(const blosc2_context* context) {
552
245k
  int clevel = context->clevel;
553
554
245k
  if (context->compcode == BLOSC_LZ4) {
555
    /* This acceleration setting based on discussions held in:
556
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
557
     */
558
10.7k
    return (10 - clevel);
559
10.7k
  }
560
234k
  return 1;
561
245k
}
562
563
564
2.11M
int do_nothing(uint8_t filter, char cmode) {
565
2.11M
  if (cmode == 'c') {
566
1.47M
    return (filter == BLOSC_NOFILTER);
567
1.47M
  } else {
568
    // TRUNC_PREC do not have to be applied during decompression
569
642k
    return ((filter == BLOSC_NOFILTER) || (filter == BLOSC_TRUNC_PREC));
570
642k
  }
571
2.11M
}
572
573
574
91.3k
int next_filter(const uint8_t* filters, int current_filter, char cmode) {
575
131k
  for (int i = current_filter - 1; i >= 0; i--) {
576
131k
    if (!do_nothing(filters[i], cmode)) {
577
91.3k
      return filters[i];
578
91.3k
    }
579
131k
  }
580
0
  return BLOSC_NOFILTER;
581
91.3k
}
582
583
584
330k
int last_filter(const uint8_t* filters, char cmode) {
585
330k
  int last_index = -1;
586
2.31M
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
587
1.98M
    if (!do_nothing(filters[i], cmode))  {
588
235k
      last_index = i;
589
235k
    }
590
1.98M
  }
591
330k
  return last_index;
592
330k
}
593
594
595
/* Convert filter pipeline to filter flags */
596
87.6k
static uint8_t filters_to_flags(const uint8_t* filters) {
597
87.6k
  uint8_t flags = 0;
598
599
613k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
600
525k
    switch (filters[i]) {
601
28.8k
      case BLOSC_SHUFFLE:
602
28.8k
        flags |= BLOSC_DOSHUFFLE;
603
28.8k
        break;
604
14.9k
      case BLOSC_BITSHUFFLE:
605
14.9k
        flags |= BLOSC_DOBITSHUFFLE;
606
14.9k
        break;
607
3.22k
      case BLOSC_DELTA:
608
3.22k
        flags |= BLOSC_DODELTA;
609
3.22k
        break;
610
478k
      default :
611
478k
        break;
612
525k
    }
613
525k
  }
614
87.6k
  return flags;
615
87.6k
}
616
617
618
/* Convert filter flags to filter pipeline */
619
210k
static void flags_to_filters(const uint8_t flags, uint8_t* filters) {
620
  /* Initialize the filter pipeline */
621
210k
  memset(filters, 0, BLOSC2_MAX_FILTERS);
622
  /* Fill the filter pipeline */
623
210k
  if (flags & BLOSC_DOSHUFFLE)
624
189k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
625
210k
  if (flags & BLOSC_DOBITSHUFFLE)
626
186k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
627
210k
  if (flags & BLOSC_DODELTA)
628
34.2k
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
629
210k
}
630
631
632
/* Get filter flags from header flags */
633
static uint8_t get_filter_flags(const uint8_t header_flags,
634
5.86k
                                const int32_t typesize) {
635
5.86k
  uint8_t flags = 0;
636
637
5.86k
  if ((header_flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
638
1.15k
    flags |= BLOSC_DOSHUFFLE;
639
1.15k
  }
640
5.86k
  if (header_flags & BLOSC_DOBITSHUFFLE) {
641
1.52k
    flags |= BLOSC_DOBITSHUFFLE;
642
1.52k
  }
643
5.86k
  if (header_flags & BLOSC_DODELTA) {
644
2.89k
    flags |= BLOSC_DODELTA;
645
2.89k
  }
646
5.86k
  if (header_flags & BLOSC_MEMCPYED) {
647
39
    flags |= BLOSC_MEMCPYED;
648
39
  }
649
5.86k
  return flags;
650
5.86k
}
651
652
typedef struct blosc_header_s {
653
  uint8_t version;
654
  uint8_t versionlz;
655
  uint8_t flags;
656
  uint8_t typesize;
657
  int32_t nbytes;
658
  int32_t blocksize;
659
  int32_t cbytes;
660
  // Extended Blosc2 header
661
  uint8_t filter_codes[BLOSC2_MAX_FILTERS];
662
  uint8_t udcompcode;
663
  uint8_t compcode_meta;
664
  uint8_t filter_meta[BLOSC2_MAX_FILTERS];
665
  uint8_t reserved2;
666
  uint8_t blosc2_flags;
667
} blosc_header;
668
669
670
int read_chunk_header(const uint8_t* src, int32_t srcsize, bool extended_header, blosc_header* header)
671
235k
{
672
235k
  memset(header, 0, sizeof(blosc_header));
673
674
235k
  if (srcsize < BLOSC_MIN_HEADER_LENGTH) {
675
0
    BLOSC_TRACE_ERROR("Not enough space to read Blosc header.");
676
0
    return BLOSC2_ERROR_READ_BUFFER;
677
0
  }
678
679
235k
  memcpy(header, src, BLOSC_MIN_HEADER_LENGTH);
680
681
235k
  bool little_endian = is_little_endian();
682
683
235k
  if (!little_endian) {
684
0
    header->nbytes = bswap32_(header->nbytes);
685
0
    header->blocksize = bswap32_(header->blocksize);
686
0
    header->cbytes = bswap32_(header->cbytes);
687
0
  }
688
689
235k
  if (header->version > BLOSC2_VERSION_FORMAT) {
690
    /* Version from future */
691
6
    return BLOSC2_ERROR_VERSION_SUPPORT;
692
6
  }
693
235k
  if (header->cbytes < BLOSC_MIN_HEADER_LENGTH) {
694
34
    BLOSC_TRACE_ERROR("`cbytes` is too small to read min header.");
695
0
    return BLOSC2_ERROR_INVALID_HEADER;
696
34
  }
697
235k
  if (header->blocksize <= 0 || (header->nbytes > 0 && (header->blocksize > header->nbytes))) {
698
70
    BLOSC_TRACE_ERROR("`blocksize` is zero or greater than uncompressed size");
699
0
    return BLOSC2_ERROR_INVALID_HEADER;
700
70
  }
701
235k
  if (header->blocksize > BLOSC2_MAXBLOCKSIZE) {
702
10
    BLOSC_TRACE_ERROR("`blocksize` greater than maximum allowed");
703
0
    return BLOSC2_ERROR_INVALID_HEADER;
704
10
  }
705
235k
  if (header->typesize == 0) {
706
12
    BLOSC_TRACE_ERROR("`typesize` is zero.");
707
0
    return BLOSC2_ERROR_INVALID_HEADER;
708
12
  }
709
710
  /* Read extended header if it is wanted */
711
235k
  if ((extended_header) && (header->flags & BLOSC_DOSHUFFLE) && (header->flags & BLOSC_DOBITSHUFFLE)) {
712
31.1k
    if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH) {
713
17
      BLOSC_TRACE_ERROR("`cbytes` is too small to read extended header.");
714
0
      return BLOSC2_ERROR_INVALID_HEADER;
715
17
    }
716
31.1k
    if (srcsize < BLOSC_EXTENDED_HEADER_LENGTH) {
717
0
      BLOSC_TRACE_ERROR("Not enough space to read Blosc extended header.");
718
0
      return BLOSC2_ERROR_READ_BUFFER;
719
0
    }
720
721
31.1k
    memcpy((uint8_t *)header + BLOSC_MIN_HEADER_LENGTH, src + BLOSC_MIN_HEADER_LENGTH,
722
31.1k
      BLOSC_EXTENDED_HEADER_LENGTH - BLOSC_MIN_HEADER_LENGTH);
723
724
31.1k
    int32_t special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
725
31.1k
    if (special_type != 0) {
726
1.24k
      if (header->nbytes % header->typesize != 0) {
727
4
        BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
728
0
        return BLOSC2_ERROR_INVALID_HEADER;
729
4
      }
730
1.23k
      if (special_type == BLOSC2_SPECIAL_VALUE) {
731
500
        if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH + header->typesize) {
732
5
          BLOSC_TRACE_ERROR("`cbytes` is too small for run length encoding");
733
0
          return BLOSC2_ERROR_READ_BUFFER;
734
5
        }
735
500
      }
736
1.23k
    }
737
    // The number of filters depends on the version of the header. Blosc2 alpha series
738
    // did not initialize filters to zero beyond the max supported.
739
31.1k
    if (header->version == BLOSC2_VERSION_FORMAT_ALPHA) {
740
931
      header->filter_codes[5] = 0;
741
931
      header->filter_meta[5] = 0;
742
931
    }
743
31.1k
  }
744
204k
  else {
745
204k
    flags_to_filters(header->flags, header->filter_codes);
746
204k
  }
747
235k
  return 0;
748
235k
}
749
750
93.4k
static inline void blosc2_calculate_blocks(blosc2_context* context) {
751
  /* Compute number of blocks in buffer */
752
93.4k
  context->nblocks = context->sourcesize / context->blocksize;
753
93.4k
  context->leftover = context->sourcesize % context->blocksize;
754
93.4k
  context->nblocks = (context->leftover > 0) ?
755
78.7k
                     (context->nblocks + 1) : context->nblocks;
756
93.4k
}
757
758
29.3k
static int blosc2_initialize_context_from_header(blosc2_context* context, blosc_header* header) {
759
29.3k
  context->header_flags = header->flags;
760
29.3k
  context->typesize = header->typesize;
761
29.3k
  context->sourcesize = header->nbytes;
762
29.3k
  context->blocksize = header->blocksize;
763
29.3k
  context->blosc2_flags = header->blosc2_flags;
764
29.3k
  context->compcode = header->flags >> 5;
765
29.3k
  if (context->compcode == BLOSC_UDCODEC_FORMAT) {
766
522
    context->compcode = header->udcompcode;
767
522
  }
768
29.3k
  blosc2_calculate_blocks(context);
769
770
29.3k
  bool is_lazy = false;
771
29.3k
  if ((context->header_flags & BLOSC_DOSHUFFLE) &&
772
29.3k
      (context->header_flags & BLOSC_DOBITSHUFFLE)) {
773
    /* Extended header */
774
23.4k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
775
776
23.4k
    memcpy(context->filters, header->filter_codes, BLOSC2_MAX_FILTERS);
777
23.4k
    memcpy(context->filters_meta, header->filter_meta, BLOSC2_MAX_FILTERS);
778
23.4k
    context->compcode_meta = header->compcode_meta;
779
780
23.4k
    context->filter_flags = filters_to_flags(header->filter_codes);
781
23.4k
    context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
782
783
23.4k
    is_lazy = (context->blosc2_flags & 0x08u);
784
23.4k
  }
785
5.86k
  else {
786
5.86k
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
787
5.86k
    context->filter_flags = get_filter_flags(context->header_flags, context->typesize);
788
5.86k
    flags_to_filters(context->header_flags, context->filters);
789
5.86k
  }
790
791
  // Some checks for malformed headers
792
29.3k
  if (!is_lazy && header->cbytes > context->srcsize) {
793
0
    return BLOSC2_ERROR_INVALID_HEADER;
794
0
  }
795
796
29.3k
  return 0;
797
29.3k
}
798
799
800
0
int fill_filter(blosc2_filter *filter) {
801
0
  char libpath[PATH_MAX];
802
0
  void *lib = load_lib(filter->name, libpath);
803
0
  if(lib == NULL) {
804
0
    BLOSC_TRACE_ERROR("Error while loading the library");
805
0
    return BLOSC2_ERROR_FAILURE;
806
0
  }
807
808
0
  filter_info *info = dlsym(lib, "info");
809
0
  filter->forward = dlsym(lib, info->forward);
810
0
  filter->backward = dlsym(lib, info->backward);
811
812
0
  if (filter->forward == NULL || filter->backward == NULL){
813
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
814
0
    dlclose(lib);
815
0
    return BLOSC2_ERROR_FAILURE;
816
0
  }
817
818
0
  return BLOSC2_ERROR_SUCCESS;
819
0
}
820
821
822
1
int fill_codec(blosc2_codec *codec) {
823
1
  char libpath[PATH_MAX];
824
1
  void *lib = load_lib(codec->compname, libpath);
825
1
  if(lib == NULL) {
826
1
    BLOSC_TRACE_ERROR("Error while loading the library for codec `%s`", codec->compname);
827
0
    return BLOSC2_ERROR_FAILURE;
828
1
  }
829
830
0
  codec_info *info = dlsym(lib, "info");
831
0
  if (info == NULL) {
832
0
    BLOSC_TRACE_ERROR("`info` symbol cannot be loaded from plugin `%s`", codec->compname);
833
0
    dlclose(lib);
834
0
    return BLOSC2_ERROR_FAILURE;
835
0
  }
836
837
0
  codec->encoder = dlsym(lib, info->encoder);
838
0
  codec->decoder = dlsym(lib, info->decoder);
839
0
  if (codec->encoder == NULL || codec->decoder == NULL) {
840
0
    BLOSC_TRACE_ERROR("encoder or decoder cannot be loaded from plugin `%s`", codec->compname);
841
0
    dlclose(lib);
842
0
    return BLOSC2_ERROR_FAILURE;
843
0
  }
844
845
0
  return BLOSC2_ERROR_SUCCESS;
846
0
}
847
848
849
0
int fill_tuner(blosc2_tuner *tuner) {
850
0
  char libpath[PATH_MAX] = {0};
851
0
  void *lib = load_lib(tuner->name, libpath);
852
0
  if(lib == NULL) {
853
0
    BLOSC_TRACE_ERROR("Error while loading the library");
854
0
    return BLOSC2_ERROR_FAILURE;
855
0
  }
856
857
0
  tuner_info *info = dlsym(lib, "info");
858
0
  tuner->init = dlsym(lib, info->init);
859
0
  tuner->update = dlsym(lib, info->update);
860
0
  tuner->next_blocksize = dlsym(lib, info->next_blocksize);
861
0
  tuner->free = dlsym(lib, info->free);
862
0
  tuner->next_cparams = dlsym(lib, info->next_cparams);
863
864
0
  if (tuner->init == NULL || tuner->update == NULL || tuner->next_blocksize == NULL || tuner->free == NULL
865
0
      || tuner->next_cparams == NULL){
866
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
867
0
    dlclose(lib);
868
0
    return BLOSC2_ERROR_FAILURE;
869
0
  }
870
871
0
  return BLOSC2_ERROR_SUCCESS;
872
0
}
873
874
875
64.1k
static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_header* header, bool extended_header) {
876
64.1k
  memset(header, 0, sizeof(blosc_header));
877
878
64.1k
  header->version = BLOSC2_VERSION_FORMAT;
879
64.1k
  header->versionlz = compcode_to_compversion(context->compcode);
880
64.1k
  header->flags = context->header_flags;
881
64.1k
  header->typesize = (uint8_t)context->typesize;
882
64.1k
  header->nbytes = (int32_t)context->sourcesize;
883
64.1k
  header->blocksize = (int32_t)context->blocksize;
884
885
64.1k
  int little_endian = is_little_endian();
886
64.1k
  if (!little_endian) {
887
0
    header->nbytes = bswap32_(header->nbytes);
888
0
    header->blocksize = bswap32_(header->blocksize);
889
    // cbytes written after compression
890
0
  }
891
892
64.1k
  if (extended_header) {
893
    /* Store filter pipeline info at the end of the header */
894
448k
    for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
895
384k
      header->filter_codes[i] = context->filters[i];
896
384k
      header->filter_meta[i] = context->filters_meta[i];
897
384k
    }
898
64.1k
    header->udcompcode = context->compcode;
899
64.1k
    header->compcode_meta = context->compcode_meta;
900
901
64.1k
    if (!little_endian) {
902
0
      header->blosc2_flags |= BLOSC2_BIGENDIAN;
903
0
    }
904
64.1k
    if (context->use_dict) {
905
0
      header->blosc2_flags |= BLOSC2_USEDICT;
906
0
    }
907
64.1k
    if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
908
0
      header->blosc2_flags |= BLOSC2_INSTR_CODEC;
909
0
    }
910
64.1k
  }
911
912
64.1k
  return 0;
913
64.1k
}
914
915
1.39M
void _cycle_buffers(uint8_t **src, uint8_t **dest, uint8_t **tmp) {
916
1.39M
  uint8_t *tmp2 = *src;
917
1.39M
  *src = *dest;
918
1.39M
  *dest = *tmp;
919
1.39M
  *tmp = tmp2;
920
1.39M
}
921
922
uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
923
                          const uint8_t* src, const int32_t offset,
924
147k
                          uint8_t* dest, uint8_t* tmp) {
925
147k
  blosc2_context* context = thread_context->parent_context;
926
147k
  uint8_t* _src = (uint8_t*)src + offset;
927
147k
  uint8_t* _tmp = tmp;
928
147k
  uint8_t* _dest = dest;
929
147k
  int32_t typesize = context->typesize;
930
147k
  uint8_t* filters = context->filters;
931
147k
  uint8_t* filters_meta = context->filters_meta;
932
147k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
933
934
  /* Prefilter function */
935
147k
  if (context->prefilter != NULL) {
936
    /* Set unwritten values to zero */
937
0
    memset(_dest, 0, bsize);
938
    // Create new prefilter parameters for this block (must be private for each thread)
939
0
    blosc2_prefilter_params preparams;
940
0
    memcpy(&preparams, context->preparams, sizeof(preparams));
941
0
    preparams.input = _src;
942
0
    preparams.output = _dest;
943
0
    preparams.output_size = bsize;
944
0
    preparams.output_typesize = typesize;
945
0
    preparams.output_offset = offset;
946
0
    preparams.nblock = offset / context->blocksize;
947
0
    preparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
948
0
    preparams.tid = thread_context->tid;
949
0
    preparams.ttmp = thread_context->tmp;
950
0
    preparams.ttmp_nbytes = thread_context->tmp_nbytes;
951
0
    preparams.ctx = context;
952
953
0
    if (context->prefilter(&preparams) != 0) {
954
0
      BLOSC_TRACE_ERROR("Execution of prefilter function failed");
955
0
      return NULL;
956
0
    }
957
958
0
    if (memcpyed) {
959
      // No more filters are required
960
0
      return _dest;
961
0
    }
962
0
    _cycle_buffers(&_src, &_dest, &_tmp);
963
0
  }
964
965
  /* Process the filter pipeline */
966
1.02M
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
967
882k
    int rc = BLOSC2_ERROR_SUCCESS;
968
882k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
969
882k
      switch (filters[i]) {
970
21.9k
        case BLOSC_SHUFFLE:
971
761k
          for (int j = 0; j <= filters_meta[i]; j++) {
972
739k
            shuffle(typesize, bsize, _src, _dest);
973
            // Cycle filters when required
974
739k
            if (j < filters_meta[i]) {
975
717k
              _cycle_buffers(&_src, &_dest, &_tmp);
976
717k
            }
977
739k
          }
978
21.9k
          break;
979
125k
        case BLOSC_BITSHUFFLE:
980
125k
          if (bitshuffle(typesize, bsize, _src, _dest) < 0) {
981
0
            return NULL;
982
0
          }
983
125k
          break;
984
125k
        case BLOSC_DELTA:
985
0
          delta_encoder(src, offset, bsize, typesize, _src, _dest);
986
0
          break;
987
0
        case BLOSC_TRUNC_PREC:
988
0
          if (truncate_precision(filters_meta[i], typesize, bsize, _src, _dest) < 0) {
989
0
            return NULL;
990
0
          }
991
0
          break;
992
735k
        default:
993
735k
          if (filters[i] != BLOSC_NOFILTER) {
994
0
            BLOSC_TRACE_ERROR("Filter %d not handled during compression\n", filters[i]);
995
0
            return NULL;
996
0
          }
997
882k
      }
998
882k
    }
999
0
    else {
1000
      // Look for the filters_meta in user filters and run it
1001
0
      for (uint64_t j = 0; j < g_nfilters; ++j) {
1002
0
        if (g_filters[j].id == filters[i]) {
1003
0
          if (g_filters[j].forward == NULL) {
1004
            // Dynamically load library
1005
0
            if (fill_filter(&g_filters[j]) < 0) {
1006
0
              BLOSC_TRACE_ERROR("Could not load filter %d\n", g_filters[j].id);
1007
0
              return NULL;
1008
0
            }
1009
0
          }
1010
0
          if (g_filters[j].forward != NULL) {
1011
0
            blosc2_cparams cparams;
1012
0
            blosc2_ctx_get_cparams(context, &cparams);
1013
0
            rc = g_filters[j].forward(_src, _dest, bsize, filters_meta[i], &cparams, g_filters[j].id);
1014
0
          } else {
1015
0
            BLOSC_TRACE_ERROR("Forward function is NULL");
1016
0
            return NULL;
1017
0
          }
1018
0
          if (rc != BLOSC2_ERROR_SUCCESS) {
1019
0
            BLOSC_TRACE_ERROR("User-defined filter %d failed during compression\n", filters[i]);
1020
0
            return NULL;
1021
0
          }
1022
0
          goto urfiltersuccess;
1023
0
        }
1024
0
      }
1025
0
      BLOSC_TRACE_ERROR("User-defined filter %d not found during compression\n", filters[i]);
1026
0
      return NULL;
1027
1028
0
      urfiltersuccess:;
1029
1030
0
    }
1031
1032
    // Cycle buffers when required
1033
882k
    if (filters[i] != BLOSC_NOFILTER) {
1034
147k
      _cycle_buffers(&_src, &_dest, &_tmp);
1035
147k
    }
1036
882k
  }
1037
147k
  return _src;
1038
147k
}
1039
1040
1041
// Optimized version for detecting runs.  It compares 8 bytes values wherever possible.
1042
245k
static bool get_run(const uint8_t* ip, const uint8_t* ip_bound) {
1043
245k
  uint8_t x = *ip;
1044
245k
  int64_t value, value2;
1045
  /* Broadcast the value for every byte in a 64-bit register */
1046
245k
  memset(&value, x, 8);
1047
3.37M
  while (ip < (ip_bound - 8)) {
1048
#if defined(BLOSC_STRICT_ALIGN)
1049
    memcpy(&value2, ip, 8);
1050
#else
1051
3.36M
    value2 = *(int64_t*)ip;
1052
3.36M
#endif
1053
3.36M
    if (value != value2) {
1054
      // Values differ.  We don't have a run.
1055
236k
      return false;
1056
236k
    }
1057
3.12M
    else {
1058
3.12M
      ip += 8;
1059
3.12M
    }
1060
3.36M
  }
1061
  /* Look into the remainder */
1062
73.7k
  while ((ip < ip_bound) && (*ip == x)) ip++;
1063
8.97k
  return ip == ip_bound ? true : false;
1064
245k
}
1065
1066
1067
/* Shuffle & compress a single block */
1068
static int blosc_c(struct thread_context* thread_context, int32_t bsize,
1069
                   int32_t leftoverblock, int32_t ntbytes, int32_t destsize,
1070
                   const uint8_t* src, const int32_t offset, uint8_t* dest,
1071
245k
                   uint8_t* tmp, uint8_t* tmp2) {
1072
245k
  blosc2_context* context = thread_context->parent_context;
1073
245k
  int dont_split = (context->header_flags & 0x10) >> 4;
1074
245k
  int dict_training = context->use_dict && context->dict_cdict == NULL;
1075
245k
  int32_t j, neblock, nstreams;
1076
245k
  int32_t cbytes;                   /* number of compressed bytes in split */
1077
245k
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
1078
245k
  int32_t maxout;
1079
245k
  int32_t typesize = context->typesize;
1080
245k
  const char* compname;
1081
245k
  int accel;
1082
245k
  const uint8_t* _src;
1083
245k
  uint8_t *_tmp = tmp, *_tmp2 = tmp2;
1084
245k
  int last_filter_index = last_filter(context->filters, 'c');
1085
245k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1086
245k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1087
245k
  blosc_timestamp_t last, current;
1088
245k
  float filter_time = 0.f;
1089
1090
245k
  if (instr_codec) {
1091
0
    blosc_set_timestamp(&last);
1092
0
  }
1093
1094
  // See whether we have a run here
1095
245k
  if (last_filter_index >= 0 || context->prefilter != NULL) {
1096
    /* Apply the filter pipeline just for the prefilter */
1097
147k
    if (memcpyed && context->prefilter != NULL) {
1098
      // We only need the prefilter output
1099
0
      _src = pipeline_forward(thread_context, bsize, src, offset, dest, _tmp2);
1100
0
      if (_src == NULL) {
1101
0
        return BLOSC2_ERROR_FILTER_PIPELINE;
1102
0
      }
1103
0
      return bsize;
1104
0
    }
1105
    /* Apply regular filter pipeline */
1106
147k
    _src = pipeline_forward(thread_context, bsize, src, offset, _tmp, _tmp2);
1107
147k
    if (_src == NULL) {
1108
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1109
0
    }
1110
147k
  } else {
1111
98.4k
    _src = src + offset;
1112
98.4k
  }
1113
1114
245k
  if (instr_codec) {
1115
0
    blosc_set_timestamp(&current);
1116
0
    filter_time = (float) blosc_elapsed_secs(last, current);
1117
0
    last = current;
1118
0
  }
1119
1120
245k
  assert(context->clevel > 0);
1121
1122
  /* Calculate acceleration for different compressors */
1123
245k
  accel = get_accel(context);
1124
1125
  /* The number of compressed data streams for this block */
1126
245k
  if (!dont_split && !leftoverblock && !dict_training) {
1127
21.8k
    nstreams = (int32_t)typesize;
1128
21.8k
  }
1129
223k
  else {
1130
223k
    nstreams = 1;
1131
223k
  }
1132
245k
  neblock = bsize / nstreams;
1133
469k
  for (j = 0; j < nstreams; j++) {
1134
245k
    if (instr_codec) {
1135
0
      blosc_set_timestamp(&last);
1136
0
    }
1137
245k
    if (!dict_training) {
1138
245k
      dest += sizeof(int32_t);
1139
245k
      ntbytes += sizeof(int32_t);
1140
245k
      ctbytes += sizeof(int32_t);
1141
1142
245k
      const uint8_t *ip = (uint8_t *) _src + j * neblock;
1143
245k
      const uint8_t *ipbound = (uint8_t *) _src + (j + 1) * neblock;
1144
1145
245k
      if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && get_run(ip, ipbound)) {
1146
        // A run
1147
7.95k
        int32_t value = _src[j * neblock];
1148
7.95k
        if (ntbytes > destsize) {
1149
7
          return 0;    /* Non-compressible data */
1150
7
        }
1151
1152
7.94k
        if (instr_codec) {
1153
0
          blosc_set_timestamp(&current);
1154
0
          int32_t instr_size = sizeof(blosc2_instr);
1155
0
          ntbytes += instr_size;
1156
0
          ctbytes += instr_size;
1157
0
          if (ntbytes > destsize) {
1158
0
            return 0;    /* Non-compressible data */
1159
0
          }
1160
0
          _sw32(dest - 4, instr_size);
1161
0
          blosc2_instr *desti = (blosc2_instr *)dest;
1162
0
          memset(desti, 0, sizeof(blosc2_instr));
1163
          // Special values have an overhead of about 1 int32
1164
0
          int32_t ssize = value == 0 ? sizeof(int32_t) : sizeof(int32_t) + 1;
1165
0
          desti->cratio = (float) neblock / (float) ssize;
1166
0
          float ctime = (float) blosc_elapsed_secs(last, current);
1167
0
          desti->cspeed = (float) neblock / ctime;
1168
0
          desti->filter_speed = (float) neblock / filter_time;
1169
0
          desti->flags[0] = 1;    // mark a runlen
1170
0
          dest += instr_size;
1171
0
          continue;
1172
0
        }
1173
1174
        // Encode the repeated byte in the first (LSB) byte of the length of the split.
1175
7.94k
        _sw32(dest - 4, -value);    // write the value in two's complement
1176
7.94k
        if (value > 0) {
1177
          // Mark encoding as a run-length (== 0 is always a 0's run)
1178
4.87k
          ntbytes += 1;
1179
4.87k
          ctbytes += 1;
1180
4.87k
          if (ntbytes > destsize) {
1181
9
            return 0;    /* Non-compressible data */
1182
9
          }
1183
          // Set MSB bit (sign) to 1 (not really necessary here, but for demonstration purposes)
1184
          // dest[-1] |= 0x80;
1185
4.86k
          dest[0] = 0x1;   // set run-length bit (0) in token
1186
4.86k
          dest += 1;
1187
4.86k
        }
1188
7.93k
        continue;
1189
7.94k
      }
1190
245k
    }
1191
1192
237k
    maxout = neblock;
1193
237k
    if (ntbytes + maxout > destsize && !instr_codec) {
1194
      /* avoid buffer * overrun */
1195
52.9k
      maxout = destsize - ntbytes;
1196
52.9k
      if (maxout <= 0) {
1197
121
        return 0;                  /* non-compressible block */
1198
121
      }
1199
52.9k
    }
1200
237k
    if (dict_training) {
1201
      // We are in the build dict state, so don't compress
1202
      // TODO: copy only a percentage for sampling
1203
0
      memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1204
0
      cbytes = (int32_t)neblock;
1205
0
    }
1206
237k
    else if (context->compcode == BLOSC_BLOSCLZ) {
1207
48.0k
      cbytes = blosclz_compress(context->clevel, _src + j * neblock,
1208
48.0k
                                (int)neblock, dest, maxout, context);
1209
48.0k
    }
1210
189k
    else if (context->compcode == BLOSC_LZ4) {
1211
10.7k
      void *hash_table = NULL;
1212
    #ifdef HAVE_IPP
1213
      hash_table = (void*)thread_context->lz4_hash_table;
1214
    #endif
1215
10.7k
      cbytes = lz4_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1216
10.7k
                                 (char*)dest, (size_t)maxout, accel, hash_table);
1217
10.7k
    }
1218
178k
    else if (context->compcode == BLOSC_LZ4HC) {
1219
15.8k
      cbytes = lz4hc_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1220
15.8k
                                   (char*)dest, (size_t)maxout, context->clevel);
1221
15.8k
    }
1222
162k
  #if defined(HAVE_ZLIB)
1223
162k
    else if (context->compcode == BLOSC_ZLIB) {
1224
37.2k
      cbytes = zlib_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1225
37.2k
                                  (char*)dest, (size_t)maxout, context->clevel);
1226
37.2k
    }
1227
125k
  #endif /* HAVE_ZLIB */
1228
125k
  #if defined(HAVE_ZSTD)
1229
125k
    else if (context->compcode == BLOSC_ZSTD) {
1230
125k
      cbytes = zstd_wrap_compress(thread_context,
1231
125k
                                  (char*)_src + j * neblock, (size_t)neblock,
1232
125k
                                  (char*)dest, (size_t)maxout, context->clevel);
1233
125k
    }
1234
0
  #endif /* HAVE_ZSTD */
1235
0
    else if (context->compcode > BLOSC2_DEFINED_CODECS_STOP) {
1236
0
      for (int i = 0; i < g_ncodecs; ++i) {
1237
0
        if (g_codecs[i].compcode == context->compcode) {
1238
0
          if (g_codecs[i].encoder == NULL) {
1239
            // Dynamically load codec plugin
1240
0
            if (fill_codec(&g_codecs[i]) < 0) {
1241
0
              BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1242
0
              return BLOSC2_ERROR_CODEC_SUPPORT;
1243
0
            }
1244
0
          }
1245
0
          blosc2_cparams cparams;
1246
0
          blosc2_ctx_get_cparams(context, &cparams);
1247
0
          cbytes = g_codecs[i].encoder(_src + j * neblock,
1248
0
                                        neblock,
1249
0
                                        dest,
1250
0
                                        maxout,
1251
0
                                        context->compcode_meta,
1252
0
                                        &cparams,
1253
0
                                        context->src);
1254
0
          goto urcodecsuccess;
1255
0
        }
1256
0
      }
1257
0
      BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during compression", context->compcode);
1258
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1259
0
    urcodecsuccess:
1260
0
      ;
1261
0
    } else {
1262
0
      blosc2_compcode_to_compname(context->compcode, &compname);
1263
0
      BLOSC_TRACE_ERROR("Blosc has not been compiled with '%s' compression support."
1264
0
                        "Please use one having it.", compname);
1265
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1266
0
    }
1267
1268
237k
    if (cbytes > maxout) {
1269
      /* Buffer overrun caused by compression (should never happen) */
1270
0
      return BLOSC2_ERROR_WRITE_BUFFER;
1271
0
    }
1272
237k
    if (cbytes < 0) {
1273
      /* cbytes should never be negative */
1274
0
      return BLOSC2_ERROR_DATA;
1275
0
    }
1276
237k
    if (cbytes == 0) {
1277
      // When cbytes is 0, the compressor has not been able to compress anything
1278
72.1k
      cbytes = neblock;
1279
72.1k
    }
1280
1281
237k
    if (instr_codec) {
1282
0
      blosc_set_timestamp(&current);
1283
0
      int32_t instr_size = sizeof(blosc2_instr);
1284
0
      ntbytes += instr_size;
1285
0
      ctbytes += instr_size;
1286
0
      if (ntbytes > destsize) {
1287
0
        return 0;    /* Non-compressible data */
1288
0
      }
1289
0
      _sw32(dest - 4, instr_size);
1290
0
      float ctime = (float)blosc_elapsed_secs(last, current);
1291
0
      blosc2_instr *desti = (blosc2_instr *)dest;
1292
0
      memset(desti, 0, sizeof(blosc2_instr));
1293
      // cratio is computed having into account 1 additional int (csize)
1294
0
      desti->cratio = (float)neblock / (float)(cbytes + sizeof(int32_t));
1295
0
      desti->cspeed = (float)neblock / ctime;
1296
0
      desti->filter_speed = (float) neblock / filter_time;
1297
0
      dest += instr_size;
1298
0
      continue;
1299
0
    }
1300
1301
237k
    if (!dict_training) {
1302
237k
      if (cbytes == neblock) {
1303
        /* The compressor has been unable to compress data at all. */
1304
        /* Before doing the copy, check that we are not running into a
1305
           buffer overflow. */
1306
72.5k
        if ((ntbytes + neblock) > destsize) {
1307
20.9k
          return 0;    /* Non-compressible data */
1308
20.9k
        }
1309
51.6k
        memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1310
51.6k
        cbytes = neblock;
1311
51.6k
      }
1312
216k
      _sw32(dest - 4, cbytes);
1313
216k
    }
1314
216k
    dest += cbytes;
1315
216k
    ntbytes += cbytes;
1316
216k
    ctbytes += cbytes;
1317
216k
  }  /* Closes j < nstreams */
1318
1319
224k
  return ctbytes;
1320
245k
}
1321
1322
1323
/* Process the filter pipeline (decompression mode) */
1324
int pipeline_backward(struct thread_context* thread_context, const int32_t bsize, uint8_t* dest,
1325
                      const int32_t offset, uint8_t* src, uint8_t* tmp,
1326
47.2k
                      uint8_t* tmp2, int last_filter_index, int32_t nblock) {
1327
47.2k
  blosc2_context* context = thread_context->parent_context;
1328
47.2k
  int32_t typesize = context->typesize;
1329
47.2k
  uint8_t* filters = context->filters;
1330
47.2k
  uint8_t* filters_meta = context->filters_meta;
1331
47.2k
  uint8_t* _src = src;
1332
47.2k
  uint8_t* _dest = tmp;
1333
47.2k
  uint8_t* _tmp = tmp2;
1334
47.2k
  int errcode = 0;
1335
1336
84.3k
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
1337
    // Delta filter requires the whole chunk ready
1338
84.3k
    int last_copy_filter = (last_filter_index == i) || (next_filter(filters, i, 'd') == BLOSC_DELTA);
1339
84.3k
    if (last_copy_filter && context->postfilter == NULL) {
1340
65.6k
      _dest = dest + offset;
1341
65.6k
    }
1342
84.3k
    int rc = BLOSC2_ERROR_SUCCESS;
1343
84.3k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1344
81.3k
      switch (filters[i]) {
1345
9.23k
        case BLOSC_SHUFFLE:
1346
477k
          for (int j = 0; j <= filters_meta[i]; j++) {
1347
468k
            unshuffle(typesize, bsize, _src, _dest);
1348
            // Cycle filters when required
1349
468k
            if (j < filters_meta[i]) {
1350
459k
              _cycle_buffers(&_src, &_dest, &_tmp);
1351
459k
            }
1352
            // Check whether we have to copy the intermediate _dest buffer to final destination
1353
468k
            if (last_copy_filter && (filters_meta[i] % 2) == 1 && j == filters_meta[i]) {
1354
4.16k
              memcpy(dest + offset, _dest, (unsigned int) bsize);
1355
4.16k
            }
1356
468k
          }
1357
9.23k
          break;
1358
39.4k
        case BLOSC_BITSHUFFLE:
1359
39.4k
          if (bitunshuffle(typesize, bsize, _src, _dest, context->src[BLOSC2_CHUNK_VERSION]) < 0) {
1360
0
            return BLOSC2_ERROR_FILTER_PIPELINE;
1361
0
          }
1362
39.4k
          break;
1363
39.4k
        case BLOSC_DELTA:
1364
14.9k
          if (context->nthreads == 1) {
1365
            /* Serial mode */
1366
14.9k
            delta_decoder(dest, offset, bsize, typesize, _dest);
1367
14.9k
          } else {
1368
            /* Force the thread in charge of the block 0 to go first */
1369
0
            pthread_mutex_lock(&context->delta_mutex);
1370
0
            if (context->dref_not_init) {
1371
0
              if (offset != 0) {
1372
0
                pthread_cond_wait(&context->delta_cv, &context->delta_mutex);
1373
0
              } else {
1374
0
                delta_decoder(dest, offset, bsize, typesize, _dest);
1375
0
                context->dref_not_init = 0;
1376
0
                pthread_cond_broadcast(&context->delta_cv);
1377
0
              }
1378
0
            }
1379
0
            pthread_mutex_unlock(&context->delta_mutex);
1380
0
            if (offset != 0) {
1381
0
              delta_decoder(dest, offset, bsize, typesize, _dest);
1382
0
            }
1383
0
          }
1384
14.9k
          break;
1385
3.70k
        case BLOSC_TRUNC_PREC:
1386
          // TRUNC_PREC filter does not need to be undone
1387
3.70k
          break;
1388
14.0k
        default:
1389
14.0k
          if (filters[i] != BLOSC_NOFILTER) {
1390
259
            BLOSC_TRACE_ERROR("Filter %d not handled during decompression.",
1391
0
                              filters[i]);
1392
0
            errcode = -1;
1393
259
          }
1394
81.3k
      }
1395
81.3k
    } else {
1396
        // Look for the filters_meta in user filters and run it
1397
10.8k
        for (uint64_t j = 0; j < g_nfilters; ++j) {
1398
10.6k
          if (g_filters[j].id == filters[i]) {
1399
2.71k
            if (g_filters[j].backward == NULL) {
1400
              // Dynamically load filter
1401
0
              if (fill_filter(&g_filters[j]) < 0) {
1402
0
                BLOSC_TRACE_ERROR("Could not load filter %d.", g_filters[j].id);
1403
0
                return BLOSC2_ERROR_FILTER_PIPELINE;
1404
0
              }
1405
0
            }
1406
2.71k
            if (g_filters[j].backward != NULL) {
1407
2.71k
              blosc2_dparams dparams;
1408
2.71k
              blosc2_ctx_get_dparams(context, &dparams);
1409
2.71k
              rc = g_filters[j].backward(_src, _dest, bsize, filters_meta[i], &dparams, g_filters[j].id);
1410
2.71k
            } else {
1411
0
              BLOSC_TRACE_ERROR("Backward function is NULL");
1412
0
              return BLOSC2_ERROR_FILTER_PIPELINE;
1413
0
            }
1414
2.71k
            if (rc != BLOSC2_ERROR_SUCCESS) {
1415
49
              BLOSC_TRACE_ERROR("User-defined filter %d failed during decompression.", filters[i]);
1416
0
              return rc;
1417
49
            }
1418
2.66k
            goto urfiltersuccess;
1419
2.71k
          }
1420
10.6k
        }
1421
249
      BLOSC_TRACE_ERROR("User-defined filter %d not found during decompression.", filters[i]);
1422
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1423
2.66k
      urfiltersuccess:;
1424
2.66k
    }
1425
1426
    // Cycle buffers when required
1427
84.0k
    if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
1428
66.5k
      _cycle_buffers(&_src, &_dest, &_tmp);
1429
66.5k
    }
1430
84.0k
    if (last_filter_index == i) {
1431
46.9k
      break;
1432
46.9k
    }
1433
84.0k
  }
1434
1435
  /* Postfilter function */
1436
46.9k
  if (context->postfilter != NULL) {
1437
    // Create new postfilter parameters for this block (must be private for each thread)
1438
0
    blosc2_postfilter_params postparams;
1439
0
    memcpy(&postparams, context->postparams, sizeof(postparams));
1440
0
    postparams.input = _src;
1441
0
    postparams.output = dest + offset;
1442
0
    postparams.size = bsize;
1443
0
    postparams.typesize = typesize;
1444
0
    postparams.offset = nblock * context->blocksize;
1445
0
    postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1446
0
    postparams.nblock = nblock;
1447
0
    postparams.tid = thread_context->tid;
1448
0
    postparams.ttmp = thread_context->tmp;
1449
0
    postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1450
0
    postparams.ctx = context;
1451
1452
0
    if (context->postfilter(&postparams) != 0) {
1453
0
      BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1454
0
      return BLOSC2_ERROR_POSTFILTER;
1455
0
    }
1456
0
  }
1457
1458
46.9k
  return errcode;
1459
46.9k
}
1460
1461
1462
1.56k
static int32_t set_nans(int32_t typesize, uint8_t* dest, int32_t destsize) {
1463
1.56k
  if (destsize % typesize != 0) {
1464
72
    BLOSC_TRACE_ERROR("destsize can only be a multiple of typesize");
1465
72
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1466
72
  }
1467
1.49k
  int32_t nitems = destsize / typesize;
1468
1.49k
  if (nitems == 0) {
1469
0
    return 0;
1470
0
  }
1471
1472
1.49k
  if (typesize == 4) {
1473
468
    float* dest_ = (float*)dest;
1474
468
    float val = nanf("");
1475
8.27k
    for (int i = 0; i < nitems; i++) {
1476
7.80k
      dest_[i] = val;
1477
7.80k
    }
1478
468
    return nitems;
1479
468
  }
1480
1.02k
  else if (typesize == 8) {
1481
996
    double* dest_ = (double*)dest;
1482
996
    double val = nan("");
1483
7.63k
    for (int i = 0; i < nitems; i++) {
1484
6.63k
      dest_[i] = val;
1485
6.63k
    }
1486
996
    return nitems;
1487
996
  }
1488
1489
29
  BLOSC_TRACE_ERROR("Unsupported typesize for NaN");
1490
0
  return BLOSC2_ERROR_DATA;
1491
1.49k
}
1492
1493
1494
3.19k
static int32_t set_values(int32_t typesize, const uint8_t* src, uint8_t* dest, int32_t destsize) {
1495
  // destsize can only be a multiple of typesize
1496
3.19k
  int64_t val8;
1497
3.19k
  int64_t* dest8;
1498
3.19k
  int32_t val4;
1499
3.19k
  int32_t* dest4;
1500
3.19k
  int16_t val2;
1501
3.19k
  int16_t* dest2;
1502
3.19k
  int8_t val1;
1503
3.19k
  int8_t* dest1;
1504
1505
3.19k
  if (destsize % typesize != 0) {
1506
29
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1507
29
  }
1508
3.16k
  int32_t nitems = destsize / typesize;
1509
3.16k
  if (nitems == 0) {
1510
0
    return 0;
1511
0
  }
1512
1513
3.16k
  switch (typesize) {
1514
521
    case 8:
1515
521
      val8 = ((int64_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1516
521
      dest8 = (int64_t*)dest;
1517
4.88k
      for (int i = 0; i < nitems; i++) {
1518
4.36k
        dest8[i] = val8;
1519
4.36k
      }
1520
521
      break;
1521
423
    case 4:
1522
423
      val4 = ((int32_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1523
423
      dest4 = (int32_t*)dest;
1524
5.97k
      for (int i = 0; i < nitems; i++) {
1525
5.55k
        dest4[i] = val4;
1526
5.55k
      }
1527
423
      break;
1528
520
    case 2:
1529
520
      val2 = ((int16_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1530
520
      dest2 = (int16_t*)dest;
1531
16.3k
      for (int i = 0; i < nitems; i++) {
1532
15.8k
        dest2[i] = val2;
1533
15.8k
      }
1534
520
      break;
1535
1.13k
    case 1:
1536
1.13k
      val1 = ((int8_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1537
1.13k
      dest1 = (int8_t*)dest;
1538
870k
      for (int i = 0; i < nitems; i++) {
1539
869k
        dest1[i] = val1;
1540
869k
      }
1541
1.13k
      break;
1542
569
    default:
1543
3.78k
      for (int i = 0; i < nitems; i++) {
1544
3.21k
        memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1545
3.21k
      }
1546
3.16k
  }
1547
1548
3.16k
  return nitems;
1549
3.16k
}
1550
1551
1552
/* Decompress & unshuffle a single block */
1553
static int blosc_d(
1554
    struct thread_context* thread_context, int32_t bsize,
1555
    int32_t leftoverblock, bool memcpyed, const uint8_t* src, int32_t srcsize, int32_t src_offset,
1556
100k
    int32_t nblock, uint8_t* dest, int32_t dest_offset, uint8_t* tmp, uint8_t* tmp2) {
1557
100k
  blosc2_context* context = thread_context->parent_context;
1558
100k
  uint8_t* filters = context->filters;
1559
100k
  uint8_t *tmp3 = thread_context->tmp4;
1560
100k
  int32_t compformat = (context->header_flags & (uint8_t)0xe0) >> 5u;
1561
100k
  int dont_split = (context->header_flags & 0x10) >> 4;
1562
100k
  int32_t chunk_nbytes;
1563
100k
  int32_t chunk_cbytes;
1564
100k
  int nstreams;
1565
100k
  int32_t neblock;
1566
100k
  int32_t nbytes;                /* number of decompressed bytes in split */
1567
100k
  int32_t cbytes;                /* number of compressed bytes in split */
1568
  // int32_t ctbytes = 0;           /* number of compressed bytes in block */
1569
100k
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
1570
100k
  uint8_t* _dest;
1571
100k
  int32_t typesize = context->typesize;
1572
100k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1573
100k
  const char* compname;
1574
100k
  int rc;
1575
1576
100k
  if (context->block_maskout != NULL && context->block_maskout[nblock]) {
1577
    // Do not decompress, but act as if we successfully decompressed everything
1578
0
    return bsize;
1579
0
  }
1580
1581
100k
  rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, NULL);
1582
100k
  if (rc < 0) {
1583
0
    return rc;
1584
0
  }
1585
1586
  // In some situations (lazychunks) the context can arrive uninitialized
1587
  // (but BITSHUFFLE needs it for accessing the format of the chunk)
1588
100k
  if (context->src == NULL) {
1589
0
    context->src = src;
1590
0
  }
1591
1592
  // Chunks with special values cannot be lazy
1593
100k
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
1594
100k
          (context->blosc2_flags & 0x08u) && !context->special_type);
1595
100k
  if (is_lazy) {
1596
    // The chunk is on disk, so just lazily load the block
1597
2
    if (context->schunk == NULL) {
1598
2
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk.");
1599
0
      return BLOSC2_ERROR_INVALID_PARAM;
1600
2
    }
1601
0
    if (context->schunk->frame == NULL) {
1602
0
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated frame.");
1603
0
      return BLOSC2_ERROR_INVALID_PARAM;
1604
0
    }
1605
0
    blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
1606
0
    char* urlpath = frame->urlpath;
1607
0
    size_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH + context->nblocks * sizeof(int32_t);
1608
0
    int32_t nchunk;
1609
0
    int64_t chunk_offset;
1610
    // The nchunk and the offset of the current chunk are in the trailer
1611
0
    nchunk = *(int32_t*)(src + trailer_offset);
1612
0
    chunk_offset = *(int64_t*)(src + trailer_offset + sizeof(int32_t));
1613
    // Get the csize of the nblock
1614
0
    int32_t *block_csizes = (int32_t *)(src + trailer_offset + sizeof(int32_t) + sizeof(int64_t));
1615
0
    int32_t block_csize = block_csizes[nblock];
1616
    // Read the lazy block on disk
1617
0
    void* fp = NULL;
1618
0
    blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
1619
0
    if (io_cb == NULL) {
1620
0
      BLOSC_TRACE_ERROR("Error getting the input/output API");
1621
0
      return BLOSC2_ERROR_PLUGIN_IO;
1622
0
    }
1623
1624
0
    if (frame->sframe) {
1625
      // The chunk is not in the frame
1626
0
      char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
1627
0
      BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
1628
0
      sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk);
1629
0
      fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
1630
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1631
0
      free(chunkpath);
1632
      // The offset of the block is src_offset
1633
0
      io_cb->seek(fp, src_offset, SEEK_SET);
1634
0
    }
1635
0
    else {
1636
0
      fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
1637
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1638
      // The offset of the block is src_offset
1639
0
      io_cb->seek(fp, frame->file_offset + chunk_offset + src_offset, SEEK_SET);
1640
0
    }
1641
    // We can make use of tmp3 because it will be used after src is not needed anymore
1642
0
    int64_t rbytes = io_cb->read(tmp3, 1, block_csize, fp);
1643
0
    io_cb->close(fp);
1644
0
    if ((int32_t)rbytes != block_csize) {
1645
0
      BLOSC_TRACE_ERROR("Cannot read the (lazy) block out of the fileframe.");
1646
0
      return BLOSC2_ERROR_READ_BUFFER;
1647
0
    }
1648
0
    src = tmp3;
1649
0
    src_offset = 0;
1650
0
    srcsize = block_csize;
1651
0
  }
1652
1653
  // If the chunk is memcpyed, we just have to copy the block to dest and return
1654
100k
  if (memcpyed) {
1655
14.7k
    int bsize_ = leftoverblock ? chunk_nbytes % context->blocksize : bsize;
1656
14.7k
    if (!context->special_type) {
1657
4.60k
      if (chunk_nbytes + context->header_overhead != chunk_cbytes) {
1658
0
        return BLOSC2_ERROR_WRITE_BUFFER;
1659
0
      }
1660
4.60k
      if (chunk_cbytes < context->header_overhead + (nblock * context->blocksize) + bsize_) {
1661
        /* Not enough input to copy block */
1662
0
        return BLOSC2_ERROR_READ_BUFFER;
1663
0
      }
1664
4.60k
    }
1665
14.7k
    if (!is_lazy) {
1666
14.7k
      src += context->header_overhead + nblock * context->blocksize;
1667
14.7k
    }
1668
14.7k
    _dest = dest + dest_offset;
1669
14.7k
    if (context->postfilter != NULL) {
1670
      // We are making use of a postfilter, so use a temp for destination
1671
0
      _dest = tmp;
1672
0
    }
1673
14.7k
    rc = 0;
1674
14.7k
    switch (context->special_type) {
1675
3.19k
      case BLOSC2_SPECIAL_VALUE:
1676
        // All repeated values
1677
3.19k
        rc = set_values(context->typesize, context->src, _dest, bsize_);
1678
3.19k
        if (rc < 0) {
1679
29
          BLOSC_TRACE_ERROR("set_values failed");
1680
0
          return BLOSC2_ERROR_DATA;
1681
29
        }
1682
3.16k
        break;
1683
3.16k
      case BLOSC2_SPECIAL_NAN:
1684
1.56k
        rc = set_nans(context->typesize, _dest, bsize_);
1685
1.56k
        if (rc < 0) {
1686
101
          BLOSC_TRACE_ERROR("set_nans failed");
1687
0
          return BLOSC2_ERROR_DATA;
1688
101
        }
1689
1.46k
        break;
1690
3.93k
      case BLOSC2_SPECIAL_ZERO:
1691
3.93k
        memset(_dest, 0, bsize_);
1692
3.93k
        break;
1693
1.44k
      case BLOSC2_SPECIAL_UNINIT:
1694
        // We do nothing here
1695
1.44k
        break;
1696
4.60k
      default:
1697
4.60k
        memcpy(_dest, src, bsize_);
1698
14.7k
    }
1699
14.6k
    if (context->postfilter != NULL) {
1700
      // Create new postfilter parameters for this block (must be private for each thread)
1701
0
      blosc2_postfilter_params postparams;
1702
0
      memcpy(&postparams, context->postparams, sizeof(postparams));
1703
0
      postparams.input = tmp;
1704
0
      postparams.output = dest + dest_offset;
1705
0
      postparams.size = bsize;
1706
0
      postparams.typesize = typesize;
1707
0
      postparams.offset = nblock * context->blocksize;
1708
0
      postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1709
0
      postparams.nblock = nblock;
1710
0
      postparams.tid = thread_context->tid;
1711
0
      postparams.ttmp = thread_context->tmp;
1712
0
      postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1713
0
      postparams.ctx = context;
1714
1715
      // Execute the postfilter (the processed block will be copied to dest)
1716
0
      if (context->postfilter(&postparams) != 0) {
1717
0
        BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1718
0
        return BLOSC2_ERROR_POSTFILTER;
1719
0
      }
1720
0
    }
1721
14.6k
    thread_context->zfp_cell_nitems = 0;
1722
1723
14.6k
    return bsize_;
1724
14.6k
  }
1725
1726
85.7k
  if (!is_lazy && (src_offset <= 0 || src_offset >= srcsize)) {
1727
    /* Invalid block src offset encountered */
1728
664
    return BLOSC2_ERROR_DATA;
1729
664
  }
1730
1731
85.0k
  src += src_offset;
1732
85.0k
  srcsize -= src_offset;
1733
1734
85.0k
  int last_filter_index = last_filter(filters, 'd');
1735
85.0k
  if (instr_codec) {
1736
    // If instrumented, we don't want to run the filters
1737
1.53k
    _dest = dest + dest_offset;
1738
1.53k
  }
1739
83.5k
  else if (((last_filter_index >= 0) &&
1740
83.5k
       (next_filter(filters, BLOSC2_MAX_FILTERS, 'd') != BLOSC_DELTA)) ||
1741
83.5k
    context->postfilter != NULL) {
1742
    // We are making use of some filter, so use a temp for destination
1743
49.0k
    _dest = tmp;
1744
49.0k
  }
1745
34.4k
  else {
1746
    // If no filters, or only DELTA in pipeline
1747
34.4k
    _dest = dest + dest_offset;
1748
34.4k
  }
1749
1750
  /* The number of compressed data streams for this block */
1751
85.0k
  if (!dont_split && !leftoverblock && !context->use_dict) {
1752
    // We don't want to split when in a training dict state
1753
5.52k
    nstreams = (int32_t)typesize;
1754
5.52k
  }
1755
79.5k
  else {
1756
79.5k
    nstreams = 1;
1757
79.5k
  }
1758
1759
85.0k
  neblock = bsize / nstreams;
1760
85.0k
  if (neblock == 0) {
1761
    /* Not enough space to output bytes */
1762
4
    BLOSC_ERROR(BLOSC2_ERROR_WRITE_BUFFER);
1763
4
  }
1764
164k
  for (int j = 0; j < nstreams; j++) {
1765
87.7k
    if (srcsize < (signed)sizeof(int32_t)) {
1766
      /* Not enough input to read compressed size */
1767
40
      return BLOSC2_ERROR_READ_BUFFER;
1768
40
    }
1769
87.6k
    srcsize -= sizeof(int32_t);
1770
87.6k
    cbytes = sw32_(src);      /* amount of compressed bytes */
1771
87.6k
    if (cbytes > 0) {
1772
80.8k
      if (srcsize < cbytes) {
1773
        /* Not enough input to read compressed bytes */
1774
198
        return BLOSC2_ERROR_READ_BUFFER;
1775
198
      }
1776
80.6k
      srcsize -= cbytes;
1777
80.6k
    }
1778
87.4k
    src += sizeof(int32_t);
1779
    // ctbytes += (signed)sizeof(int32_t);
1780
1781
    /* Uncompress */
1782
87.4k
    if (cbytes == 0) {
1783
      // A run of 0's
1784
4.20k
      memset(_dest, 0, (unsigned int)neblock);
1785
4.20k
      nbytes = neblock;
1786
4.20k
    }
1787
83.2k
    else if (cbytes < 0) {
1788
      // A negative number means some encoding depending on the token that comes next
1789
2.57k
      uint8_t token;
1790
1791
2.57k
      if (srcsize < (signed)sizeof(uint8_t)) {
1792
        // Not enough input to read token */
1793
76
        return BLOSC2_ERROR_READ_BUFFER;
1794
76
      }
1795
2.50k
      srcsize -= sizeof(uint8_t);
1796
1797
2.50k
      token = src[0];
1798
2.50k
      src += 1;
1799
      // ctbytes += 1;
1800
1801
2.50k
      if (token & 0x1) {
1802
        // A run of bytes that are different than 0
1803
2.44k
        if (cbytes < -255) {
1804
          // Runs can only encode a byte
1805
182
          return BLOSC2_ERROR_RUN_LENGTH;
1806
182
        }
1807
2.26k
        uint8_t value = -cbytes;
1808
2.26k
        memset(_dest, value, (unsigned int)neblock);
1809
2.26k
      } else {
1810
60
        BLOSC_TRACE_ERROR("Invalid or unsupported compressed stream token value - %d", token);
1811
0
        return BLOSC2_ERROR_RUN_LENGTH;
1812
60
      }
1813
2.26k
      nbytes = neblock;
1814
2.26k
      cbytes = 0;  // everything is encoded in the cbytes token
1815
2.26k
    }
1816
80.6k
    else if (cbytes == neblock) {
1817
35.5k
      memcpy(_dest, src, (unsigned int)neblock);
1818
35.5k
      nbytes = (int32_t)neblock;
1819
35.5k
    }
1820
45.1k
    else {
1821
45.1k
      if (compformat == BLOSC_BLOSCLZ_FORMAT) {
1822
12.0k
        nbytes = blosclz_decompress(src, cbytes, _dest, (int)neblock);
1823
12.0k
      }
1824
33.0k
      else if (compformat == BLOSC_LZ4_FORMAT) {
1825
6.45k
        nbytes = lz4_wrap_decompress((char*)src, (size_t)cbytes,
1826
6.45k
                                     (char*)_dest, (size_t)neblock);
1827
6.45k
      }
1828
26.5k
  #if defined(HAVE_ZLIB)
1829
26.5k
      else if (compformat == BLOSC_ZLIB_FORMAT) {
1830
9.08k
        nbytes = zlib_wrap_decompress((char*)src, (size_t)cbytes,
1831
9.08k
                                      (char*)_dest, (size_t)neblock);
1832
9.08k
      }
1833
17.5k
  #endif /*  HAVE_ZLIB */
1834
17.5k
  #if defined(HAVE_ZSTD)
1835
17.5k
      else if (compformat == BLOSC_ZSTD_FORMAT) {
1836
17.0k
        nbytes = zstd_wrap_decompress(thread_context,
1837
17.0k
                                      (char*)src, (size_t)cbytes,
1838
17.0k
                                      (char*)_dest, (size_t)neblock);
1839
17.0k
      }
1840
412
  #endif /*  HAVE_ZSTD */
1841
412
      else if (compformat == BLOSC_UDCODEC_FORMAT) {
1842
396
        bool getcell = false;
1843
1844
396
#if defined(HAVE_PLUGINS)
1845
396
        if ((context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) &&
1846
396
            (thread_context->zfp_cell_nitems > 0)) {
1847
0
          nbytes = zfp_getcell(thread_context, src, cbytes, _dest, neblock);
1848
0
          if (nbytes < 0) {
1849
0
            return BLOSC2_ERROR_DATA;
1850
0
          }
1851
0
          if (nbytes == thread_context->zfp_cell_nitems * typesize) {
1852
0
            getcell = true;
1853
0
          }
1854
0
        }
1855
396
#endif /* HAVE_PLUGINS */
1856
396
        if (!getcell) {
1857
396
          thread_context->zfp_cell_nitems = 0;
1858
451
          for (int i = 0; i < g_ncodecs; ++i) {
1859
442
            if (g_codecs[i].compcode == context->compcode) {
1860
387
              if (g_codecs[i].decoder == NULL) {
1861
                // Dynamically load codec plugin
1862
1
                if (fill_codec(&g_codecs[i]) < 0) {
1863
1
                  BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1864
0
                  return BLOSC2_ERROR_CODEC_SUPPORT;
1865
1
                }
1866
1
              }
1867
386
              blosc2_dparams dparams;
1868
386
              blosc2_ctx_get_dparams(context, &dparams);
1869
386
              nbytes = g_codecs[i].decoder(src,
1870
386
                                           cbytes,
1871
386
                                           _dest,
1872
386
                                           neblock,
1873
386
                                           context->compcode_meta,
1874
386
                                           &dparams,
1875
386
                                           context->src);
1876
386
              goto urcodecsuccess;
1877
387
            }
1878
442
          }
1879
9
          BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during decompression", context->compcode);
1880
0
          return BLOSC2_ERROR_CODEC_SUPPORT;
1881
396
        }
1882
386
      urcodecsuccess:
1883
386
        ;
1884
386
      }
1885
16
      else {
1886
16
        compname = clibcode_to_clibname(compformat);
1887
16
        BLOSC_TRACE_ERROR(
1888
0
                "Blosc has not been compiled with decompression "
1889
0
                "support for '%s' format.  "
1890
0
                "Please recompile for adding this support.", compname);
1891
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
1892
16
      }
1893
1894
      /* Check that decompressed bytes number is correct */
1895
45.1k
      if ((nbytes != neblock) && (thread_context->zfp_cell_nitems == 0)) {
1896
7.97k
        return BLOSC2_ERROR_DATA;
1897
7.97k
      }
1898
1899
45.1k
    }
1900
79.1k
    src += cbytes;
1901
    // ctbytes += cbytes;
1902
79.1k
    _dest += nbytes;
1903
79.1k
    ntbytes += nbytes;
1904
79.1k
  } /* Closes j < nstreams */
1905
1906
76.5k
  if (!instr_codec) {
1907
75.6k
    if (last_filter_index >= 0 || context->postfilter != NULL) {
1908
      /* Apply regular filter pipeline */
1909
47.2k
      int errcode = pipeline_backward(thread_context, bsize, dest, dest_offset, tmp, tmp2, tmp3,
1910
47.2k
                                      last_filter_index, nblock);
1911
47.2k
      if (errcode < 0)
1912
438
        return errcode;
1913
47.2k
    }
1914
75.6k
  }
1915
1916
  /* Return the number of uncompressed bytes */
1917
76.0k
  return (int)ntbytes;
1918
76.5k
}
1919
1920
1921
/* Serial version for compression/decompression */
1922
108k
static int serial_blosc(struct thread_context* thread_context) {
1923
108k
  blosc2_context* context = thread_context->parent_context;
1924
108k
  int32_t j, bsize, leftoverblock;
1925
108k
  int32_t cbytes;
1926
108k
  int32_t ntbytes = context->output_bytes;
1927
108k
  int32_t* bstarts = context->bstarts;
1928
108k
  uint8_t* tmp = thread_context->tmp;
1929
108k
  uint8_t* tmp2 = thread_context->tmp2;
1930
108k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
1931
108k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1932
108k
  if (!context->do_compress && context->special_type) {
1933
    // Fake a runlen as if it was a memcpyed chunk
1934
1.14k
    memcpyed = true;
1935
1.14k
  }
1936
1937
439k
  for (j = 0; j < context->nblocks; j++) {
1938
362k
    if (context->do_compress && !memcpyed && !dict_training) {
1939
245k
      _sw32(bstarts + j, ntbytes);
1940
245k
    }
1941
362k
    bsize = context->blocksize;
1942
362k
    leftoverblock = 0;
1943
362k
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
1944
8.97k
      bsize = context->leftover;
1945
8.97k
      leftoverblock = 1;
1946
8.97k
    }
1947
362k
    if (context->do_compress) {
1948
261k
      if (memcpyed && !context->prefilter) {
1949
        /* We want to memcpy only */
1950
16.0k
        memcpy(context->dest + context->header_overhead + j * context->blocksize,
1951
16.0k
               context->src + j * context->blocksize, (unsigned int)bsize);
1952
16.0k
        cbytes = (int32_t)bsize;
1953
16.0k
      }
1954
245k
      else {
1955
        /* Regular compression */
1956
245k
        cbytes = blosc_c(thread_context, bsize, leftoverblock, ntbytes,
1957
245k
                         context->destsize, context->src, j * context->blocksize,
1958
245k
                         context->dest + ntbytes, tmp, tmp2);
1959
245k
        if (cbytes == 0) {
1960
21.0k
          ntbytes = 0;              /* incompressible data */
1961
21.0k
          break;
1962
21.0k
        }
1963
245k
      }
1964
261k
    }
1965
100k
    else {
1966
      /* Regular decompression */
1967
      // If memcpyed we don't have a bstarts section (because it is not needed)
1968
100k
      int32_t src_offset = memcpyed ?
1969
85.7k
          context->header_overhead + j * context->blocksize : sw32_(bstarts + j);
1970
100k
      cbytes = blosc_d(thread_context, bsize, leftoverblock, memcpyed,
1971
100k
                       context->src, context->srcsize, src_offset, j,
1972
100k
                       context->dest, j * context->blocksize, tmp, tmp2);
1973
100k
    }
1974
1975
341k
    if (cbytes < 0) {
1976
9.79k
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
1977
9.79k
      break;
1978
9.79k
    }
1979
331k
    ntbytes += cbytes;
1980
331k
  }
1981
1982
108k
  return ntbytes;
1983
108k
}
1984
1985
static void t_blosc_do_job(void *ctxt);
1986
1987
/* Threaded version for compression/decompression */
1988
0
static int parallel_blosc(blosc2_context* context) {
1989
0
#ifdef BLOSC_POSIX_BARRIERS
1990
0
  int rc;
1991
0
#endif
1992
  /* Set sentinels */
1993
0
  context->thread_giveup_code = 1;
1994
0
  context->thread_nblock = -1;
1995
1996
0
  if (threads_callback) {
1997
0
    threads_callback(threads_callback_data, t_blosc_do_job,
1998
0
                     context->nthreads, sizeof(struct thread_context), (void*) context->thread_contexts);
1999
0
  }
2000
0
  else {
2001
    /* Synchronization point for all threads (wait for initialization) */
2002
0
    WAIT_INIT(-1, context);
2003
2004
    /* Synchronization point for all threads (wait for finalization) */
2005
0
    WAIT_FINISH(-1, context);
2006
0
  }
2007
2008
0
  if (context->thread_giveup_code <= 0) {
2009
    /* Compression/decompression gave up.  Return error code. */
2010
0
    return context->thread_giveup_code;
2011
0
  }
2012
2013
  /* Return the total bytes (de-)compressed in threads */
2014
0
  return (int)context->output_bytes;
2015
0
}
2016
2017
/* initialize a thread_context that has already been allocated */
2018
static int init_thread_context(struct thread_context* thread_context, blosc2_context* context, int32_t tid)
2019
17.8k
{
2020
17.8k
  int32_t ebsize;
2021
2022
17.8k
  thread_context->parent_context = context;
2023
17.8k
  thread_context->tid = tid;
2024
2025
17.8k
  ebsize = context->blocksize + context->typesize * (signed)sizeof(int32_t);
2026
17.8k
  thread_context->tmp_nbytes = (size_t)4 * ebsize;
2027
17.8k
  thread_context->tmp = my_malloc(thread_context->tmp_nbytes);
2028
17.8k
  BLOSC_ERROR_NULL(thread_context->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
2029
17.8k
  thread_context->tmp2 = thread_context->tmp + ebsize;
2030
17.8k
  thread_context->tmp3 = thread_context->tmp2 + ebsize;
2031
17.8k
  thread_context->tmp4 = thread_context->tmp3 + ebsize;
2032
17.8k
  thread_context->tmp_blocksize = context->blocksize;
2033
17.8k
  thread_context->zfp_cell_nitems = 0;
2034
17.8k
  thread_context->zfp_cell_start = 0;
2035
17.8k
  #if defined(HAVE_ZSTD)
2036
17.8k
  thread_context->zstd_cctx = NULL;
2037
17.8k
  thread_context->zstd_dctx = NULL;
2038
17.8k
  #endif
2039
2040
  /* Create the hash table for LZ4 in case we are using IPP */
2041
#ifdef HAVE_IPP
2042
  IppStatus status;
2043
  int inlen = thread_context->tmp_blocksize > 0 ? thread_context->tmp_blocksize : 1 << 16;
2044
  int hash_size = 0;
2045
  status = ippsEncodeLZ4HashTableGetSize_8u(&hash_size);
2046
  if (status != ippStsNoErr) {
2047
      BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableGetSize_8u.");
2048
  }
2049
  Ipp8u *hash_table = ippsMalloc_8u(hash_size);
2050
  status = ippsEncodeLZ4HashTableInit_8u(hash_table, inlen);
2051
  if (status != ippStsNoErr) {
2052
    BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableInit_8u.");
2053
  }
2054
  thread_context->lz4_hash_table = hash_table;
2055
#endif
2056
17.8k
  return 0;
2057
17.8k
}
2058
2059
static struct thread_context*
2060
17.8k
create_thread_context(blosc2_context* context, int32_t tid) {
2061
17.8k
  struct thread_context* thread_context;
2062
17.8k
  thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
2063
17.8k
  BLOSC_ERROR_NULL(thread_context, NULL);
2064
17.8k
  int rc = init_thread_context(thread_context, context, tid);
2065
17.8k
  if (rc < 0) {
2066
0
    return NULL;
2067
0
  }
2068
17.8k
  return thread_context;
2069
17.8k
}
2070
2071
/* free members of thread_context, but not thread_context itself */
2072
17.8k
static void destroy_thread_context(struct thread_context* thread_context) {
2073
17.8k
  my_free(thread_context->tmp);
2074
17.8k
#if defined(HAVE_ZSTD)
2075
17.8k
  if (thread_context->zstd_cctx != NULL) {
2076
1.07k
    ZSTD_freeCCtx(thread_context->zstd_cctx);
2077
1.07k
  }
2078
17.8k
  if (thread_context->zstd_dctx != NULL) {
2079
5.44k
    ZSTD_freeDCtx(thread_context->zstd_dctx);
2080
5.44k
  }
2081
17.8k
#endif
2082
#ifdef HAVE_IPP
2083
  if (thread_context->lz4_hash_table != NULL) {
2084
    ippsFree(thread_context->lz4_hash_table);
2085
  }
2086
#endif
2087
17.8k
}
2088
2089
17.8k
void free_thread_context(struct thread_context* thread_context) {
2090
17.8k
  destroy_thread_context(thread_context);
2091
17.8k
  my_free(thread_context);
2092
17.8k
}
2093
2094
2095
108k
int check_nthreads(blosc2_context* context) {
2096
108k
  if (context->nthreads <= 0) {
2097
0
    BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2098
0
    return BLOSC2_ERROR_INVALID_PARAM;
2099
0
  }
2100
2101
108k
  if (context->new_nthreads != context->nthreads) {
2102
0
    if (context->nthreads > 1) {
2103
0
      release_threadpool(context);
2104
0
    }
2105
0
    context->nthreads = context->new_nthreads;
2106
0
  }
2107
108k
  if (context->new_nthreads > 1 && context->threads_started == 0) {
2108
0
    init_threadpool(context);
2109
0
  }
2110
2111
108k
  return context->nthreads;
2112
108k
}
2113
2114
/* Do the compression or decompression of the buffer depending on the
2115
   global params. */
2116
108k
static int do_job(blosc2_context* context) {
2117
108k
  int32_t ntbytes;
2118
2119
  /* Set sentinels */
2120
108k
  context->dref_not_init = 1;
2121
2122
  /* Check whether we need to restart threads */
2123
108k
  check_nthreads(context);
2124
2125
  /* Run the serial version when nthreads is 1 or when the buffers are
2126
     not larger than blocksize */
2127
108k
  if (context->nthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
2128
    /* The context for this 'thread' has no been initialized yet */
2129
108k
    if (context->serial_context == NULL) {
2130
16.0k
      context->serial_context = create_thread_context(context, 0);
2131
16.0k
    }
2132
92.1k
    else if (context->blocksize != context->serial_context->tmp_blocksize) {
2133
1.84k
      free_thread_context(context->serial_context);
2134
1.84k
      context->serial_context = create_thread_context(context, 0);
2135
1.84k
    }
2136
108k
    BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
2137
108k
    ntbytes = serial_blosc(context->serial_context);
2138
108k
  }
2139
0
  else {
2140
0
    ntbytes = parallel_blosc(context);
2141
0
  }
2142
2143
108k
  return ntbytes;
2144
108k
}
2145
2146
2147
static int initialize_context_compression(
2148
        blosc2_context* context, const void* src, int32_t srcsize, void* dest,
2149
        int32_t destsize, int clevel, uint8_t const *filters,
2150
        uint8_t const *filters_meta, int32_t typesize, int compressor,
2151
        int32_t blocksize, int16_t new_nthreads, int16_t nthreads,
2152
        int32_t splitmode,
2153
        int tuner_id, void *tuner_params,
2154
64.1k
        blosc2_schunk* schunk) {
2155
2156
  /* Set parameters */
2157
64.1k
  context->do_compress = 1;
2158
64.1k
  context->src = (const uint8_t*)src;
2159
64.1k
  context->srcsize = srcsize;
2160
64.1k
  context->dest = (uint8_t*)dest;
2161
64.1k
  context->output_bytes = 0;
2162
64.1k
  context->destsize = destsize;
2163
64.1k
  context->sourcesize = srcsize;
2164
64.1k
  context->typesize = (int32_t)typesize;
2165
64.1k
  context->filter_flags = filters_to_flags(filters);
2166
449k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
2167
384k
    context->filters[i] = filters[i];
2168
384k
    context->filters_meta[i] = filters_meta[i];
2169
384k
  }
2170
64.1k
  context->compcode = compressor;
2171
64.1k
  context->nthreads = nthreads;
2172
64.1k
  context->new_nthreads = new_nthreads;
2173
64.1k
  context->end_threads = 0;
2174
64.1k
  context->clevel = clevel;
2175
64.1k
  context->schunk = schunk;
2176
64.1k
  context->tuner_params = tuner_params;
2177
64.1k
  context->tuner_id = tuner_id;
2178
64.1k
  context->splitmode = splitmode;
2179
  /* tuner some compression parameters */
2180
64.1k
  context->blocksize = (int32_t)blocksize;
2181
64.1k
  int rc = 0;
2182
64.1k
  if (context->tuner_params != NULL) {
2183
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2184
0
      if (blosc_stune_next_cparams(context) < 0) {
2185
0
        BLOSC_TRACE_ERROR("Error in stune next_cparams func\n");
2186
0
        return BLOSC2_ERROR_TUNER;
2187
0
      }
2188
0
    } else {
2189
0
      for (int i = 0; i < g_ntuners; ++i) {
2190
0
        if (g_tuners[i].id == context->tuner_id) {
2191
0
          if (g_tuners[i].next_cparams == NULL) {
2192
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2193
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2194
0
              return BLOSC2_ERROR_FAILURE;
2195
0
            }
2196
0
          }
2197
0
          if (g_tuners[i].next_cparams(context) < 0) {
2198
0
            BLOSC_TRACE_ERROR("Error in tuner %d next_cparams func\n", context->tuner_id);
2199
0
            return BLOSC2_ERROR_TUNER;
2200
0
          }
2201
0
          if (g_tuners[i].id == BLOSC_BTUNE && context->blocksize == 0) {
2202
            // Call stune for initializing blocksize
2203
0
            if (blosc_stune_next_blocksize(context) < 0) {
2204
0
              BLOSC_TRACE_ERROR("Error in stune next_blocksize func\n");
2205
0
              return BLOSC2_ERROR_TUNER;
2206
0
            }
2207
0
          }
2208
0
          goto urtunersuccess;
2209
0
        }
2210
0
      }
2211
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2212
0
      return BLOSC2_ERROR_INVALID_PARAM;
2213
0
    }
2214
64.1k
  } else {
2215
64.1k
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2216
64.1k
      rc = blosc_stune_next_blocksize(context);
2217
64.1k
    } else {
2218
0
      for (int i = 0; i < g_ntuners; ++i) {
2219
0
        if (g_tuners[i].id == context->tuner_id) {
2220
0
          if (g_tuners[i].next_blocksize == NULL) {
2221
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2222
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2223
0
              return BLOSC2_ERROR_FAILURE;
2224
0
            }
2225
0
          }
2226
0
          rc = g_tuners[i].next_blocksize(context);
2227
0
          goto urtunersuccess;
2228
0
        }
2229
0
      }
2230
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2231
0
      return BLOSC2_ERROR_INVALID_PARAM;
2232
0
    }
2233
64.1k
  }
2234
64.1k
  urtunersuccess:;
2235
64.1k
  if (rc < 0) {
2236
0
    BLOSC_TRACE_ERROR("Error in tuner next_blocksize func\n");
2237
0
    return BLOSC2_ERROR_TUNER;
2238
0
  }
2239
2240
2241
  /* Check buffer size limits */
2242
64.1k
  if (srcsize > BLOSC2_MAX_BUFFERSIZE) {
2243
0
    BLOSC_TRACE_ERROR("Input buffer size cannot exceed %d bytes.",
2244
0
                      BLOSC2_MAX_BUFFERSIZE);
2245
0
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2246
0
  }
2247
2248
64.1k
  if (destsize < BLOSC2_MAX_OVERHEAD) {
2249
27
    BLOSC_TRACE_ERROR("Output buffer size should be larger than %d bytes.",
2250
0
                      BLOSC2_MAX_OVERHEAD);
2251
0
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2252
27
  }
2253
2254
  /* Compression level */
2255
64.1k
  if (clevel < 0 || clevel > 9) {
2256
    /* If clevel not in 0..9, print an error */
2257
0
    BLOSC_TRACE_ERROR("`clevel` parameter must be between 0 and 9!.");
2258
0
    return BLOSC2_ERROR_CODEC_PARAM;
2259
0
  }
2260
2261
  /* Check typesize limits */
2262
64.1k
  if (context->typesize > BLOSC_MAX_TYPESIZE) {
2263
    /* If typesize is too large, treat buffer as an 1-byte stream. */
2264
0
    context->typesize = 1;
2265
0
  }
2266
2267
64.1k
  blosc2_calculate_blocks(context);
2268
2269
64.1k
  return 1;
2270
64.1k
}
2271
2272
2273
static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
2274
29.3k
                                            int32_t srcsize, void* dest, int32_t destsize) {
2275
29.3k
  int32_t bstarts_end;
2276
2277
29.3k
  context->do_compress = 0;
2278
29.3k
  context->src = (const uint8_t*)src;
2279
29.3k
  context->srcsize = srcsize;
2280
29.3k
  context->dest = (uint8_t*)dest;
2281
29.3k
  context->destsize = destsize;
2282
29.3k
  context->output_bytes = 0;
2283
29.3k
  context->end_threads = 0;
2284
2285
29.3k
  int rc = blosc2_initialize_context_from_header(context, header);
2286
29.3k
  if (rc < 0) {
2287
0
    return rc;
2288
0
  }
2289
2290
  /* Check that we have enough space to decompress */
2291
29.3k
  if (context->sourcesize > (int32_t)context->destsize) {
2292
0
    return BLOSC2_ERROR_WRITE_BUFFER;
2293
0
  }
2294
2295
29.3k
  if (context->block_maskout != NULL && context->block_maskout_nitems != context->nblocks) {
2296
0
    BLOSC_TRACE_ERROR("The number of items in block_maskout (%d) must match the number"
2297
0
                      " of blocks in chunk (%d).",
2298
0
                      context->block_maskout_nitems, context->nblocks);
2299
0
    return BLOSC2_ERROR_DATA;
2300
0
  }
2301
2302
29.3k
  context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
2303
29.3k
  if (context->special_type > BLOSC2_SPECIAL_LASTID) {
2304
1
    BLOSC_TRACE_ERROR("Unknown special values ID (%d) ",
2305
0
                      context->special_type);
2306
0
    return BLOSC2_ERROR_DATA;
2307
1
  }
2308
2309
29.3k
  int memcpyed = (context->header_flags & (uint8_t) BLOSC_MEMCPYED);
2310
29.3k
  if (memcpyed && (header->cbytes != header->nbytes + context->header_overhead)) {
2311
20
    BLOSC_TRACE_ERROR("Wrong header info for this memcpyed chunk");
2312
0
    return BLOSC2_ERROR_DATA;
2313
20
  }
2314
2315
29.3k
  if ((header->nbytes == 0) && (header->cbytes == context->header_overhead) &&
2316
29.3k
      !context->special_type) {
2317
    // A compressed buffer with only a header can only contain a zero-length buffer
2318
0
    return 0;
2319
0
  }
2320
2321
29.3k
  context->bstarts = (int32_t *) (context->src + context->header_overhead);
2322
29.3k
  bstarts_end = context->header_overhead;
2323
29.3k
  if (!context->special_type && !memcpyed) {
2324
    /* If chunk is not special or a memcpyed, we do have a bstarts section */
2325
24.5k
    bstarts_end = (int32_t)(context->header_overhead + (context->nblocks * sizeof(int32_t)));
2326
24.5k
  }
2327
2328
29.3k
  if (srcsize < bstarts_end) {
2329
30
    BLOSC_TRACE_ERROR("`bstarts` exceeds length of source buffer.");
2330
0
    return BLOSC2_ERROR_READ_BUFFER;
2331
30
  }
2332
29.2k
  srcsize -= bstarts_end;
2333
2334
  /* Read optional dictionary if flag set */
2335
29.2k
  if (context->blosc2_flags & BLOSC2_USEDICT) {
2336
3.51k
#if defined(HAVE_ZSTD)
2337
3.51k
    context->use_dict = 1;
2338
3.51k
    if (context->dict_ddict != NULL) {
2339
      // Free the existing dictionary (probably from another chunk)
2340
0
      ZSTD_freeDDict(context->dict_ddict);
2341
0
    }
2342
    // The trained dictionary is after the bstarts block
2343
3.51k
    if (srcsize < (signed)sizeof(int32_t)) {
2344
3
      BLOSC_TRACE_ERROR("Not enough space to read size of dictionary.");
2345
0
      return BLOSC2_ERROR_READ_BUFFER;
2346
3
    }
2347
3.50k
    srcsize -= sizeof(int32_t);
2348
    // Read dictionary size
2349
3.50k
    context->dict_size = sw32_(context->src + bstarts_end);
2350
3.50k
    if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2351
56
      BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2352
0
      return BLOSC2_ERROR_CODEC_DICT;
2353
56
    }
2354
3.45k
    if (srcsize < (int32_t)context->dict_size) {
2355
27
      BLOSC_TRACE_ERROR("Not enough space to read entire dictionary.");
2356
0
      return BLOSC2_ERROR_READ_BUFFER;
2357
27
    }
2358
3.42k
    srcsize -= context->dict_size;
2359
    // Read dictionary
2360
3.42k
    context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2361
3.42k
    context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2362
3.42k
#endif   // HAVE_ZSTD
2363
3.42k
  }
2364
2365
29.1k
  return 0;
2366
29.2k
}
2367
2368
64.1k
static int write_compression_header(blosc2_context* context, bool extended_header) {
2369
64.1k
  blosc_header header;
2370
64.1k
  int dont_split;
2371
64.1k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
2372
2373
64.1k
  context->header_flags = 0;
2374
2375
64.1k
  if (context->clevel == 0) {
2376
    /* Compression level 0 means buffer to be memcpy'ed */
2377
367
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2378
367
  }
2379
64.1k
  if (context->sourcesize < BLOSC_MIN_BUFFERSIZE) {
2380
    /* Buffer is too small.  Try memcpy'ing. */
2381
140
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2382
140
  }
2383
2384
64.1k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2385
64.1k
  if (extended_header) {
2386
    /* Indicate that we are building an extended header */
2387
64.1k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
2388
64.1k
    context->header_flags |= (BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE);
2389
    /* Store filter pipeline info at the end of the header */
2390
64.1k
    if (dict_training || memcpyed) {
2391
498
      context->bstarts = NULL;
2392
498
      context->output_bytes = context->header_overhead;
2393
63.6k
    } else {
2394
63.6k
      context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2395
63.6k
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2396
63.6k
    }
2397
64.1k
  } else {
2398
    // Regular header
2399
0
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
2400
0
    if (memcpyed) {
2401
0
      context->bstarts = NULL;
2402
0
      context->output_bytes = context->header_overhead;
2403
0
    } else {
2404
0
      context->bstarts = (int32_t *) (context->dest + context->header_overhead);
2405
0
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2406
0
    }
2407
0
  }
2408
2409
  // when memcpyed bit is set, there is no point in dealing with others
2410
64.1k
  if (!memcpyed) {
2411
63.6k
    if (context->filter_flags & BLOSC_DOSHUFFLE) {
2412
      /* Byte-shuffle is active */
2413
21.8k
      context->header_flags |= BLOSC_DOSHUFFLE;
2414
21.8k
    }
2415
2416
63.6k
    if (context->filter_flags & BLOSC_DOBITSHUFFLE) {
2417
      /* Bit-shuffle is active */
2418
10.9k
      context->header_flags |= BLOSC_DOBITSHUFFLE;
2419
10.9k
    }
2420
2421
63.6k
    if (context->filter_flags & BLOSC_DODELTA) {
2422
      /* Delta is active */
2423
0
      context->header_flags |= BLOSC_DODELTA;
2424
0
    }
2425
2426
63.6k
    dont_split = !split_block(context, context->typesize,
2427
63.6k
                              context->blocksize);
2428
2429
    /* dont_split is in bit 4 */
2430
63.6k
    context->header_flags |= dont_split << 4;
2431
    /* codec starts at bit 5 */
2432
63.6k
    uint8_t compformat = compcode_to_compformat(context->compcode);
2433
63.6k
    context->header_flags |= compformat << 5;
2434
63.6k
  }
2435
2436
  // Create blosc header and store to dest
2437
64.1k
  blosc2_intialize_header_from_context(context, &header, extended_header);
2438
2439
64.1k
  memcpy(context->dest, &header, (extended_header) ?
2440
64.1k
    BLOSC_EXTENDED_HEADER_LENGTH : BLOSC_MIN_HEADER_LENGTH);
2441
2442
64.1k
  return 1;
2443
64.1k
}
2444
2445
2446
64.1k
static int blosc_compress_context(blosc2_context* context) {
2447
64.1k
  int ntbytes = 0;
2448
64.1k
  blosc_timestamp_t last, current;
2449
64.1k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2450
2451
64.1k
  blosc_set_timestamp(&last);
2452
2453
64.1k
  if (!memcpyed) {
2454
    /* Do the actual compression */
2455
63.6k
    ntbytes = do_job(context);
2456
63.6k
    if (ntbytes < 0) {
2457
0
      return ntbytes;
2458
0
    }
2459
63.6k
    if (ntbytes == 0) {
2460
      // Try out with a memcpy later on (last chance for fitting src buffer in dest).
2461
21.0k
      context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2462
21.0k
      memcpyed = true;
2463
21.0k
    }
2464
63.6k
  }
2465
2466
64.1k
  int dont_split = (context->header_flags & 0x10) >> 4;
2467
64.1k
  int nstreams = context->nblocks;
2468
64.1k
  if (!dont_split) {
2469
    // When splitting, the number of streams is computed differently
2470
22.3k
    if (context->leftover) {
2471
265
      nstreams = (context->nblocks - 1) * context->typesize + 1;
2472
265
    }
2473
22.0k
    else {
2474
22.0k
      nstreams *= context->typesize;
2475
22.0k
    }
2476
22.3k
  }
2477
2478
64.1k
  if (memcpyed) {
2479
21.5k
    if (context->sourcesize + context->header_overhead > context->destsize) {
2480
      /* We are exceeding maximum output size */
2481
6.23k
      ntbytes = 0;
2482
6.23k
    }
2483
15.3k
    else {
2484
15.3k
      context->output_bytes = context->header_overhead;
2485
15.3k
      ntbytes = do_job(context);
2486
15.3k
      if (ntbytes < 0) {
2487
0
        return ntbytes;
2488
0
      }
2489
      // Success!  update the memcpy bit in header
2490
15.3k
      context->dest[BLOSC2_CHUNK_FLAGS] = context->header_flags;
2491
      // and clear the memcpy bit in context (for next reuse)
2492
15.3k
      context->header_flags &= ~(uint8_t)BLOSC_MEMCPYED;
2493
15.3k
    }
2494
21.5k
  }
2495
42.5k
  else {
2496
    // Check whether we have a run for the whole chunk
2497
42.5k
    int start_csizes = context->header_overhead + 4 * context->nblocks;
2498
42.5k
    if (ntbytes == (int)(start_csizes + nstreams * sizeof(int32_t))) {
2499
      // The streams are all zero runs (by construction).  Encode it...
2500
1.55k
      context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] |= BLOSC2_SPECIAL_ZERO << 4;
2501
      // ...and assign the new chunk length
2502
1.55k
      ntbytes = context->header_overhead;
2503
1.55k
    }
2504
42.5k
  }
2505
2506
  /* Set the number of compressed bytes in header */
2507
64.1k
  _sw32(context->dest + BLOSC2_CHUNK_CBYTES, ntbytes);
2508
64.1k
  if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
2509
0
    dont_split = (context->header_flags & 0x10) >> 4;
2510
0
    int32_t blocksize = dont_split ? (int32_t)sizeof(blosc2_instr) : (int32_t)sizeof(blosc2_instr) * context->typesize;
2511
0
    _sw32(context->dest + BLOSC2_CHUNK_NBYTES, nstreams * (int32_t)sizeof(blosc2_instr));
2512
0
    _sw32(context->dest + BLOSC2_CHUNK_BLOCKSIZE, blocksize);
2513
0
  }
2514
2515
  /* Set the number of bytes in dest buffer (might be useful for tuner) */
2516
64.1k
  context->destsize = ntbytes;
2517
2518
64.1k
  if (context->tuner_params != NULL) {
2519
0
    blosc_set_timestamp(&current);
2520
0
    double ctime = blosc_elapsed_secs(last, current);
2521
0
    int rc;
2522
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2523
0
      rc = blosc_stune_update(context, ctime);
2524
0
    } else {
2525
0
      for (int i = 0; i < g_ntuners; ++i) {
2526
0
        if (g_tuners[i].id == context->tuner_id) {
2527
0
          if (g_tuners[i].update == NULL) {
2528
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2529
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2530
0
              return BLOSC2_ERROR_FAILURE;
2531
0
            }
2532
0
          }
2533
0
          rc = g_tuners[i].update(context, ctime);
2534
0
          goto urtunersuccess;
2535
0
        }
2536
0
      }
2537
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2538
0
      return BLOSC2_ERROR_INVALID_PARAM;
2539
0
      urtunersuccess:;
2540
0
    }
2541
0
    if (rc < 0) {
2542
0
      BLOSC_TRACE_ERROR("Error in tuner update func\n");
2543
0
      return BLOSC2_ERROR_TUNER;
2544
0
    }
2545
0
  }
2546
2547
64.1k
  return ntbytes;
2548
64.1k
}
2549
2550
2551
/* The public secure routine for compression with context. */
2552
int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2553
48.1k
                        void* dest, int32_t destsize) {
2554
48.1k
  int error, cbytes;
2555
2556
48.1k
  if (context->do_compress != 1) {
2557
0
    BLOSC_TRACE_ERROR("Context is not meant for compression.  Giving up.");
2558
0
    return BLOSC2_ERROR_INVALID_PARAM;
2559
0
  }
2560
2561
48.1k
  error = initialize_context_compression(
2562
48.1k
          context, src, srcsize, dest, destsize,
2563
48.1k
          context->clevel, context->filters, context->filters_meta,
2564
48.1k
          context->typesize, context->compcode, context->blocksize,
2565
48.1k
          context->new_nthreads, context->nthreads, context->splitmode,
2566
48.1k
          context->tuner_id, context->tuner_params, context->schunk);
2567
48.1k
  if (error <= 0) {
2568
0
    return error;
2569
0
  }
2570
2571
  /* Write the extended header */
2572
48.1k
  error = write_compression_header(context, true);
2573
48.1k
  if (error < 0) {
2574
0
    return error;
2575
0
  }
2576
2577
48.1k
  cbytes = blosc_compress_context(context);
2578
48.1k
  if (cbytes < 0) {
2579
0
    return cbytes;
2580
0
  }
2581
2582
48.1k
  if (context->use_dict && context->dict_cdict == NULL) {
2583
2584
0
    if (context->compcode != BLOSC_ZSTD) {
2585
0
      const char* compname;
2586
0
      compname = clibcode_to_clibname(context->compcode);
2587
0
      BLOSC_TRACE_ERROR("Codec %s does not support dicts.  Giving up.",
2588
0
                        compname);
2589
0
      return BLOSC2_ERROR_CODEC_DICT;
2590
0
    }
2591
2592
0
#ifdef HAVE_ZSTD
2593
    // Build the dictionary out of the filters outcome and compress with it
2594
0
    int32_t dict_maxsize = BLOSC2_MAXDICTSIZE;
2595
    // Do not make the dict more than 5% larger than uncompressed buffer
2596
0
    if (dict_maxsize > srcsize / 20) {
2597
0
      dict_maxsize = srcsize / 20;
2598
0
    }
2599
0
    void* samples_buffer = context->dest + context->header_overhead;
2600
0
    unsigned nblocks = 8;  // the minimum that accepts zstd as of 1.4.0
2601
0
    unsigned sample_fraction = 1;  // 1 allows to use most of the chunk for training
2602
0
    size_t sample_size = context->sourcesize / nblocks / sample_fraction;
2603
2604
    // Populate the samples sizes for training the dictionary
2605
0
    size_t* samples_sizes = malloc(nblocks * sizeof(void*));
2606
0
    BLOSC_ERROR_NULL(samples_sizes, BLOSC2_ERROR_MEMORY_ALLOC);
2607
0
    for (size_t i = 0; i < nblocks; i++) {
2608
0
      samples_sizes[i] = sample_size;
2609
0
    }
2610
2611
    // Train from samples
2612
0
    void* dict_buffer = malloc(dict_maxsize);
2613
0
    BLOSC_ERROR_NULL(dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2614
0
    int32_t dict_actual_size = (int32_t)ZDICT_trainFromBuffer(dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks);
2615
2616
    // TODO: experiment with parameters of low-level fast cover algorithm
2617
    // Note that this API is still unstable.  See: https://github.com/facebook/zstd/issues/1599
2618
    // ZDICT_fastCover_params_t fast_cover_params;
2619
    // memset(&fast_cover_params, 0, sizeof(fast_cover_params));
2620
    // fast_cover_params.d = nblocks;
2621
    // fast_cover_params.steps = 4;
2622
    // fast_cover_params.zParams.compressionLevel = context->clevel;
2623
    //size_t dict_actual_size = ZDICT_optimizeTrainFromBuffer_fastCover(dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks, &fast_cover_params);
2624
2625
0
    if (ZDICT_isError(dict_actual_size) != ZSTD_error_no_error) {
2626
0
      BLOSC_TRACE_ERROR("Error in ZDICT_trainFromBuffer(): '%s'."
2627
0
                        "  Giving up.", ZDICT_getErrorName(dict_actual_size));
2628
0
      return BLOSC2_ERROR_CODEC_DICT;
2629
0
    }
2630
0
    assert(dict_actual_size > 0);
2631
0
    free(samples_sizes);
2632
2633
    // Update bytes counter and pointers to bstarts for the new compressed buffer
2634
0
    context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2635
0
    context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2636
    /* Write the size of trained dict at the end of bstarts */
2637
0
    _sw32(context->dest + context->output_bytes, (int32_t)dict_actual_size);
2638
0
    context->output_bytes += sizeof(int32_t);
2639
    /* Write the trained dict afterwards */
2640
0
    context->dict_buffer = context->dest + context->output_bytes;
2641
0
    memcpy(context->dict_buffer, dict_buffer, (unsigned int)dict_actual_size);
2642
0
    context->dict_cdict = ZSTD_createCDict(dict_buffer, dict_actual_size, 1);  // TODO: use get_accel()
2643
0
    free(dict_buffer);      // the dictionary is copied in the header now
2644
0
    context->output_bytes += (int32_t)dict_actual_size;
2645
0
    context->dict_size = dict_actual_size;
2646
2647
    /* Compress with dict */
2648
0
    cbytes = blosc_compress_context(context);
2649
2650
    // Invalidate the dictionary for compressing other chunks using the same context
2651
0
    context->dict_buffer = NULL;
2652
0
    ZSTD_freeCDict(context->dict_cdict);
2653
0
    context->dict_cdict = NULL;
2654
0
#endif  // HAVE_ZSTD
2655
0
  }
2656
2657
48.1k
  return cbytes;
2658
48.1k
}
2659
2660
2661
void build_filters(const int doshuffle, const int delta,
2662
18.3k
                   const int32_t typesize, uint8_t* filters) {
2663
2664
  /* Fill the end part of the filter pipeline */
2665
18.3k
  if ((doshuffle == BLOSC_SHUFFLE) && (typesize > 1))
2666
0
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
2667
18.3k
  if (doshuffle == BLOSC_BITSHUFFLE)
2668
5.53k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
2669
18.3k
  if (doshuffle == BLOSC_NOSHUFFLE)
2670
5.96k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_NOSHUFFLE;
2671
18.3k
  if (delta)
2672
0
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
2673
18.3k
}
2674
2675
/* The public secure routine for compression. */
2676
int blosc2_compress(int clevel, int doshuffle, int32_t typesize,
2677
15.9k
                    const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2678
15.9k
  int error;
2679
15.9k
  int result;
2680
15.9k
  char* envvar;
2681
2682
  /* Check whether the library should be initialized */
2683
15.9k
  if (!g_initlib) blosc2_init();
2684
2685
  /* Check for a BLOSC_CLEVEL environment variable */
2686
15.9k
  envvar = getenv("BLOSC_CLEVEL");
2687
15.9k
  if (envvar != NULL) {
2688
0
    long value;
2689
0
    value = strtol(envvar, NULL, 10);
2690
0
    if ((errno != EINVAL) && (value >= 0)) {
2691
0
      clevel = (int)value;
2692
0
    }
2693
0
    else {
2694
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
2695
0
    }
2696
0
  }
2697
2698
  /* Check for a BLOSC_SHUFFLE environment variable */
2699
0
  envvar = getenv("BLOSC_SHUFFLE");
2700
15.9k
  if (envvar != NULL) {
2701
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
2702
0
      doshuffle = BLOSC_NOSHUFFLE;
2703
0
    }
2704
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
2705
0
      doshuffle = BLOSC_SHUFFLE;
2706
0
    }
2707
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
2708
0
      doshuffle = BLOSC_BITSHUFFLE;
2709
0
    }
2710
0
    else {
2711
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
2712
0
    }
2713
0
  }
2714
2715
  /* Check for a BLOSC_DELTA environment variable */
2716
0
  envvar = getenv("BLOSC_DELTA");
2717
15.9k
  if (envvar != NULL) {
2718
0
    if (strcmp(envvar, "1") == 0) {
2719
0
      blosc2_set_delta(1);
2720
0
    } else if (strcmp(envvar, "0") == 0) {
2721
0
      blosc2_set_delta(0);
2722
0
    }
2723
0
    else {
2724
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
2725
0
    }
2726
0
  }
2727
2728
  /* Check for a BLOSC_TYPESIZE environment variable */
2729
0
  envvar = getenv("BLOSC_TYPESIZE");
2730
15.9k
  if (envvar != NULL) {
2731
0
    long value;
2732
0
    value = strtol(envvar, NULL, 10);
2733
0
    if ((errno != EINVAL) && (value > 0)) {
2734
0
      typesize = (int32_t)value;
2735
0
    }
2736
0
    else {
2737
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
2738
0
    }
2739
0
  }
2740
2741
  /* Check for a BLOSC_COMPRESSOR environment variable */
2742
0
  envvar = getenv("BLOSC_COMPRESSOR");
2743
15.9k
  if (envvar != NULL) {
2744
0
    result = blosc1_set_compressor(envvar);
2745
0
    if (result < 0) {
2746
0
      BLOSC_TRACE_WARNING("BLOSC_COMPRESSOR environment variable '%s' not recognized\n", envvar);
2747
0
    }
2748
0
  }
2749
2750
  /* Check for a BLOSC_BLOCKSIZE environment variable */
2751
0
  envvar = getenv("BLOSC_BLOCKSIZE");
2752
15.9k
  if (envvar != NULL) {
2753
0
    long blocksize;
2754
0
    blocksize = strtol(envvar, NULL, 10);
2755
0
    if ((errno != EINVAL) && (blocksize > 0)) {
2756
0
      blosc1_set_blocksize((size_t) blocksize);
2757
0
    }
2758
0
    else {
2759
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
2760
0
    }
2761
0
  }
2762
2763
  /* Check for a BLOSC_NTHREADS environment variable */
2764
0
  envvar = getenv("BLOSC_NTHREADS");
2765
15.9k
  if (envvar != NULL) {
2766
0
    long nthreads;
2767
0
    nthreads = strtol(envvar, NULL, 10);
2768
0
    if ((errno != EINVAL) && (nthreads > 0)) {
2769
0
      result = blosc2_set_nthreads((int16_t) nthreads);
2770
0
      if (result < 0) {
2771
0
        BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
2772
0
      }
2773
0
    }
2774
0
  }
2775
2776
  /* Check for a BLOSC_SPLITMODE environment variable */
2777
0
  envvar = getenv("BLOSC_SPLITMODE");
2778
15.9k
  if (envvar != NULL) {
2779
0
    int32_t splitmode = -1;
2780
0
    if (strcmp(envvar, "ALWAYS") == 0) {
2781
0
      splitmode = BLOSC_ALWAYS_SPLIT;
2782
0
    }
2783
0
    else if (strcmp(envvar, "NEVER") == 0) {
2784
0
      splitmode = BLOSC_NEVER_SPLIT;
2785
0
    }
2786
0
    else if (strcmp(envvar, "AUTO") == 0) {
2787
0
      splitmode = BLOSC_AUTO_SPLIT;
2788
0
    }
2789
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
2790
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
2791
0
    }
2792
0
    else {
2793
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
2794
0
    }
2795
2796
0
    if (splitmode >= 0) {
2797
0
      blosc1_set_splitmode(splitmode);
2798
0
    }
2799
0
  }
2800
2801
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
2802
     that this should be the last env var so that it can take the
2803
     previous ones into account */
2804
0
  envvar = getenv("BLOSC_NOLOCK");
2805
15.9k
  if (envvar != NULL) {
2806
    // TODO: here is the only place that returns an extended header from
2807
    //  a blosc1_compress() call.  This should probably be fixed.
2808
0
    const char *compname;
2809
0
    blosc2_context *cctx;
2810
0
    blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
2811
2812
0
    blosc2_compcode_to_compname(g_compressor, &compname);
2813
    /* Create a context for compression */
2814
0
    build_filters(doshuffle, g_delta, typesize, cparams.filters);
2815
    // TODO: cparams can be shared in a multithreaded environment.  do a copy!
2816
0
    cparams.typesize = (uint8_t)typesize;
2817
0
    cparams.compcode = (uint8_t)g_compressor;
2818
0
    cparams.clevel = (uint8_t)clevel;
2819
0
    cparams.nthreads = g_nthreads;
2820
0
    cparams.splitmode = g_splitmode;
2821
0
    cctx = blosc2_create_cctx(cparams);
2822
0
    if (cctx == NULL) {
2823
0
      BLOSC_TRACE_ERROR("Error while creating the compression context");
2824
0
      return BLOSC2_ERROR_NULL_POINTER;
2825
0
    }
2826
    /* Do the actual compression */
2827
0
    result = blosc2_compress_ctx(cctx, src, srcsize, dest, destsize);
2828
    /* Release context resources */
2829
0
    blosc2_free_ctx(cctx);
2830
0
    return result;
2831
0
  }
2832
2833
15.9k
  pthread_mutex_lock(&global_comp_mutex);
2834
2835
  /* Initialize a context compression */
2836
15.9k
  uint8_t* filters = calloc(1, BLOSC2_MAX_FILTERS);
2837
15.9k
  BLOSC_ERROR_NULL(filters, BLOSC2_ERROR_MEMORY_ALLOC);
2838
15.9k
  uint8_t* filters_meta = calloc(1, BLOSC2_MAX_FILTERS);
2839
15.9k
  BLOSC_ERROR_NULL(filters_meta, BLOSC2_ERROR_MEMORY_ALLOC);
2840
15.9k
  build_filters(doshuffle, g_delta, typesize, filters);
2841
15.9k
  error = initialize_context_compression(
2842
15.9k
          g_global_context, src, srcsize, dest, destsize, clevel, filters,
2843
15.9k
          filters_meta, (int32_t)typesize, g_compressor, g_force_blocksize, g_nthreads, g_nthreads,
2844
15.9k
          g_splitmode, g_tuner, NULL, g_schunk);
2845
15.9k
  free(filters);
2846
15.9k
  free(filters_meta);
2847
15.9k
  if (error <= 0) {
2848
27
    pthread_mutex_unlock(&global_comp_mutex);
2849
27
    return error;
2850
27
  }
2851
2852
15.9k
  envvar = getenv("BLOSC_BLOSC1_COMPAT");
2853
15.9k
  if (envvar != NULL) {
2854
    /* Write chunk header without extended header (Blosc1 compatibility mode) */
2855
0
    error = write_compression_header(g_global_context, false);
2856
0
  }
2857
15.9k
  else {
2858
15.9k
    error = write_compression_header(g_global_context, true);
2859
15.9k
  }
2860
15.9k
  if (error < 0) {
2861
0
    pthread_mutex_unlock(&global_comp_mutex);
2862
0
    return error;
2863
0
  }
2864
2865
15.9k
  result = blosc_compress_context(g_global_context);
2866
2867
15.9k
  pthread_mutex_unlock(&global_comp_mutex);
2868
2869
15.9k
  return result;
2870
15.9k
}
2871
2872
2873
/* The public routine for compression. */
2874
int blosc1_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
2875
0
                    const void* src, void* dest, size_t destsize) {
2876
0
  return blosc2_compress(clevel, doshuffle, (int32_t)typesize, src, (int32_t)nbytes, dest, (int32_t)destsize);
2877
0
}
2878
2879
2880
2881
static int blosc_run_decompression_with_context(blosc2_context* context, const void* src, int32_t srcsize,
2882
37.0k
                                                void* dest, int32_t destsize) {
2883
37.0k
  blosc_header header;
2884
37.0k
  int32_t ntbytes;
2885
37.0k
  int rc;
2886
2887
37.0k
  rc = read_chunk_header(src, srcsize, true, &header);
2888
37.0k
  if (rc < 0) {
2889
26
    return rc;
2890
26
  }
2891
2892
37.0k
  if (header.nbytes > destsize) {
2893
    // Not enough space for writing into the destination
2894
7.69k
    return BLOSC2_ERROR_WRITE_BUFFER;
2895
7.69k
  }
2896
2897
29.3k
  rc = initialize_context_decompression(context, &header, src, srcsize, dest, destsize);
2898
29.3k
  if (rc < 0) {
2899
137
    return rc;
2900
137
  }
2901
2902
  /* Do the actual decompression */
2903
29.1k
  ntbytes = do_job(context);
2904
29.1k
  if (ntbytes < 0) {
2905
9.79k
    return ntbytes;
2906
9.79k
  }
2907
2908
19.3k
  assert(ntbytes <= (int32_t)destsize);
2909
19.3k
  return ntbytes;
2910
29.1k
}
2911
2912
2913
/* The public secure routine for decompression with context. */
2914
int blosc2_decompress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2915
15.5k
                          void* dest, int32_t destsize) {
2916
15.5k
  int result;
2917
2918
15.5k
  if (context->do_compress != 0) {
2919
0
    BLOSC_TRACE_ERROR("Context is not meant for decompression.  Giving up.");
2920
0
    return BLOSC2_ERROR_INVALID_PARAM;
2921
0
  }
2922
2923
15.5k
  result = blosc_run_decompression_with_context(context, src, srcsize, dest, destsize);
2924
2925
  // Reset a possible block_maskout
2926
15.5k
  if (context->block_maskout != NULL) {
2927
0
    free(context->block_maskout);
2928
0
    context->block_maskout = NULL;
2929
0
  }
2930
15.5k
  context->block_maskout_nitems = 0;
2931
2932
15.5k
  return result;
2933
15.5k
}
2934
2935
2936
/* The public secure routine for decompression. */
2937
21.5k
int blosc2_decompress(const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2938
21.5k
  int result;
2939
21.5k
  char* envvar;
2940
21.5k
  long nthreads;
2941
21.5k
  blosc2_context *dctx;
2942
21.5k
  blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
2943
2944
  /* Check whether the library should be initialized */
2945
21.5k
  if (!g_initlib) blosc2_init();
2946
2947
  /* Check for a BLOSC_NTHREADS environment variable */
2948
21.5k
  envvar = getenv("BLOSC_NTHREADS");
2949
21.5k
  if (envvar != NULL) {
2950
0
    nthreads = strtol(envvar, NULL, 10);
2951
0
    if ((errno != EINVAL)) {
2952
0
      if ((nthreads <= 0) || (nthreads > INT16_MAX)) {
2953
0
        BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2954
0
        return BLOSC2_ERROR_INVALID_PARAM;
2955
0
      }
2956
0
      result = blosc2_set_nthreads((int16_t) nthreads);
2957
0
      if (result < 0) {
2958
0
        return result;
2959
0
      }
2960
0
    }
2961
0
  }
2962
2963
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
2964
     that this should be the last env var so that it can take the
2965
     previous ones into account */
2966
21.5k
  envvar = getenv("BLOSC_NOLOCK");
2967
21.5k
  if (envvar != NULL) {
2968
0
    dparams.nthreads = g_nthreads;
2969
0
    dctx = blosc2_create_dctx(dparams);
2970
0
    if (dctx == NULL) {
2971
0
      BLOSC_TRACE_ERROR("Error while creating the decompression context");
2972
0
      return BLOSC2_ERROR_NULL_POINTER;
2973
0
    }
2974
0
    result = blosc2_decompress_ctx(dctx, src, srcsize, dest, destsize);
2975
0
    blosc2_free_ctx(dctx);
2976
0
    return result;
2977
0
  }
2978
2979
21.5k
  pthread_mutex_lock(&global_comp_mutex);
2980
2981
21.5k
  result = blosc_run_decompression_with_context(
2982
21.5k
          g_global_context, src, srcsize, dest, destsize);
2983
2984
21.5k
  pthread_mutex_unlock(&global_comp_mutex);
2985
2986
21.5k
  return result;
2987
21.5k
}
2988
2989
2990
/* The public routine for decompression. */
2991
9.70k
int blosc1_decompress(const void* src, void* dest, size_t destsize) {
2992
9.70k
  return blosc2_decompress(src, INT32_MAX, dest, (int32_t)destsize);
2993
9.70k
}
2994
2995
2996
/* Specific routine optimized for decompression a small number of
2997
   items out of a compressed chunk.  This does not use threads because
2998
   it would affect negatively to performance. */
2999
int _blosc_getitem(blosc2_context* context, blosc_header* header, const void* src, int32_t srcsize,
3000
21
                   int start, int nitems, void* dest, int32_t destsize) {
3001
21
  uint8_t* _src = (uint8_t*)(src);  /* current pos for source buffer */
3002
21
  uint8_t* _dest = (uint8_t*)(dest);
3003
21
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
3004
21
  int32_t bsize, bsize2, ebsize, leftoverblock;
3005
21
  int32_t startb, stopb;
3006
21
  int32_t stop = start + nitems;
3007
21
  int j, rc;
3008
3009
21
  if (nitems == 0) {
3010
    // We have nothing to do
3011
0
    return 0;
3012
0
  }
3013
21
  if (nitems * header->typesize > destsize) {
3014
0
    BLOSC_TRACE_ERROR("`nitems`*`typesize` out of dest bounds.");
3015
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3016
0
  }
3017
3018
21
  context->bstarts = (int32_t*)(_src + context->header_overhead);
3019
3020
  /* Check region boundaries */
3021
21
  if ((start < 0) || (start * header->typesize > header->nbytes)) {
3022
0
    BLOSC_TRACE_ERROR("`start` out of bounds.");
3023
0
    return BLOSC2_ERROR_INVALID_PARAM;
3024
0
  }
3025
3026
21
  if ((stop < 0) || (stop * header->typesize > header->nbytes)) {
3027
0
    BLOSC_TRACE_ERROR("`start`+`nitems` out of bounds.");
3028
0
    return BLOSC2_ERROR_INVALID_PARAM;
3029
0
  }
3030
3031
21
  int chunk_memcpy = header->flags & 0x1;
3032
21
  if (!context->special_type && !chunk_memcpy &&
3033
21
      ((uint8_t *)(_src + srcsize) < (uint8_t *)(context->bstarts + context->nblocks))) {
3034
0
    BLOSC_TRACE_ERROR("`bstarts` out of bounds.");
3035
0
    return BLOSC2_ERROR_READ_BUFFER;
3036
0
  }
3037
3038
21
  bool memcpyed = header->flags & (uint8_t)BLOSC_MEMCPYED;
3039
21
  if (context->special_type) {
3040
    // Fake a runlen as if its a memcpyed chunk
3041
0
    memcpyed = true;
3042
0
  }
3043
3044
21
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
3045
21
                  (context->blosc2_flags & 0x08u) && !context->special_type);
3046
21
  if (memcpyed && !is_lazy && !context->postfilter) {
3047
    // Short-circuit for (non-lazy) memcpyed or special values
3048
21
    ntbytes = nitems * header->typesize;
3049
21
    switch (context->special_type) {
3050
0
      case BLOSC2_SPECIAL_VALUE:
3051
        // All repeated values
3052
0
        rc = set_values(context->typesize, _src, _dest, ntbytes);
3053
0
        if (rc < 0) {
3054
0
          BLOSC_TRACE_ERROR("set_values failed");
3055
0
          return BLOSC2_ERROR_DATA;
3056
0
        }
3057
0
        break;
3058
0
      case BLOSC2_SPECIAL_NAN:
3059
0
        rc = set_nans(context->typesize, _dest, ntbytes);
3060
0
        if (rc < 0) {
3061
0
          BLOSC_TRACE_ERROR("set_nans failed");
3062
0
          return BLOSC2_ERROR_DATA;
3063
0
        }
3064
0
        break;
3065
0
      case BLOSC2_SPECIAL_ZERO:
3066
0
        memset(_dest, 0, ntbytes);
3067
0
        break;
3068
0
      case BLOSC2_SPECIAL_UNINIT:
3069
        // We do nothing here
3070
0
        break;
3071
21
      case BLOSC2_NO_SPECIAL:
3072
21
        _src += context->header_overhead + start * context->typesize;
3073
21
        memcpy(_dest, _src, ntbytes);
3074
21
        break;
3075
0
      default:
3076
0
        BLOSC_TRACE_ERROR("Unhandled special value case");
3077
0
        BLOSC_ERROR(BLOSC2_ERROR_SCHUNK_SPECIAL);
3078
21
    }
3079
21
    return ntbytes;
3080
21
  }
3081
3082
0
  ebsize = header->blocksize + header->typesize * (signed)sizeof(int32_t);
3083
0
  struct thread_context* scontext = context->serial_context;
3084
  /* Resize the temporaries in serial context if needed */
3085
0
  if (header->blocksize > scontext->tmp_blocksize) {
3086
0
    my_free(scontext->tmp);
3087
0
    scontext->tmp_nbytes = (size_t)4 * ebsize;
3088
0
    scontext->tmp = my_malloc(scontext->tmp_nbytes);
3089
0
    BLOSC_ERROR_NULL(scontext->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
3090
0
    scontext->tmp2 = scontext->tmp + ebsize;
3091
0
    scontext->tmp3 = scontext->tmp2 + ebsize;
3092
0
    scontext->tmp4 = scontext->tmp3 + ebsize;
3093
0
    scontext->tmp_blocksize = (int32_t)header->blocksize;
3094
0
  }
3095
3096
0
  for (j = 0; j < context->nblocks; j++) {
3097
0
    bsize = header->blocksize;
3098
0
    leftoverblock = 0;
3099
0
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
3100
0
      bsize = context->leftover;
3101
0
      leftoverblock = 1;
3102
0
    }
3103
3104
    /* Compute start & stop for each block */
3105
0
    startb = start * header->typesize - j * header->blocksize;
3106
0
    stopb = stop * header->typesize - j * header->blocksize;
3107
0
    if (stopb <= 0) {
3108
      // We can exit as soon as this block is beyond stop
3109
0
      break;
3110
0
    }
3111
0
    if (startb >= header->blocksize) {
3112
0
      continue;
3113
0
    }
3114
0
    if (startb < 0) {
3115
0
      startb = 0;
3116
0
    }
3117
0
    if (stopb > header->blocksize) {
3118
0
      stopb = header->blocksize;
3119
0
    }
3120
0
    bsize2 = stopb - startb;
3121
3122
0
#if defined(HAVE_PLUGINS)
3123
0
    if (context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) {
3124
0
      scontext->zfp_cell_start = startb / context->typesize;
3125
0
      scontext->zfp_cell_nitems = nitems;
3126
0
    }
3127
0
#endif /* HAVE_PLUGINS */
3128
3129
    /* Do the actual data copy */
3130
    // Regular decompression.  Put results in tmp2.
3131
    // If the block is aligned and the worst case fits in destination, let's avoid a copy
3132
0
    bool get_single_block = ((startb == 0) && (bsize == nitems * header->typesize));
3133
0
    uint8_t* tmp2 = get_single_block ? dest : scontext->tmp2;
3134
3135
    // If memcpyed we don't have a bstarts section (because it is not needed)
3136
0
    int32_t src_offset = memcpyed ?
3137
0
      context->header_overhead + j * header->blocksize : sw32_(context->bstarts + j);
3138
3139
0
    int32_t cbytes = blosc_d(context->serial_context, bsize, leftoverblock, memcpyed,
3140
0
                             src, srcsize, src_offset, j,
3141
0
                             tmp2, 0, scontext->tmp, scontext->tmp3);
3142
0
    if (cbytes < 0) {
3143
0
      ntbytes = cbytes;
3144
0
      break;
3145
0
    }
3146
0
    if (scontext->zfp_cell_nitems > 0) {
3147
0
      if (cbytes == bsize2) {
3148
0
        memcpy((uint8_t *) dest, tmp2, (unsigned int) bsize2);
3149
0
      } else if (cbytes == context->blocksize) {
3150
0
        memcpy((uint8_t *) dest, tmp2 + scontext->zfp_cell_start * context->typesize, (unsigned int) bsize2);
3151
0
        cbytes = bsize2;
3152
0
      }
3153
0
    } else if (!get_single_block) {
3154
      /* Copy to destination */
3155
0
      memcpy((uint8_t *) dest + ntbytes, tmp2 + startb, (unsigned int) bsize2);
3156
0
    }
3157
0
    ntbytes += bsize2;
3158
0
  }
3159
3160
0
  scontext->zfp_cell_nitems = 0;
3161
3162
0
  return ntbytes;
3163
0
}
3164
3165
21
int blosc2_getitem(const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) {
3166
21
  blosc2_context context;
3167
21
  int result;
3168
3169
  /* Minimally populate the context */
3170
21
  memset(&context, 0, sizeof(blosc2_context));
3171
3172
21
  context.schunk = g_schunk;
3173
21
  context.nthreads = 1;  // force a serial decompression; fixes #95
3174
3175
  /* Call the actual getitem function */
3176
21
  result = blosc2_getitem_ctx(&context, src, srcsize, start, nitems, dest, destsize);
3177
3178
  /* Release resources */
3179
21
  if (context.serial_context != NULL) {
3180
21
    free_thread_context(context.serial_context);
3181
21
  }
3182
21
  return result;
3183
21
}
3184
3185
/* Specific routine optimized for decompression a small number of
3186
   items out of a compressed chunk.  Public non-contextual API. */
3187
0
int blosc1_getitem(const void* src, int start, int nitems, void* dest) {
3188
0
  return blosc2_getitem(src, INT32_MAX, start, nitems, dest, INT32_MAX);
3189
0
}
3190
3191
int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize,
3192
21
    int start, int nitems, void* dest, int32_t destsize) {
3193
21
  blosc_header header;
3194
21
  int result;
3195
3196
  /* Minimally populate the context */
3197
21
  result = read_chunk_header((uint8_t *) src, srcsize, true, &header);
3198
21
  if (result < 0) {
3199
0
    return result;
3200
0
  }
3201
3202
21
  context->src = src;
3203
21
  context->srcsize = srcsize;
3204
21
  context->dest = dest;
3205
21
  context->destsize = destsize;
3206
3207
21
  result = blosc2_initialize_context_from_header(context, &header);
3208
21
  if (result < 0) {
3209
0
    return result;
3210
0
  }
3211
3212
21
  if (context->serial_context == NULL) {
3213
21
    context->serial_context = create_thread_context(context, 0);
3214
21
  }
3215
21
  BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
3216
  /* Call the actual getitem function */
3217
21
  result = _blosc_getitem(context, &header, src, srcsize, start, nitems, dest, destsize);
3218
3219
21
  return result;
3220
21
}
3221
3222
/* execute single compression/decompression job for a single thread_context */
3223
static void t_blosc_do_job(void *ctxt)
3224
0
{
3225
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3226
0
  blosc2_context* context = thcontext->parent_context;
3227
0
  int32_t cbytes;
3228
0
  int32_t ntdest;
3229
0
  int32_t tblocks;               /* number of blocks per thread */
3230
0
  int32_t tblock;                /* limit block on a thread */
3231
0
  int32_t nblock_;              /* private copy of nblock */
3232
0
  int32_t bsize;
3233
0
  int32_t leftoverblock;
3234
  /* Parameters for threads */
3235
0
  int32_t blocksize;
3236
0
  int32_t ebsize;
3237
0
  int32_t srcsize;
3238
0
  bool compress = context->do_compress != 0;
3239
0
  int32_t maxbytes;
3240
0
  int32_t nblocks;
3241
0
  int32_t leftover;
3242
0
  int32_t leftover2;
3243
0
  int32_t* bstarts;
3244
0
  const uint8_t* src;
3245
0
  uint8_t* dest;
3246
0
  uint8_t* tmp;
3247
0
  uint8_t* tmp2;
3248
0
  uint8_t* tmp3;
3249
3250
  /* Get parameters for this thread before entering the main loop */
3251
0
  blocksize = context->blocksize;
3252
0
  ebsize = blocksize + context->typesize * (int32_t)sizeof(int32_t);
3253
0
  maxbytes = context->destsize;
3254
0
  nblocks = context->nblocks;
3255
0
  leftover = context->leftover;
3256
0
  bstarts = context->bstarts;
3257
0
  src = context->src;
3258
0
  srcsize = context->srcsize;
3259
0
  dest = context->dest;
3260
3261
  /* Resize the temporaries if needed */
3262
0
  if (blocksize > thcontext->tmp_blocksize) {
3263
0
    my_free(thcontext->tmp);
3264
0
    thcontext->tmp_nbytes = (size_t) 4 * ebsize;
3265
0
    thcontext->tmp = my_malloc(thcontext->tmp_nbytes);
3266
0
    thcontext->tmp2 = thcontext->tmp + ebsize;
3267
0
    thcontext->tmp3 = thcontext->tmp2 + ebsize;
3268
0
    thcontext->tmp4 = thcontext->tmp3 + ebsize;
3269
0
    thcontext->tmp_blocksize = blocksize;
3270
0
  }
3271
3272
0
  tmp = thcontext->tmp;
3273
0
  tmp2 = thcontext->tmp2;
3274
0
  tmp3 = thcontext->tmp3;
3275
3276
  // Determine whether we can do a static distribution of workload among different threads
3277
0
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
3278
0
  if (!context->do_compress && context->special_type) {
3279
    // Fake a runlen as if its a memcpyed chunk
3280
0
    memcpyed = true;
3281
0
  }
3282
3283
0
  bool static_schedule = (!compress || memcpyed) && context->block_maskout == NULL;
3284
0
  if (static_schedule) {
3285
      /* Blocks per thread */
3286
0
      tblocks = nblocks / context->nthreads;
3287
0
      leftover2 = nblocks % context->nthreads;
3288
0
      tblocks = (leftover2 > 0) ? tblocks + 1 : tblocks;
3289
0
      nblock_ = thcontext->tid * tblocks;
3290
0
      tblock = nblock_ + tblocks;
3291
0
      if (tblock > nblocks) {
3292
0
          tblock = nblocks;
3293
0
      }
3294
0
  }
3295
0
  else {
3296
    // Use dynamic schedule via a queue.  Get the next block.
3297
0
    pthread_mutex_lock(&context->count_mutex);
3298
0
    context->thread_nblock++;
3299
0
    nblock_ = context->thread_nblock;
3300
0
    pthread_mutex_unlock(&context->count_mutex);
3301
0
    tblock = nblocks;
3302
0
  }
3303
3304
  /* Loop over blocks */
3305
0
  leftoverblock = 0;
3306
0
  while ((nblock_ < tblock) && (context->thread_giveup_code > 0)) {
3307
0
    bsize = blocksize;
3308
0
    if (nblock_ == (nblocks - 1) && (leftover > 0)) {
3309
0
      bsize = leftover;
3310
0
      leftoverblock = 1;
3311
0
    }
3312
0
    if (compress) {
3313
0
      if (memcpyed) {
3314
0
        if (!context->prefilter) {
3315
          /* We want to memcpy only */
3316
0
          memcpy(dest + context->header_overhead + nblock_ * blocksize,
3317
0
                 src + nblock_ * blocksize, (unsigned int) bsize);
3318
0
          cbytes = (int32_t) bsize;
3319
0
        }
3320
0
        else {
3321
          /* Only the prefilter has to be executed, and this is done in blosc_c().
3322
           * However, no further actions are needed, so we can put the result
3323
           * directly in dest. */
3324
0
          cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3325
0
                           ebsize, src, nblock_ * blocksize,
3326
0
                           dest + context->header_overhead + nblock_ * blocksize,
3327
0
                           tmp, tmp3);
3328
0
        }
3329
0
      }
3330
0
      else {
3331
        /* Regular compression */
3332
0
        cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3333
0
                          ebsize, src, nblock_ * blocksize, tmp2, tmp, tmp3);
3334
0
      }
3335
0
    }
3336
0
    else {
3337
      /* Regular decompression */
3338
0
      if (context->special_type == BLOSC2_NO_SPECIAL && !memcpyed &&
3339
0
          (srcsize < (int32_t)(context->header_overhead + (sizeof(int32_t) * nblocks)))) {
3340
        /* Not enough input to read all `bstarts` */
3341
0
        cbytes = -1;
3342
0
      }
3343
0
      else {
3344
        // If memcpyed we don't have a bstarts section (because it is not needed)
3345
0
        int32_t src_offset = memcpyed ?
3346
0
            context->header_overhead + nblock_ * blocksize : sw32_(bstarts + nblock_);
3347
0
        cbytes = blosc_d(thcontext, bsize, leftoverblock, memcpyed,
3348
0
                          src, srcsize, src_offset, nblock_,
3349
0
                          dest, nblock_ * blocksize, tmp, tmp2);
3350
0
      }
3351
0
    }
3352
3353
    /* Check whether current thread has to giveup */
3354
0
    if (context->thread_giveup_code <= 0) {
3355
0
      break;
3356
0
    }
3357
3358
    /* Check results for the compressed/decompressed block */
3359
0
    if (cbytes < 0) {            /* compr/decompr failure */
3360
      /* Set giveup_code error */
3361
0
      pthread_mutex_lock(&context->count_mutex);
3362
0
      context->thread_giveup_code = cbytes;
3363
0
      pthread_mutex_unlock(&context->count_mutex);
3364
0
      break;
3365
0
    }
3366
3367
0
    if (compress && !memcpyed) {
3368
      /* Start critical section */
3369
0
      pthread_mutex_lock(&context->count_mutex);
3370
0
      ntdest = context->output_bytes;
3371
      // Note: do not use a typical local dict_training variable here
3372
      // because it is probably cached from previous calls if the number of
3373
      // threads does not change (the usual thing).
3374
0
      if (!(context->use_dict && context->dict_cdict == NULL)) {
3375
0
        _sw32(bstarts + nblock_, (int32_t) ntdest);
3376
0
      }
3377
3378
0
      if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
3379
0
        context->thread_giveup_code = 0;  /* incompressible buf */
3380
0
        pthread_mutex_unlock(&context->count_mutex);
3381
0
        break;
3382
0
      }
3383
0
      context->thread_nblock++;
3384
0
      nblock_ = context->thread_nblock;
3385
0
      context->output_bytes += cbytes;
3386
0
      pthread_mutex_unlock(&context->count_mutex);
3387
      /* End of critical section */
3388
3389
      /* Copy the compressed buffer to destination */
3390
0
      memcpy(dest + ntdest, tmp2, (unsigned int) cbytes);
3391
0
    }
3392
0
    else if (static_schedule) {
3393
0
      nblock_++;
3394
0
    }
3395
0
    else {
3396
0
      pthread_mutex_lock(&context->count_mutex);
3397
0
      context->thread_nblock++;
3398
0
      nblock_ = context->thread_nblock;
3399
0
      context->output_bytes += cbytes;
3400
0
      pthread_mutex_unlock(&context->count_mutex);
3401
0
    }
3402
3403
0
  } /* closes while (nblock_) */
3404
3405
0
  if (static_schedule) {
3406
0
    pthread_mutex_lock(&context->count_mutex);
3407
0
    context->output_bytes = context->sourcesize;
3408
0
    if (compress) {
3409
0
      context->output_bytes += context->header_overhead;
3410
0
    }
3411
0
    pthread_mutex_unlock(&context->count_mutex);
3412
0
  }
3413
3414
0
}
3415
3416
/* Decompress & unshuffle several blocks in a single thread */
3417
0
static void* t_blosc(void* ctxt) {
3418
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3419
0
  blosc2_context* context = thcontext->parent_context;
3420
0
#ifdef BLOSC_POSIX_BARRIERS
3421
0
  int rc;
3422
0
#endif
3423
3424
0
  while (1) {
3425
    /* Synchronization point for all threads (wait for initialization) */
3426
0
    WAIT_INIT(NULL, context);
3427
3428
0
    if (context->end_threads) {
3429
0
      break;
3430
0
    }
3431
3432
0
    t_blosc_do_job(ctxt);
3433
3434
    /* Meeting point for all threads (wait for finalization) */
3435
0
    WAIT_FINISH(NULL, context);
3436
0
  }
3437
3438
  /* Cleanup our working space and context */
3439
0
  free_thread_context(thcontext);
3440
3441
0
  return (NULL);
3442
0
}
3443
3444
3445
0
int init_threadpool(blosc2_context *context) {
3446
0
  int32_t tid;
3447
0
  int rc2;
3448
3449
  /* Initialize mutex and condition variable objects */
3450
0
  pthread_mutex_init(&context->count_mutex, NULL);
3451
0
  pthread_mutex_init(&context->delta_mutex, NULL);
3452
0
  pthread_mutex_init(&context->nchunk_mutex, NULL);
3453
0
  pthread_cond_init(&context->delta_cv, NULL);
3454
3455
  /* Set context thread sentinels */
3456
0
  context->thread_giveup_code = 1;
3457
0
  context->thread_nblock = -1;
3458
3459
  /* Barrier initialization */
3460
0
#ifdef BLOSC_POSIX_BARRIERS
3461
0
  pthread_barrier_init(&context->barr_init, NULL, context->nthreads + 1);
3462
0
  pthread_barrier_init(&context->barr_finish, NULL, context->nthreads + 1);
3463
#else
3464
  pthread_mutex_init(&context->count_threads_mutex, NULL);
3465
  pthread_cond_init(&context->count_threads_cv, NULL);
3466
  context->count_threads = 0;      /* Reset threads counter */
3467
#endif
3468
3469
0
  if (threads_callback) {
3470
      /* Create thread contexts to store data for callback threads */
3471
0
    context->thread_contexts = (struct thread_context *)my_malloc(
3472
0
            context->nthreads * sizeof(struct thread_context));
3473
0
    BLOSC_ERROR_NULL(context->thread_contexts, BLOSC2_ERROR_MEMORY_ALLOC);
3474
0
    for (tid = 0; tid < context->nthreads; tid++)
3475
0
      init_thread_context(context->thread_contexts + tid, context, tid);
3476
0
  }
3477
0
  else {
3478
0
    #if !defined(_WIN32)
3479
      /* Initialize and set thread detached attribute */
3480
0
      pthread_attr_init(&context->ct_attr);
3481
0
      pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
3482
0
    #endif
3483
3484
    /* Make space for thread handlers */
3485
0
    context->threads = (pthread_t*)my_malloc(
3486
0
            context->nthreads * sizeof(pthread_t));
3487
0
    BLOSC_ERROR_NULL(context->threads, BLOSC2_ERROR_MEMORY_ALLOC);
3488
    /* Finally, create the threads */
3489
0
    for (tid = 0; tid < context->nthreads; tid++) {
3490
      /* Create a thread context (will destroy when finished) */
3491
0
      struct thread_context *thread_context = create_thread_context(context, tid);
3492
0
      BLOSC_ERROR_NULL(thread_context, BLOSC2_ERROR_THREAD_CREATE);
3493
0
      #if !defined(_WIN32)
3494
0
        rc2 = pthread_create(&context->threads[tid], &context->ct_attr, t_blosc,
3495
0
                            (void*)thread_context);
3496
      #else
3497
        rc2 = pthread_create(&context->threads[tid], NULL, t_blosc,
3498
                            (void *)thread_context);
3499
      #endif
3500
0
      if (rc2) {
3501
0
        BLOSC_TRACE_ERROR("Return code from pthread_create() is %d.\n"
3502
0
                          "\tError detail: %s\n", rc2, strerror(rc2));
3503
0
        return BLOSC2_ERROR_THREAD_CREATE;
3504
0
      }
3505
0
    }
3506
0
  }
3507
3508
  /* We have now started/initialized the threads */
3509
0
  context->threads_started = context->nthreads;
3510
0
  context->new_nthreads = context->nthreads;
3511
3512
0
  return 0;
3513
0
}
3514
3515
int16_t blosc2_get_nthreads(void)
3516
10
{
3517
10
  return g_nthreads;
3518
10
}
3519
3520
28.0k
int16_t blosc2_set_nthreads(int16_t nthreads) {
3521
28.0k
  int16_t ret = g_nthreads;          /* the previous number of threads */
3522
3523
  /* Check whether the library should be initialized */
3524
28.0k
  if (!g_initlib) blosc2_init();
3525
3526
28.0k
 if (nthreads != ret) {
3527
0
   g_nthreads = nthreads;
3528
0
   g_global_context->new_nthreads = nthreads;
3529
0
   int16_t ret2 = check_nthreads(g_global_context);
3530
0
   if (ret2 < 0) {
3531
0
     return ret2;
3532
0
   }
3533
0
 }
3534
3535
28.0k
  return ret;
3536
28.0k
}
3537
3538
3539
const char* blosc1_get_compressor(void)
3540
0
{
3541
0
  const char* compname;
3542
0
  blosc2_compcode_to_compname(g_compressor, &compname);
3543
3544
0
  return compname;
3545
0
}
3546
3547
18.3k
int blosc1_set_compressor(const char* compname) {
3548
18.3k
  int code = blosc2_compname_to_compcode(compname);
3549
18.3k
  if (code >= BLOSC_LAST_CODEC) {
3550
0
    BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
3551
0
    BLOSC_ERROR(BLOSC2_ERROR_CODEC_SUPPORT);
3552
0
  }
3553
18.3k
  g_compressor = code;
3554
3555
  /* Check whether the library should be initialized */
3556
18.3k
  if (!g_initlib) blosc2_init();
3557
3558
18.3k
  return code;
3559
18.3k
}
3560
3561
0
void blosc2_set_delta(int dodelta) {
3562
3563
0
  g_delta = dodelta;
3564
3565
  /* Check whether the library should be initialized */
3566
0
  if (!g_initlib) blosc2_init();
3567
3568
0
}
3569
3570
0
const char* blosc2_list_compressors(void) {
3571
0
  static int compressors_list_done = 0;
3572
0
  static char ret[256];
3573
3574
0
  if (compressors_list_done) return ret;
3575
0
  ret[0] = '\0';
3576
0
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
3577
0
  strcat(ret, ",");
3578
0
  strcat(ret, BLOSC_LZ4_COMPNAME);
3579
0
  strcat(ret, ",");
3580
0
  strcat(ret, BLOSC_LZ4HC_COMPNAME);
3581
0
#if defined(HAVE_ZLIB)
3582
0
  strcat(ret, ",");
3583
0
  strcat(ret, BLOSC_ZLIB_COMPNAME);
3584
0
#endif /* HAVE_ZLIB */
3585
0
#if defined(HAVE_ZSTD)
3586
0
  strcat(ret, ",");
3587
0
  strcat(ret, BLOSC_ZSTD_COMPNAME);
3588
0
#endif /* HAVE_ZSTD */
3589
0
  compressors_list_done = 1;
3590
0
  return ret;
3591
0
}
3592
3593
3594
0
const char* blosc2_get_version_string(void) {
3595
0
  return BLOSC2_VERSION_STRING;
3596
0
}
3597
3598
3599
0
int blosc2_get_complib_info(const char* compname, char** complib, char** version) {
3600
0
  int clibcode;
3601
0
  const char* clibname;
3602
0
  const char* clibversion = "unknown";
3603
0
  char sbuffer[256];
3604
3605
0
  clibcode = compname_to_clibcode(compname);
3606
0
  clibname = clibcode_to_clibname(clibcode);
3607
3608
  /* complib version */
3609
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
3610
0
    clibversion = BLOSCLZ_VERSION_STRING;
3611
0
  }
3612
0
  else if (clibcode == BLOSC_LZ4_LIB) {
3613
0
    sprintf(sbuffer, "%d.%d.%d",
3614
0
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
3615
0
    clibversion = sbuffer;
3616
0
  }
3617
0
#if defined(HAVE_ZLIB)
3618
0
  else if (clibcode == BLOSC_ZLIB_LIB) {
3619
0
#ifdef ZLIB_COMPAT
3620
0
    clibversion = ZLIB_VERSION;
3621
#elif defined(HAVE_ZLIB_NG)
3622
    clibversion = ZLIBNG_VERSION;
3623
#else
3624
    clibversion = ZLIB_VERSION;
3625
#endif
3626
0
  }
3627
0
#endif /* HAVE_ZLIB */
3628
0
#if defined(HAVE_ZSTD)
3629
0
  else if (clibcode == BLOSC_ZSTD_LIB) {
3630
0
    sprintf(sbuffer, "%d.%d.%d",
3631
0
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
3632
0
    clibversion = sbuffer;
3633
0
  }
3634
0
#endif /* HAVE_ZSTD */
3635
3636
#ifdef _MSC_VER
3637
  *complib = _strdup(clibname);
3638
  *version = _strdup(clibversion);
3639
#else
3640
0
  *complib = strdup(clibname);
3641
0
  *version = strdup(clibversion);
3642
0
#endif
3643
0
  return clibcode;
3644
0
}
3645
3646
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
3647
21.7k
void blosc1_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, size_t* blocksize) {
3648
21.7k
  int32_t nbytes32, cbytes32, blocksize32;
3649
21.7k
  blosc2_cbuffer_sizes(cbuffer, &nbytes32, &cbytes32, &blocksize32);
3650
21.7k
  *nbytes = nbytes32;
3651
21.7k
  *cbytes = cbytes32;
3652
21.7k
  *blocksize = blocksize32;
3653
21.7k
}
3654
3655
198k
int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, int32_t* cbytes, int32_t* blocksize) {
3656
198k
  blosc_header header;
3657
198k
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3658
198k
  if (rc < 0) {
3659
    /* Return zeros if error reading header */
3660
132
    memset(&header, 0, sizeof(header));
3661
132
  }
3662
3663
  /* Read the interesting values */
3664
198k
  if (nbytes != NULL)
3665
198k
    *nbytes = header.nbytes;
3666
198k
  if (cbytes != NULL)
3667
198k
    *cbytes = header.cbytes;
3668
198k
  if (blocksize != NULL)
3669
21.7k
    *blocksize = header.blocksize;
3670
198k
  return rc;
3671
198k
}
3672
3673
11.8k
int blosc1_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
3674
11.8k
  int32_t header_cbytes;
3675
11.8k
  int32_t header_nbytes;
3676
11.8k
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) {
3677
    /* Compressed data should contain enough space for header */
3678
0
    *nbytes = 0;
3679
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3680
0
  }
3681
11.8k
  int rc = blosc2_cbuffer_sizes(cbuffer, &header_nbytes, &header_cbytes, NULL);
3682
11.8k
  if (rc < 0) {
3683
0
    *nbytes = 0;
3684
0
    return rc;
3685
0
  }
3686
11.8k
  *nbytes = header_nbytes;
3687
11.8k
  if (header_cbytes != (int32_t)cbytes) {
3688
    /* Compressed size from header does not match `cbytes` */
3689
0
    *nbytes = 0;
3690
0
    return BLOSC2_ERROR_INVALID_HEADER;
3691
0
  }
3692
11.8k
  if (*nbytes > BLOSC2_MAX_BUFFERSIZE) {
3693
    /* Uncompressed size is larger than allowed */
3694
42
    *nbytes = 0;
3695
42
    return BLOSC2_ERROR_MEMORY_ALLOC;
3696
42
  }
3697
11.8k
  return 0;
3698
11.8k
}
3699
3700
/* Return `typesize` and `flags` from a compressed buffer. */
3701
0
void blosc1_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) {
3702
0
  blosc_header header;
3703
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3704
0
  if (rc < 0) {
3705
0
    *typesize = *flags = 0;
3706
0
    return;
3707
0
  }
3708
3709
  /* Read the interesting values */
3710
0
  *flags = header.flags;
3711
0
  *typesize = header.typesize;
3712
0
}
3713
3714
3715
/* Return version information from a compressed buffer. */
3716
0
void blosc2_cbuffer_versions(const void* cbuffer, int* version, int* versionlz) {
3717
0
  blosc_header header;
3718
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3719
0
  if (rc < 0) {
3720
0
    *version = *versionlz = 0;
3721
0
    return;
3722
0
  }
3723
3724
  /* Read the version info */
3725
0
  *version = header.version;
3726
0
  *versionlz = header.versionlz;
3727
0
}
3728
3729
3730
/* Return the compressor library/format used in a compressed buffer. */
3731
0
const char* blosc2_cbuffer_complib(const void* cbuffer) {
3732
0
  blosc_header header;
3733
0
  int clibcode;
3734
0
  const char* complib;
3735
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3736
0
  if (rc < 0) {
3737
0
    return NULL;
3738
0
  }
3739
3740
  /* Read the compressor format/library info */
3741
0
  clibcode = (header.flags & 0xe0) >> 5;
3742
0
  complib = clibcode_to_clibname(clibcode);
3743
0
  return complib;
3744
0
}
3745
3746
3747
/* Get the internal blocksize to be used during compression.  0 means
3748
   that an automatic blocksize is computed internally. */
3749
int blosc1_get_blocksize(void)
3750
0
{
3751
0
  return (int)g_force_blocksize;
3752
0
}
3753
3754
3755
/* Force the use of a specific blocksize.  If 0, an automatic
3756
   blocksize will be used (the default). */
3757
3.70k
void blosc1_set_blocksize(size_t blocksize) {
3758
3.70k
  g_force_blocksize = (int32_t)blocksize;
3759
3.70k
}
3760
3761
3762
/* Force the use of a specific split mode. */
3763
void blosc1_set_splitmode(int mode)
3764
0
{
3765
0
  g_splitmode = mode;
3766
0
}
3767
3768
3769
/* Set pointer to super-chunk.  If NULL, no super-chunk will be
3770
   reachable (the default). */
3771
0
void blosc_set_schunk(blosc2_schunk* schunk) {
3772
0
  g_schunk = schunk;
3773
0
  g_global_context->schunk = schunk;
3774
0
}
3775
3776
blosc2_io *blosc2_io_global = NULL;
3777
blosc2_io_cb BLOSC2_IO_CB_DEFAULTS;
3778
3779
14.4k
void blosc2_init(void) {
3780
  /* Return if Blosc is already initialized */
3781
14.4k
  if (g_initlib) return;
3782
3783
14.4k
  BLOSC2_IO_CB_DEFAULTS.id = BLOSC2_IO_FILESYSTEM;
3784
14.4k
  BLOSC2_IO_CB_DEFAULTS.name = "filesystem";
3785
14.4k
  BLOSC2_IO_CB_DEFAULTS.open = (blosc2_open_cb) blosc2_stdio_open;
3786
14.4k
  BLOSC2_IO_CB_DEFAULTS.close = (blosc2_close_cb) blosc2_stdio_close;
3787
14.4k
  BLOSC2_IO_CB_DEFAULTS.tell = (blosc2_tell_cb) blosc2_stdio_tell;
3788
14.4k
  BLOSC2_IO_CB_DEFAULTS.seek = (blosc2_seek_cb) blosc2_stdio_seek;
3789
14.4k
  BLOSC2_IO_CB_DEFAULTS.write = (blosc2_write_cb) blosc2_stdio_write;
3790
14.4k
  BLOSC2_IO_CB_DEFAULTS.read = (blosc2_read_cb) blosc2_stdio_read;
3791
14.4k
  BLOSC2_IO_CB_DEFAULTS.truncate = (blosc2_truncate_cb) blosc2_stdio_truncate;
3792
3793
14.4k
  g_ncodecs = 0;
3794
14.4k
  g_nfilters = 0;
3795
14.4k
  g_ntuners = 0;
3796
3797
14.4k
#if defined(HAVE_PLUGINS)
3798
14.4k
  #include "blosc2/blosc2-common.h"
3799
14.4k
  #include "blosc2/blosc2-stdio.h"
3800
14.4k
  register_codecs();
3801
14.4k
  register_filters();
3802
14.4k
  register_tuners();
3803
14.4k
#endif
3804
14.4k
  pthread_mutex_init(&global_comp_mutex, NULL);
3805
  /* Create a global context */
3806
14.4k
  g_global_context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3807
14.4k
  memset(g_global_context, 0, sizeof(blosc2_context));
3808
14.4k
  g_global_context->nthreads = g_nthreads;
3809
14.4k
  g_global_context->new_nthreads = g_nthreads;
3810
14.4k
  g_initlib = 1;
3811
14.4k
}
3812
3813
3814
14.4k
int blosc2_free_resources(void) {
3815
  /* Return if Blosc is not initialized */
3816
14.4k
  if (!g_initlib) return BLOSC2_ERROR_FAILURE;
3817
3818
14.4k
  return release_threadpool(g_global_context);
3819
14.4k
}
3820
3821
3822
14.4k
void blosc2_destroy(void) {
3823
  /* Return if Blosc is not initialized */
3824
14.4k
  if (!g_initlib) return;
3825
3826
14.4k
  blosc2_free_resources();
3827
14.4k
  g_initlib = 0;
3828
14.4k
  blosc2_free_ctx(g_global_context);
3829
3830
14.4k
  pthread_mutex_destroy(&global_comp_mutex);
3831
3832
14.4k
}
3833
3834
3835
33.8k
int release_threadpool(blosc2_context *context) {
3836
33.8k
  int32_t t;
3837
33.8k
  void* status;
3838
33.8k
  int rc;
3839
3840
33.8k
  if (context->threads_started > 0) {
3841
0
    if (threads_callback) {
3842
      /* free context data for user-managed threads */
3843
0
      for (t=0; t<context->threads_started; t++)
3844
0
        destroy_thread_context(context->thread_contexts + t);
3845
0
      my_free(context->thread_contexts);
3846
0
    }
3847
0
    else {
3848
      /* Tell all existing threads to finish */
3849
0
      context->end_threads = 1;
3850
0
      WAIT_INIT(-1, context);
3851
3852
      /* Join exiting threads */
3853
0
      for (t = 0; t < context->threads_started; t++) {
3854
0
        rc = pthread_join(context->threads[t], &status);
3855
0
        if (rc) {
3856
0
          BLOSC_TRACE_ERROR("Return code from pthread_join() is %d\n"
3857
0
                            "\tError detail: %s.", rc, strerror(rc));
3858
0
        }
3859
0
      }
3860
3861
      /* Thread attributes */
3862
0
      #if !defined(_WIN32)
3863
0
        pthread_attr_destroy(&context->ct_attr);
3864
0
      #endif
3865
3866
      /* Release thread handlers */
3867
0
      my_free(context->threads);
3868
0
    }
3869
3870
    /* Release mutex and condition variable objects */
3871
0
    pthread_mutex_destroy(&context->count_mutex);
3872
0
    pthread_mutex_destroy(&context->delta_mutex);
3873
0
    pthread_mutex_destroy(&context->nchunk_mutex);
3874
0
    pthread_cond_destroy(&context->delta_cv);
3875
3876
    /* Barriers */
3877
0
  #ifdef BLOSC_POSIX_BARRIERS
3878
0
    pthread_barrier_destroy(&context->barr_init);
3879
0
    pthread_barrier_destroy(&context->barr_finish);
3880
  #else
3881
    pthread_mutex_destroy(&context->count_threads_mutex);
3882
    pthread_cond_destroy(&context->count_threads_cv);
3883
    context->count_threads = 0;      /* Reset threads counter */
3884
  #endif
3885
3886
    /* Reset flags and counters */
3887
0
    context->end_threads = 0;
3888
0
    context->threads_started = 0;
3889
0
  }
3890
3891
3892
33.8k
  return 0;
3893
33.8k
}
3894
3895
3896
/* Contexts */
3897
3898
/* Create a context for compression */
3899
2.41k
blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) {
3900
2.41k
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3901
2.41k
  BLOSC_ERROR_NULL(context, NULL);
3902
3903
  /* Populate the context, using zeros as default values */
3904
2.41k
  memset(context, 0, sizeof(blosc2_context));
3905
2.41k
  context->do_compress = 1;   /* meant for compression */
3906
2.41k
  context->use_dict = cparams.use_dict;
3907
2.41k
  if (cparams.instr_codec) {
3908
0
    context->blosc2_flags = BLOSC2_INSTR_CODEC;
3909
0
  }
3910
3911
16.9k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
3912
14.5k
    context->filters[i] = cparams.filters[i];
3913
14.5k
    context->filters_meta[i] = cparams.filters_meta[i];
3914
3915
14.5k
    if (context->filters[i] >= BLOSC_LAST_FILTER && context->filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
3916
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3917
0
                        context->filters[i]);
3918
0
      free(context);
3919
0
      return NULL;
3920
0
    }
3921
14.5k
    if (context->filters[i] > BLOSC_LAST_REGISTERED_FILTER && context->filters[i] <= BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP) {
3922
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3923
0
                        context->filters[i]);
3924
0
      free(context);
3925
0
      return NULL;
3926
0
    }
3927
14.5k
  }
3928
3929
2.41k
#if defined(HAVE_PLUGINS)
3930
2.41k
#include "blosc2/codecs-registry.h"
3931
2.41k
  if ((context->compcode >= BLOSC_CODEC_ZFP_FIXED_ACCURACY) && (context->compcode <= BLOSC_CODEC_ZFP_FIXED_RATE)) {
3932
0
    for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
3933
0
      if ((context->filters[i] == BLOSC_SHUFFLE) || (context->filters[i] == BLOSC_BITSHUFFLE)) {
3934
0
        BLOSC_TRACE_ERROR("ZFP cannot be run in presence of SHUFFLE / BITSHUFFLE");
3935
0
        return NULL;
3936
0
      }
3937
0
    }
3938
0
  }
3939
2.41k
#endif /* HAVE_PLUGINS */
3940
3941
  /* Check for a BLOSC_SHUFFLE environment variable */
3942
2.41k
  int doshuffle = -1;
3943
2.41k
  char* envvar = getenv("BLOSC_SHUFFLE");
3944
2.41k
  if (envvar != NULL) {
3945
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
3946
0
      doshuffle = BLOSC_NOSHUFFLE;
3947
0
    }
3948
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
3949
0
      doshuffle = BLOSC_SHUFFLE;
3950
0
    }
3951
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
3952
0
      doshuffle = BLOSC_BITSHUFFLE;
3953
0
    }
3954
0
    else {
3955
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
3956
0
    }
3957
0
  }
3958
  /* Check for a BLOSC_DELTA environment variable */
3959
0
  int dodelta = BLOSC_NOFILTER;
3960
2.41k
  envvar = getenv("BLOSC_DELTA");
3961
2.41k
  if (envvar != NULL) {
3962
0
    if (strcmp(envvar, "1") == 0) {
3963
0
      dodelta = BLOSC_DELTA;
3964
0
    } else if (strcmp(envvar, "0") == 0){
3965
0
      dodelta = BLOSC_NOFILTER;
3966
0
    }
3967
0
    else {
3968
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
3969
0
    }
3970
0
  }
3971
  /* Check for a BLOSC_TYPESIZE environment variable */
3972
0
  context->typesize = cparams.typesize;
3973
2.41k
  envvar = getenv("BLOSC_TYPESIZE");
3974
2.41k
  if (envvar != NULL) {
3975
0
    int32_t value;
3976
0
    value = (int32_t) strtol(envvar, NULL, 10);
3977
0
    if ((errno != EINVAL) && (value > 0)) {
3978
0
      context->typesize = value;
3979
0
    }
3980
0
    else {
3981
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
3982
0
    }
3983
0
  }
3984
0
  build_filters(doshuffle, dodelta, context->typesize, context->filters);
3985
3986
2.41k
  context->clevel = cparams.clevel;
3987
  /* Check for a BLOSC_CLEVEL environment variable */
3988
2.41k
  envvar = getenv("BLOSC_CLEVEL");
3989
2.41k
  if (envvar != NULL) {
3990
0
    int value;
3991
0
    value = (int)strtol(envvar, NULL, 10);
3992
0
    if ((errno != EINVAL) && (value >= 0)) {
3993
0
      context->clevel = value;
3994
0
    }
3995
0
    else {
3996
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
3997
0
    }
3998
0
  }
3999
4000
0
  context->compcode = cparams.compcode;
4001
  /* Check for a BLOSC_COMPRESSOR environment variable */
4002
2.41k
  envvar = getenv("BLOSC_COMPRESSOR");
4003
2.41k
  if (envvar != NULL) {
4004
0
    int codec = blosc2_compname_to_compcode(envvar);
4005
0
    if (codec >= BLOSC_LAST_CODEC) {
4006
0
      BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
4007
0
      return NULL;
4008
0
    }
4009
0
    context->compcode = codec;
4010
0
  }
4011
2.41k
  context->compcode_meta = cparams.compcode_meta;
4012
4013
2.41k
  context->blocksize = cparams.blocksize;
4014
  /* Check for a BLOSC_BLOCKSIZE environment variable */
4015
2.41k
  envvar = getenv("BLOSC_BLOCKSIZE");
4016
2.41k
  if (envvar != NULL) {
4017
0
    int32_t blocksize;
4018
0
    blocksize = (int32_t) strtol(envvar, NULL, 10);
4019
0
    if ((errno != EINVAL) && (blocksize > 0)) {
4020
0
      context->blocksize = blocksize;
4021
0
    }
4022
0
    else {
4023
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
4024
0
    }
4025
0
  }
4026
4027
0
  context->nthreads = cparams.nthreads;
4028
  /* Check for a BLOSC_NTHREADS environment variable */
4029
2.41k
  envvar = getenv("BLOSC_NTHREADS");
4030
2.41k
  if (envvar != NULL) {
4031
0
    int16_t nthreads = (int16_t) strtol(envvar, NULL, 10);
4032
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4033
0
      context->nthreads = nthreads;
4034
0
    }
4035
0
    else {
4036
0
      BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
4037
0
    }
4038
0
  }
4039
0
  context->new_nthreads = context->nthreads;
4040
4041
2.41k
  context->splitmode = cparams.splitmode;
4042
  /* Check for a BLOSC_SPLITMODE environment variable */
4043
2.41k
  envvar = getenv("BLOSC_SPLITMODE");
4044
2.41k
  if (envvar != NULL) {
4045
0
    int32_t splitmode = -1;
4046
0
    if (strcmp(envvar, "ALWAYS") == 0) {
4047
0
      splitmode = BLOSC_ALWAYS_SPLIT;
4048
0
    }
4049
0
    else if (strcmp(envvar, "NEVER") == 0) {
4050
0
      splitmode = BLOSC_NEVER_SPLIT;
4051
0
    }
4052
0
    else if (strcmp(envvar, "AUTO") == 0) {
4053
0
      splitmode = BLOSC_AUTO_SPLIT;
4054
0
    }
4055
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
4056
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
4057
0
    }
4058
0
    else {
4059
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
4060
0
    }
4061
0
    if (splitmode >= 0) {
4062
0
      context->splitmode = splitmode;
4063
0
    }
4064
0
  }
4065
4066
0
  context->threads_started = 0;
4067
2.41k
  context->schunk = cparams.schunk;
4068
4069
2.41k
  if (cparams.prefilter != NULL) {
4070
0
    context->prefilter = cparams.prefilter;
4071
0
    context->preparams = (blosc2_prefilter_params*)my_malloc(sizeof(blosc2_prefilter_params));
4072
0
    BLOSC_ERROR_NULL(context->preparams, NULL);
4073
0
    memcpy(context->preparams, cparams.preparams, sizeof(blosc2_prefilter_params));
4074
0
  }
4075
4076
2.41k
  if (cparams.tuner_id <= 0) {
4077
2.41k
    cparams.tuner_id = g_tuner;
4078
2.41k
  } else {
4079
0
    for (int i = 0; i < g_ntuners; ++i) {
4080
0
      if (g_tuners[i].id == cparams.tuner_id) {
4081
0
        if (g_tuners[i].init == NULL) {
4082
0
          if (fill_tuner(&g_tuners[i]) < 0) {
4083
0
            BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4084
0
            return NULL;
4085
0
          }
4086
0
        }
4087
0
        if (g_tuners[i].init(cparams.tuner_params, context, NULL) < 0) {
4088
0
          BLOSC_TRACE_ERROR("Error in user-defined tuner %d init function\n", cparams.tuner_id);
4089
0
          return NULL;
4090
0
        }
4091
0
        goto urtunersuccess;
4092
0
      }
4093
0
    }
4094
0
    BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", cparams.tuner_id);
4095
0
    return NULL;
4096
0
  }
4097
2.41k
  urtunersuccess:;
4098
4099
2.41k
  context->tuner_id = cparams.tuner_id;
4100
4101
2.41k
  context->codec_params = cparams.codec_params;
4102
2.41k
  memcpy(context->filter_params, cparams.filter_params, BLOSC2_MAX_FILTERS * sizeof(void*));
4103
4104
2.41k
  return context;
4105
2.41k
}
4106
4107
/* Create a context for decompression */
4108
2.41k
blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) {
4109
2.41k
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
4110
2.41k
  BLOSC_ERROR_NULL(context, NULL);
4111
4112
  /* Populate the context, using zeros as default values */
4113
2.41k
  memset(context, 0, sizeof(blosc2_context));
4114
2.41k
  context->do_compress = 0;   /* Meant for decompression */
4115
4116
2.41k
  context->nthreads = dparams.nthreads;
4117
2.41k
  char* envvar = getenv("BLOSC_NTHREADS");
4118
2.41k
  if (envvar != NULL) {
4119
0
    long nthreads = strtol(envvar, NULL, 10);
4120
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4121
0
      context->nthreads = (int16_t) nthreads;
4122
0
    }
4123
0
  }
4124
2.41k
  context->new_nthreads = context->nthreads;
4125
4126
2.41k
  context->threads_started = 0;
4127
2.41k
  context->block_maskout = NULL;
4128
2.41k
  context->block_maskout_nitems = 0;
4129
2.41k
  context->schunk = dparams.schunk;
4130
4131
2.41k
  if (dparams.postfilter != NULL) {
4132
0
    context->postfilter = dparams.postfilter;
4133
0
    context->postparams = (blosc2_postfilter_params*)my_malloc(sizeof(blosc2_postfilter_params));
4134
0
    BLOSC_ERROR_NULL(context->postparams, NULL);
4135
0
    memcpy(context->postparams, dparams.postparams, sizeof(blosc2_postfilter_params));
4136
0
  }
4137
4138
2.41k
  return context;
4139
2.41k
}
4140
4141
4142
19.3k
void blosc2_free_ctx(blosc2_context* context) {
4143
19.3k
  release_threadpool(context);
4144
19.3k
  if (context->serial_context != NULL) {
4145
16.0k
    free_thread_context(context->serial_context);
4146
16.0k
  }
4147
19.3k
  if (context->dict_cdict != NULL) {
4148
0
#ifdef HAVE_ZSTD
4149
0
    ZSTD_freeCDict(context->dict_cdict);
4150
0
#endif
4151
0
  }
4152
19.3k
  if (context->dict_ddict != NULL) {
4153
2.65k
#ifdef HAVE_ZSTD
4154
2.65k
    ZSTD_freeDDict(context->dict_ddict);
4155
2.65k
#endif
4156
2.65k
  }
4157
19.3k
  if (context->tuner_params != NULL) {
4158
0
    int rc;
4159
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
4160
0
      rc = blosc_stune_free(context);
4161
0
    } else {
4162
0
      for (int i = 0; i < g_ntuners; ++i) {
4163
0
        if (g_tuners[i].id == context->tuner_id) {
4164
0
          if (g_tuners[i].free == NULL) {
4165
0
            if (fill_tuner(&g_tuners[i]) < 0) {
4166
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4167
0
              return;
4168
0
            }
4169
0
          }
4170
0
          rc = g_tuners[i].free(context);
4171
0
          goto urtunersuccess;
4172
0
        }
4173
0
      }
4174
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
4175
0
      return;
4176
0
      urtunersuccess:;
4177
0
    }
4178
0
    if (rc < 0) {
4179
0
      BLOSC_TRACE_ERROR("Error in user-defined tuner free function\n");
4180
0
      return;
4181
0
    }
4182
0
  }
4183
19.3k
  if (context->prefilter != NULL) {
4184
0
    my_free(context->preparams);
4185
0
  }
4186
19.3k
  if (context->postfilter != NULL) {
4187
0
    my_free(context->postparams);
4188
0
  }
4189
4190
19.3k
  if (context->block_maskout != NULL) {
4191
0
    free(context->block_maskout);
4192
0
  }
4193
19.3k
  my_free(context);
4194
19.3k
}
4195
4196
4197
0
int blosc2_ctx_get_cparams(blosc2_context *ctx, blosc2_cparams *cparams) {
4198
0
  cparams->compcode = ctx->compcode;
4199
0
  cparams->compcode_meta = ctx->compcode_meta;
4200
0
  cparams->clevel = ctx->clevel;
4201
0
  cparams->use_dict = ctx->use_dict;
4202
0
  cparams->instr_codec = ctx->blosc2_flags & BLOSC2_INSTR_CODEC;
4203
0
  cparams->typesize = ctx->typesize;
4204
0
  cparams->nthreads = ctx->nthreads;
4205
0
  cparams->blocksize = ctx->blocksize;
4206
0
  cparams->splitmode = ctx->splitmode;
4207
0
  cparams->schunk = ctx->schunk;
4208
0
  for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
4209
0
    cparams->filters[i] = ctx->filters[i];
4210
0
    cparams->filters_meta[i] = ctx->filters_meta[i];
4211
0
  }
4212
0
  cparams->prefilter = ctx->prefilter;
4213
0
  cparams->preparams = ctx->preparams;
4214
0
  cparams->tuner_id = ctx->tuner_id;
4215
0
  cparams->codec_params = ctx->codec_params;
4216
4217
0
  return BLOSC2_ERROR_SUCCESS;
4218
0
}
4219
4220
4221
3.09k
int blosc2_ctx_get_dparams(blosc2_context *ctx, blosc2_dparams *dparams) {
4222
3.09k
  dparams->nthreads = ctx->nthreads;
4223
3.09k
  dparams->schunk = ctx->schunk;
4224
3.09k
  dparams->postfilter = ctx->postfilter;
4225
3.09k
  dparams->postparams = ctx->postparams;
4226
4227
3.09k
  return BLOSC2_ERROR_SUCCESS;
4228
3.09k
}
4229
4230
4231
/* Set a maskout in decompression context */
4232
0
int blosc2_set_maskout(blosc2_context *ctx, bool *maskout, int nblocks) {
4233
4234
0
  if (ctx->block_maskout != NULL) {
4235
    // Get rid of a possible mask here
4236
0
    free(ctx->block_maskout);
4237
0
  }
4238
4239
0
  bool *maskout_ = malloc(nblocks);
4240
0
  BLOSC_ERROR_NULL(maskout_, BLOSC2_ERROR_MEMORY_ALLOC);
4241
0
  memcpy(maskout_, maskout, nblocks);
4242
0
  ctx->block_maskout = maskout_;
4243
0
  ctx->block_maskout_nitems = nblocks;
4244
4245
0
  return 0;
4246
0
}
4247
4248
4249
/* Create a chunk made of zeros */
4250
5
int blosc2_chunk_zeros(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4251
5
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4252
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4253
0
    return BLOSC2_ERROR_DATA;
4254
0
  }
4255
4256
5
  if (nbytes % cparams.typesize) {
4257
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4258
0
    return BLOSC2_ERROR_DATA;
4259
0
  }
4260
4261
5
  blosc_header header;
4262
5
  blosc2_context* context = blosc2_create_cctx(cparams);
4263
5
  if (context == NULL) {
4264
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4265
0
    return BLOSC2_ERROR_NULL_POINTER;
4266
0
  }
4267
4268
5
  int error = initialize_context_compression(
4269
5
          context, NULL, nbytes, dest, destsize,
4270
5
          context->clevel, context->filters, context->filters_meta,
4271
5
          context->typesize, context->compcode, context->blocksize,
4272
5
          context->new_nthreads, context->nthreads, context->splitmode,
4273
5
          context->tuner_id, context->tuner_params, context->schunk);
4274
5
  if (error <= 0) {
4275
0
    blosc2_free_ctx(context);
4276
0
    return error;
4277
0
  }
4278
4279
5
  memset(&header, 0, sizeof(header));
4280
5
  header.version = BLOSC2_VERSION_FORMAT;
4281
5
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4282
5
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4283
5
  header.typesize = context->typesize;
4284
5
  header.nbytes = (int32_t)nbytes;
4285
5
  header.blocksize = context->blocksize;
4286
5
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4287
5
  header.blosc2_flags = BLOSC2_SPECIAL_ZERO << 4;  // mark chunk as all zeros
4288
5
  memcpy((uint8_t *)dest, &header, sizeof(header));
4289
4290
5
  blosc2_free_ctx(context);
4291
4292
5
  return BLOSC_EXTENDED_HEADER_LENGTH;
4293
5
}
4294
4295
4296
/* Create a chunk made of uninitialized values */
4297
0
int blosc2_chunk_uninit(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4298
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4299
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4300
0
    return BLOSC2_ERROR_DATA;
4301
0
  }
4302
4303
0
  if (nbytes % cparams.typesize) {
4304
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4305
0
    return BLOSC2_ERROR_DATA;
4306
0
  }
4307
4308
0
  blosc_header header;
4309
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4310
0
  if (context == NULL) {
4311
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4312
0
    return BLOSC2_ERROR_NULL_POINTER;
4313
0
  }
4314
0
  int error = initialize_context_compression(
4315
0
          context, NULL, nbytes, dest, destsize,
4316
0
          context->clevel, context->filters, context->filters_meta,
4317
0
          context->typesize, context->compcode, context->blocksize,
4318
0
          context->new_nthreads, context->nthreads, context->splitmode,
4319
0
          context->tuner_id, context->tuner_params, context->schunk);
4320
0
  if (error <= 0) {
4321
0
    blosc2_free_ctx(context);
4322
0
    return error;
4323
0
  }
4324
4325
0
  memset(&header, 0, sizeof(header));
4326
0
  header.version = BLOSC2_VERSION_FORMAT;
4327
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4328
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4329
0
  header.typesize = context->typesize;
4330
0
  header.nbytes = (int32_t)nbytes;
4331
0
  header.blocksize = context->blocksize;
4332
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4333
0
  header.blosc2_flags = BLOSC2_SPECIAL_UNINIT << 4;  // mark chunk as uninitialized
4334
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4335
4336
0
  blosc2_free_ctx(context);
4337
4338
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4339
0
}
4340
4341
4342
/* Create a chunk made of nans */
4343
0
int blosc2_chunk_nans(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4344
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4345
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4346
0
    return BLOSC2_ERROR_DATA;
4347
0
  }
4348
4349
0
  if (nbytes % cparams.typesize) {
4350
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4351
0
    return BLOSC2_ERROR_DATA;
4352
0
  }
4353
4354
0
  blosc_header header;
4355
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4356
0
  if (context == NULL) {
4357
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4358
0
    return BLOSC2_ERROR_NULL_POINTER;
4359
0
  }
4360
4361
0
  int error = initialize_context_compression(
4362
0
          context, NULL, nbytes, dest, destsize,
4363
0
          context->clevel, context->filters, context->filters_meta,
4364
0
          context->typesize, context->compcode, context->blocksize,
4365
0
          context->new_nthreads, context->nthreads, context->splitmode,
4366
0
          context->tuner_id, context->tuner_params, context->schunk);
4367
0
  if (error <= 0) {
4368
0
    blosc2_free_ctx(context);
4369
0
    return error;
4370
0
  }
4371
4372
0
  memset(&header, 0, sizeof(header));
4373
0
  header.version = BLOSC2_VERSION_FORMAT;
4374
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4375
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4376
0
  header.typesize = context->typesize;
4377
0
  header.nbytes = (int32_t)nbytes;
4378
0
  header.blocksize = context->blocksize;
4379
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4380
0
  header.blosc2_flags = BLOSC2_SPECIAL_NAN << 4;  // mark chunk as all NaNs
4381
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4382
4383
0
  blosc2_free_ctx(context);
4384
4385
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4386
0
}
4387
4388
4389
/* Create a chunk made of repeated values */
4390
int blosc2_chunk_repeatval(blosc2_cparams cparams, const int32_t nbytes,
4391
0
                           void* dest, int32_t destsize, const void* repeatval) {
4392
0
  uint8_t typesize = cparams.typesize;
4393
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH + typesize) {
4394
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4395
0
    return BLOSC2_ERROR_DATA;
4396
0
  }
4397
4398
0
  if (nbytes % cparams.typesize) {
4399
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4400
0
    return BLOSC2_ERROR_DATA;
4401
0
  }
4402
4403
0
  blosc_header header;
4404
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4405
0
  if (context == NULL) {
4406
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4407
0
    return BLOSC2_ERROR_NULL_POINTER;
4408
0
  }
4409
4410
0
  int error = initialize_context_compression(
4411
0
          context, NULL, nbytes, dest, destsize,
4412
0
          context->clevel, context->filters, context->filters_meta,
4413
0
          context->typesize, context->compcode, context->blocksize,
4414
0
          context->new_nthreads, context->nthreads, context->splitmode,
4415
0
          context->tuner_id, context->tuner_params, context->schunk);
4416
0
  if (error <= 0) {
4417
0
    blosc2_free_ctx(context);
4418
0
    return error;
4419
0
  }
4420
4421
0
  memset(&header, 0, sizeof(header));
4422
0
  header.version = BLOSC2_VERSION_FORMAT;
4423
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4424
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4425
0
  header.typesize = (uint8_t)typesize;
4426
0
  header.nbytes = (int32_t)nbytes;
4427
0
  header.blocksize = context->blocksize;
4428
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH + (int32_t)typesize;
4429
0
  header.blosc2_flags = BLOSC2_SPECIAL_VALUE << 4;  // mark chunk as all repeated value
4430
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4431
0
  memcpy((uint8_t *)dest + sizeof(header), repeatval, typesize);
4432
4433
0
  blosc2_free_ctx(context);
4434
4435
0
  return BLOSC_EXTENDED_HEADER_LENGTH + (uint8_t)typesize;
4436
0
}
4437
4438
4439
/* Register filters */
4440
4441
57.9k
int register_filter_private(blosc2_filter *filter) {
4442
57.9k
    BLOSC_ERROR_NULL(filter, BLOSC2_ERROR_INVALID_PARAM);
4443
57.9k
    if (g_nfilters == UINT8_MAX) {
4444
0
        BLOSC_TRACE_ERROR("Can not register more filters");
4445
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
4446
0
    }
4447
57.9k
    if (filter->id < BLOSC2_GLOBAL_REGISTERED_FILTERS_START) {
4448
0
        BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_FILTERS_START);
4449
0
        return BLOSC2_ERROR_FAILURE;
4450
0
    }
4451
    /* This condition can never be fulfilled
4452
    if (filter->id > BLOSC2_USER_REGISTERED_FILTERS_STOP) {
4453
        BLOSC_TRACE_ERROR("The id must be less than or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_STOP);
4454
        return BLOSC2_ERROR_FAILURE;
4455
    }
4456
    */
4457
4458
144k
    for (uint64_t i = 0; i < g_nfilters; ++i) {
4459
86.9k
      if (g_filters[i].id == filter->id) {
4460
0
        if (strcmp(g_filters[i].name, filter->name) != 0) {
4461
0
          BLOSC_TRACE_ERROR("The filter (ID: %d) plugin is already registered with name: %s."
4462
0
                            "  Choose another one !", filter->id, g_filters[i].name);
4463
0
          return BLOSC2_ERROR_FAILURE;
4464
0
        }
4465
0
        else {
4466
          // Already registered, so no more actions needed
4467
0
          return BLOSC2_ERROR_SUCCESS;
4468
0
        }
4469
0
      }
4470
86.9k
    }
4471
4472
57.9k
    blosc2_filter *filter_new = &g_filters[g_nfilters++];
4473
57.9k
    memcpy(filter_new, filter, sizeof(blosc2_filter));
4474
4475
57.9k
    return BLOSC2_ERROR_SUCCESS;
4476
57.9k
}
4477
4478
4479
0
int blosc2_register_filter(blosc2_filter *filter) {
4480
0
  if (filter->id < BLOSC2_USER_REGISTERED_FILTERS_START) {
4481
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_START);
4482
0
    return BLOSC2_ERROR_FAILURE;
4483
0
  }
4484
4485
0
  return register_filter_private(filter);
4486
0
}
4487
4488
4489
/* Register codecs */
4490
4491
72.4k
int register_codec_private(blosc2_codec *codec) {
4492
72.4k
    BLOSC_ERROR_NULL(codec, BLOSC2_ERROR_INVALID_PARAM);
4493
72.4k
    if (g_ncodecs == UINT8_MAX) {
4494
0
      BLOSC_TRACE_ERROR("Can not register more codecs");
4495
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
4496
0
    }
4497
72.4k
    if (codec->compcode < BLOSC2_GLOBAL_REGISTERED_CODECS_START) {
4498
0
      BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_CODECS_START);
4499
0
      return BLOSC2_ERROR_FAILURE;
4500
0
    }
4501
    /* This condition can never be fulfilled
4502
    if (codec->compcode > BLOSC2_USER_REGISTERED_CODECS_STOP) {
4503
      BLOSC_TRACE_ERROR("The id must be less or equal to %d", BLOSC2_USER_REGISTERED_CODECS_STOP);
4504
      return BLOSC2_ERROR_FAILURE;
4505
    }
4506
     */
4507
4508
217k
    for (int i = 0; i < g_ncodecs; ++i) {
4509
144k
      if (g_codecs[i].compcode == codec->compcode) {
4510
0
        if (strcmp(g_codecs[i].compname, codec->compname) != 0) {
4511
0
          BLOSC_TRACE_ERROR("The codec (ID: %d) plugin is already registered with name: %s."
4512
0
                            "  Choose another one !", codec->compcode, codec->compname);
4513
0
          return BLOSC2_ERROR_CODEC_PARAM;
4514
0
        }
4515
0
        else {
4516
          // Already registered, so no more actions needed
4517
0
          return BLOSC2_ERROR_SUCCESS;
4518
0
        }
4519
0
      }
4520
144k
    }
4521
4522
72.4k
    blosc2_codec *codec_new = &g_codecs[g_ncodecs++];
4523
72.4k
    memcpy(codec_new, codec, sizeof(blosc2_codec));
4524
4525
72.4k
    return BLOSC2_ERROR_SUCCESS;
4526
72.4k
}
4527
4528
4529
0
int blosc2_register_codec(blosc2_codec *codec) {
4530
0
  if (codec->compcode < BLOSC2_USER_REGISTERED_CODECS_START) {
4531
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_USER_REGISTERED_CODECS_START);
4532
0
    return BLOSC2_ERROR_CODEC_PARAM;
4533
0
  }
4534
4535
0
  return register_codec_private(codec);
4536
0
}
4537
4538
4539
/* Register tuners */
4540
4541
14.4k
int register_tuner_private(blosc2_tuner *tuner) {
4542
14.4k
  BLOSC_ERROR_NULL(tuner, BLOSC2_ERROR_INVALID_PARAM);
4543
14.4k
  if (g_ntuners == UINT8_MAX) {
4544
0
    BLOSC_TRACE_ERROR("Can not register more tuners");
4545
0
    return BLOSC2_ERROR_CODEC_SUPPORT;
4546
0
  }
4547
14.4k
  if (tuner->id < BLOSC2_GLOBAL_REGISTERED_TUNER_START) {
4548
0
    BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_TUNER_START);
4549
0
    return BLOSC2_ERROR_FAILURE;
4550
0
  }
4551
4552
14.4k
  for (int i = 0; i < g_ntuners; ++i) {
4553
0
    if (g_tuners[i].id == tuner->id) {
4554
0
      if (strcmp(g_tuners[i].name, tuner->name) != 0) {
4555
0
        BLOSC_TRACE_ERROR("The tuner (ID: %d) plugin is already registered with name: %s."
4556
0
                          "  Choose another one !", tuner->id, g_tuners[i].name);
4557
0
        return BLOSC2_ERROR_FAILURE;
4558
0
      }
4559
0
      else {
4560
        // Already registered, so no more actions needed
4561
0
        return BLOSC2_ERROR_SUCCESS;
4562
0
      }
4563
0
    }
4564
0
  }
4565
4566
14.4k
  blosc2_tuner *tuner_new = &g_tuners[g_ntuners++];
4567
14.4k
  memcpy(tuner_new, tuner, sizeof(blosc2_tuner));
4568
4569
14.4k
  return BLOSC2_ERROR_SUCCESS;
4570
14.4k
}
4571
4572
4573
0
int blosc2_register_tuner(blosc2_tuner *tuner) {
4574
0
  if (tuner->id < BLOSC2_USER_REGISTERED_TUNER_START) {
4575
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_TUNER_START);
4576
0
    return BLOSC2_ERROR_FAILURE;
4577
0
  }
4578
4579
0
  return register_tuner_private(tuner);
4580
0
}
4581
4582
4583
1
int _blosc2_register_io_cb(const blosc2_io_cb *io) {
4584
4585
1
  for (uint64_t i = 0; i < g_nio; ++i) {
4586
0
    if (g_ios[i].id == io->id) {
4587
0
      if (strcmp(g_ios[i].name, io->name) != 0) {
4588
0
        BLOSC_TRACE_ERROR("The IO (ID: %d) plugin is already registered with name: %s."
4589
0
                          "  Choose another one !", io->id, g_ios[i].name);
4590
0
        return BLOSC2_ERROR_PLUGIN_IO;
4591
0
      }
4592
0
      else {
4593
        // Already registered, so no more actions needed
4594
0
        return BLOSC2_ERROR_SUCCESS;
4595
0
      }
4596
0
    }
4597
0
  }
4598
4599
1
  blosc2_io_cb *io_new = &g_ios[g_nio++];
4600
1
  memcpy(io_new, io, sizeof(blosc2_io_cb));
4601
4602
1
  return BLOSC2_ERROR_SUCCESS;
4603
1
}
4604
4605
0
int blosc2_register_io_cb(const blosc2_io_cb *io) {
4606
0
  BLOSC_ERROR_NULL(io, BLOSC2_ERROR_INVALID_PARAM);
4607
0
  if (g_nio == UINT8_MAX) {
4608
0
    BLOSC_TRACE_ERROR("Can not register more codecs");
4609
0
    return BLOSC2_ERROR_PLUGIN_IO;
4610
0
  }
4611
4612
0
  if (io->id < BLOSC2_IO_REGISTERED) {
4613
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_IO_REGISTERED);
4614
0
    return BLOSC2_ERROR_PLUGIN_IO;
4615
0
  }
4616
4617
0
  return _blosc2_register_io_cb(io);
4618
0
}
4619
4620
53
blosc2_io_cb *blosc2_get_io_cb(uint8_t id) {
4621
53
  for (uint64_t i = 0; i < g_nio; ++i) {
4622
52
    if (g_ios[i].id == id) {
4623
52
      return &g_ios[i];
4624
52
    }
4625
52
  }
4626
1
  if (id == BLOSC2_IO_FILESYSTEM) {
4627
1
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS) < 0) {
4628
0
      BLOSC_TRACE_ERROR("Error registering the default IO API");
4629
0
      return NULL;
4630
0
    }
4631
1
    return blosc2_get_io_cb(id);
4632
1
  }
4633
0
  return NULL;
4634
1
}
4635
4636
0
void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) {
4637
0
  if (ndim == 0) {
4638
0
    return;
4639
0
  }
4640
0
  int64_t *strides = malloc(ndim * sizeof(int64_t));
4641
0
  strides[ndim - 1] = 1;
4642
0
  for (int j = ndim - 2; j >= 0; --j) {
4643
0
      strides[j] = shape[j + 1] * strides[j + 1];
4644
0
  }
4645
4646
0
  index[0] = i / strides[0];
4647
0
  for (int j = 1; j < ndim; ++j) {
4648
0
      index[j] = (i % strides[j - 1]) / strides[j];
4649
0
  }
4650
0
  free(strides);
4651
0
}
4652
4653
0
void blosc2_multidim_to_unidim(const int64_t *index, int8_t ndim, const int64_t *strides, int64_t *i) {
4654
0
  *i = 0;
4655
0
  for (int j = 0; j < ndim; ++j) {
4656
0
    *i += index[j] * strides[j];
4657
0
  }
4658
0
}