Coverage Report

Created: 2025-12-23 06:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/c-blosc2/blosc/blosc2.c
Line
Count
Source
1
/*********************************************************************
2
  Blosc - Blocked Shuffling and Compression Library
3
4
  Copyright (c) 2021  Blosc Development Team <blosc@blosc.org>
5
  https://blosc.org
6
  License: BSD 3-Clause (see LICENSE.txt)
7
8
  See LICENSE.txt for details about copyright and rights to use.
9
**********************************************************************/
10
11
12
#include "blosc2.h"
13
#include "blosc-private.h"
14
#include "../plugins/codecs/zfp/blosc2-zfp.h"
15
#include "frame.h"
16
#include "b2nd-private.h"
17
#include "schunk-private.h"
18
19
#if defined(USING_CMAKE)
20
  #include "config.h"
21
#endif /*  USING_CMAKE */
22
#include "context.h"
23
24
#include "shuffle.h"
25
#include "delta.h"
26
#include "trunc-prec.h"
27
#include "blosclz.h"
28
#include "stune.h"
29
#include "blosc2/codecs-registry.h"
30
#include "blosc2/filters-registry.h"
31
#include "blosc2/tuners-registry.h"
32
33
#include "lz4.h"
34
#include "lz4hc.h"
35
#ifdef HAVE_IPP
36
  #include <ipps.h>
37
  #include <ippdc.h>
38
#endif
39
#if defined(HAVE_ZLIB_NG)
40
#ifdef ZLIB_COMPAT
41
  #include "zlib.h"
42
#else
43
  #include "zlib-ng.h"
44
#endif
45
#elif defined(HAVE_ZLIB)
46
  #include "zlib.h"
47
#endif /*  HAVE_MINIZ */
48
#if defined(HAVE_ZSTD)
49
  #include "zstd.h"
50
  #include "zstd_errors.h"
51
  // #include "cover.h"  // for experimenting with fast cover training for building dicts
52
  #include "zdict.h"
53
#endif /*  HAVE_ZSTD */
54
55
#if defined(_WIN32) && !defined(__MINGW32__)
56
  #include <windows.h>
57
  #include <malloc.h>
58
  #include <process.h>
59
  #define getpid _getpid
60
#endif  /* _WIN32 */
61
62
#include "threading.h"
63
64
#include <stdio.h>
65
#include <stdlib.h>
66
#include <errno.h>
67
#include <string.h>
68
#include <sys/types.h>
69
#include <assert.h>
70
#include <math.h>
71
72
73
/* Synchronization variables */
74
75
/* Global context for non-contextual API */
76
static blosc2_context* g_global_context;
77
static blosc2_pthread_mutex_t global_comp_mutex;
78
static int g_compressor = BLOSC_BLOSCLZ;
79
static int g_delta = 0;
80
/* The default splitmode */
81
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
82
/* the compressor to use by default */
83
static int16_t g_nthreads = 1;
84
static int32_t g_force_blocksize = 0;
85
static int g_initlib = 0;
86
static blosc2_schunk* g_schunk = NULL;   /* the pointer to super-chunk */
87
88
blosc2_codec g_codecs[256] = {0};
89
uint8_t g_ncodecs = 0;
90
91
static blosc2_filter g_filters[256] = {0};
92
static uint64_t g_nfilters = 0;
93
94
static blosc2_io_cb g_ios[256] = {0};
95
static uint64_t g_nio = 0;
96
97
blosc2_tuner g_tuners[256] = {0};
98
int g_ntuners = 0;
99
100
static int g_tuner = BLOSC_STUNE;
101
102
// Forward declarations
103
int init_threadpool(blosc2_context *context);
104
int release_threadpool(blosc2_context *context);
105
106
/* Macros for synchronization */
107
108
/* Wait until all threads are initialized */
109
#ifdef BLOSC_POSIX_BARRIERS
110
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
111
0
  do {                                                                 \
112
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_init);              \
113
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
114
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (init): %d", rc);   \
115
0
      return((RET_VAL));                                               \
116
0
    }                                                                  \
117
0
  } while (0)
118
#else
119
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
120
  do {                                                                 \
121
    blosc2_pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
122
    if ((CONTEXT_PTR)->count_threads < (CONTEXT_PTR)->nthreads) {      \
123
      (CONTEXT_PTR)->count_threads++;                                  \
124
      blosc2_pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
125
                        &(CONTEXT_PTR)->count_threads_mutex);          \
126
    }                                                                  \
127
    else {                                                             \
128
      blosc2_pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
129
    }                                                                  \
130
    blosc2_pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
131
  } while (0)
132
#endif
133
134
/* Wait for all threads to finish */
135
#ifdef BLOSC_POSIX_BARRIERS
136
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
137
0
  do {                                                                 \
138
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_finish);            \
139
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
140
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (finish)");         \
141
0
      return((RET_VAL));                                               \
142
0
    }                                                                  \
143
0
  } while (0)
144
#else
145
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
146
  do {                                                                 \
147
    blosc2_pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
148
    if ((CONTEXT_PTR)->count_threads > 0) {                            \
149
      (CONTEXT_PTR)->count_threads--;                                  \
150
      blosc2_pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
151
                        &(CONTEXT_PTR)->count_threads_mutex);          \
152
    }                                                                  \
153
    else {                                                             \
154
      blosc2_pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
155
    }                                                                  \
156
    blosc2_pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
157
  } while (0)
158
#endif
159
160
161
/* global variable to change threading backend from Blosc-managed to caller-managed */
162
static blosc_threads_callback threads_callback = 0;
163
static void *threads_callback_data = 0;
164
165
166
/* non-threadsafe function should be called before any other Blosc function in
167
   order to change how threads are managed */
168
void blosc2_set_threads_callback(blosc_threads_callback callback, void *callback_data)
169
0
{
170
0
  threads_callback = callback;
171
0
  threads_callback_data = callback_data;
172
0
}
173
174
175
/* A function for aligned malloc that is portable */
176
52.7k
static uint8_t* my_malloc(size_t size) {
177
52.7k
  void* block = NULL;
178
52.7k
  int res = 0;
179
180
/* Do an alignment to 32 bytes because AVX2 is supported */
181
#if defined(_WIN32)
182
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
183
  block = (void *)_aligned_malloc(size, 32);
184
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
185
  /* Platform does have an implementation of posix_memalign */
186
52.7k
  res = posix_memalign(&block, 32, size);
187
#else
188
  block = malloc(size);
189
#endif  /* _WIN32 */
190
191
52.7k
  if (block == NULL || res != 0) {
192
0
    BLOSC_TRACE_ERROR("Error allocating memory!");
193
0
    return NULL;
194
0
  }
195
196
52.7k
  return (uint8_t*)block;
197
52.7k
}
198
199
200
/* Release memory booked by my_malloc */
201
52.7k
static void my_free(void* block) {
202
#if defined(_WIN32)
203
  _aligned_free(block);
204
#else
205
52.7k
  free(block);
206
52.7k
#endif  /* _WIN32 */
207
52.7k
}
208
209
210
/*
211
 * Conversion routines between compressor and compression libraries
212
 */
213
214
/* Return the library code associated with the compressor name */
215
0
static int compname_to_clibcode(const char* compname) {
216
0
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
217
0
    return BLOSC_BLOSCLZ_LIB;
218
0
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
219
0
    return BLOSC_LZ4_LIB;
220
0
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
221
0
    return BLOSC_LZ4_LIB;
222
0
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
223
0
    return BLOSC_ZLIB_LIB;
224
0
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
225
0
    return BLOSC_ZSTD_LIB;
226
0
  for (int i = 0; i < g_ncodecs; ++i) {
227
0
    if (strcmp(compname, g_codecs[i].compname) == 0)
228
0
      return g_codecs[i].complib;
229
0
  }
230
0
  return BLOSC2_ERROR_NOT_FOUND;
231
0
}
232
233
/* Return the library name associated with the compressor code */
234
14
static const char* clibcode_to_clibname(int clibcode) {
235
14
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
236
14
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
237
14
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
238
14
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
239
98
  for (int i = 0; i < g_ncodecs; ++i) {
240
84
    if (clibcode == g_codecs[i].complib)
241
0
      return g_codecs[i].compname;
242
84
  }
243
14
  return NULL;                  /* should never happen */
244
14
}
245
246
247
/*
248
 * Conversion routines between compressor names and compressor codes
249
 */
250
251
/* Get the compressor name associated with the compressor code */
252
0
int blosc2_compcode_to_compname(int compcode, const char** compname) {
253
0
  int code = -1;    /* -1 means non-existent compressor code */
254
0
  const char* name = NULL;
255
256
  /* Map the compressor code */
257
0
  if (compcode == BLOSC_BLOSCLZ)
258
0
    name = BLOSC_BLOSCLZ_COMPNAME;
259
0
  else if (compcode == BLOSC_LZ4)
260
0
    name = BLOSC_LZ4_COMPNAME;
261
0
  else if (compcode == BLOSC_LZ4HC)
262
0
    name = BLOSC_LZ4HC_COMPNAME;
263
0
  else if (compcode == BLOSC_ZLIB)
264
0
    name = BLOSC_ZLIB_COMPNAME;
265
0
  else if (compcode == BLOSC_ZSTD)
266
0
    name = BLOSC_ZSTD_COMPNAME;
267
0
  else {
268
0
    for (int i = 0; i < g_ncodecs; ++i) {
269
0
      if (compcode == g_codecs[i].compcode) {
270
0
        name = g_codecs[i].compname;
271
0
        break;
272
0
      }
273
0
    }
274
0
  }
275
276
0
  *compname = name;
277
278
  /* Guess if there is support for this code */
279
0
  if (compcode == BLOSC_BLOSCLZ)
280
0
    code = BLOSC_BLOSCLZ;
281
0
  else if (compcode == BLOSC_LZ4)
282
0
    code = BLOSC_LZ4;
283
0
  else if (compcode == BLOSC_LZ4HC)
284
0
    code = BLOSC_LZ4HC;
285
0
#if defined(HAVE_ZLIB)
286
0
  else if (compcode == BLOSC_ZLIB)
287
0
    code = BLOSC_ZLIB;
288
0
#endif /* HAVE_ZLIB */
289
0
#if defined(HAVE_ZSTD)
290
0
  else if (compcode == BLOSC_ZSTD)
291
0
    code = BLOSC_ZSTD;
292
0
#endif /* HAVE_ZSTD */
293
0
  else if (compcode >= BLOSC_LAST_CODEC)
294
0
    code = compcode;
295
0
  return code;
296
0
}
297
298
/* Get the compressor code for the compressor name. -1 if it is not available */
299
15.1k
int blosc2_compname_to_compcode(const char* compname) {
300
15.1k
  int code = -1;  /* -1 means non-existent compressor code */
301
302
15.1k
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
303
2.76k
    code = BLOSC_BLOSCLZ;
304
2.76k
  }
305
12.3k
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
306
507
    code = BLOSC_LZ4;
307
507
  }
308
11.8k
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
309
2.86k
    code = BLOSC_LZ4HC;
310
2.86k
  }
311
8.96k
#if defined(HAVE_ZLIB)
312
8.96k
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
313
2.19k
    code = BLOSC_ZLIB;
314
2.19k
  }
315
6.76k
#endif /*  HAVE_ZLIB */
316
6.76k
#if defined(HAVE_ZSTD)
317
6.76k
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
318
6.76k
    code = BLOSC_ZSTD;
319
6.76k
  }
320
0
#endif /*  HAVE_ZSTD */
321
0
  else{
322
0
    for (int i = 0; i < g_ncodecs; ++i) {
323
0
      if (strcmp(compname, g_codecs[i].compname) == 0) {
324
0
        code = g_codecs[i].compcode;
325
0
        break;
326
0
      }
327
0
    }
328
0
  }
329
15.1k
  return code;
330
15.1k
}
331
332
333
/* Convert compressor code to blosc compressor format code */
334
51.0k
static int compcode_to_compformat(int compcode) {
335
51.0k
  switch (compcode) {
336
38.7k
    case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_FORMAT;
337
504
    case BLOSC_LZ4:     return BLOSC_LZ4_FORMAT;
338
2.86k
    case BLOSC_LZ4HC:   return BLOSC_LZ4HC_FORMAT;
339
340
0
#if defined(HAVE_ZLIB)
341
2.19k
    case BLOSC_ZLIB:    return BLOSC_ZLIB_FORMAT;
342
0
#endif /*  HAVE_ZLIB */
343
344
0
#if defined(HAVE_ZSTD)
345
6.75k
    case BLOSC_ZSTD:    return BLOSC_ZSTD_FORMAT;
346
0
      break;
347
0
#endif /*  HAVE_ZSTD */
348
0
    default:
349
0
      return BLOSC_UDCODEC_FORMAT;
350
51.0k
  }
351
0
  BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
352
0
}
353
354
355
/* Convert compressor code to blosc compressor format version */
356
51.5k
static int compcode_to_compversion(int compcode) {
357
  /* Write compressor format */
358
51.5k
  switch (compcode) {
359
39.2k
    case BLOSC_BLOSCLZ:
360
39.2k
      return BLOSC_BLOSCLZ_VERSION_FORMAT;
361
505
    case BLOSC_LZ4:
362
505
      return BLOSC_LZ4_VERSION_FORMAT;
363
2.86k
    case BLOSC_LZ4HC:
364
2.86k
      return BLOSC_LZ4HC_VERSION_FORMAT;
365
366
0
#if defined(HAVE_ZLIB)
367
2.19k
    case BLOSC_ZLIB:
368
2.19k
      return BLOSC_ZLIB_VERSION_FORMAT;
369
0
      break;
370
0
#endif /*  HAVE_ZLIB */
371
372
0
#if defined(HAVE_ZSTD)
373
6.75k
    case BLOSC_ZSTD:
374
6.75k
      return BLOSC_ZSTD_VERSION_FORMAT;
375
0
      break;
376
0
#endif /*  HAVE_ZSTD */
377
0
    default:
378
0
      for (int i = 0; i < g_ncodecs; ++i) {
379
0
        if (compcode == g_codecs[i].compcode) {
380
0
          return g_codecs[i].version;
381
0
        }
382
0
      }
383
51.5k
  }
384
0
  return BLOSC2_ERROR_FAILURE;
385
51.5k
}
386
387
388
static int lz4_wrap_compress(const char* input, size_t input_length,
389
6.56k
                             char* output, size_t maxout, int accel, void* hash_table) {
390
6.56k
  BLOSC_UNUSED_PARAM(accel);
391
6.56k
  int cbytes;
392
#ifdef HAVE_IPP
393
  if (hash_table == NULL) {
394
    return BLOSC2_ERROR_INVALID_PARAM;  // the hash table should always be initialized
395
  }
396
  int outlen = (int)maxout;
397
  int inlen = (int)input_length;
398
  // I have not found any function that uses `accel` like in `LZ4_compress_fast`, but
399
  // the IPP LZ4Safe call does a pretty good job on compressing well, so let's use it
400
  IppStatus status = ippsEncodeLZ4Safe_8u((const Ipp8u*)input, &inlen,
401
                                           (Ipp8u*)output, &outlen, (Ipp8u*)hash_table);
402
  if (status == ippStsDstSizeLessExpected) {
403
    return 0;  // we cannot compress in required outlen
404
  }
405
  else if (status != ippStsNoErr) {
406
    return BLOSC2_ERROR_FAILURE;  // an unexpected error happened
407
  }
408
  cbytes = outlen;
409
#else
410
6.56k
  BLOSC_UNUSED_PARAM(hash_table);
411
6.56k
  accel = 1;  // deactivate acceleration to match IPP behaviour
412
6.56k
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, accel);
413
6.56k
#endif
414
6.56k
  return cbytes;
415
6.56k
}
416
417
418
static int lz4hc_wrap_compress(const char* input, size_t input_length,
419
24.2k
                               char* output, size_t maxout, int clevel) {
420
24.2k
  int cbytes;
421
24.2k
  if (input_length > (size_t)(UINT32_C(2) << 30))
422
0
    return BLOSC2_ERROR_2GB_LIMIT;
423
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
424
   * but levels larger than 9 do not buy much compression. */
425
24.2k
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
426
24.2k
                           clevel);
427
24.2k
  return cbytes;
428
24.2k
}
429
430
431
static int lz4_wrap_decompress(const char* input, size_t compressed_length,
432
2.84k
                               char* output, size_t maxout) {
433
2.84k
  int nbytes;
434
#ifdef HAVE_IPP
435
  int outlen = (int)maxout;
436
  int inlen = (int)compressed_length;
437
  IppStatus status;
438
  status = ippsDecodeLZ4_8u((const Ipp8u*)input, inlen, (Ipp8u*)output, &outlen);
439
  //status = ippsDecodeLZ4Dict_8u((const Ipp8u*)input, &inlen, (Ipp8u*)output, 0, &outlen, NULL, 1 << 16);
440
  nbytes = (status == ippStsNoErr) ? outlen : -outlen;
441
#else
442
2.84k
  nbytes = LZ4_decompress_safe(input, output, (int)compressed_length, (int)maxout);
443
2.84k
#endif
444
2.84k
  if (nbytes != (int)maxout) {
445
555
    return 0;
446
555
  }
447
2.29k
  return (int)maxout;
448
2.84k
}
449
450
#if defined(HAVE_ZLIB)
451
/* zlib is not very respectful with sharing name space with others.
452
 Fortunately, its names do not collide with those already in blosc. */
453
static int zlib_wrap_compress(const char* input, size_t input_length,
454
30.0k
                              char* output, size_t maxout, int clevel) {
455
30.0k
  int status;
456
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
457
  size_t cl = maxout;
458
  status = zng_compress2(
459
      (uint8_t*)output, &cl, (uint8_t*)input, input_length, clevel);
460
#else
461
30.0k
  uLongf cl = (uLongf)maxout;
462
30.0k
  status = compress2(
463
30.0k
      (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
464
30.0k
#endif
465
30.0k
  if (status != Z_OK) {
466
6.49k
    return 0;
467
6.49k
  }
468
23.5k
  return (int)cl;
469
30.0k
}
470
471
static int zlib_wrap_decompress(const char* input, size_t compressed_length,
472
5.46k
                                char* output, size_t maxout) {
473
5.46k
  int status;
474
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
475
  size_t ul = maxout;
476
  status = zng_uncompress(
477
      (uint8_t*)output, &ul, (uint8_t*)input, compressed_length);
478
#else
479
5.46k
  uLongf ul = (uLongf)maxout;
480
5.46k
  status = uncompress(
481
5.46k
      (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
482
5.46k
#endif
483
5.46k
  if (status != Z_OK) {
484
1.45k
    return 0;
485
1.45k
  }
486
4.01k
  return (int)ul;
487
5.46k
}
488
#endif /*  HAVE_ZLIB */
489
490
491
#if defined(HAVE_ZSTD)
492
static int zstd_wrap_compress(struct thread_context* thread_context,
493
                              const char* input, size_t input_length,
494
122k
                              char* output, size_t maxout, int clevel) {
495
122k
  size_t code;
496
122k
  blosc2_context* context = thread_context->parent_context;
497
498
122k
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
499
  /* Make the level 8 close enough to maxCLevel */
500
122k
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
501
502
122k
  if (thread_context->zstd_cctx == NULL) {
503
908
    thread_context->zstd_cctx = ZSTD_createCCtx();
504
908
  }
505
506
122k
  if (context->use_dict) {
507
0
    assert(context->dict_cdict != NULL);
508
0
    code = ZSTD_compress_usingCDict(
509
0
            thread_context->zstd_cctx, (void*)output, maxout, (void*)input,
510
0
            input_length, context->dict_cdict);
511
122k
  } else {
512
122k
    code = ZSTD_compressCCtx(thread_context->zstd_cctx,
513
122k
        (void*)output, maxout, (void*)input, input_length, clevel);
514
122k
  }
515
122k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
516
    // Blosc will just memcpy this buffer
517
22.8k
    return 0;
518
22.8k
  }
519
99.5k
  return (int)code;
520
122k
}
521
522
static int zstd_wrap_decompress(struct thread_context* thread_context,
523
                                const char* input, size_t compressed_length,
524
10.9k
                                char* output, size_t maxout) {
525
10.9k
  size_t code;
526
10.9k
  blosc2_context* context = thread_context->parent_context;
527
528
10.9k
  if (thread_context->zstd_dctx == NULL) {
529
5.24k
    thread_context->zstd_dctx = ZSTD_createDCtx();
530
5.24k
  }
531
532
10.9k
  if (context->use_dict) {
533
2.31k
    assert(context->dict_ddict != NULL);
534
2.31k
    code = ZSTD_decompress_usingDDict(
535
2.31k
            thread_context->zstd_dctx, (void*)output, maxout, (void*)input,
536
2.31k
            compressed_length, context->dict_ddict);
537
8.64k
  } else {
538
8.64k
    code = ZSTD_decompressDCtx(thread_context->zstd_dctx,
539
8.64k
        (void*)output, maxout, (void*)input, compressed_length);
540
8.64k
  }
541
10.9k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
542
5.02k
    BLOSC_TRACE_ERROR("Error in ZSTD decompression: '%s'.  Giving up.",
543
5.02k
                      ZDICT_getErrorName(code));
544
5.02k
    return 0;
545
5.02k
  }
546
5.92k
  return (int)code;
547
10.9k
}
548
#endif /*  HAVE_ZSTD */
549
550
/* Compute acceleration for blosclz */
551
231k
static int get_accel(const blosc2_context* context) {
552
231k
  int clevel = context->clevel;
553
554
231k
  if (context->compcode == BLOSC_LZ4) {
555
    /* This acceleration setting based on discussions held in:
556
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
557
     */
558
6.86k
    return (10 - clevel);
559
6.86k
  }
560
225k
  return 1;
561
231k
}
562
563
564
1.83M
int do_nothing(uint8_t filter, char cmode) {
565
1.83M
  if (cmode == 'c') {
566
1.39M
    return (filter == BLOSC_NOFILTER);
567
1.39M
  } else {
568
    // TRUNC_PREC do not have to be applied during decompression
569
446k
    return ((filter == BLOSC_NOFILTER) || (filter == BLOSC_TRUNC_PREC));
570
446k
  }
571
1.83M
}
572
573
574
77.4k
int next_filter(const uint8_t* filters, int current_filter, char cmode) {
575
113k
  for (int i = current_filter - 1; i >= 0; i--) {
576
113k
    if (!do_nothing(filters[i], cmode)) {
577
77.4k
      return filters[i];
578
77.4k
    }
579
113k
  }
580
0
  return BLOSC_NOFILTER;
581
77.4k
}
582
583
584
287k
int last_filter(const uint8_t* filters, char cmode) {
585
287k
  int last_index = -1;
586
2.01M
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
587
1.72M
    if (!do_nothing(filters[i], cmode))  {
588
214k
      last_index = i;
589
214k
    }
590
1.72M
  }
591
287k
  return last_index;
592
287k
}
593
594
595
/* Convert filter pipeline to filter flags */
596
73.5k
static uint8_t filters_to_flags(const uint8_t* filters) {
597
73.5k
  uint8_t flags = 0;
598
599
515k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
600
441k
    switch (filters[i]) {
601
20.7k
      case BLOSC_SHUFFLE:
602
20.7k
        flags |= BLOSC_DOSHUFFLE;
603
20.7k
        break;
604
20.4k
      case BLOSC_BITSHUFFLE:
605
20.4k
        flags |= BLOSC_DOBITSHUFFLE;
606
20.4k
        break;
607
2.87k
      case BLOSC_DELTA:
608
2.87k
        flags |= BLOSC_DODELTA;
609
2.87k
        break;
610
397k
      default :
611
397k
        break;
612
441k
    }
613
441k
  }
614
73.5k
  return flags;
615
73.5k
}
616
617
618
/* Convert filter flags to filter pipeline */
619
293k
static void flags_to_filters(const uint8_t flags, uint8_t* filters) {
620
  /* Initialize the filter pipeline */
621
293k
  memset(filters, 0, BLOSC2_MAX_FILTERS);
622
  /* Fill the filter pipeline */
623
293k
  if (flags & BLOSC_DOSHUFFLE)
624
273k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
625
293k
  if (flags & BLOSC_DOBITSHUFFLE)
626
271k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
627
293k
  if (flags & BLOSC_DODELTA)
628
28.3k
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
629
293k
}
630
631
632
/* Get filter flags from header flags */
633
static uint8_t get_filter_flags(const uint8_t header_flags,
634
5.58k
                                const int32_t typesize) {
635
5.58k
  uint8_t flags = 0;
636
637
5.58k
  if ((header_flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
638
1.01k
    flags |= BLOSC_DOSHUFFLE;
639
1.01k
  }
640
5.58k
  if (header_flags & BLOSC_DOBITSHUFFLE) {
641
1.42k
    flags |= BLOSC_DOBITSHUFFLE;
642
1.42k
  }
643
5.58k
  if (header_flags & BLOSC_DODELTA) {
644
1.98k
    flags |= BLOSC_DODELTA;
645
1.98k
  }
646
5.58k
  if (header_flags & BLOSC_MEMCPYED) {
647
37
    flags |= BLOSC_MEMCPYED;
648
37
  }
649
5.58k
  return flags;
650
5.58k
}
651
652
typedef struct blosc_header_s {
653
  uint8_t version;
654
  uint8_t versionlz;
655
  uint8_t flags;
656
  uint8_t typesize;
657
  int32_t nbytes;
658
  int32_t blocksize;
659
  int32_t cbytes;
660
  // Extended Blosc2 header
661
  uint8_t filters[BLOSC2_MAX_FILTERS];
662
  uint8_t udcompcode;
663
  uint8_t compcode_meta;
664
  uint8_t filters_meta[BLOSC2_MAX_FILTERS];
665
  uint8_t reserved2;
666
  uint8_t blosc2_flags;
667
} blosc_header;
668
669
670
int read_chunk_header(const uint8_t* src, int32_t srcsize, bool extended_header, blosc_header* header)
671
316k
{
672
316k
  memset(header, 0, sizeof(blosc_header));
673
674
316k
  if (srcsize < BLOSC_MIN_HEADER_LENGTH) {
675
0
    BLOSC_TRACE_ERROR("Not enough space to read Blosc header.");
676
0
    return BLOSC2_ERROR_READ_BUFFER;
677
0
  }
678
679
316k
  memcpy(header, src, BLOSC_MIN_HEADER_LENGTH);
680
681
316k
  bool little_endian = is_little_endian();
682
683
316k
  if (!little_endian) {
684
0
    header->nbytes = bswap32_(header->nbytes);
685
0
    header->blocksize = bswap32_(header->blocksize);
686
0
    header->cbytes = bswap32_(header->cbytes);
687
0
  }
688
689
316k
  if (header->version > BLOSC2_VERSION_FORMAT) {
690
    /* Version from future */
691
5
    return BLOSC2_ERROR_VERSION_SUPPORT;
692
5
  }
693
316k
  if (header->cbytes < BLOSC_MIN_HEADER_LENGTH) {
694
32
    BLOSC_TRACE_ERROR("`cbytes` is too small to read min header.");
695
32
    return BLOSC2_ERROR_INVALID_HEADER;
696
32
  }
697
316k
  if (header->blocksize <= 0 || (header->nbytes > 0 && (header->blocksize > header->nbytes))) {
698
72
    BLOSC_TRACE_ERROR("`blocksize` is zero or greater than uncompressed size");
699
72
    return BLOSC2_ERROR_INVALID_HEADER;
700
72
  }
701
316k
  if (header->blocksize > BLOSC2_MAXBLOCKSIZE) {
702
5
    BLOSC_TRACE_ERROR("`blocksize` greater than maximum allowed");
703
5
    return BLOSC2_ERROR_INVALID_HEADER;
704
5
  }
705
316k
  if (header->typesize == 0) {
706
10
    BLOSC_TRACE_ERROR("`typesize` is zero.");
707
10
    return BLOSC2_ERROR_INVALID_HEADER;
708
10
  }
709
710
  /* Read extended header if it is wanted */
711
316k
  if ((extended_header) && (header->flags & BLOSC_DOSHUFFLE) && (header->flags & BLOSC_DOBITSHUFFLE)) {
712
28.6k
    if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH) {
713
16
      BLOSC_TRACE_ERROR("`cbytes` is too small to read extended header.");
714
16
      return BLOSC2_ERROR_INVALID_HEADER;
715
16
    }
716
28.6k
    if (srcsize < BLOSC_EXTENDED_HEADER_LENGTH) {
717
0
      BLOSC_TRACE_ERROR("Not enough space to read Blosc extended header.");
718
0
      return BLOSC2_ERROR_READ_BUFFER;
719
0
    }
720
721
28.6k
    memcpy((uint8_t *)header + BLOSC_MIN_HEADER_LENGTH, src + BLOSC_MIN_HEADER_LENGTH,
722
28.6k
      BLOSC_EXTENDED_HEADER_LENGTH - BLOSC_MIN_HEADER_LENGTH);
723
724
28.6k
    int32_t special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
725
28.6k
    if (special_type != 0) {
726
1.18k
      if (special_type == BLOSC2_SPECIAL_VALUE) {
727
        // In this case, the actual type size must be derived from the cbytes
728
142
        int32_t typesize = header->cbytes - BLOSC_EXTENDED_HEADER_LENGTH;
729
142
        if (typesize <= 0) {
730
1
          BLOSC_TRACE_ERROR("`typesize` is zero or negative");
731
1
          return BLOSC2_ERROR_INVALID_HEADER;
732
1
        }
733
141
        if (typesize > BLOSC2_MAXTYPESIZE) {
734
0
          BLOSC_TRACE_ERROR("`typesize` is greater than maximum allowed");
735
0
          return BLOSC2_ERROR_INVALID_HEADER;
736
0
        }
737
141
        if (typesize > header->nbytes) {
738
14
          BLOSC_TRACE_ERROR("`typesize` is greater than `nbytes`");
739
14
          return BLOSC2_ERROR_INVALID_HEADER;
740
14
        }
741
127
        if (header->nbytes % typesize != 0) {
742
8
          BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
743
8
          return BLOSC2_ERROR_INVALID_HEADER;
744
8
        }
745
127
      }
746
1.04k
      else {
747
1.04k
        if (header->nbytes % header->typesize != 0) {
748
6
          BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
749
6
          return BLOSC2_ERROR_INVALID_HEADER;
750
6
        }
751
1.04k
      }
752
1.18k
    }
753
    // The number of filters depends on the version of the header. Blosc2 alpha series
754
    // did not initialize filters to zero beyond the max supported.
755
28.5k
    if (header->version == BLOSC2_VERSION_FORMAT_ALPHA) {
756
641
      header->filters[5] = 0;
757
641
      header->filters_meta[5] = 0;
758
641
    }
759
28.5k
  }
760
287k
  else {
761
287k
    flags_to_filters(header->flags, header->filters);
762
287k
  }
763
316k
  return 0;
764
316k
}
765
766
79.1k
static inline void blosc2_calculate_blocks(blosc2_context* context) {
767
  /* Compute number of blocks in buffer */
768
79.1k
  context->nblocks = context->sourcesize / context->blocksize;
769
79.1k
  context->leftover = context->sourcesize % context->blocksize;
770
79.1k
  context->nblocks = (context->leftover > 0) ?
771
66.5k
                     (context->nblocks + 1) : context->nblocks;
772
79.1k
}
773
774
27.6k
static int blosc2_initialize_context_from_header(blosc2_context* context, blosc_header* header) {
775
27.6k
  context->header_flags = header->flags;
776
27.6k
  context->typesize = header->typesize;
777
27.6k
  context->sourcesize = header->nbytes;
778
27.6k
  context->blocksize = header->blocksize;
779
27.6k
  context->blosc2_flags = header->blosc2_flags;
780
27.6k
  context->compcode = header->flags >> 5;
781
27.6k
  if (context->compcode == BLOSC_UDCODEC_FORMAT) {
782
859
    context->compcode = header->udcompcode;
783
859
  }
784
27.6k
  blosc2_calculate_blocks(context);
785
786
27.6k
  bool is_lazy = false;
787
27.6k
  if ((context->header_flags & BLOSC_DOSHUFFLE) &&
788
23.9k
      (context->header_flags & BLOSC_DOBITSHUFFLE)) {
789
    /* Extended header */
790
22.0k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
791
792
22.0k
    memcpy(context->filters, header->filters, BLOSC2_MAX_FILTERS);
793
22.0k
    memcpy(context->filters_meta, header->filters_meta, BLOSC2_MAX_FILTERS);
794
22.0k
    context->compcode_meta = header->compcode_meta;
795
796
22.0k
    context->filter_flags = filters_to_flags(header->filters);
797
22.0k
    context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
798
799
22.0k
    is_lazy = (context->blosc2_flags & 0x08u);
800
22.0k
  }
801
5.58k
  else {
802
5.58k
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
803
5.58k
    context->filter_flags = get_filter_flags(context->header_flags, context->typesize);
804
5.58k
    flags_to_filters(context->header_flags, context->filters);
805
5.58k
  }
806
807
  // Some checks for malformed headers
808
27.6k
  if (!is_lazy && header->cbytes > context->srcsize) {
809
0
    return BLOSC2_ERROR_INVALID_HEADER;
810
0
  }
811
812
27.6k
  return 0;
813
27.6k
}
814
815
816
0
int fill_filter(blosc2_filter *filter) {
817
0
  char libpath[PATH_MAX];
818
0
  void *lib = load_lib(filter->name, libpath);
819
0
  if(lib == NULL) {
820
0
    BLOSC_TRACE_ERROR("Error while loading the library");
821
0
    return BLOSC2_ERROR_FAILURE;
822
0
  }
823
824
0
  filter_info *info = dlsym(lib, "info");
825
0
  filter->forward = dlsym(lib, info->forward);
826
0
  filter->backward = dlsym(lib, info->backward);
827
828
0
  if (filter->forward == NULL || filter->backward == NULL){
829
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
830
0
    dlclose(lib);
831
0
    return BLOSC2_ERROR_FAILURE;
832
0
  }
833
834
0
  return BLOSC2_ERROR_SUCCESS;
835
0
}
836
837
838
1
int fill_codec(blosc2_codec *codec) {
839
1
  char libpath[PATH_MAX];
840
1
  void *lib = load_lib(codec->compname, libpath);
841
1
  if(lib == NULL) {
842
1
    BLOSC_TRACE_ERROR("Error while loading the library for codec `%s`", codec->compname);
843
1
    return BLOSC2_ERROR_FAILURE;
844
1
  }
845
846
0
  codec_info *info = dlsym(lib, "info");
847
0
  if (info == NULL) {
848
0
    BLOSC_TRACE_ERROR("`info` symbol cannot be loaded from plugin `%s`", codec->compname);
849
0
    dlclose(lib);
850
0
    return BLOSC2_ERROR_FAILURE;
851
0
  }
852
853
0
  codec->encoder = dlsym(lib, info->encoder);
854
0
  codec->decoder = dlsym(lib, info->decoder);
855
0
  if (codec->encoder == NULL || codec->decoder == NULL) {
856
0
    BLOSC_TRACE_ERROR("encoder or decoder cannot be loaded from plugin `%s`", codec->compname);
857
0
    dlclose(lib);
858
0
    return BLOSC2_ERROR_FAILURE;
859
0
  }
860
861
0
  return BLOSC2_ERROR_SUCCESS;
862
0
}
863
864
865
0
int fill_tuner(blosc2_tuner *tuner) {
866
0
  char libpath[PATH_MAX] = {0};
867
0
  void *lib = load_lib(tuner->name, libpath);
868
0
  if(lib == NULL) {
869
0
    BLOSC_TRACE_ERROR("Error while loading the library");
870
0
    return BLOSC2_ERROR_FAILURE;
871
0
  }
872
873
0
  tuner_info *info = dlsym(lib, "info");
874
0
  tuner->init = dlsym(lib, info->init);
875
0
  tuner->update = dlsym(lib, info->update);
876
0
  tuner->next_blocksize = dlsym(lib, info->next_blocksize);
877
0
  tuner->free = dlsym(lib, info->free);
878
0
  tuner->next_cparams = dlsym(lib, info->next_cparams);
879
880
0
  if (tuner->init == NULL || tuner->update == NULL || tuner->next_blocksize == NULL || tuner->free == NULL
881
0
      || tuner->next_cparams == NULL){
882
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
883
0
    dlclose(lib);
884
0
    return BLOSC2_ERROR_FAILURE;
885
0
  }
886
887
0
  return BLOSC2_ERROR_SUCCESS;
888
0
}
889
890
891
51.5k
static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_header* header, bool extended_header) {
892
51.5k
  memset(header, 0, sizeof(blosc_header));
893
894
51.5k
  header->version = BLOSC2_VERSION_FORMAT;
895
51.5k
  header->versionlz = compcode_to_compversion(context->compcode);
896
51.5k
  header->flags = context->header_flags;
897
51.5k
  header->typesize = (uint8_t)context->typesize;
898
51.5k
  header->nbytes = (int32_t)context->sourcesize;
899
51.5k
  header->blocksize = (int32_t)context->blocksize;
900
901
51.5k
  int little_endian = is_little_endian();
902
51.5k
  if (!little_endian) {
903
0
    header->nbytes = bswap32_(header->nbytes);
904
0
    header->blocksize = bswap32_(header->blocksize);
905
    // cbytes written after compression
906
0
  }
907
908
51.5k
  if (extended_header) {
909
    /* Store filter pipeline info at the end of the header */
910
360k
    for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
911
309k
      header->filters[i] = context->filters[i];
912
309k
      header->filters_meta[i] = context->filters_meta[i];
913
309k
    }
914
51.5k
    header->udcompcode = context->compcode;
915
51.5k
    header->compcode_meta = context->compcode_meta;
916
917
51.5k
    if (!little_endian) {
918
0
      header->blosc2_flags |= BLOSC2_BIGENDIAN;
919
0
    }
920
51.5k
    if (context->use_dict) {
921
0
      header->blosc2_flags |= BLOSC2_USEDICT;
922
0
    }
923
51.5k
    if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
924
0
      header->blosc2_flags |= BLOSC2_INSTR_CODEC;
925
0
    }
926
51.5k
  }
927
928
51.5k
  return 0;
929
51.5k
}
930
931
191k
void _cycle_buffers(uint8_t **src, uint8_t **dest, uint8_t **tmp) {
932
191k
  uint8_t *tmp2 = *src;
933
191k
  *src = *dest;
934
191k
  *dest = *tmp;
935
191k
  *tmp = tmp2;
936
191k
}
937
938
uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
939
                          const uint8_t* src, const int32_t offset,
940
136k
                          uint8_t* dest, uint8_t* tmp) {
941
136k
  blosc2_context* context = thread_context->parent_context;
942
136k
  uint8_t* _src = (uint8_t*)src + offset;
943
136k
  uint8_t* _tmp = tmp;
944
136k
  uint8_t* _dest = dest;
945
136k
  int32_t typesize = context->typesize;
946
136k
  uint8_t* filters = context->filters;
947
136k
  uint8_t* filters_meta = context->filters_meta;
948
136k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
949
950
  /* Prefilter function */
951
136k
  if (context->prefilter != NULL) {
952
    /* Set unwritten values to zero */
953
0
    memset(_dest, 0, bsize);
954
    // Create new prefilter parameters for this block (must be private for each thread)
955
0
    blosc2_prefilter_params preparams;
956
0
    memcpy(&preparams, context->preparams, sizeof(preparams));
957
0
    preparams.input = _src;
958
0
    preparams.output = _dest;
959
0
    preparams.output_size = bsize;
960
0
    preparams.output_typesize = typesize;
961
0
    preparams.output_offset = offset;
962
0
    preparams.nblock = offset / context->blocksize;
963
0
    preparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
964
0
    preparams.tid = thread_context->tid;
965
0
    preparams.ttmp = thread_context->tmp;
966
0
    preparams.ttmp_nbytes = thread_context->tmp_nbytes;
967
0
    preparams.ctx = context;
968
969
0
    if (context->prefilter(&preparams) != 0) {
970
0
      BLOSC_TRACE_ERROR("Execution of prefilter function failed");
971
0
      return NULL;
972
0
    }
973
974
0
    if (memcpyed) {
975
      // No more filters are required
976
0
      return _dest;
977
0
    }
978
0
    _cycle_buffers(&_src, &_dest, &_tmp);
979
0
  }
980
981
  /* Process the filter pipeline */
982
958k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
983
821k
    int rc = BLOSC2_ERROR_SUCCESS;
984
821k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
985
821k
      switch (filters[i]) {
986
12.6k
        case BLOSC_SHUFFLE:
987
12.6k
          blosc2_shuffle(typesize, bsize, _src, _dest);
988
12.6k
          break;
989
124k
        case BLOSC_BITSHUFFLE:
990
124k
          if (blosc2_bitshuffle(typesize, bsize, _src, _dest) < 0) {
991
0
            return NULL;
992
0
          }
993
124k
          break;
994
124k
        case BLOSC_DELTA:
995
0
          delta_encoder(src, offset, bsize, typesize, _src, _dest);
996
0
          break;
997
0
        case BLOSC_TRUNC_PREC:
998
0
          if (truncate_precision(filters_meta[i], typesize, bsize, _src, _dest) < 0) {
999
0
            return NULL;
1000
0
          }
1001
0
          break;
1002
684k
        default:
1003
684k
          if (filters[i] != BLOSC_NOFILTER) {
1004
0
            BLOSC_TRACE_ERROR("Filter %d not handled during compression\n", filters[i]);
1005
0
            return NULL;
1006
0
          }
1007
821k
      }
1008
821k
    }
1009
0
    else {
1010
      // Look for the filters_meta in user filters and run it
1011
0
      for (uint64_t j = 0; j < g_nfilters; ++j) {
1012
0
        if (g_filters[j].id == filters[i]) {
1013
0
          if (g_filters[j].forward == NULL) {
1014
            // Dynamically load library
1015
0
            if (fill_filter(&g_filters[j]) < 0) {
1016
0
              BLOSC_TRACE_ERROR("Could not load filter %d\n", g_filters[j].id);
1017
0
              return NULL;
1018
0
            }
1019
0
          }
1020
0
          if (g_filters[j].forward != NULL) {
1021
0
            blosc2_cparams cparams;
1022
0
            blosc2_ctx_get_cparams(context, &cparams);
1023
0
            rc = g_filters[j].forward(_src, _dest, bsize, filters_meta[i], &cparams, g_filters[j].id);
1024
0
          } else {
1025
0
            BLOSC_TRACE_ERROR("Forward function is NULL");
1026
0
            return NULL;
1027
0
          }
1028
0
          if (rc != BLOSC2_ERROR_SUCCESS) {
1029
0
            BLOSC_TRACE_ERROR("User-defined filter %d failed during compression\n", filters[i]);
1030
0
            return NULL;
1031
0
          }
1032
0
          goto urfiltersuccess;
1033
0
        }
1034
0
      }
1035
0
      BLOSC_TRACE_ERROR("User-defined filter %d not found during compression\n", filters[i]);
1036
0
      return NULL;
1037
1038
0
      urfiltersuccess:;
1039
1040
0
    }
1041
1042
    // Cycle buffers when required
1043
821k
    if (filters[i] != BLOSC_NOFILTER) {
1044
136k
      _cycle_buffers(&_src, &_dest, &_tmp);
1045
136k
    }
1046
821k
  }
1047
136k
  return _src;
1048
136k
}
1049
1050
1051
// Optimized version for detecting runs.  It compares 8 bytes values wherever possible.
1052
231k
static bool get_run(const uint8_t* ip, const uint8_t* ip_bound) {
1053
231k
  uint8_t x = *ip;
1054
231k
  int64_t value, value2;
1055
  /* Broadcast the value for every byte in a 64-bit register */
1056
231k
  memset(&value, x, 8);
1057
5.34M
  while (ip < (ip_bound - 8)) {
1058
#if defined(BLOSC_STRICT_ALIGN)
1059
    memcpy(&value2, ip, 8);
1060
#else
1061
5.33M
    value2 = *(int64_t*)ip;
1062
5.33M
#endif
1063
5.33M
    if (value != value2) {
1064
      // Values differ.  We don't have a run.
1065
223k
      return false;
1066
223k
    }
1067
5.11M
    else {
1068
5.11M
      ip += 8;
1069
5.11M
    }
1070
5.33M
  }
1071
  /* Look into the remainder */
1072
71.6k
  while ((ip < ip_bound) && (*ip == x)) ip++;
1073
8.49k
  return ip == ip_bound ? true : false;
1074
231k
}
1075
1076
1077
/* Shuffle & compress a single block */
1078
static int blosc_c(struct thread_context* thread_context, int32_t bsize,
1079
                   int32_t leftoverblock, int32_t ntbytes, int32_t destsize,
1080
                   const uint8_t* src, const int32_t offset, uint8_t* dest,
1081
231k
                   uint8_t* tmp, uint8_t* tmp2) {
1082
231k
  blosc2_context* context = thread_context->parent_context;
1083
231k
  int dont_split = (context->header_flags & 0x10) >> 4;
1084
231k
  int dict_training = context->use_dict && context->dict_cdict == NULL;
1085
231k
  int32_t j, neblock, nstreams;
1086
231k
  int32_t cbytes;                   /* number of compressed bytes in split */
1087
231k
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
1088
231k
  int32_t maxout;
1089
231k
  int32_t typesize = context->typesize;
1090
231k
  const char* compname;
1091
231k
  int accel;
1092
231k
  const uint8_t* _src;
1093
231k
  uint8_t *_tmp = tmp, *_tmp2 = tmp2;
1094
231k
  int last_filter_index = last_filter(context->filters, 'c');
1095
231k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1096
231k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1097
231k
  blosc_timestamp_t last, current;
1098
231k
  float filter_time = 0.f;
1099
1100
231k
  if (instr_codec) {
1101
0
    blosc_set_timestamp(&last);
1102
0
  }
1103
1104
  // See whether we have a run here
1105
231k
  if (last_filter_index >= 0 || context->prefilter != NULL) {
1106
    /* Apply the filter pipeline just for the prefilter */
1107
136k
    if (memcpyed && context->prefilter != NULL) {
1108
      // We only need the prefilter output
1109
0
      _src = pipeline_forward(thread_context, bsize, src, offset, dest, _tmp2);
1110
0
      if (_src == NULL) {
1111
0
        return BLOSC2_ERROR_FILTER_PIPELINE;
1112
0
      }
1113
0
      return bsize;
1114
0
    }
1115
    /* Apply regular filter pipeline */
1116
136k
    _src = pipeline_forward(thread_context, bsize, src, offset, _tmp, _tmp2);
1117
136k
    if (_src == NULL) {
1118
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1119
0
    }
1120
136k
  } else {
1121
94.9k
    _src = src + offset;
1122
94.9k
  }
1123
1124
231k
  if (instr_codec) {
1125
0
    blosc_set_timestamp(&current);
1126
0
    filter_time = (float) blosc_elapsed_secs(last, current);
1127
0
    last = current;
1128
0
  }
1129
1130
231k
  assert(context->clevel > 0);
1131
1132
  /* Calculate acceleration for different compressors */
1133
231k
  accel = get_accel(context);
1134
1135
  /* The number of compressed data streams for this block */
1136
231k
  if (!dont_split && !leftoverblock && !dict_training) {
1137
12.5k
    nstreams = (int32_t)typesize;
1138
12.5k
  }
1139
219k
  else {
1140
219k
    nstreams = 1;
1141
219k
  }
1142
231k
  neblock = bsize / nstreams;
1143
448k
  for (j = 0; j < nstreams; j++) {
1144
231k
    if (instr_codec) {
1145
0
      blosc_set_timestamp(&last);
1146
0
    }
1147
231k
    if (!dict_training) {
1148
231k
      dest += sizeof(int32_t);
1149
231k
      ntbytes += sizeof(int32_t);
1150
231k
      ctbytes += sizeof(int32_t);
1151
1152
231k
      const uint8_t *ip = (uint8_t *) _src + j * neblock;
1153
231k
      const uint8_t *ipbound = (uint8_t *) _src + (j + 1) * neblock;
1154
1155
231k
      if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && get_run(ip, ipbound)) {
1156
        // A run
1157
7.74k
        int32_t value = _src[j * neblock];
1158
7.74k
        if (ntbytes > destsize) {
1159
9
          return 0;    /* Non-compressible data */
1160
9
        }
1161
1162
7.73k
        if (instr_codec) {
1163
0
          blosc_set_timestamp(&current);
1164
0
          int32_t instr_size = sizeof(blosc2_instr);
1165
0
          ntbytes += instr_size;
1166
0
          ctbytes += instr_size;
1167
0
          if (ntbytes > destsize) {
1168
0
            return 0;    /* Non-compressible data */
1169
0
          }
1170
0
          _sw32(dest - 4, instr_size);
1171
0
          blosc2_instr *desti = (blosc2_instr *)dest;
1172
0
          memset(desti, 0, sizeof(blosc2_instr));
1173
          // Special values have an overhead of about 1 int32
1174
0
          int32_t ssize = value == 0 ? sizeof(int32_t) : sizeof(int32_t) + 1;
1175
0
          desti->cratio = (float) neblock / (float) ssize;
1176
0
          float ctime = (float) blosc_elapsed_secs(last, current);
1177
0
          desti->cspeed = (float) neblock / ctime;
1178
0
          desti->filter_speed = (float) neblock / filter_time;
1179
0
          desti->flags[0] = 1;    // mark a runlen
1180
0
          dest += instr_size;
1181
0
          continue;
1182
0
        }
1183
1184
        // Encode the repeated byte in the first (LSB) byte of the length of the split.
1185
7.73k
        _sw32(dest - 4, -value);    // write the value in two's complement
1186
7.73k
        if (value > 0) {
1187
          // Mark encoding as a run-length (== 0 is always a 0's run)
1188
3.84k
          ntbytes += 1;
1189
3.84k
          ctbytes += 1;
1190
3.84k
          if (ntbytes > destsize) {
1191
7
            return 0;    /* Non-compressible data */
1192
7
          }
1193
          // Set MSB bit (sign) to 1 (not really necessary here, but for demonstration purposes)
1194
          // dest[-1] |= 0x80;
1195
3.83k
          dest[0] = 0x1;   // set run-length bit (0) in token
1196
3.83k
          dest += 1;
1197
3.83k
        }
1198
7.72k
        continue;
1199
7.73k
      }
1200
231k
    }
1201
1202
224k
    maxout = neblock;
1203
224k
    if (ntbytes + maxout > destsize && !instr_codec) {
1204
      /* avoid buffer * overrun */
1205
42.9k
      maxout = destsize - ntbytes;
1206
42.9k
      if (maxout <= 0) {
1207
115
        return 0;                  /* non-compressible block */
1208
115
      }
1209
42.9k
    }
1210
224k
    if (dict_training) {
1211
      // We are in the build dict state, so don't compress
1212
      // TODO: copy only a percentage for sampling
1213
0
      memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1214
0
      cbytes = (int32_t)neblock;
1215
0
    }
1216
224k
    else if (context->compcode == BLOSC_BLOSCLZ) {
1217
40.6k
      cbytes = blosclz_compress(context->clevel, _src + j * neblock,
1218
40.6k
                                (int)neblock, dest, maxout, context);
1219
40.6k
    }
1220
183k
    else if (context->compcode == BLOSC_LZ4) {
1221
6.56k
      void *hash_table = NULL;
1222
    #ifdef HAVE_IPP
1223
      hash_table = (void*)thread_context->lz4_hash_table;
1224
    #endif
1225
6.56k
      cbytes = lz4_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1226
6.56k
                                 (char*)dest, (size_t)maxout, accel, hash_table);
1227
6.56k
    }
1228
176k
    else if (context->compcode == BLOSC_LZ4HC) {
1229
24.2k
      cbytes = lz4hc_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1230
24.2k
                                   (char*)dest, (size_t)maxout, context->clevel);
1231
24.2k
    }
1232
152k
  #if defined(HAVE_ZLIB)
1233
152k
    else if (context->compcode == BLOSC_ZLIB) {
1234
30.0k
      cbytes = zlib_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1235
30.0k
                                  (char*)dest, (size_t)maxout, context->clevel);
1236
30.0k
    }
1237
122k
  #endif /* HAVE_ZLIB */
1238
122k
  #if defined(HAVE_ZSTD)
1239
122k
    else if (context->compcode == BLOSC_ZSTD) {
1240
122k
      cbytes = zstd_wrap_compress(thread_context,
1241
122k
                                  (char*)_src + j * neblock, (size_t)neblock,
1242
122k
                                  (char*)dest, (size_t)maxout, context->clevel);
1243
122k
    }
1244
0
  #endif /* HAVE_ZSTD */
1245
0
    else if (context->compcode > BLOSC2_DEFINED_CODECS_STOP) {
1246
0
      for (int i = 0; i < g_ncodecs; ++i) {
1247
0
        if (g_codecs[i].compcode == context->compcode) {
1248
0
          if (g_codecs[i].encoder == NULL) {
1249
            // Dynamically load codec plugin
1250
0
            if (fill_codec(&g_codecs[i]) < 0) {
1251
0
              BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1252
0
              return BLOSC2_ERROR_CODEC_SUPPORT;
1253
0
            }
1254
0
          }
1255
0
          blosc2_cparams cparams;
1256
0
          blosc2_ctx_get_cparams(context, &cparams);
1257
0
          cbytes = g_codecs[i].encoder(_src + j * neblock,
1258
0
                                        neblock,
1259
0
                                        dest,
1260
0
                                        maxout,
1261
0
                                        context->compcode_meta,
1262
0
                                        &cparams,
1263
0
                                        context->src);
1264
0
          goto urcodecsuccess;
1265
0
        }
1266
0
      }
1267
0
      BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during compression", context->compcode);
1268
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1269
0
    urcodecsuccess:
1270
0
      ;
1271
0
    } else {
1272
0
      blosc2_compcode_to_compname(context->compcode, &compname);
1273
0
      BLOSC_TRACE_ERROR("Blosc has not been compiled with '%s' compression support."
1274
0
                        "Please use one having it.", compname);
1275
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1276
0
    }
1277
1278
224k
    if (cbytes > maxout) {
1279
      /* Buffer overrun caused by compression (should never happen) */
1280
0
      return BLOSC2_ERROR_WRITE_BUFFER;
1281
0
    }
1282
224k
    if (cbytes < 0) {
1283
      /* cbytes should never be negative */
1284
0
      return BLOSC2_ERROR_DATA;
1285
0
    }
1286
224k
    if (cbytes == 0) {
1287
      // When cbytes is 0, the compressor has not been able to compress anything
1288
48.0k
      cbytes = neblock;
1289
48.0k
    }
1290
1291
224k
    if (instr_codec) {
1292
0
      blosc_set_timestamp(&current);
1293
0
      int32_t instr_size = sizeof(blosc2_instr);
1294
0
      ntbytes += instr_size;
1295
0
      ctbytes += instr_size;
1296
0
      if (ntbytes > destsize) {
1297
0
        return 0;    /* Non-compressible data */
1298
0
      }
1299
0
      _sw32(dest - 4, instr_size);
1300
0
      float ctime = (float)blosc_elapsed_secs(last, current);
1301
0
      blosc2_instr *desti = (blosc2_instr *)dest;
1302
0
      memset(desti, 0, sizeof(blosc2_instr));
1303
      // cratio is computed having into account 1 additional int (csize)
1304
0
      desti->cratio = (float)neblock / (float)(cbytes + sizeof(int32_t));
1305
0
      desti->cspeed = (float)neblock / ctime;
1306
0
      desti->filter_speed = (float) neblock / filter_time;
1307
0
      dest += instr_size;
1308
0
      continue;
1309
0
    }
1310
1311
224k
    if (!dict_training) {
1312
224k
      if (cbytes == neblock) {
1313
        /* The compressor has been unable to compress data at all. */
1314
        /* Before doing the copy, check that we are not running into a
1315
           buffer overflow. */
1316
48.2k
        if ((ntbytes + neblock) > destsize) {
1317
15.0k
          return 0;    /* Non-compressible data */
1318
15.0k
        }
1319
33.2k
        memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1320
33.2k
        cbytes = neblock;
1321
33.2k
      }
1322
208k
      _sw32(dest - 4, cbytes);
1323
208k
    }
1324
208k
    dest += cbytes;
1325
208k
    ntbytes += cbytes;
1326
208k
    ctbytes += cbytes;
1327
208k
  }  /* Closes j < nstreams */
1328
1329
216k
  return ctbytes;
1330
231k
}
1331
1332
1333
/* Process the filter pipeline (decompression mode) */
1334
int pipeline_backward(struct thread_context* thread_context, const int32_t bsize, uint8_t* dest,
1335
                      const int32_t offset, uint8_t* src, uint8_t* tmp,
1336
33.1k
                      uint8_t* tmp2, int last_filter_index, int32_t nblock) {
1337
33.1k
  blosc2_context* context = thread_context->parent_context;
1338
33.1k
  int32_t typesize = context->typesize;
1339
33.1k
  uint8_t* filters = context->filters;
1340
33.1k
  uint8_t* filters_meta = context->filters_meta;
1341
33.1k
  uint8_t* _src = src;
1342
33.1k
  uint8_t* _dest = tmp;
1343
33.1k
  uint8_t* _tmp = tmp2;
1344
33.1k
  int errcode = 0;
1345
1346
71.1k
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
1347
    // Delta filter requires the whole chunk ready
1348
71.1k
    int last_copy_filter = (last_filter_index == i) || (next_filter(filters, i, 'd') == BLOSC_DELTA);
1349
71.1k
    if (last_copy_filter && context->postfilter == NULL) {
1350
48.9k
      _dest = dest + offset;
1351
48.9k
    }
1352
71.1k
    int rc = BLOSC2_ERROR_SUCCESS;
1353
71.1k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1354
66.7k
      switch (filters[i]) {
1355
10.4k
        case BLOSC_SHUFFLE:
1356
10.4k
          blosc2_unshuffle(typesize, bsize, _src, _dest);
1357
10.4k
          break;
1358
25.3k
        case BLOSC_BITSHUFFLE:
1359
25.3k
          if (bitunshuffle(typesize, bsize, _src, _dest, context->src[BLOSC2_CHUNK_VERSION]) < 0) {
1360
0
            return BLOSC2_ERROR_FILTER_PIPELINE;
1361
0
          }
1362
25.3k
          break;
1363
25.3k
        case BLOSC_DELTA:
1364
14.7k
          if (context->nthreads == 1) {
1365
            /* Serial mode */
1366
14.7k
            delta_decoder(dest, offset, bsize, typesize, _dest);
1367
14.7k
          } else {
1368
            /* Force the thread in charge of the block 0 to go first */
1369
0
            blosc2_pthread_mutex_lock(&context->delta_mutex);
1370
0
            if (context->dref_not_init) {
1371
0
              if (offset != 0) {
1372
0
                blosc2_pthread_cond_wait(&context->delta_cv, &context->delta_mutex);
1373
0
              } else {
1374
0
                delta_decoder(dest, offset, bsize, typesize, _dest);
1375
0
                context->dref_not_init = 0;
1376
0
                blosc2_pthread_cond_broadcast(&context->delta_cv);
1377
0
              }
1378
0
            }
1379
0
            blosc2_pthread_mutex_unlock(&context->delta_mutex);
1380
0
            if (offset != 0) {
1381
0
              delta_decoder(dest, offset, bsize, typesize, _dest);
1382
0
            }
1383
0
          }
1384
14.7k
          break;
1385
3.76k
        case BLOSC_TRUNC_PREC:
1386
          // TRUNC_PREC filter does not need to be undone
1387
3.76k
          break;
1388
12.4k
        default:
1389
12.4k
          if (filters[i] != BLOSC_NOFILTER) {
1390
138
            BLOSC_TRACE_ERROR("Filter %d not handled during decompression.",
1391
138
                              filters[i]);
1392
138
            errcode = -1;
1393
138
          }
1394
66.7k
      }
1395
66.7k
    } else {
1396
        // Look for the filters_meta in user filters and run it
1397
18.0k
        for (uint64_t j = 0; j < g_nfilters; ++j) {
1398
17.9k
          if (g_filters[j].id == filters[i]) {
1399
4.22k
            if (g_filters[j].backward == NULL) {
1400
              // Dynamically load filter
1401
0
              if (fill_filter(&g_filters[j]) < 0) {
1402
0
                BLOSC_TRACE_ERROR("Could not load filter %d.", g_filters[j].id);
1403
0
                return BLOSC2_ERROR_FILTER_PIPELINE;
1404
0
              }
1405
0
            }
1406
4.22k
            if (g_filters[j].backward != NULL) {
1407
4.22k
              blosc2_dparams dparams;
1408
4.22k
              blosc2_ctx_get_dparams(context, &dparams);
1409
4.22k
              rc = g_filters[j].backward(_src, _dest, bsize, filters_meta[i], &dparams, g_filters[j].id);
1410
4.22k
            } else {
1411
0
              BLOSC_TRACE_ERROR("Backward function is NULL");
1412
0
              return BLOSC2_ERROR_FILTER_PIPELINE;
1413
0
            }
1414
4.22k
            if (rc != BLOSC2_ERROR_SUCCESS) {
1415
38
              BLOSC_TRACE_ERROR("User-defined filter %d failed during decompression.", filters[i]);
1416
38
              return rc;
1417
38
            }
1418
4.18k
            goto urfiltersuccess;
1419
4.22k
          }
1420
17.9k
        }
1421
159
      BLOSC_TRACE_ERROR("User-defined filter %d not found during decompression.", filters[i]);
1422
159
      return BLOSC2_ERROR_FILTER_PIPELINE;
1423
4.18k
      urfiltersuccess:;
1424
4.18k
    }
1425
1426
    // Cycle buffers when required
1427
70.9k
    if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
1428
54.8k
      _cycle_buffers(&_src, &_dest, &_tmp);
1429
54.8k
    }
1430
70.9k
    if (last_filter_index == i) {
1431
32.9k
      break;
1432
32.9k
    }
1433
70.9k
  }
1434
1435
  /* Postfilter function */
1436
32.9k
  if (context->postfilter != NULL) {
1437
    // Create new postfilter parameters for this block (must be private for each thread)
1438
0
    blosc2_postfilter_params postparams;
1439
0
    memcpy(&postparams, context->postparams, sizeof(postparams));
1440
0
    postparams.input = _src;
1441
0
    postparams.output = dest + offset;
1442
0
    postparams.size = bsize;
1443
0
    postparams.typesize = typesize;
1444
0
    postparams.offset = nblock * context->blocksize;
1445
0
    postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1446
0
    postparams.nblock = nblock;
1447
0
    postparams.tid = thread_context->tid;
1448
0
    postparams.ttmp = thread_context->tmp;
1449
0
    postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1450
0
    postparams.ctx = context;
1451
1452
0
    if (context->postfilter(&postparams) != 0) {
1453
0
      BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1454
0
      return BLOSC2_ERROR_POSTFILTER;
1455
0
    }
1456
0
  }
1457
1458
32.9k
  return errcode;
1459
32.9k
}
1460
1461
1462
1.14k
static int32_t set_nans(int32_t typesize, uint8_t* dest, int32_t destsize) {
1463
1.14k
  if (destsize % typesize != 0) {
1464
44
    BLOSC_TRACE_ERROR("destsize can only be a multiple of typesize");
1465
44
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1466
44
  }
1467
1.10k
  int32_t nitems = destsize / typesize;
1468
1.10k
  if (nitems == 0) {
1469
0
    return 0;
1470
0
  }
1471
1472
1.10k
  if (typesize == 4) {
1473
605
    float* dest_ = (float*)dest;
1474
605
    float val = nanf("");
1475
4.41k
    for (int i = 0; i < nitems; i++) {
1476
3.81k
      dest_[i] = val;
1477
3.81k
    }
1478
605
    return nitems;
1479
605
  }
1480
497
  else if (typesize == 8) {
1481
423
    double* dest_ = (double*)dest;
1482
423
    double val = nan("");
1483
1.90k
    for (int i = 0; i < nitems; i++) {
1484
1.48k
      dest_[i] = val;
1485
1.48k
    }
1486
423
    return nitems;
1487
423
  }
1488
1489
74
  BLOSC_TRACE_ERROR("Unsupported typesize for NaN");
1490
74
  return BLOSC2_ERROR_DATA;
1491
1.10k
}
1492
1493
1494
231
static int32_t set_values(int32_t typesize, const uint8_t* src, uint8_t* dest, int32_t destsize) {
1495
#if defined(BLOSC_STRICT_ALIGN)
1496
  if (destsize % typesize != 0) {
1497
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1498
  }
1499
  int32_t nitems = destsize / typesize;
1500
  if (nitems == 0) {
1501
    return 0;
1502
  }
1503
  for (int i = 0; i < nitems; i++) {
1504
    memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1505
  }
1506
#else
1507
  // destsize can only be a multiple of typesize
1508
231
  int64_t val8;
1509
231
  int64_t* dest8;
1510
231
  int32_t val4;
1511
231
  int32_t* dest4;
1512
231
  int16_t val2;
1513
231
  int16_t* dest2;
1514
231
  int8_t val1;
1515
231
  int8_t* dest1;
1516
1517
231
  if (destsize % typesize != 0) {
1518
9
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1519
9
  }
1520
222
  int32_t nitems = destsize / typesize;
1521
222
  if (nitems == 0) {
1522
0
    return 0;
1523
0
  }
1524
1525
222
  switch (typesize) {
1526
15
    case 8:
1527
15
      val8 = ((int64_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1528
15
      dest8 = (int64_t*)dest;
1529
43
      for (int i = 0; i < nitems; i++) {
1530
28
        dest8[i] = val8;
1531
28
      }
1532
15
      break;
1533
29
    case 4:
1534
29
      val4 = ((int32_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1535
29
      dest4 = (int32_t*)dest;
1536
95
      for (int i = 0; i < nitems; i++) {
1537
66
        dest4[i] = val4;
1538
66
      }
1539
29
      break;
1540
64
    case 2:
1541
64
      val2 = ((int16_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1542
64
      dest2 = (int16_t*)dest;
1543
289
      for (int i = 0; i < nitems; i++) {
1544
225
        dest2[i] = val2;
1545
225
      }
1546
64
      break;
1547
75
    case 1:
1548
75
      val1 = ((int8_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1549
75
      dest1 = (int8_t*)dest;
1550
290
      for (int i = 0; i < nitems; i++) {
1551
215
        dest1[i] = val1;
1552
215
      }
1553
75
      break;
1554
39
    default:
1555
133
      for (int i = 0; i < nitems; i++) {
1556
94
        memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1557
94
      }
1558
222
  }
1559
222
#endif
1560
1561
222
  return nitems;
1562
222
}
1563
1564
1565
/* Decompress & unshuffle a single block */
1566
static int blosc_d(
1567
    struct thread_context* thread_context, int32_t bsize,
1568
    int32_t leftoverblock, bool memcpyed, const uint8_t* src, int32_t srcsize, int32_t src_offset,
1569
196k
    int32_t nblock, uint8_t* dest, int32_t dest_offset, uint8_t* tmp, uint8_t* tmp2) {
1570
196k
  blosc2_context* context = thread_context->parent_context;
1571
196k
  uint8_t* filters = context->filters;
1572
196k
  uint8_t *tmp3 = thread_context->tmp4;
1573
196k
  int32_t compformat = (context->header_flags & (uint8_t)0xe0) >> 5u;
1574
196k
  int dont_split = (context->header_flags & 0x10) >> 4;
1575
196k
  int32_t chunk_nbytes;
1576
196k
  int32_t chunk_cbytes;
1577
196k
  int nstreams;
1578
196k
  int32_t neblock;
1579
196k
  int32_t nbytes;                /* number of decompressed bytes in split */
1580
196k
  int32_t cbytes;                /* number of compressed bytes in split */
1581
  // int32_t ctbytes = 0;           /* number of compressed bytes in block */
1582
196k
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
1583
196k
  uint8_t* _dest;
1584
196k
  int32_t typesize = context->typesize;
1585
196k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1586
196k
  const char* compname;
1587
196k
  int rc;
1588
1589
196k
  if (context->block_maskout != NULL && context->block_maskout[nblock]) {
1590
    // Do not decompress, but act as if we successfully decompressed everything
1591
0
    return bsize;
1592
0
  }
1593
1594
196k
  rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, NULL);
1595
196k
  if (rc < 0) {
1596
0
    return rc;
1597
0
  }
1598
196k
  if (context->special_type == BLOSC2_SPECIAL_VALUE) {
1599
    // We need the actual typesize in this case, but it cannot be encoded in the header, so derive it from cbytes
1600
231
    typesize = chunk_cbytes - context->header_overhead;
1601
231
  }
1602
1603
  // In some situations (lazychunks) the context can arrive uninitialized
1604
  // (but BITSHUFFLE needs it for accessing the format of the chunk)
1605
196k
  if (context->src == NULL) {
1606
0
    context->src = src;
1607
0
  }
1608
1609
  // Chunks with special values cannot be lazy
1610
196k
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
1611
189k
          (context->blosc2_flags & 0x08u) && !context->special_type);
1612
196k
  if (is_lazy) {
1613
    // The chunk is on disk, so just lazily load the block
1614
1
    if (context->schunk == NULL) {
1615
1
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk.");
1616
1
      return BLOSC2_ERROR_INVALID_PARAM;
1617
1
    }
1618
0
    if (context->schunk->frame == NULL) {
1619
0
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated frame.");
1620
0
      return BLOSC2_ERROR_INVALID_PARAM;
1621
0
    }
1622
0
    blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
1623
0
    char* urlpath = frame->urlpath;
1624
0
    size_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH + context->nblocks * sizeof(int32_t);
1625
0
    int32_t nchunk;
1626
0
    int64_t chunk_offset;
1627
    // The nchunk and the offset of the current chunk are in the trailer
1628
0
    nchunk = *(int32_t*)(src + trailer_offset);
1629
0
    chunk_offset = *(int64_t*)(src + trailer_offset + sizeof(int32_t));
1630
    // Get the csize of the nblock
1631
0
    int32_t *block_csizes = (int32_t *)(src + trailer_offset + sizeof(int32_t) + sizeof(int64_t));
1632
0
    int32_t block_csize = block_csizes[nblock];
1633
    // Read the lazy block on disk
1634
0
    void* fp = NULL;
1635
0
    blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
1636
0
    if (io_cb == NULL) {
1637
0
      BLOSC_TRACE_ERROR("Error getting the input/output API");
1638
0
      return BLOSC2_ERROR_PLUGIN_IO;
1639
0
    }
1640
1641
0
    int64_t io_pos = 0;
1642
0
    if (frame->sframe) {
1643
      // The chunk is not in the frame
1644
0
      char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
1645
0
      BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
1646
0
      sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk);
1647
0
      fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
1648
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1649
0
      free(chunkpath);
1650
      // The offset of the block is src_offset
1651
0
      io_pos = src_offset;
1652
0
    }
1653
0
    else {
1654
0
      fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
1655
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1656
      // The offset of the block is src_offset
1657
0
      io_pos = frame->file_offset + chunk_offset + src_offset;
1658
0
    }
1659
    // We can make use of tmp3 because it will be used after src is not needed anymore
1660
0
    int64_t rbytes = io_cb->read((void**)&tmp3, 1, block_csize, io_pos, fp);
1661
0
    io_cb->close(fp);
1662
0
    if ((int32_t)rbytes != block_csize) {
1663
0
      BLOSC_TRACE_ERROR("Cannot read the (lazy) block out of the fileframe.");
1664
0
      return BLOSC2_ERROR_READ_BUFFER;
1665
0
    }
1666
0
    src = tmp3;
1667
0
    src_offset = 0;
1668
0
    srcsize = block_csize;
1669
0
  }
1670
1671
  // If the chunk is memcpyed, we just have to copy the block to dest and return
1672
196k
  if (memcpyed) {
1673
140k
    int bsize_ = leftoverblock ? chunk_nbytes % context->blocksize : bsize;
1674
140k
    if (!context->special_type) {
1675
3.29k
      if (chunk_nbytes + context->header_overhead != chunk_cbytes) {
1676
0
        return BLOSC2_ERROR_WRITE_BUFFER;
1677
0
      }
1678
3.29k
      if (chunk_cbytes < context->header_overhead + (nblock * context->blocksize) + bsize_) {
1679
        /* Not enough input to copy block */
1680
0
        return BLOSC2_ERROR_READ_BUFFER;
1681
0
      }
1682
3.29k
    }
1683
140k
    if (!is_lazy) {
1684
140k
      src += context->header_overhead + nblock * context->blocksize;
1685
140k
    }
1686
140k
    _dest = dest + dest_offset;
1687
140k
    if (context->postfilter != NULL) {
1688
      // We are making use of a postfilter, so use a temp for destination
1689
0
      _dest = tmp;
1690
0
    }
1691
140k
    rc = 0;
1692
140k
    switch (context->special_type) {
1693
231
      case BLOSC2_SPECIAL_VALUE:
1694
        // All repeated values
1695
231
        rc = set_values(typesize, context->src, _dest, bsize_);
1696
231
        if (rc < 0) {
1697
9
          BLOSC_TRACE_ERROR("set_values failed");
1698
9
          return BLOSC2_ERROR_DATA;
1699
9
        }
1700
222
        break;
1701
1.14k
      case BLOSC2_SPECIAL_NAN:
1702
1.14k
        rc = set_nans(context->typesize, _dest, bsize_);
1703
1.14k
        if (rc < 0) {
1704
118
          BLOSC_TRACE_ERROR("set_nans failed");
1705
118
          return BLOSC2_ERROR_DATA;
1706
118
        }
1707
1.02k
        break;
1708
133k
      case BLOSC2_SPECIAL_ZERO:
1709
133k
        memset(_dest, 0, bsize_);
1710
133k
        break;
1711
2.36k
      case BLOSC2_SPECIAL_UNINIT:
1712
        // We do nothing here
1713
2.36k
        break;
1714
3.29k
      default:
1715
3.29k
        memcpy(_dest, src, bsize_);
1716
140k
    }
1717
140k
    if (context->postfilter != NULL) {
1718
      // Create new postfilter parameters for this block (must be private for each thread)
1719
0
      blosc2_postfilter_params postparams;
1720
0
      memcpy(&postparams, context->postparams, sizeof(postparams));
1721
0
      postparams.input = tmp;
1722
0
      postparams.output = dest + dest_offset;
1723
0
      postparams.size = bsize;
1724
0
      postparams.typesize = typesize;
1725
0
      postparams.offset = nblock * context->blocksize;
1726
0
      postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1727
0
      postparams.nblock = nblock;
1728
0
      postparams.tid = thread_context->tid;
1729
0
      postparams.ttmp = thread_context->tmp;
1730
0
      postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1731
0
      postparams.ctx = context;
1732
1733
      // Execute the postfilter (the processed block will be copied to dest)
1734
0
      if (context->postfilter(&postparams) != 0) {
1735
0
        BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1736
0
        return BLOSC2_ERROR_POSTFILTER;
1737
0
      }
1738
0
    }
1739
140k
    thread_context->zfp_cell_nitems = 0;
1740
1741
140k
    return bsize_;
1742
140k
  }
1743
1744
56.1k
  if (!is_lazy && (src_offset <= 0 || src_offset >= srcsize)) {
1745
    /* Invalid block src offset encountered */
1746
575
    return BLOSC2_ERROR_DATA;
1747
575
  }
1748
1749
55.5k
  src += src_offset;
1750
55.5k
  srcsize -= src_offset;
1751
1752
55.5k
  int last_filter_index = last_filter(filters, 'd');
1753
55.5k
  if (instr_codec) {
1754
    // If instrumented, we don't want to run the filters
1755
2.12k
    _dest = dest + dest_offset;
1756
2.12k
  }
1757
53.4k
  else if (((last_filter_index >= 0) &&
1758
39.3k
       (next_filter(filters, BLOSC2_MAX_FILTERS, 'd') != BLOSC_DELTA)) ||
1759
34.5k
    context->postfilter != NULL) {
1760
    // We are making use of some filter, so use a temp for destination
1761
34.5k
    _dest = tmp;
1762
34.5k
  }
1763
18.8k
  else {
1764
    // If no filters, or only DELTA in pipeline
1765
18.8k
    _dest = dest + dest_offset;
1766
18.8k
  }
1767
1768
  /* The number of compressed data streams for this block */
1769
55.5k
  if (!dont_split && !leftoverblock) {
1770
7.45k
    nstreams = context->typesize;
1771
7.45k
  }
1772
48.1k
  else {
1773
48.1k
    nstreams = 1;
1774
48.1k
  }
1775
1776
55.5k
  neblock = bsize / nstreams;
1777
55.5k
  if (neblock == 0) {
1778
    /* Not enough space to output bytes */
1779
6
    BLOSC_ERROR(BLOSC2_ERROR_WRITE_BUFFER);
1780
6
  }
1781
104k
  for (int j = 0; j < nstreams; j++) {
1782
57.6k
    if (srcsize < (signed)sizeof(int32_t)) {
1783
      /* Not enough input to read compressed size */
1784
32
      return BLOSC2_ERROR_READ_BUFFER;
1785
32
    }
1786
57.5k
    srcsize -= sizeof(int32_t);
1787
57.5k
    cbytes = sw32_(src);      /* amount of compressed bytes */
1788
57.5k
    if (cbytes > 0) {
1789
50.9k
      if (srcsize < cbytes) {
1790
        /* Not enough input to read compressed bytes */
1791
184
        return BLOSC2_ERROR_READ_BUFFER;
1792
184
      }
1793
50.7k
      srcsize -= cbytes;
1794
50.7k
    }
1795
57.4k
    src += sizeof(int32_t);
1796
    // ctbytes += (signed)sizeof(int32_t);
1797
1798
    /* Uncompress */
1799
57.4k
    if (cbytes == 0) {
1800
      // A run of 0's
1801
3.50k
      memset(_dest, 0, (unsigned int)neblock);
1802
3.50k
      nbytes = neblock;
1803
3.50k
    }
1804
53.8k
    else if (cbytes < 0) {
1805
      // A negative number means some encoding depending on the token that comes next
1806
3.09k
      uint8_t token;
1807
1808
3.09k
      if (srcsize < (signed)sizeof(uint8_t)) {
1809
        // Not enough input to read token */
1810
80
        return BLOSC2_ERROR_READ_BUFFER;
1811
80
      }
1812
3.01k
      srcsize -= sizeof(uint8_t);
1813
1814
3.01k
      token = src[0];
1815
3.01k
      src += 1;
1816
      // ctbytes += 1;
1817
1818
3.01k
      if (token & 0x1) {
1819
        // A run of bytes that are different than 0
1820
2.98k
        if (cbytes < -255) {
1821
          // Runs can only encode a byte
1822
188
          return BLOSC2_ERROR_RUN_LENGTH;
1823
188
        }
1824
2.79k
        uint8_t value = -cbytes;
1825
2.79k
        memset(_dest, value, (unsigned int)neblock);
1826
2.79k
      } else {
1827
39
        BLOSC_TRACE_ERROR("Invalid or unsupported compressed stream token value - %d", token);
1828
39
        return BLOSC2_ERROR_RUN_LENGTH;
1829
39
      }
1830
2.79k
      nbytes = neblock;
1831
2.79k
      cbytes = 0;  // everything is encoded in the cbytes token
1832
2.79k
    }
1833
50.7k
    else if (cbytes == neblock) {
1834
17.1k
      memcpy(_dest, src, (unsigned int)neblock);
1835
17.1k
      nbytes = (int32_t)neblock;
1836
17.1k
    }
1837
33.6k
    else {
1838
33.6k
      if (compformat == BLOSC_BLOSCLZ_FORMAT) {
1839
12.8k
        nbytes = blosclz_decompress(src, cbytes, _dest, (int)neblock);
1840
12.8k
      }
1841
20.8k
      else if (compformat == BLOSC_LZ4_FORMAT) {
1842
2.84k
        nbytes = lz4_wrap_decompress((char*)src, (size_t)cbytes,
1843
2.84k
                                     (char*)_dest, (size_t)neblock);
1844
2.84k
      }
1845
18.0k
  #if defined(HAVE_ZLIB)
1846
18.0k
      else if (compformat == BLOSC_ZLIB_FORMAT) {
1847
5.46k
        nbytes = zlib_wrap_decompress((char*)src, (size_t)cbytes,
1848
5.46k
                                      (char*)_dest, (size_t)neblock);
1849
5.46k
      }
1850
12.5k
  #endif /*  HAVE_ZLIB */
1851
12.5k
  #if defined(HAVE_ZSTD)
1852
12.5k
      else if (compformat == BLOSC_ZSTD_FORMAT) {
1853
10.9k
        nbytes = zstd_wrap_decompress(thread_context,
1854
10.9k
                                      (char*)src, (size_t)cbytes,
1855
10.9k
                                      (char*)_dest, (size_t)neblock);
1856
10.9k
      }
1857
1.59k
  #endif /*  HAVE_ZSTD */
1858
1.59k
      else if (compformat == BLOSC_UDCODEC_FORMAT) {
1859
1.58k
        bool getcell = false;
1860
1861
1.58k
#if defined(HAVE_PLUGINS)
1862
1.58k
        if ((context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) &&
1863
1
            (thread_context->zfp_cell_nitems > 0)) {
1864
0
          nbytes = zfp_getcell(thread_context, src, cbytes, _dest, neblock);
1865
0
          if (nbytes < 0) {
1866
0
            return BLOSC2_ERROR_DATA;
1867
0
          }
1868
0
          if (nbytes == thread_context->zfp_cell_nitems * typesize) {
1869
0
            getcell = true;
1870
0
          }
1871
0
        }
1872
1.58k
#endif /* HAVE_PLUGINS */
1873
1.58k
        if (!getcell) {
1874
1.58k
          thread_context->zfp_cell_nitems = 0;
1875
1.69k
          for (int i = 0; i < g_ncodecs; ++i) {
1876
1.68k
            if (g_codecs[i].compcode == context->compcode) {
1877
1.56k
              if (g_codecs[i].decoder == NULL) {
1878
                // Dynamically load codec plugin
1879
1
                if (fill_codec(&g_codecs[i]) < 0) {
1880
1
                  BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1881
1
                  return BLOSC2_ERROR_CODEC_SUPPORT;
1882
1
                }
1883
1
              }
1884
1.56k
              blosc2_dparams dparams;
1885
1.56k
              blosc2_ctx_get_dparams(context, &dparams);
1886
1.56k
              nbytes = g_codecs[i].decoder(src,
1887
1.56k
                                           cbytes,
1888
1.56k
                                           _dest,
1889
1.56k
                                           neblock,
1890
1.56k
                                           context->compcode_meta,
1891
1.56k
                                           &dparams,
1892
1.56k
                                           context->src);
1893
1.56k
              goto urcodecsuccess;
1894
1.56k
            }
1895
1.68k
          }
1896
18
          BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during decompression", context->compcode);
1897
18
          return BLOSC2_ERROR_CODEC_SUPPORT;
1898
1.58k
        }
1899
1.56k
      urcodecsuccess:
1900
1.56k
        ;
1901
1.56k
      }
1902
14
      else {
1903
14
        compname = clibcode_to_clibname(compformat);
1904
14
        BLOSC_TRACE_ERROR(
1905
14
                "Blosc has not been compiled with decompression "
1906
14
                "support for '%s' format.  "
1907
14
                "Please recompile for adding this support.", compname);
1908
14
        return BLOSC2_ERROR_CODEC_SUPPORT;
1909
14
      }
1910
1911
      /* Check that decompressed bytes number is correct */
1912
33.6k
      if ((nbytes != neblock) && (thread_context->zfp_cell_nitems == 0)) {
1913
7.96k
        return BLOSC2_ERROR_DATA;
1914
7.96k
      }
1915
1916
33.6k
    }
1917
49.0k
    src += cbytes;
1918
    // ctbytes += cbytes;
1919
49.0k
    _dest += nbytes;
1920
49.0k
    ntbytes += nbytes;
1921
49.0k
  } /* Closes j < nstreams */
1922
1923
47.0k
  if (!instr_codec) {
1924
45.8k
    if (last_filter_index >= 0 || context->postfilter != NULL) {
1925
      /* Apply regular filter pipeline */
1926
33.1k
      int errcode = pipeline_backward(thread_context, bsize, dest, dest_offset, tmp, tmp2, tmp3,
1927
33.1k
                                      last_filter_index, nblock);
1928
33.1k
      if (errcode < 0)
1929
282
        return errcode;
1930
33.1k
    }
1931
45.8k
  }
1932
1933
  /* Return the number of uncompressed bytes */
1934
46.7k
  return (int)ntbytes;
1935
47.0k
}
1936
1937
1938
/* Serial version for compression/decompression */
1939
89.2k
static int serial_blosc(struct thread_context* thread_context) {
1940
89.2k
  blosc2_context* context = thread_context->parent_context;
1941
89.2k
  int32_t j, bsize, leftoverblock;
1942
89.2k
  int32_t cbytes;
1943
89.2k
  int32_t ntbytes = context->output_bytes;
1944
89.2k
  int32_t* bstarts = context->bstarts;
1945
89.2k
  uint8_t* tmp = thread_context->tmp;
1946
89.2k
  uint8_t* tmp2 = thread_context->tmp2;
1947
89.2k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
1948
89.2k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1949
89.2k
  if (!context->do_compress && context->special_type) {
1950
    // Fake a runlen as if it was a memcpyed chunk
1951
1.06k
    memcpyed = true;
1952
1.06k
  }
1953
1954
504k
  for (j = 0; j < context->nblocks; j++) {
1955
439k
    if (context->do_compress && !memcpyed && !dict_training) {
1956
231k
      _sw32(bstarts + j, ntbytes);
1957
231k
    }
1958
439k
    bsize = context->blocksize;
1959
439k
    leftoverblock = 0;
1960
439k
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
1961
7.36k
      bsize = context->leftover;
1962
7.36k
      leftoverblock = 1;
1963
7.36k
    }
1964
439k
    if (context->do_compress) {
1965
243k
      if (memcpyed && !context->prefilter) {
1966
        /* We want to memcpy only */
1967
11.3k
        memcpy(context->dest + context->header_overhead + j * context->blocksize,
1968
11.3k
               context->src + j * context->blocksize, (unsigned int)bsize);
1969
11.3k
        cbytes = (int32_t)bsize;
1970
11.3k
      }
1971
231k
      else {
1972
        /* Regular compression */
1973
231k
        cbytes = blosc_c(thread_context, bsize, leftoverblock, ntbytes,
1974
231k
                         context->destsize, context->src, j * context->blocksize,
1975
231k
                         context->dest + ntbytes, tmp, tmp2);
1976
231k
        if (cbytes == 0) {
1977
15.1k
          ntbytes = 0;              /* incompressible data */
1978
15.1k
          break;
1979
15.1k
        }
1980
231k
      }
1981
243k
    }
1982
196k
    else {
1983
      /* Regular decompression */
1984
      // If memcpyed we don't have a bstarts section (because it is not needed)
1985
196k
      int32_t src_offset = memcpyed ?
1986
140k
          context->header_overhead + j * context->blocksize : sw32_(bstarts + j);
1987
196k
      cbytes = blosc_d(thread_context, bsize, leftoverblock, memcpyed,
1988
196k
                       context->src, context->srcsize, src_offset, j,
1989
196k
                       context->dest, j * context->blocksize, tmp, tmp2);
1990
196k
    }
1991
1992
424k
    if (cbytes < 0) {
1993
9.51k
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
1994
9.51k
      break;
1995
9.51k
    }
1996
415k
    ntbytes += cbytes;
1997
415k
  }
1998
1999
89.2k
  return ntbytes;
2000
89.2k
}
2001
2002
static void t_blosc_do_job(void *ctxt);
2003
2004
/* Threaded version for compression/decompression */
2005
0
static int parallel_blosc(blosc2_context* context) {
2006
0
#ifdef BLOSC_POSIX_BARRIERS
2007
0
  int rc;
2008
0
#endif
2009
  /* Set sentinels */
2010
0
  context->thread_giveup_code = 1;
2011
0
  context->thread_nblock = -1;
2012
2013
0
  if (threads_callback) {
2014
0
    threads_callback(threads_callback_data, t_blosc_do_job,
2015
0
                     context->nthreads, sizeof(struct thread_context), (void*) context->thread_contexts);
2016
0
  }
2017
0
  else {
2018
    /* Synchronization point for all threads (wait for initialization) */
2019
0
    WAIT_INIT(-1, context);
2020
2021
    /* Synchronization point for all threads (wait for finalization) */
2022
0
    WAIT_FINISH(-1, context);
2023
0
  }
2024
2025
0
  if (context->thread_giveup_code <= 0) {
2026
    /* Compression/decompression gave up.  Return error code. */
2027
0
    return context->thread_giveup_code;
2028
0
  }
2029
2030
  /* Return the total bytes (de-)compressed in threads */
2031
0
  return (int)context->output_bytes;
2032
0
}
2033
2034
/* initialize a thread_context that has already been allocated */
2035
static int init_thread_context(struct thread_context* thread_context, blosc2_context* context, int32_t tid)
2036
17.0k
{
2037
17.0k
  int32_t ebsize;
2038
2039
17.0k
  thread_context->parent_context = context;
2040
17.0k
  thread_context->tid = tid;
2041
2042
17.0k
  ebsize = context->blocksize + context->typesize * (signed)sizeof(int32_t);
2043
17.0k
  thread_context->tmp_nbytes = (size_t)4 * ebsize;
2044
17.0k
  thread_context->tmp = my_malloc(thread_context->tmp_nbytes);
2045
17.0k
  BLOSC_ERROR_NULL(thread_context->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
2046
17.0k
  thread_context->tmp2 = thread_context->tmp + ebsize;
2047
17.0k
  thread_context->tmp3 = thread_context->tmp2 + ebsize;
2048
17.0k
  thread_context->tmp4 = thread_context->tmp3 + ebsize;
2049
17.0k
  thread_context->tmp_blocksize = context->blocksize;
2050
17.0k
  thread_context->zfp_cell_nitems = 0;
2051
17.0k
  thread_context->zfp_cell_start = 0;
2052
17.0k
  #if defined(HAVE_ZSTD)
2053
17.0k
  thread_context->zstd_cctx = NULL;
2054
17.0k
  thread_context->zstd_dctx = NULL;
2055
17.0k
  #endif
2056
2057
  /* Create the hash table for LZ4 in case we are using IPP */
2058
#ifdef HAVE_IPP
2059
  IppStatus status;
2060
  int inlen = thread_context->tmp_blocksize > 0 ? thread_context->tmp_blocksize : 1 << 16;
2061
  int hash_size = 0;
2062
  status = ippsEncodeLZ4HashTableGetSize_8u(&hash_size);
2063
  if (status != ippStsNoErr) {
2064
      BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableGetSize_8u.");
2065
  }
2066
  Ipp8u *hash_table = ippsMalloc_8u(hash_size);
2067
  status = ippsEncodeLZ4HashTableInit_8u(hash_table, inlen);
2068
  if (status != ippStsNoErr) {
2069
    BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableInit_8u.");
2070
  }
2071
  thread_context->lz4_hash_table = hash_table;
2072
#endif
2073
17.0k
  return 0;
2074
17.0k
}
2075
2076
static struct thread_context*
2077
17.0k
create_thread_context(blosc2_context* context, int32_t tid) {
2078
17.0k
  struct thread_context* thread_context;
2079
17.0k
  thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
2080
17.0k
  BLOSC_ERROR_NULL(thread_context, NULL);
2081
17.0k
  int rc = init_thread_context(thread_context, context, tid);
2082
17.0k
  if (rc < 0) {
2083
0
    return NULL;
2084
0
  }
2085
17.0k
  return thread_context;
2086
17.0k
}
2087
2088
/* free members of thread_context, but not thread_context itself */
2089
17.0k
static void destroy_thread_context(struct thread_context* thread_context) {
2090
17.0k
  my_free(thread_context->tmp);
2091
17.0k
#if defined(HAVE_ZSTD)
2092
17.0k
  if (thread_context->zstd_cctx != NULL) {
2093
907
    ZSTD_freeCCtx(thread_context->zstd_cctx);
2094
907
  }
2095
17.0k
  if (thread_context->zstd_dctx != NULL) {
2096
5.24k
    ZSTD_freeDCtx(thread_context->zstd_dctx);
2097
5.24k
  }
2098
17.0k
#endif
2099
#ifdef HAVE_IPP
2100
  if (thread_context->lz4_hash_table != NULL) {
2101
    ippsFree(thread_context->lz4_hash_table);
2102
  }
2103
#endif
2104
17.0k
}
2105
2106
17.0k
void free_thread_context(struct thread_context* thread_context) {
2107
17.0k
  destroy_thread_context(thread_context);
2108
17.0k
  my_free(thread_context);
2109
17.0k
}
2110
2111
2112
89.2k
int check_nthreads(blosc2_context* context) {
2113
89.2k
  if (context->nthreads <= 0) {
2114
0
    BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2115
0
    return BLOSC2_ERROR_INVALID_PARAM;
2116
0
  }
2117
2118
89.2k
  if (context->new_nthreads != context->nthreads) {
2119
0
    if (context->nthreads > 1) {
2120
0
      release_threadpool(context);
2121
0
    }
2122
0
    context->nthreads = context->new_nthreads;
2123
0
  }
2124
89.2k
  if (context->new_nthreads > 1 && context->threads_started == 0) {
2125
0
    init_threadpool(context);
2126
0
  }
2127
2128
89.2k
  return context->nthreads;
2129
89.2k
}
2130
2131
/* Do the compression or decompression of the buffer depending on the
2132
   global params. */
2133
89.2k
static int do_job(blosc2_context* context) {
2134
89.2k
  int32_t ntbytes;
2135
2136
  /* Set sentinels */
2137
89.2k
  context->dref_not_init = 1;
2138
2139
  /* Check whether we need to restart threads */
2140
89.2k
  check_nthreads(context);
2141
2142
  /* Run the serial version when nthreads is 1 or when the buffers are
2143
     not larger than blocksize */
2144
89.2k
  if (context->nthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
2145
    /* The context for this 'thread' has no been initialized yet */
2146
89.2k
    if (context->serial_context == NULL) {
2147
15.3k
      context->serial_context = create_thread_context(context, 0);
2148
15.3k
    }
2149
73.8k
    else if (context->blocksize != context->serial_context->tmp_blocksize) {
2150
1.67k
      free_thread_context(context->serial_context);
2151
1.67k
      context->serial_context = create_thread_context(context, 0);
2152
1.67k
    }
2153
89.2k
    BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
2154
89.2k
    ntbytes = serial_blosc(context->serial_context);
2155
89.2k
  }
2156
0
  else {
2157
0
    ntbytes = parallel_blosc(context);
2158
0
  }
2159
2160
89.2k
  return ntbytes;
2161
89.2k
}
2162
2163
2164
static int initialize_context_compression(
2165
        blosc2_context* context, const void* src, int32_t srcsize, void* dest,
2166
        int32_t destsize, int clevel, uint8_t const *filters,
2167
        uint8_t const *filters_meta, int32_t typesize, int compressor,
2168
        int32_t blocksize, int16_t new_nthreads, int16_t nthreads,
2169
        int32_t splitmode,
2170
        int tuner_id, void *tuner_params,
2171
51.5k
        blosc2_schunk* schunk) {
2172
2173
  /* Set parameters */
2174
51.5k
  context->do_compress = 1;
2175
51.5k
  context->src = (const uint8_t*)src;
2176
51.5k
  context->srcsize = srcsize;
2177
51.5k
  context->dest = (uint8_t*)dest;
2178
51.5k
  context->output_bytes = 0;
2179
51.5k
  context->destsize = destsize;
2180
51.5k
  context->sourcesize = srcsize;
2181
51.5k
  context->typesize = typesize;
2182
51.5k
  context->filter_flags = filters_to_flags(filters);
2183
361k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
2184
309k
    context->filters[i] = filters[i];
2185
309k
    context->filters_meta[i] = filters_meta[i];
2186
309k
  }
2187
51.5k
  context->compcode = compressor;
2188
51.5k
  context->nthreads = nthreads;
2189
51.5k
  context->new_nthreads = new_nthreads;
2190
51.5k
  context->end_threads = 0;
2191
51.5k
  context->clevel = clevel;
2192
51.5k
  context->schunk = schunk;
2193
51.5k
  context->tuner_params = tuner_params;
2194
51.5k
  context->tuner_id = tuner_id;
2195
51.5k
  context->splitmode = splitmode;
2196
  /* tuner some compression parameters */
2197
51.5k
  context->blocksize = (int32_t)blocksize;
2198
51.5k
  int rc = 0;
2199
51.5k
  if (context->tuner_params != NULL) {
2200
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2201
0
      if (blosc_stune_next_cparams(context) < 0) {
2202
0
        BLOSC_TRACE_ERROR("Error in stune next_cparams func\n");
2203
0
        return BLOSC2_ERROR_TUNER;
2204
0
      }
2205
0
    } else {
2206
0
      for (int i = 0; i < g_ntuners; ++i) {
2207
0
        if (g_tuners[i].id == context->tuner_id) {
2208
0
          if (g_tuners[i].next_cparams == NULL) {
2209
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2210
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2211
0
              return BLOSC2_ERROR_FAILURE;
2212
0
            }
2213
0
          }
2214
0
          if (g_tuners[i].next_cparams(context) < 0) {
2215
0
            BLOSC_TRACE_ERROR("Error in tuner %d next_cparams func\n", context->tuner_id);
2216
0
            return BLOSC2_ERROR_TUNER;
2217
0
          }
2218
0
          if (g_tuners[i].id == BLOSC_BTUNE && context->blocksize == 0) {
2219
            // Call stune for initializing blocksize
2220
0
            if (blosc_stune_next_blocksize(context) < 0) {
2221
0
              BLOSC_TRACE_ERROR("Error in stune next_blocksize func\n");
2222
0
              return BLOSC2_ERROR_TUNER;
2223
0
            }
2224
0
          }
2225
0
          goto urtunersuccess;
2226
0
        }
2227
0
      }
2228
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2229
0
      return BLOSC2_ERROR_INVALID_PARAM;
2230
0
    }
2231
51.5k
  } else {
2232
51.5k
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2233
51.5k
      rc = blosc_stune_next_blocksize(context);
2234
51.5k
    } else {
2235
0
      for (int i = 0; i < g_ntuners; ++i) {
2236
0
        if (g_tuners[i].id == context->tuner_id) {
2237
0
          if (g_tuners[i].next_blocksize == NULL) {
2238
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2239
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2240
0
              return BLOSC2_ERROR_FAILURE;
2241
0
            }
2242
0
          }
2243
0
          rc = g_tuners[i].next_blocksize(context);
2244
0
          goto urtunersuccess;
2245
0
        }
2246
0
      }
2247
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2248
0
      return BLOSC2_ERROR_INVALID_PARAM;
2249
0
    }
2250
51.5k
  }
2251
51.5k
  urtunersuccess:;
2252
51.5k
  if (rc < 0) {
2253
0
    BLOSC_TRACE_ERROR("Error in tuner next_blocksize func\n");
2254
0
    return BLOSC2_ERROR_TUNER;
2255
0
  }
2256
2257
2258
  /* Check buffer size limits */
2259
51.5k
  if (srcsize > BLOSC2_MAX_BUFFERSIZE) {
2260
0
    BLOSC_TRACE_ERROR("Input buffer size cannot exceed %d bytes.",
2261
0
                      BLOSC2_MAX_BUFFERSIZE);
2262
0
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2263
0
  }
2264
2265
51.5k
  if (destsize < BLOSC2_MAX_OVERHEAD) {
2266
31
    BLOSC_TRACE_ERROR("Output buffer size should be larger than %d bytes.",
2267
31
                      BLOSC2_MAX_OVERHEAD);
2268
31
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2269
31
  }
2270
2271
  /* Compression level */
2272
51.5k
  if (clevel < 0 || clevel > 9) {
2273
    /* If clevel not in 0..9, print an error */
2274
0
    BLOSC_TRACE_ERROR("`clevel` parameter must be between 0 and 9!.");
2275
0
    return BLOSC2_ERROR_CODEC_PARAM;
2276
0
  }
2277
2278
  /* Check typesize limits */
2279
51.5k
  if (context->typesize > BLOSC2_MAXTYPESIZE) {
2280
    // If typesize is too large for Blosc2, return an error
2281
0
    BLOSC_TRACE_ERROR("Typesize cannot exceed %d bytes.", BLOSC2_MAXTYPESIZE);
2282
0
    return BLOSC2_ERROR_INVALID_PARAM;
2283
0
  }
2284
  /* Now, cap typesize so that blosc2 split machinery can continue to work */
2285
51.5k
  if (context->typesize > BLOSC_MAX_TYPESIZE) {
2286
    /* If typesize is too large, treat buffer as an 1-byte stream. */
2287
0
    context->typesize = 1;
2288
0
  }
2289
2290
51.5k
  blosc2_calculate_blocks(context);
2291
2292
51.5k
  return 1;
2293
51.5k
}
2294
2295
2296
static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
2297
27.5k
                                            int32_t srcsize, void* dest, int32_t destsize) {
2298
27.5k
  int32_t bstarts_end;
2299
2300
27.5k
  context->do_compress = 0;
2301
27.5k
  context->src = (const uint8_t*)src;
2302
27.5k
  context->srcsize = srcsize;
2303
27.5k
  context->dest = (uint8_t*)dest;
2304
27.5k
  context->destsize = destsize;
2305
27.5k
  context->output_bytes = 0;
2306
27.5k
  context->end_threads = 0;
2307
2308
27.5k
  int rc = blosc2_initialize_context_from_header(context, header);
2309
27.5k
  if (rc < 0) {
2310
0
    return rc;
2311
0
  }
2312
2313
  /* Check that we have enough space to decompress */
2314
27.5k
  if (context->sourcesize > (int32_t)context->destsize) {
2315
0
    return BLOSC2_ERROR_WRITE_BUFFER;
2316
0
  }
2317
2318
27.5k
  if (context->block_maskout != NULL && context->block_maskout_nitems != context->nblocks) {
2319
0
    BLOSC_TRACE_ERROR("The number of items in block_maskout (%d) must match the number"
2320
0
                      " of blocks in chunk (%d).",
2321
0
                      context->block_maskout_nitems, context->nblocks);
2322
0
    return BLOSC2_ERROR_DATA;
2323
0
  }
2324
2325
27.5k
  context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
2326
27.5k
  if (context->special_type > BLOSC2_SPECIAL_LASTID) {
2327
1
    BLOSC_TRACE_ERROR("Unknown special values ID (%d) ",
2328
1
                      context->special_type);
2329
1
    return BLOSC2_ERROR_DATA;
2330
1
  }
2331
2332
27.5k
  int memcpyed = (context->header_flags & (uint8_t) BLOSC_MEMCPYED);
2333
27.5k
  if (memcpyed && (header->cbytes != header->nbytes + context->header_overhead)) {
2334
14
    BLOSC_TRACE_ERROR("Wrong header info for this memcpyed chunk");
2335
14
    return BLOSC2_ERROR_DATA;
2336
14
  }
2337
2338
27.5k
  if ((header->nbytes == 0) && (header->cbytes == context->header_overhead) &&
2339
0
      !context->special_type) {
2340
    // A compressed buffer with only a header can only contain a zero-length buffer
2341
0
    return 0;
2342
0
  }
2343
2344
27.5k
  context->bstarts = (int32_t *) (context->src + context->header_overhead);
2345
27.5k
  bstarts_end = context->header_overhead;
2346
27.5k
  if (!context->special_type && !memcpyed) {
2347
    /* If chunk is not special or a memcpyed, we do have a bstarts section */
2348
23.9k
    bstarts_end = (int32_t)(context->header_overhead + (context->nblocks * sizeof(int32_t)));
2349
23.9k
  }
2350
2351
27.5k
  if (srcsize < bstarts_end) {
2352
19
    BLOSC_TRACE_ERROR("`bstarts` exceeds length of source buffer.");
2353
19
    return BLOSC2_ERROR_READ_BUFFER;
2354
19
  }
2355
27.5k
  srcsize -= bstarts_end;
2356
2357
  /* Read optional dictionary if flag set */
2358
27.5k
  if (context->blosc2_flags & BLOSC2_USEDICT) {
2359
2.94k
#if defined(HAVE_ZSTD)
2360
2.94k
    context->use_dict = 1;
2361
2.94k
    if (context->dict_ddict != NULL) {
2362
      // Free the existing dictionary (probably from another chunk)
2363
0
      ZSTD_freeDDict(context->dict_ddict);
2364
0
    }
2365
    // The trained dictionary is after the bstarts block
2366
2.94k
    if (srcsize < (signed)sizeof(int32_t)) {
2367
3
      BLOSC_TRACE_ERROR("Not enough space to read size of dictionary.");
2368
3
      return BLOSC2_ERROR_READ_BUFFER;
2369
3
    }
2370
2.94k
    srcsize -= sizeof(int32_t);
2371
    // Read dictionary size
2372
2.94k
    context->dict_size = sw32_(context->src + bstarts_end);
2373
2.94k
    if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2374
49
      BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2375
49
      return BLOSC2_ERROR_CODEC_DICT;
2376
49
    }
2377
2.89k
    if (srcsize < (int32_t)context->dict_size) {
2378
26
      BLOSC_TRACE_ERROR("Not enough space to read entire dictionary.");
2379
26
      return BLOSC2_ERROR_READ_BUFFER;
2380
26
    }
2381
2.86k
    srcsize -= context->dict_size;
2382
    // Read dictionary
2383
2.86k
    context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2384
2.86k
    context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2385
2.86k
#endif   // HAVE_ZSTD
2386
2.86k
  }
2387
2388
27.4k
  return 0;
2389
27.5k
}
2390
2391
51.5k
static int write_compression_header(blosc2_context* context, bool extended_header) {
2392
51.5k
  blosc_header header;
2393
51.5k
  int dont_split;
2394
51.5k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
2395
2396
51.5k
  context->header_flags = 0;
2397
2398
51.5k
  if (context->clevel == 0) {
2399
    /* Compression level 0 means buffer to be memcpy'ed */
2400
367
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2401
367
  }
2402
51.5k
  if (context->sourcesize < BLOSC_MIN_BUFFERSIZE) {
2403
    /* Buffer is too small.  Try memcpy'ing. */
2404
121
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2405
121
  }
2406
2407
51.5k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2408
51.5k
  if (extended_header) {
2409
    /* Indicate that we are building an extended header */
2410
51.5k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
2411
51.5k
    context->header_flags |= (BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE);
2412
    /* Store filter pipeline info at the end of the header */
2413
51.5k
    if (dict_training || memcpyed) {
2414
478
      context->bstarts = NULL;
2415
478
      context->output_bytes = context->header_overhead;
2416
51.0k
    } else {
2417
51.0k
      context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2418
51.0k
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2419
51.0k
    }
2420
51.5k
  } else {
2421
    // Regular header
2422
0
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
2423
0
    if (memcpyed) {
2424
0
      context->bstarts = NULL;
2425
0
      context->output_bytes = context->header_overhead;
2426
0
    } else {
2427
0
      context->bstarts = (int32_t *) (context->dest + context->header_overhead);
2428
0
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2429
0
    }
2430
0
  }
2431
2432
  // when memcpyed bit is set, there is no point in dealing with others
2433
51.5k
  if (!memcpyed) {
2434
51.0k
    if (context->filter_flags & BLOSC_DOSHUFFLE) {
2435
      /* Byte-shuffle is active */
2436
12.5k
      context->header_flags |= BLOSC_DOSHUFFLE;
2437
12.5k
    }
2438
2439
51.0k
    if (context->filter_flags & BLOSC_DOBITSHUFFLE) {
2440
      /* Bit-shuffle is active */
2441
14.1k
      context->header_flags |= BLOSC_DOBITSHUFFLE;
2442
14.1k
    }
2443
2444
51.0k
    if (context->filter_flags & BLOSC_DODELTA) {
2445
      /* Delta is active */
2446
0
      context->header_flags |= BLOSC_DODELTA;
2447
0
    }
2448
2449
51.0k
    dont_split = !split_block(context, context->typesize,
2450
51.0k
                              context->blocksize);
2451
2452
    /* dont_split is in bit 4 */
2453
51.0k
    context->header_flags |= dont_split << 4;
2454
    /* codec starts at bit 5 */
2455
51.0k
    uint8_t compformat = compcode_to_compformat(context->compcode);
2456
51.0k
    context->header_flags |= compformat << 5;
2457
51.0k
  }
2458
2459
  // Create blosc header and store to dest
2460
51.5k
  blosc2_intialize_header_from_context(context, &header, extended_header);
2461
2462
51.5k
  memcpy(context->dest, &header, (extended_header) ?
2463
51.5k
    BLOSC_EXTENDED_HEADER_LENGTH : BLOSC_MIN_HEADER_LENGTH);
2464
2465
51.5k
  return 1;
2466
51.5k
}
2467
2468
2469
51.5k
static int blosc_compress_context(blosc2_context* context) {
2470
51.5k
  int ntbytes = 0;
2471
51.5k
  blosc_timestamp_t last, current;
2472
51.5k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2473
2474
51.5k
  blosc_set_timestamp(&last);
2475
2476
51.5k
  if (!memcpyed) {
2477
    /* Do the actual compression */
2478
51.0k
    ntbytes = do_job(context);
2479
51.0k
    if (ntbytes < 0) {
2480
0
      return ntbytes;
2481
0
    }
2482
51.0k
    if (ntbytes == 0) {
2483
      // Try out with a memcpy later on (last chance for fitting src buffer in dest).
2484
15.1k
      context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2485
15.1k
      memcpyed = true;
2486
15.1k
    }
2487
51.0k
  }
2488
2489
51.5k
  int dont_split = (context->header_flags & 0x10) >> 4;
2490
51.5k
  int nstreams = context->nblocks;
2491
51.5k
  if (!dont_split) {
2492
    // When splitting, the number of streams is computed differently
2493
12.9k
    if (context->leftover) {
2494
284
      nstreams = (context->nblocks - 1) * context->typesize + 1;
2495
284
    }
2496
12.6k
    else {
2497
12.6k
      nstreams *= context->typesize;
2498
12.6k
    }
2499
12.9k
  }
2500
2501
51.5k
  if (memcpyed) {
2502
15.6k
    if (context->sourcesize + context->header_overhead > context->destsize) {
2503
      /* We are exceeding maximum output size */
2504
4.98k
      ntbytes = 0;
2505
4.98k
    }
2506
10.6k
    else {
2507
10.6k
      context->output_bytes = context->header_overhead;
2508
10.6k
      ntbytes = do_job(context);
2509
10.6k
      if (ntbytes < 0) {
2510
0
        return ntbytes;
2511
0
      }
2512
      // Success!  update the memcpy bit in header
2513
10.6k
      context->dest[BLOSC2_CHUNK_FLAGS] = context->header_flags;
2514
      // and clear the memcpy bit in context (for next reuse)
2515
10.6k
      context->header_flags &= ~(uint8_t)BLOSC_MEMCPYED;
2516
10.6k
    }
2517
15.6k
  }
2518
35.8k
  else {
2519
    // Check whether we have a run for the whole chunk
2520
35.8k
    int start_csizes = context->header_overhead + 4 * context->nblocks;
2521
35.8k
    if (ntbytes == (int)(start_csizes + nstreams * sizeof(int32_t))) {
2522
      // The streams are all zero runs (by construction).  Encode it...
2523
1.91k
      context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] |= BLOSC2_SPECIAL_ZERO << 4;
2524
      // ...and assign the new chunk length
2525
1.91k
      ntbytes = context->header_overhead;
2526
1.91k
    }
2527
35.8k
  }
2528
2529
  /* Set the number of compressed bytes in header */
2530
51.5k
  _sw32(context->dest + BLOSC2_CHUNK_CBYTES, ntbytes);
2531
51.5k
  if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
2532
0
    dont_split = (context->header_flags & 0x10) >> 4;
2533
0
    int32_t blocksize = dont_split ? (int32_t)sizeof(blosc2_instr) : (int32_t)sizeof(blosc2_instr) * context->typesize;
2534
0
    _sw32(context->dest + BLOSC2_CHUNK_NBYTES, nstreams * (int32_t)sizeof(blosc2_instr));
2535
0
    _sw32(context->dest + BLOSC2_CHUNK_BLOCKSIZE, blocksize);
2536
0
  }
2537
2538
  /* Set the number of bytes in dest buffer (might be useful for tuner) */
2539
51.5k
  context->destsize = ntbytes;
2540
2541
51.5k
  if (context->tuner_params != NULL) {
2542
0
    blosc_set_timestamp(&current);
2543
0
    double ctime = blosc_elapsed_secs(last, current);
2544
0
    int rc;
2545
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2546
0
      rc = blosc_stune_update(context, ctime);
2547
0
    } else {
2548
0
      for (int i = 0; i < g_ntuners; ++i) {
2549
0
        if (g_tuners[i].id == context->tuner_id) {
2550
0
          if (g_tuners[i].update == NULL) {
2551
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2552
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2553
0
              return BLOSC2_ERROR_FAILURE;
2554
0
            }
2555
0
          }
2556
0
          rc = g_tuners[i].update(context, ctime);
2557
0
          goto urtunersuccess;
2558
0
        }
2559
0
      }
2560
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2561
0
      return BLOSC2_ERROR_INVALID_PARAM;
2562
0
      urtunersuccess:;
2563
0
    }
2564
0
    if (rc < 0) {
2565
0
      BLOSC_TRACE_ERROR("Error in tuner update func\n");
2566
0
      return BLOSC2_ERROR_TUNER;
2567
0
    }
2568
0
  }
2569
2570
51.5k
  return ntbytes;
2571
51.5k
}
2572
2573
2574
/* The public secure routine for compression with context. */
2575
int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2576
38.8k
                        void* dest, int32_t destsize) {
2577
38.8k
  int error, cbytes;
2578
2579
38.8k
  if (context->do_compress != 1) {
2580
0
    BLOSC_TRACE_ERROR("Context is not meant for compression.  Giving up.");
2581
0
    return BLOSC2_ERROR_INVALID_PARAM;
2582
0
  }
2583
2584
38.8k
  error = initialize_context_compression(
2585
38.8k
          context, src, srcsize, dest, destsize,
2586
38.8k
          context->clevel, context->filters, context->filters_meta,
2587
38.8k
          context->typesize, context->compcode, context->blocksize,
2588
38.8k
          context->new_nthreads, context->nthreads, context->splitmode,
2589
38.8k
          context->tuner_id, context->tuner_params, context->schunk);
2590
38.8k
  if (error <= 0) {
2591
0
    return error;
2592
0
  }
2593
2594
  /* Write the extended header */
2595
38.8k
  error = write_compression_header(context, true);
2596
38.8k
  if (error < 0) {
2597
0
    return error;
2598
0
  }
2599
2600
38.8k
  cbytes = blosc_compress_context(context);
2601
38.8k
  if (cbytes < 0) {
2602
0
    return cbytes;
2603
0
  }
2604
2605
38.8k
  if (context->use_dict && context->dict_cdict == NULL) {
2606
2607
0
    if (context->compcode != BLOSC_ZSTD) {
2608
0
      const char* compname;
2609
0
      compname = clibcode_to_clibname(context->compcode);
2610
0
      BLOSC_TRACE_ERROR("Codec %s does not support dicts.  Giving up.",
2611
0
                        compname);
2612
0
      return BLOSC2_ERROR_CODEC_DICT;
2613
0
    }
2614
2615
0
#ifdef HAVE_ZSTD
2616
    // Build the dictionary out of the filters outcome and compress with it
2617
0
    int32_t dict_maxsize = BLOSC2_MAXDICTSIZE;
2618
    // Do not make the dict more than 5% larger than uncompressed buffer
2619
0
    if (dict_maxsize > srcsize / 20) {
2620
0
      dict_maxsize = srcsize / 20;
2621
0
    }
2622
0
    void* samples_buffer = context->dest + context->header_overhead;
2623
0
    unsigned nblocks = (unsigned)context->nblocks;
2624
0
    int dont_split = (context->header_flags & 0x10) >> 4;
2625
0
    if (!dont_split) {
2626
0
      nblocks = nblocks * context->typesize;
2627
0
    }
2628
0
    if (nblocks < 8) {
2629
0
      nblocks = 8;  // the minimum that accepts zstd as of 1.4.0
2630
0
    }
2631
2632
    // 1 allows to use most of the chunk for training, but it is slower,
2633
    // and it does not always seem to improve compression ratio.
2634
    // Let's use 16, which is faster and still gives good results
2635
    // on test_dict_schunk.c, but this seems very dependent on the data.
2636
0
    unsigned sample_fraction = 16;
2637
0
    size_t sample_size = context->sourcesize / nblocks / sample_fraction;
2638
2639
    // Populate the samples sizes for training the dictionary
2640
0
    size_t* samples_sizes = malloc(nblocks * sizeof(void*));
2641
0
    BLOSC_ERROR_NULL(samples_sizes, BLOSC2_ERROR_MEMORY_ALLOC);
2642
0
    for (size_t i = 0; i < nblocks; i++) {
2643
0
      samples_sizes[i] = sample_size;
2644
0
    }
2645
2646
    // Train from samples
2647
0
    void* dict_buffer = malloc(dict_maxsize);
2648
0
    BLOSC_ERROR_NULL(dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2649
0
    int32_t dict_actual_size = (int32_t)ZDICT_trainFromBuffer(
2650
0
        dict_buffer, dict_maxsize,
2651
0
        samples_buffer, samples_sizes, nblocks);
2652
2653
    // TODO: experiment with parameters of low-level fast cover algorithm
2654
    // Note that this API is still unstable.  See: https://github.com/facebook/zstd/issues/1599
2655
    // ZDICT_fastCover_params_t fast_cover_params;
2656
    // memset(&fast_cover_params, 0, sizeof(fast_cover_params));
2657
    // fast_cover_params.d = nblocks;
2658
    // fast_cover_params.steps = 4;
2659
    // fast_cover_params.zParams.compressionLevel = context->clevel;
2660
    // size_t dict_actual_size = ZDICT_optimizeTrainFromBuffer_fastCover(
2661
    //   dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks,
2662
    //   &fast_cover_params);
2663
2664
0
    if (ZDICT_isError(dict_actual_size) != ZSTD_error_no_error) {
2665
0
      BLOSC_TRACE_ERROR("Error in ZDICT_trainFromBuffer(): '%s'."
2666
0
                        "  Giving up.", ZDICT_getErrorName(dict_actual_size));
2667
0
      return BLOSC2_ERROR_CODEC_DICT;
2668
0
    }
2669
0
    assert(dict_actual_size > 0);
2670
0
    free(samples_sizes);
2671
2672
    // Update bytes counter and pointers to bstarts for the new compressed buffer
2673
0
    context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2674
0
    context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2675
    /* Write the size of trained dict at the end of bstarts */
2676
0
    _sw32(context->dest + context->output_bytes, (int32_t)dict_actual_size);
2677
0
    context->output_bytes += sizeof(int32_t);
2678
    /* Write the trained dict afterwards */
2679
0
    context->dict_buffer = context->dest + context->output_bytes;
2680
0
    memcpy(context->dict_buffer, dict_buffer, (unsigned int)dict_actual_size);
2681
0
    context->dict_cdict = ZSTD_createCDict(dict_buffer, dict_actual_size, 1);  // TODO: use get_accel()
2682
0
    free(dict_buffer);      // the dictionary is copied in the header now
2683
0
    context->output_bytes += (int32_t)dict_actual_size;
2684
0
    context->dict_size = dict_actual_size;
2685
2686
    /* Compress with dict */
2687
0
    cbytes = blosc_compress_context(context);
2688
2689
    // Invalidate the dictionary for compressing other chunks using the same context
2690
0
    context->dict_buffer = NULL;
2691
0
    ZSTD_freeCDict(context->dict_cdict);
2692
0
    context->dict_cdict = NULL;
2693
0
#endif  // HAVE_ZSTD
2694
0
  }
2695
2696
38.8k
  return cbytes;
2697
38.8k
}
2698
2699
2700
void build_filters(const int doshuffle, const int delta,
2701
15.1k
                   const int32_t typesize, uint8_t* filters) {
2702
2703
  /* Fill the end part of the filter pipeline */
2704
15.1k
  if ((doshuffle == BLOSC_SHUFFLE) && (typesize > 1))
2705
0
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
2706
15.1k
  if (doshuffle == BLOSC_BITSHUFFLE)
2707
4.84k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
2708
15.1k
  if (doshuffle == BLOSC_NOSHUFFLE)
2709
4.43k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_NOSHUFFLE;
2710
15.1k
  if (delta)
2711
0
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
2712
15.1k
}
2713
2714
/* The public secure routine for compression. */
2715
int blosc2_compress(int clevel, int doshuffle, int32_t typesize,
2716
12.7k
                    const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2717
12.7k
  int error;
2718
12.7k
  int result;
2719
12.7k
  char* envvar;
2720
2721
  /* Check whether the library should be initialized */
2722
12.7k
  if (!g_initlib) blosc2_init();
2723
2724
  /* Check for a BLOSC_CLEVEL environment variable */
2725
12.7k
  envvar = getenv("BLOSC_CLEVEL");
2726
12.7k
  if (envvar != NULL) {
2727
0
    long value;
2728
0
    errno = 0; /* To distinguish success/failure after call */
2729
0
    value = strtol(envvar, NULL, 10);
2730
0
    if ((errno != EINVAL) && (value >= 0)) {
2731
0
      clevel = (int)value;
2732
0
    }
2733
0
    else {
2734
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
2735
0
    }
2736
0
  }
2737
2738
  /* Check for a BLOSC_SHUFFLE environment variable */
2739
12.7k
  envvar = getenv("BLOSC_SHUFFLE");
2740
12.7k
  if (envvar != NULL) {
2741
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
2742
0
      doshuffle = BLOSC_NOSHUFFLE;
2743
0
    }
2744
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
2745
0
      doshuffle = BLOSC_SHUFFLE;
2746
0
    }
2747
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
2748
0
      doshuffle = BLOSC_BITSHUFFLE;
2749
0
    }
2750
0
    else {
2751
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
2752
0
    }
2753
0
  }
2754
2755
  /* Check for a BLOSC_DELTA environment variable */
2756
12.7k
  envvar = getenv("BLOSC_DELTA");
2757
12.7k
  if (envvar != NULL) {
2758
0
    if (strcmp(envvar, "1") == 0) {
2759
0
      blosc2_set_delta(1);
2760
0
    } else if (strcmp(envvar, "0") == 0) {
2761
0
      blosc2_set_delta(0);
2762
0
    }
2763
0
    else {
2764
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
2765
0
    }
2766
0
  }
2767
2768
  /* Check for a BLOSC_TYPESIZE environment variable */
2769
12.7k
  envvar = getenv("BLOSC_TYPESIZE");
2770
12.7k
  if (envvar != NULL) {
2771
0
    long value;
2772
0
    errno = 0; /* To distinguish success/failure after call */
2773
0
    value = strtol(envvar, NULL, 10);
2774
0
    if ((errno != EINVAL) && (value > 0)) {
2775
0
      typesize = (int32_t)value;
2776
0
    }
2777
0
    else {
2778
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
2779
0
    }
2780
0
  }
2781
2782
  /* Check for a BLOSC_COMPRESSOR environment variable */
2783
12.7k
  envvar = getenv("BLOSC_COMPRESSOR");
2784
12.7k
  if (envvar != NULL) {
2785
0
    result = blosc1_set_compressor(envvar);
2786
0
    if (result < 0) {
2787
0
      BLOSC_TRACE_WARNING("BLOSC_COMPRESSOR environment variable '%s' not recognized\n", envvar);
2788
0
    }
2789
0
  }
2790
2791
  /* Check for a BLOSC_BLOCKSIZE environment variable */
2792
12.7k
  envvar = getenv("BLOSC_BLOCKSIZE");
2793
12.7k
  if (envvar != NULL) {
2794
0
    long blocksize;
2795
0
    errno = 0; /* To distinguish success/failure after call */
2796
0
    blocksize = strtol(envvar, NULL, 10);
2797
0
    if ((errno != EINVAL) && (blocksize > 0)) {
2798
0
      blosc1_set_blocksize((size_t) blocksize);
2799
0
    }
2800
0
    else {
2801
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
2802
0
    }
2803
0
  }
2804
2805
  /* Check for a BLOSC_NTHREADS environment variable */
2806
12.7k
  envvar = getenv("BLOSC_NTHREADS");
2807
12.7k
  if (envvar != NULL) {
2808
0
    long nthreads;
2809
0
    errno = 0; /* To distinguish success/failure after call */
2810
0
    nthreads = strtol(envvar, NULL, 10);
2811
0
    if ((errno != EINVAL) && (nthreads > 0)) {
2812
0
      result = blosc2_set_nthreads((int16_t) nthreads);
2813
0
      if (result < 0) {
2814
0
        BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
2815
0
      }
2816
0
    }
2817
0
  }
2818
2819
  /* Check for a BLOSC_SPLITMODE environment variable */
2820
12.7k
  envvar = getenv("BLOSC_SPLITMODE");
2821
12.7k
  if (envvar != NULL) {
2822
0
    int32_t splitmode = -1;
2823
0
    if (strcmp(envvar, "ALWAYS") == 0) {
2824
0
      splitmode = BLOSC_ALWAYS_SPLIT;
2825
0
    }
2826
0
    else if (strcmp(envvar, "NEVER") == 0) {
2827
0
      splitmode = BLOSC_NEVER_SPLIT;
2828
0
    }
2829
0
    else if (strcmp(envvar, "AUTO") == 0) {
2830
0
      splitmode = BLOSC_AUTO_SPLIT;
2831
0
    }
2832
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
2833
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
2834
0
    }
2835
0
    else {
2836
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
2837
0
    }
2838
2839
0
    if (splitmode >= 0) {
2840
0
      blosc1_set_splitmode(splitmode);
2841
0
    }
2842
0
  }
2843
2844
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
2845
     that this should be the last env var so that it can take the
2846
     previous ones into account */
2847
12.7k
  envvar = getenv("BLOSC_NOLOCK");
2848
12.7k
  if (envvar != NULL) {
2849
    // TODO: here is the only place that returns an extended header from
2850
    //  a blosc1_compress() call.  This should probably be fixed.
2851
0
    const char *compname;
2852
0
    blosc2_context *cctx;
2853
0
    blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
2854
2855
0
    blosc2_compcode_to_compname(g_compressor, &compname);
2856
    /* Create a context for compression */
2857
0
    build_filters(doshuffle, g_delta, typesize, cparams.filters);
2858
    // TODO: cparams can be shared in a multithreaded environment.  do a copy!
2859
0
    cparams.typesize = (uint8_t)typesize;
2860
0
    cparams.compcode = (uint8_t)g_compressor;
2861
0
    cparams.clevel = (uint8_t)clevel;
2862
0
    cparams.nthreads = g_nthreads;
2863
0
    cparams.splitmode = g_splitmode;
2864
0
    cctx = blosc2_create_cctx(cparams);
2865
0
    if (cctx == NULL) {
2866
0
      BLOSC_TRACE_ERROR("Error while creating the compression context");
2867
0
      return BLOSC2_ERROR_NULL_POINTER;
2868
0
    }
2869
    /* Do the actual compression */
2870
0
    result = blosc2_compress_ctx(cctx, src, srcsize, dest, destsize);
2871
    /* Release context resources */
2872
0
    blosc2_free_ctx(cctx);
2873
0
    return result;
2874
0
  }
2875
2876
12.7k
  blosc2_pthread_mutex_lock(&global_comp_mutex);
2877
2878
  /* Initialize a context compression */
2879
12.7k
  uint8_t* filters = calloc(1, BLOSC2_MAX_FILTERS);
2880
12.7k
  BLOSC_ERROR_NULL(filters, BLOSC2_ERROR_MEMORY_ALLOC);
2881
12.7k
  uint8_t* filters_meta = calloc(1, BLOSC2_MAX_FILTERS);
2882
12.7k
  BLOSC_ERROR_NULL(filters_meta, BLOSC2_ERROR_MEMORY_ALLOC);
2883
12.7k
  build_filters(doshuffle, g_delta, typesize, filters);
2884
12.7k
  error = initialize_context_compression(
2885
12.7k
          g_global_context, src, srcsize, dest, destsize, clevel, filters,
2886
12.7k
          filters_meta, (int32_t)typesize, g_compressor, g_force_blocksize, g_nthreads, g_nthreads,
2887
12.7k
          g_splitmode, g_tuner, NULL, g_schunk);
2888
12.7k
  free(filters);
2889
12.7k
  free(filters_meta);
2890
12.7k
  if (error <= 0) {
2891
31
    blosc2_pthread_mutex_unlock(&global_comp_mutex);
2892
31
    return error;
2893
31
  }
2894
2895
12.7k
  envvar = getenv("BLOSC_BLOSC1_COMPAT");
2896
12.7k
  if (envvar != NULL) {
2897
    /* Write chunk header without extended header (Blosc1 compatibility mode) */
2898
0
    error = write_compression_header(g_global_context, false);
2899
0
  }
2900
12.7k
  else {
2901
12.7k
    error = write_compression_header(g_global_context, true);
2902
12.7k
  }
2903
12.7k
  if (error < 0) {
2904
0
    blosc2_pthread_mutex_unlock(&global_comp_mutex);
2905
0
    return error;
2906
0
  }
2907
2908
12.7k
  result = blosc_compress_context(g_global_context);
2909
2910
12.7k
  blosc2_pthread_mutex_unlock(&global_comp_mutex);
2911
2912
12.7k
  return result;
2913
12.7k
}
2914
2915
2916
/* The public routine for compression. */
2917
int blosc1_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
2918
0
                    const void* src, void* dest, size_t destsize) {
2919
0
  return blosc2_compress(clevel, doshuffle, (int32_t)typesize, src, (int32_t)nbytes, dest, (int32_t)destsize);
2920
0
}
2921
2922
2923
2924
static int blosc_run_decompression_with_context(blosc2_context* context, const void* src, int32_t srcsize,
2925
34.2k
                                                void* dest, int32_t destsize) {
2926
34.2k
  blosc_header header;
2927
34.2k
  int32_t ntbytes;
2928
34.2k
  int rc;
2929
2930
34.2k
  rc = read_chunk_header(src, srcsize, true, &header);
2931
34.2k
  if (rc < 0) {
2932
45
    return rc;
2933
45
  }
2934
2935
34.2k
  if (header.nbytes > destsize) {
2936
    // Not enough space for writing into the destination
2937
6.64k
    return BLOSC2_ERROR_WRITE_BUFFER;
2938
6.64k
  }
2939
2940
27.5k
  rc = initialize_context_decompression(context, &header, src, srcsize, dest, destsize);
2941
27.5k
  if (rc < 0) {
2942
112
    return rc;
2943
112
  }
2944
2945
  /* Do the actual decompression */
2946
27.4k
  ntbytes = do_job(context);
2947
27.4k
  if (ntbytes < 0) {
2948
9.51k
    return ntbytes;
2949
9.51k
  }
2950
2951
27.4k
  assert(ntbytes <= (int32_t)destsize);
2952
17.9k
  return ntbytes;
2953
27.4k
}
2954
2955
2956
/* The public secure routine for decompression with context. */
2957
int blosc2_decompress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2958
15.1k
                          void* dest, int32_t destsize) {
2959
15.1k
  int result;
2960
2961
15.1k
  if (context->do_compress != 0) {
2962
0
    BLOSC_TRACE_ERROR("Context is not meant for decompression.  Giving up.");
2963
0
    return BLOSC2_ERROR_INVALID_PARAM;
2964
0
  }
2965
2966
15.1k
  result = blosc_run_decompression_with_context(context, src, srcsize, dest, destsize);
2967
2968
  // Reset a possible block_maskout
2969
15.1k
  if (context->block_maskout != NULL) {
2970
0
    free(context->block_maskout);
2971
0
    context->block_maskout = NULL;
2972
0
  }
2973
15.1k
  context->block_maskout_nitems = 0;
2974
2975
15.1k
  return result;
2976
15.1k
}
2977
2978
2979
/* The public secure routine for decompression. */
2980
19.0k
int blosc2_decompress(const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2981
19.0k
  int result;
2982
19.0k
  char* envvar;
2983
19.0k
  long nthreads;
2984
19.0k
  blosc2_context *dctx;
2985
19.0k
  blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
2986
2987
  /* Check whether the library should be initialized */
2988
19.0k
  if (!g_initlib) blosc2_init();
2989
2990
  /* Check for a BLOSC_NTHREADS environment variable */
2991
19.0k
  envvar = getenv("BLOSC_NTHREADS");
2992
19.0k
  if (envvar != NULL) {
2993
0
    errno = 0; /* To distinguish success/failure after call */
2994
0
    nthreads = strtol(envvar, NULL, 10);
2995
0
    if ((errno != EINVAL)) {
2996
0
      if ((nthreads <= 0) || (nthreads > INT16_MAX)) {
2997
0
        BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2998
0
        return BLOSC2_ERROR_INVALID_PARAM;
2999
0
      }
3000
0
      result = blosc2_set_nthreads((int16_t) nthreads);
3001
0
      if (result < 0) {
3002
0
        return result;
3003
0
      }
3004
0
    }
3005
0
  }
3006
3007
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
3008
     that this should be the last env var so that it can take the
3009
     previous ones into account */
3010
19.0k
  envvar = getenv("BLOSC_NOLOCK");
3011
19.0k
  if (envvar != NULL) {
3012
0
    dparams.nthreads = g_nthreads;
3013
0
    dctx = blosc2_create_dctx(dparams);
3014
0
    if (dctx == NULL) {
3015
0
      BLOSC_TRACE_ERROR("Error while creating the decompression context");
3016
0
      return BLOSC2_ERROR_NULL_POINTER;
3017
0
    }
3018
0
    result = blosc2_decompress_ctx(dctx, src, srcsize, dest, destsize);
3019
0
    blosc2_free_ctx(dctx);
3020
0
    return result;
3021
0
  }
3022
3023
19.0k
  blosc2_pthread_mutex_lock(&global_comp_mutex);
3024
3025
19.0k
  result = blosc_run_decompression_with_context(
3026
19.0k
          g_global_context, src, srcsize, dest, destsize);
3027
3028
19.0k
  blosc2_pthread_mutex_unlock(&global_comp_mutex);
3029
3030
19.0k
  return result;
3031
19.0k
}
3032
3033
3034
/* The public routine for decompression. */
3035
7.75k
int blosc1_decompress(const void* src, void* dest, size_t destsize) {
3036
7.75k
  return blosc2_decompress(src, INT32_MAX, dest, (int32_t)destsize);
3037
7.75k
}
3038
3039
3040
/* Specific routine optimized for decompression a small number of
3041
   items out of a compressed chunk.  This does not use threads because
3042
   it would affect negatively to performance. */
3043
int _blosc_getitem(blosc2_context* context, blosc_header* header, const void* src, int32_t srcsize,
3044
15
                   int start, int nitems, void* dest, int32_t destsize) {
3045
15
  uint8_t* _src = (uint8_t*)(src);  /* current pos for source buffer */
3046
15
  uint8_t* _dest = (uint8_t*)(dest);
3047
15
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
3048
15
  int32_t bsize, bsize2, ebsize, leftoverblock;
3049
15
  int32_t startb, stopb;
3050
15
  int32_t stop = start + nitems;
3051
15
  int j, rc;
3052
3053
15
  if (nitems == 0) {
3054
    // We have nothing to do
3055
0
    return 0;
3056
0
  }
3057
15
  if (nitems * header->typesize > destsize) {
3058
0
    BLOSC_TRACE_ERROR("`nitems`*`typesize` out of dest bounds.");
3059
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3060
0
  }
3061
3062
15
  int32_t* bstarts = (int32_t*)(_src + context->header_overhead);
3063
3064
  /* Check region boundaries */
3065
15
  if ((start < 0) || (start * header->typesize > header->nbytes)) {
3066
0
    BLOSC_TRACE_ERROR("`start` out of bounds.");
3067
0
    return BLOSC2_ERROR_INVALID_PARAM;
3068
0
  }
3069
3070
15
  if ((stop < 0) || (stop * header->typesize > header->nbytes)) {
3071
0
    BLOSC_TRACE_ERROR("`start`+`nitems` out of bounds.");
3072
0
    return BLOSC2_ERROR_INVALID_PARAM;
3073
0
  }
3074
3075
15
  int chunk_memcpy = header->flags & 0x1;
3076
15
  if (!context->special_type && !chunk_memcpy &&
3077
0
      ((uint8_t *)(_src + srcsize) < (uint8_t *)(bstarts + context->nblocks))) {
3078
0
    BLOSC_TRACE_ERROR("`bstarts` out of bounds.");
3079
0
    return BLOSC2_ERROR_READ_BUFFER;
3080
0
  }
3081
3082
15
  bool memcpyed = header->flags & (uint8_t)BLOSC_MEMCPYED;
3083
15
  if (context->special_type) {
3084
    // Fake a runlen as if its a memcpyed chunk
3085
0
    memcpyed = true;
3086
0
  }
3087
3088
15
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
3089
15
                  (context->blosc2_flags & 0x08u) && !context->special_type);
3090
15
  if (memcpyed && !is_lazy && !context->postfilter) {
3091
    // Short-circuit for (non-lazy) memcpyed or special values
3092
15
    ntbytes = nitems * header->typesize;
3093
15
    switch (context->special_type) {
3094
0
      case BLOSC2_SPECIAL_VALUE:
3095
        // All repeated values
3096
0
        rc = set_values(context->typesize, _src, _dest, ntbytes);
3097
0
        if (rc < 0) {
3098
0
          BLOSC_TRACE_ERROR("set_values failed");
3099
0
          return BLOSC2_ERROR_DATA;
3100
0
        }
3101
0
        break;
3102
0
      case BLOSC2_SPECIAL_NAN:
3103
0
        rc = set_nans(context->typesize, _dest, ntbytes);
3104
0
        if (rc < 0) {
3105
0
          BLOSC_TRACE_ERROR("set_nans failed");
3106
0
          return BLOSC2_ERROR_DATA;
3107
0
        }
3108
0
        break;
3109
0
      case BLOSC2_SPECIAL_ZERO:
3110
0
        memset(_dest, 0, ntbytes);
3111
0
        break;
3112
0
      case BLOSC2_SPECIAL_UNINIT:
3113
        // We do nothing here
3114
0
        break;
3115
15
      case BLOSC2_NO_SPECIAL:
3116
15
        _src += context->header_overhead + start * context->typesize;
3117
15
        memcpy(_dest, _src, ntbytes);
3118
15
        break;
3119
0
      default:
3120
0
        BLOSC_TRACE_ERROR("Unhandled special value case");
3121
0
        BLOSC_ERROR(BLOSC2_ERROR_SCHUNK_SPECIAL);
3122
15
    }
3123
15
    return ntbytes;
3124
15
  }
3125
3126
0
  ebsize = header->blocksize + header->typesize * (signed)sizeof(int32_t);
3127
0
  struct thread_context* scontext = context->serial_context;
3128
  /* Resize the temporaries in serial context if needed */
3129
0
  if (header->blocksize > scontext->tmp_blocksize) {
3130
0
    my_free(scontext->tmp);
3131
0
    scontext->tmp_nbytes = (size_t)4 * ebsize;
3132
0
    scontext->tmp = my_malloc(scontext->tmp_nbytes);
3133
0
    BLOSC_ERROR_NULL(scontext->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
3134
0
    scontext->tmp2 = scontext->tmp + ebsize;
3135
0
    scontext->tmp3 = scontext->tmp2 + ebsize;
3136
0
    scontext->tmp4 = scontext->tmp3 + ebsize;
3137
0
    scontext->tmp_blocksize = (int32_t)header->blocksize;
3138
0
  }
3139
3140
0
  for (j = 0; j < context->nblocks; j++) {
3141
0
    bsize = header->blocksize;
3142
0
    leftoverblock = 0;
3143
0
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
3144
0
      bsize = context->leftover;
3145
0
      leftoverblock = 1;
3146
0
    }
3147
3148
    /* Compute start & stop for each block */
3149
0
    startb = start * header->typesize - j * header->blocksize;
3150
0
    stopb = stop * header->typesize - j * header->blocksize;
3151
0
    if (stopb <= 0) {
3152
      // We can exit as soon as this block is beyond stop
3153
0
      break;
3154
0
    }
3155
0
    if (startb >= header->blocksize) {
3156
0
      continue;
3157
0
    }
3158
0
    if (startb < 0) {
3159
0
      startb = 0;
3160
0
    }
3161
0
    if (stopb > header->blocksize) {
3162
0
      stopb = header->blocksize;
3163
0
    }
3164
0
    bsize2 = stopb - startb;
3165
3166
0
#if defined(HAVE_PLUGINS)
3167
0
    if (context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) {
3168
0
      scontext->zfp_cell_start = startb / context->typesize;
3169
0
      scontext->zfp_cell_nitems = nitems;
3170
0
    }
3171
0
#endif /* HAVE_PLUGINS */
3172
3173
    /* Do the actual data copy */
3174
    // Regular decompression.  Put results in tmp2.
3175
    // If the block is aligned and the worst case fits in destination, let's avoid a copy
3176
0
    bool get_single_block = ((startb == 0) && (bsize == nitems * header->typesize));
3177
0
    uint8_t* tmp2 = get_single_block ? dest : scontext->tmp2;
3178
3179
    // If memcpyed we don't have a bstarts section (because it is not needed)
3180
0
    int32_t src_offset = memcpyed ?
3181
0
      context->header_overhead + j * header->blocksize : sw32_(bstarts + j);
3182
3183
0
    int32_t cbytes = blosc_d(context->serial_context, bsize, leftoverblock, memcpyed,
3184
0
                             src, srcsize, src_offset, j,
3185
0
                             tmp2, 0, scontext->tmp, scontext->tmp3);
3186
0
    if (cbytes < 0) {
3187
0
      ntbytes = cbytes;
3188
0
      break;
3189
0
    }
3190
0
    if (scontext->zfp_cell_nitems > 0) {
3191
0
      if (cbytes == bsize2) {
3192
0
        memcpy((uint8_t *) dest, tmp2, (unsigned int) bsize2);
3193
0
      } else if (cbytes == context->blocksize) {
3194
0
        memcpy((uint8_t *) dest, tmp2 + scontext->zfp_cell_start * context->typesize, (unsigned int) bsize2);
3195
0
        cbytes = bsize2;
3196
0
      }
3197
0
    } else if (!get_single_block) {
3198
      /* Copy to destination */
3199
0
      memcpy((uint8_t *) dest + ntbytes, tmp2 + startb, (unsigned int) bsize2);
3200
0
    }
3201
0
    ntbytes += bsize2;
3202
0
  }
3203
3204
0
  scontext->zfp_cell_nitems = 0;
3205
3206
0
  return ntbytes;
3207
0
}
3208
3209
15
int blosc2_getitem(const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) {
3210
15
  blosc2_context context;
3211
15
  int result;
3212
3213
  /* Minimally populate the context */
3214
15
  memset(&context, 0, sizeof(blosc2_context));
3215
3216
15
  context.schunk = g_schunk;
3217
15
  context.nthreads = 1;  // force a serial decompression; fixes #95
3218
3219
  /* Call the actual getitem function */
3220
15
  result = blosc2_getitem_ctx(&context, src, srcsize, start, nitems, dest, destsize);
3221
3222
  /* Release resources */
3223
15
  if (context.serial_context != NULL) {
3224
15
    free_thread_context(context.serial_context);
3225
15
  }
3226
15
  return result;
3227
15
}
3228
3229
/* Specific routine optimized for decompression a small number of
3230
   items out of a compressed chunk.  Public non-contextual API. */
3231
0
int blosc1_getitem(const void* src, int start, int nitems, void* dest) {
3232
0
  return blosc2_getitem(src, INT32_MAX, start, nitems, dest, INT32_MAX);
3233
0
}
3234
3235
int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize,
3236
15
    int start, int nitems, void* dest, int32_t destsize) {
3237
15
  blosc_header header;
3238
15
  int result;
3239
3240
  /* Minimally populate the context */
3241
15
  result = read_chunk_header((uint8_t *) src, srcsize, true, &header);
3242
15
  if (result < 0) {
3243
0
    return result;
3244
0
  }
3245
3246
15
  context->src = src;
3247
15
  context->srcsize = srcsize;
3248
15
  context->dest = dest;
3249
15
  context->destsize = destsize;
3250
3251
15
  result = blosc2_initialize_context_from_header(context, &header);
3252
15
  if (result < 0) {
3253
0
    return result;
3254
0
  }
3255
3256
15
  if (context->serial_context == NULL) {
3257
15
    context->serial_context = create_thread_context(context, 0);
3258
15
  }
3259
15
  BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
3260
  /* Call the actual getitem function */
3261
15
  result = _blosc_getitem(context, &header, src, srcsize, start, nitems, dest, destsize);
3262
3263
15
  return result;
3264
15
}
3265
3266
/* execute single compression/decompression job for a single thread_context */
3267
static void t_blosc_do_job(void *ctxt)
3268
0
{
3269
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3270
0
  blosc2_context* context = thcontext->parent_context;
3271
0
  int32_t cbytes;
3272
0
  int32_t ntdest;
3273
0
  int32_t tblocks;               /* number of blocks per thread */
3274
0
  int32_t tblock;                /* limit block on a thread */
3275
0
  int32_t nblock_;              /* private copy of nblock */
3276
0
  int32_t bsize;
3277
0
  int32_t leftoverblock;
3278
  /* Parameters for threads */
3279
0
  int32_t blocksize;
3280
0
  int32_t ebsize;
3281
0
  int32_t srcsize;
3282
0
  bool compress = context->do_compress != 0;
3283
0
  int32_t maxbytes;
3284
0
  int32_t nblocks;
3285
0
  int32_t leftover;
3286
0
  int32_t leftover2;
3287
0
  int32_t* bstarts;
3288
0
  const uint8_t* src;
3289
0
  uint8_t* dest;
3290
0
  uint8_t* tmp;
3291
0
  uint8_t* tmp2;
3292
0
  uint8_t* tmp3;
3293
3294
  /* Get parameters for this thread before entering the main loop */
3295
0
  blocksize = context->blocksize;
3296
0
  ebsize = blocksize + context->typesize * (int32_t)sizeof(int32_t);
3297
0
  maxbytes = context->destsize;
3298
0
  nblocks = context->nblocks;
3299
0
  leftover = context->leftover;
3300
0
  bstarts = context->bstarts;
3301
0
  src = context->src;
3302
0
  srcsize = context->srcsize;
3303
0
  dest = context->dest;
3304
3305
  /* Resize the temporaries if needed */
3306
0
  if (blocksize > thcontext->tmp_blocksize) {
3307
0
    my_free(thcontext->tmp);
3308
0
    thcontext->tmp_nbytes = (size_t) 4 * ebsize;
3309
0
    thcontext->tmp = my_malloc(thcontext->tmp_nbytes);
3310
0
    thcontext->tmp2 = thcontext->tmp + ebsize;
3311
0
    thcontext->tmp3 = thcontext->tmp2 + ebsize;
3312
0
    thcontext->tmp4 = thcontext->tmp3 + ebsize;
3313
0
    thcontext->tmp_blocksize = blocksize;
3314
0
  }
3315
3316
0
  tmp = thcontext->tmp;
3317
0
  tmp2 = thcontext->tmp2;
3318
0
  tmp3 = thcontext->tmp3;
3319
3320
  // Determine whether we can do a static distribution of workload among different threads
3321
0
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
3322
0
  if (!context->do_compress && context->special_type) {
3323
    // Fake a runlen as if its a memcpyed chunk
3324
0
    memcpyed = true;
3325
0
  }
3326
3327
0
  bool static_schedule = (!compress || memcpyed) && context->block_maskout == NULL;
3328
0
  if (static_schedule) {
3329
      /* Blocks per thread */
3330
0
      tblocks = nblocks / context->nthreads;
3331
0
      leftover2 = nblocks % context->nthreads;
3332
0
      tblocks = (leftover2 > 0) ? tblocks + 1 : tblocks;
3333
0
      nblock_ = thcontext->tid * tblocks;
3334
0
      tblock = nblock_ + tblocks;
3335
0
      if (tblock > nblocks) {
3336
0
          tblock = nblocks;
3337
0
      }
3338
0
  }
3339
0
  else {
3340
    // Use dynamic schedule via a queue.  Get the next block.
3341
0
    blosc2_pthread_mutex_lock(&context->count_mutex);
3342
0
    context->thread_nblock++;
3343
0
    nblock_ = context->thread_nblock;
3344
0
    blosc2_pthread_mutex_unlock(&context->count_mutex);
3345
0
    tblock = nblocks;
3346
0
  }
3347
3348
  /* Loop over blocks */
3349
0
  leftoverblock = 0;
3350
0
  while ((nblock_ < tblock) && (context->thread_giveup_code > 0)) {
3351
0
    bsize = blocksize;
3352
0
    if (nblock_ == (nblocks - 1) && (leftover > 0)) {
3353
0
      bsize = leftover;
3354
0
      leftoverblock = 1;
3355
0
    }
3356
0
    if (compress) {
3357
0
      if (memcpyed) {
3358
0
        if (!context->prefilter) {
3359
          /* We want to memcpy only */
3360
0
          memcpy(dest + context->header_overhead + nblock_ * blocksize,
3361
0
                 src + nblock_ * blocksize, (unsigned int) bsize);
3362
0
          cbytes = (int32_t) bsize;
3363
0
        }
3364
0
        else {
3365
          /* Only the prefilter has to be executed, and this is done in blosc_c().
3366
           * However, no further actions are needed, so we can put the result
3367
           * directly in dest. */
3368
0
          cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3369
0
                           ebsize, src, nblock_ * blocksize,
3370
0
                           dest + context->header_overhead + nblock_ * blocksize,
3371
0
                           tmp, tmp3);
3372
0
        }
3373
0
      }
3374
0
      else {
3375
        /* Regular compression */
3376
0
        cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3377
0
                          ebsize, src, nblock_ * blocksize, tmp2, tmp, tmp3);
3378
0
      }
3379
0
    }
3380
0
    else {
3381
      /* Regular decompression */
3382
0
      if (context->special_type == BLOSC2_NO_SPECIAL && !memcpyed &&
3383
0
          (srcsize < (int32_t)(context->header_overhead + (sizeof(int32_t) * nblocks)))) {
3384
        /* Not enough input to read all `bstarts` */
3385
0
        cbytes = -1;
3386
0
      }
3387
0
      else {
3388
        // If memcpyed we don't have a bstarts section (because it is not needed)
3389
0
        int32_t src_offset = memcpyed ?
3390
0
            context->header_overhead + nblock_ * blocksize : sw32_(bstarts + nblock_);
3391
0
        cbytes = blosc_d(thcontext, bsize, leftoverblock, memcpyed,
3392
0
                          src, srcsize, src_offset, nblock_,
3393
0
                          dest, nblock_ * blocksize, tmp, tmp2);
3394
0
      }
3395
0
    }
3396
3397
    /* Check whether current thread has to giveup */
3398
0
    if (context->thread_giveup_code <= 0) {
3399
0
      break;
3400
0
    }
3401
3402
    /* Check results for the compressed/decompressed block */
3403
0
    if (cbytes < 0) {            /* compr/decompr failure */
3404
      /* Set giveup_code error */
3405
0
      blosc2_pthread_mutex_lock(&context->count_mutex);
3406
0
      context->thread_giveup_code = cbytes;
3407
0
      blosc2_pthread_mutex_unlock(&context->count_mutex);
3408
0
      break;
3409
0
    }
3410
3411
0
    if (compress && !memcpyed) {
3412
      /* Start critical section */
3413
0
      blosc2_pthread_mutex_lock(&context->count_mutex);
3414
0
      ntdest = context->output_bytes;
3415
      // Note: do not use a typical local dict_training variable here
3416
      // because it is probably cached from previous calls if the number of
3417
      // threads does not change (the usual thing).
3418
0
      if (!(context->use_dict && context->dict_cdict == NULL)) {
3419
0
        _sw32(bstarts + nblock_, (int32_t) ntdest);
3420
0
      }
3421
3422
0
      if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
3423
0
        context->thread_giveup_code = 0;  /* incompressible buf */
3424
0
        blosc2_pthread_mutex_unlock(&context->count_mutex);
3425
0
        break;
3426
0
      }
3427
0
      context->thread_nblock++;
3428
0
      nblock_ = context->thread_nblock;
3429
0
      context->output_bytes += cbytes;
3430
0
      blosc2_pthread_mutex_unlock(&context->count_mutex);
3431
      /* End of critical section */
3432
3433
      /* Copy the compressed buffer to destination */
3434
0
      memcpy(dest + ntdest, tmp2, (unsigned int) cbytes);
3435
0
    }
3436
0
    else if (static_schedule) {
3437
0
      nblock_++;
3438
0
    }
3439
0
    else {
3440
0
      blosc2_pthread_mutex_lock(&context->count_mutex);
3441
0
      context->thread_nblock++;
3442
0
      nblock_ = context->thread_nblock;
3443
0
      context->output_bytes += cbytes;
3444
0
      blosc2_pthread_mutex_unlock(&context->count_mutex);
3445
0
    }
3446
3447
0
  } /* closes while (nblock_) */
3448
3449
0
  if (static_schedule) {
3450
0
    blosc2_pthread_mutex_lock(&context->count_mutex);
3451
0
    context->output_bytes = context->sourcesize;
3452
0
    if (compress) {
3453
0
      context->output_bytes += context->header_overhead;
3454
0
    }
3455
0
    blosc2_pthread_mutex_unlock(&context->count_mutex);
3456
0
  }
3457
3458
0
}
3459
3460
/* Decompress & unshuffle several blocks in a single thread */
3461
0
static void* t_blosc(void* ctxt) {
3462
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3463
0
  blosc2_context* context = thcontext->parent_context;
3464
0
#ifdef BLOSC_POSIX_BARRIERS
3465
0
  int rc;
3466
0
#endif
3467
3468
0
  while (1) {
3469
    /* Synchronization point for all threads (wait for initialization) */
3470
0
    WAIT_INIT(NULL, context);
3471
3472
0
    if (context->end_threads) {
3473
0
      break;
3474
0
    }
3475
3476
0
    t_blosc_do_job(ctxt);
3477
3478
    /* Meeting point for all threads (wait for finalization) */
3479
0
    WAIT_FINISH(NULL, context);
3480
0
  }
3481
3482
  /* Cleanup our working space and context */
3483
0
  free_thread_context(thcontext);
3484
3485
0
  return (NULL);
3486
0
}
3487
3488
3489
0
int init_threadpool(blosc2_context *context) {
3490
0
  int32_t tid;
3491
0
  int rc2;
3492
3493
  /* Initialize mutex and condition variable objects */
3494
0
  blosc2_pthread_mutex_init(&context->count_mutex, NULL);
3495
0
  blosc2_pthread_mutex_init(&context->delta_mutex, NULL);
3496
0
  blosc2_pthread_mutex_init(&context->nchunk_mutex, NULL);
3497
0
  blosc2_pthread_cond_init(&context->delta_cv, NULL);
3498
3499
  /* Set context thread sentinels */
3500
0
  context->thread_giveup_code = 1;
3501
0
  context->thread_nblock = -1;
3502
3503
  /* Barrier initialization */
3504
0
#ifdef BLOSC_POSIX_BARRIERS
3505
0
  pthread_barrier_init(&context->barr_init, NULL, context->nthreads + 1);
3506
0
  pthread_barrier_init(&context->barr_finish, NULL, context->nthreads + 1);
3507
#else
3508
  blosc2_pthread_mutex_init(&context->count_threads_mutex, NULL);
3509
  blosc2_pthread_cond_init(&context->count_threads_cv, NULL);
3510
  context->count_threads = 0;      /* Reset threads counter */
3511
#endif
3512
3513
0
  if (threads_callback) {
3514
      /* Create thread contexts to store data for callback threads */
3515
0
    context->thread_contexts = (struct thread_context *)my_malloc(
3516
0
            context->nthreads * sizeof(struct thread_context));
3517
0
    BLOSC_ERROR_NULL(context->thread_contexts, BLOSC2_ERROR_MEMORY_ALLOC);
3518
0
    for (tid = 0; tid < context->nthreads; tid++)
3519
0
      init_thread_context(context->thread_contexts + tid, context, tid);
3520
0
  }
3521
0
  else {
3522
0
    #if !defined(_WIN32)
3523
      /* Initialize and set thread detached attribute */
3524
0
      pthread_attr_init(&context->ct_attr);
3525
0
      pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
3526
0
    #endif
3527
3528
    /* Make space for thread handlers */
3529
0
    context->threads = (blosc2_pthread_t*)my_malloc(
3530
0
            context->nthreads * sizeof(blosc2_pthread_t));
3531
0
    BLOSC_ERROR_NULL(context->threads, BLOSC2_ERROR_MEMORY_ALLOC);
3532
    /* Finally, create the threads */
3533
0
    for (tid = 0; tid < context->nthreads; tid++) {
3534
      /* Create a thread context (will destroy when finished) */
3535
0
      struct thread_context *thread_context = create_thread_context(context, tid);
3536
0
      BLOSC_ERROR_NULL(thread_context, BLOSC2_ERROR_THREAD_CREATE);
3537
0
      #if !defined(_WIN32)
3538
0
        rc2 = blosc2_pthread_create(&context->threads[tid], &context->ct_attr, t_blosc,
3539
0
                            (void*)thread_context);
3540
      #else
3541
        rc2 = blosc2_pthread_create(&context->threads[tid], NULL, t_blosc,
3542
                            (void *)thread_context);
3543
      #endif
3544
0
      if (rc2) {
3545
0
        BLOSC_TRACE_ERROR("Return code from blosc2_pthread_create() is %d.\n"
3546
0
                          "\tError detail: %s\n", rc2, strerror(rc2));
3547
0
        return BLOSC2_ERROR_THREAD_CREATE;
3548
0
      }
3549
0
    }
3550
0
  }
3551
3552
  /* We have now started/initialized the threads */
3553
0
  context->threads_started = context->nthreads;
3554
0
  context->new_nthreads = context->nthreads;
3555
3556
0
  return 0;
3557
0
}
3558
3559
int16_t blosc2_get_nthreads(void)
3560
8
{
3561
8
  return g_nthreads;
3562
8
}
3563
3564
24.3k
int16_t blosc2_set_nthreads(int16_t nthreads) {
3565
24.3k
  int16_t ret = g_nthreads;          /* the previous number of threads */
3566
3567
  /* Check whether the library should be initialized */
3568
24.3k
  if (!g_initlib) blosc2_init();
3569
3570
24.3k
 if (nthreads != ret) {
3571
0
   g_nthreads = nthreads;
3572
0
   g_global_context->new_nthreads = nthreads;
3573
0
   int16_t ret2 = check_nthreads(g_global_context);
3574
0
   if (ret2 < 0) {
3575
0
     return ret2;
3576
0
   }
3577
0
 }
3578
3579
24.3k
  return ret;
3580
24.3k
}
3581
3582
3583
const char* blosc1_get_compressor(void)
3584
0
{
3585
0
  const char* compname;
3586
0
  blosc2_compcode_to_compname(g_compressor, &compname);
3587
3588
0
  return compname;
3589
0
}
3590
3591
15.1k
int blosc1_set_compressor(const char* compname) {
3592
15.1k
  int code = blosc2_compname_to_compcode(compname);
3593
15.1k
  if (code >= BLOSC_LAST_CODEC) {
3594
0
    BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
3595
0
    BLOSC_ERROR(BLOSC2_ERROR_CODEC_SUPPORT);
3596
0
  }
3597
15.1k
  g_compressor = code;
3598
3599
  /* Check whether the library should be initialized */
3600
15.1k
  if (!g_initlib) blosc2_init();
3601
3602
15.1k
  return code;
3603
15.1k
}
3604
3605
0
void blosc2_set_delta(int dodelta) {
3606
3607
0
  g_delta = dodelta;
3608
3609
  /* Check whether the library should be initialized */
3610
0
  if (!g_initlib) blosc2_init();
3611
3612
0
}
3613
3614
0
const char* blosc2_list_compressors(void) {
3615
0
  static int compressors_list_done = 0;
3616
0
  static char ret[256];
3617
3618
0
  if (compressors_list_done) return ret;
3619
0
  ret[0] = '\0';
3620
0
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
3621
0
  strcat(ret, ",");
3622
0
  strcat(ret, BLOSC_LZ4_COMPNAME);
3623
0
  strcat(ret, ",");
3624
0
  strcat(ret, BLOSC_LZ4HC_COMPNAME);
3625
0
#if defined(HAVE_ZLIB)
3626
0
  strcat(ret, ",");
3627
0
  strcat(ret, BLOSC_ZLIB_COMPNAME);
3628
0
#endif /* HAVE_ZLIB */
3629
0
#if defined(HAVE_ZSTD)
3630
0
  strcat(ret, ",");
3631
0
  strcat(ret, BLOSC_ZSTD_COMPNAME);
3632
0
#endif /* HAVE_ZSTD */
3633
0
  compressors_list_done = 1;
3634
0
  return ret;
3635
0
}
3636
3637
3638
0
const char* blosc2_get_version_string(void) {
3639
0
  return BLOSC2_VERSION_STRING;
3640
0
}
3641
3642
3643
0
int blosc2_get_complib_info(const char* compname, char** complib, char** version) {
3644
0
  int clibcode;
3645
0
  const char* clibname;
3646
0
  const char* clibversion = "unknown";
3647
0
  char sbuffer[256];
3648
3649
0
  clibcode = compname_to_clibcode(compname);
3650
0
  clibname = clibcode_to_clibname(clibcode);
3651
3652
  /* complib version */
3653
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
3654
0
    clibversion = BLOSCLZ_VERSION_STRING;
3655
0
  }
3656
0
  else if (clibcode == BLOSC_LZ4_LIB) {
3657
0
    sprintf(sbuffer, "%d.%d.%d",
3658
0
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
3659
0
    clibversion = sbuffer;
3660
0
  }
3661
0
#if defined(HAVE_ZLIB)
3662
0
  else if (clibcode == BLOSC_ZLIB_LIB) {
3663
0
#ifdef ZLIB_COMPAT
3664
0
    clibversion = ZLIB_VERSION;
3665
#elif defined(HAVE_ZLIB_NG)
3666
    clibversion = ZLIBNG_VERSION;
3667
#else
3668
    clibversion = ZLIB_VERSION;
3669
#endif
3670
0
  }
3671
0
#endif /* HAVE_ZLIB */
3672
0
#if defined(HAVE_ZSTD)
3673
0
  else if (clibcode == BLOSC_ZSTD_LIB) {
3674
0
    sprintf(sbuffer, "%d.%d.%d",
3675
0
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
3676
0
    clibversion = sbuffer;
3677
0
  }
3678
0
#endif /* HAVE_ZSTD */
3679
3680
#ifdef _MSC_VER
3681
  *complib = _strdup(clibname);
3682
  *version = _strdup(clibversion);
3683
#else
3684
0
  *complib = strdup(clibname);
3685
0
  *version = strdup(clibversion);
3686
0
#endif
3687
0
  return clibcode;
3688
0
}
3689
3690
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
3691
19.3k
void blosc1_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, size_t* blocksize) {
3692
19.3k
  int32_t nbytes32, cbytes32, blocksize32;
3693
19.3k
  blosc2_cbuffer_sizes(cbuffer, &nbytes32, &cbytes32, &blocksize32);
3694
19.3k
  *nbytes = nbytes32;
3695
19.3k
  *cbytes = cbytes32;
3696
19.3k
  *blocksize = blocksize32;
3697
19.3k
}
3698
3699
282k
int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, int32_t* cbytes, int32_t* blocksize) {
3700
282k
  blosc_header header;
3701
282k
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3702
282k
  if (rc < 0) {
3703
    /* Return zeros if error reading header */
3704
124
    memset(&header, 0, sizeof(header));
3705
124
  }
3706
3707
  /* Read the interesting values */
3708
282k
  if (nbytes != NULL)
3709
282k
    *nbytes = header.nbytes;
3710
282k
  if (cbytes != NULL)
3711
281k
    *cbytes = header.cbytes;
3712
282k
  if (blocksize != NULL)
3713
19.3k
    *blocksize = header.blocksize;
3714
282k
  return rc;
3715
282k
}
3716
3717
11.3k
int blosc1_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
3718
11.3k
  int32_t header_cbytes;
3719
11.3k
  int32_t header_nbytes;
3720
11.3k
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) {
3721
    /* Compressed data should contain enough space for header */
3722
0
    *nbytes = 0;
3723
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3724
0
  }
3725
11.3k
  int rc = blosc2_cbuffer_sizes(cbuffer, &header_nbytes, &header_cbytes, NULL);
3726
11.3k
  if (rc < 0) {
3727
0
    *nbytes = 0;
3728
0
    return rc;
3729
0
  }
3730
11.3k
  *nbytes = header_nbytes;
3731
11.3k
  if (header_cbytes != (int32_t)cbytes) {
3732
    /* Compressed size from header does not match `cbytes` */
3733
0
    *nbytes = 0;
3734
0
    return BLOSC2_ERROR_INVALID_HEADER;
3735
0
  }
3736
11.3k
  if (*nbytes > BLOSC2_MAX_BUFFERSIZE) {
3737
    /* Uncompressed size is larger than allowed */
3738
52
    *nbytes = 0;
3739
52
    return BLOSC2_ERROR_MEMORY_ALLOC;
3740
52
  }
3741
11.3k
  return 0;
3742
11.3k
}
3743
3744
/* Return `typesize` and `flags` from a compressed buffer. */
3745
0
void blosc1_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) {
3746
0
  blosc_header header;
3747
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3748
0
  if (rc < 0) {
3749
0
    *typesize = *flags = 0;
3750
0
    return;
3751
0
  }
3752
3753
  /* Read the interesting values */
3754
0
  *flags = header.flags;
3755
0
  *typesize = header.typesize;
3756
0
}
3757
3758
3759
/* Return version information from a compressed buffer. */
3760
0
void blosc2_cbuffer_versions(const void* cbuffer, int* version, int* versionlz) {
3761
0
  blosc_header header;
3762
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3763
0
  if (rc < 0) {
3764
0
    *version = *versionlz = 0;
3765
0
    return;
3766
0
  }
3767
3768
  /* Read the version info */
3769
0
  *version = header.version;
3770
0
  *versionlz = header.versionlz;
3771
0
}
3772
3773
3774
/* Return the compressor library/format used in a compressed buffer. */
3775
0
const char* blosc2_cbuffer_complib(const void* cbuffer) {
3776
0
  blosc_header header;
3777
0
  int clibcode;
3778
0
  const char* complib;
3779
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3780
0
  if (rc < 0) {
3781
0
    return NULL;
3782
0
  }
3783
3784
  /* Read the compressor format/library info */
3785
0
  clibcode = (header.flags & 0xe0) >> 5;
3786
0
  complib = clibcode_to_clibname(clibcode);
3787
0
  return complib;
3788
0
}
3789
3790
3791
/* Get the internal blocksize to be used during compression.  0 means
3792
   that an automatic blocksize is computed internally. */
3793
int blosc1_get_blocksize(void)
3794
0
{
3795
0
  return (int)g_force_blocksize;
3796
0
}
3797
3798
3799
/* Force the use of a specific blocksize.  If 0, an automatic
3800
   blocksize will be used (the default). */
3801
3.04k
void blosc1_set_blocksize(size_t blocksize) {
3802
3.04k
  g_force_blocksize = (int32_t)blocksize;
3803
3.04k
}
3804
3805
3806
/* Force the use of a specific split mode. */
3807
void blosc1_set_splitmode(int mode)
3808
0
{
3809
0
  g_splitmode = mode;
3810
0
}
3811
3812
3813
/* Set pointer to super-chunk.  If NULL, no super-chunk will be
3814
   reachable (the default). */
3815
0
void blosc_set_schunk(blosc2_schunk* schunk) {
3816
0
  g_schunk = schunk;
3817
0
  g_global_context->schunk = schunk;
3818
0
}
3819
3820
blosc2_io *blosc2_io_global = NULL;
3821
blosc2_io_cb BLOSC2_IO_CB_DEFAULTS;
3822
blosc2_io_cb BLOSC2_IO_CB_MMAP;
3823
3824
int _blosc2_register_io_cb(const blosc2_io_cb *io);
3825
3826
13.9k
void blosc2_init(void) {
3827
  /* Return if Blosc is already initialized */
3828
13.9k
  if (g_initlib) return;
3829
3830
13.9k
  BLOSC2_IO_CB_DEFAULTS.id = BLOSC2_IO_FILESYSTEM;
3831
13.9k
  BLOSC2_IO_CB_DEFAULTS.name = "filesystem";
3832
13.9k
  BLOSC2_IO_CB_DEFAULTS.is_allocation_necessary = true;
3833
13.9k
  BLOSC2_IO_CB_DEFAULTS.open = (blosc2_open_cb) blosc2_stdio_open;
3834
13.9k
  BLOSC2_IO_CB_DEFAULTS.close = (blosc2_close_cb) blosc2_stdio_close;
3835
13.9k
  BLOSC2_IO_CB_DEFAULTS.size = (blosc2_size_cb) blosc2_stdio_size;
3836
13.9k
  BLOSC2_IO_CB_DEFAULTS.write = (blosc2_write_cb) blosc2_stdio_write;
3837
13.9k
  BLOSC2_IO_CB_DEFAULTS.read = (blosc2_read_cb) blosc2_stdio_read;
3838
13.9k
  BLOSC2_IO_CB_DEFAULTS.truncate = (blosc2_truncate_cb) blosc2_stdio_truncate;
3839
13.9k
  BLOSC2_IO_CB_DEFAULTS.destroy = (blosc2_destroy_cb) blosc2_stdio_destroy;
3840
3841
13.9k
  _blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS);
3842
3843
13.9k
  BLOSC2_IO_CB_MMAP.id = BLOSC2_IO_FILESYSTEM_MMAP;
3844
13.9k
  BLOSC2_IO_CB_MMAP.name = "filesystem_mmap";
3845
13.9k
  BLOSC2_IO_CB_MMAP.is_allocation_necessary = false;
3846
13.9k
  BLOSC2_IO_CB_MMAP.open = (blosc2_open_cb) blosc2_stdio_mmap_open;
3847
13.9k
  BLOSC2_IO_CB_MMAP.close = (blosc2_close_cb) blosc2_stdio_mmap_close;
3848
13.9k
  BLOSC2_IO_CB_MMAP.read = (blosc2_read_cb) blosc2_stdio_mmap_read;
3849
13.9k
  BLOSC2_IO_CB_MMAP.size = (blosc2_size_cb) blosc2_stdio_mmap_size;
3850
13.9k
  BLOSC2_IO_CB_MMAP.write = (blosc2_write_cb) blosc2_stdio_mmap_write;
3851
13.9k
  BLOSC2_IO_CB_MMAP.truncate = (blosc2_truncate_cb) blosc2_stdio_mmap_truncate;
3852
13.9k
  BLOSC2_IO_CB_MMAP.destroy = (blosc2_destroy_cb) blosc2_stdio_mmap_destroy;
3853
3854
13.9k
  _blosc2_register_io_cb(&BLOSC2_IO_CB_MMAP);
3855
3856
13.9k
  g_ncodecs = 0;
3857
13.9k
  g_nfilters = 0;
3858
13.9k
  g_ntuners = 0;
3859
3860
13.9k
#if defined(HAVE_PLUGINS)
3861
13.9k
  #include "blosc2/blosc2-common.h"
3862
13.9k
  #include "blosc2/blosc2-stdio.h"
3863
13.9k
  register_codecs();
3864
13.9k
  register_filters();
3865
13.9k
  register_tuners();
3866
13.9k
#endif
3867
13.9k
  blosc2_pthread_mutex_init(&global_comp_mutex, NULL);
3868
  /* Create a global context */
3869
13.9k
  g_global_context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3870
13.9k
  memset(g_global_context, 0, sizeof(blosc2_context));
3871
13.9k
  g_global_context->nthreads = g_nthreads;
3872
13.9k
  g_global_context->new_nthreads = g_nthreads;
3873
13.9k
  g_initlib = 1;
3874
13.9k
}
3875
3876
3877
13.9k
int blosc2_free_resources(void) {
3878
  /* Return if Blosc is not initialized */
3879
13.9k
  if (!g_initlib) return BLOSC2_ERROR_FAILURE;
3880
3881
13.9k
  return release_threadpool(g_global_context);
3882
13.9k
}
3883
3884
3885
13.9k
void blosc2_destroy(void) {
3886
  /* Return if Blosc is not initialized */
3887
13.9k
  if (!g_initlib) return;
3888
3889
13.9k
  blosc2_free_resources();
3890
13.9k
  g_initlib = 0;
3891
13.9k
  blosc2_free_ctx(g_global_context);
3892
3893
13.9k
  blosc2_pthread_mutex_destroy(&global_comp_mutex);
3894
3895
13.9k
}
3896
3897
3898
32.4k
int release_threadpool(blosc2_context *context) {
3899
32.4k
  int32_t t;
3900
32.4k
  void* status;
3901
32.4k
  int rc;
3902
3903
32.4k
  if (context->threads_started > 0) {
3904
0
    if (threads_callback) {
3905
      /* free context data for user-managed threads */
3906
0
      for (t=0; t<context->threads_started; t++)
3907
0
        destroy_thread_context(context->thread_contexts + t);
3908
0
      my_free(context->thread_contexts);
3909
0
    }
3910
0
    else {
3911
      /* Tell all existing threads to finish */
3912
0
      context->end_threads = 1;
3913
0
      WAIT_INIT(-1, context);
3914
3915
      /* Join exiting threads */
3916
0
      for (t = 0; t < context->threads_started; t++) {
3917
0
        rc = blosc2_pthread_join(context->threads[t], &status);
3918
0
        if (rc) {
3919
0
          BLOSC_TRACE_ERROR("Return code from blosc2_pthread_join() is %d\n"
3920
0
                            "\tError detail: %s.", rc, strerror(rc));
3921
0
        }
3922
0
      }
3923
3924
      /* Thread attributes */
3925
0
      #if !defined(_WIN32)
3926
0
        pthread_attr_destroy(&context->ct_attr);
3927
0
      #endif
3928
3929
      /* Release thread handlers */
3930
0
      my_free(context->threads);
3931
0
    }
3932
3933
    /* Release mutex and condition variable objects */
3934
0
    blosc2_pthread_mutex_destroy(&context->count_mutex);
3935
0
    blosc2_pthread_mutex_destroy(&context->delta_mutex);
3936
0
    blosc2_pthread_mutex_destroy(&context->nchunk_mutex);
3937
0
    blosc2_pthread_cond_destroy(&context->delta_cv);
3938
3939
    /* Barriers */
3940
0
  #ifdef BLOSC_POSIX_BARRIERS
3941
0
    pthread_barrier_destroy(&context->barr_init);
3942
0
    pthread_barrier_destroy(&context->barr_finish);
3943
  #else
3944
    blosc2_pthread_mutex_destroy(&context->count_threads_mutex);
3945
    blosc2_pthread_cond_destroy(&context->count_threads_cv);
3946
    context->count_threads = 0;      /* Reset threads counter */
3947
  #endif
3948
3949
    /* Reset flags and counters */
3950
0
    context->end_threads = 0;
3951
0
    context->threads_started = 0;
3952
0
  }
3953
3954
3955
32.4k
  return 0;
3956
32.4k
}
3957
3958
3959
/* Contexts */
3960
3961
/* Create a context for compression */
3962
2.33k
blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) {
3963
2.33k
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3964
2.33k
  BLOSC_ERROR_NULL(context, NULL);
3965
3966
  /* Populate the context, using zeros as default values */
3967
2.33k
  memset(context, 0, sizeof(blosc2_context));
3968
2.33k
  context->do_compress = 1;   /* meant for compression */
3969
2.33k
  context->use_dict = cparams.use_dict;
3970
2.33k
  if (cparams.instr_codec) {
3971
0
    context->blosc2_flags = BLOSC2_INSTR_CODEC;
3972
0
  }
3973
3974
16.3k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
3975
14.0k
    context->filters[i] = cparams.filters[i];
3976
14.0k
    context->filters_meta[i] = cparams.filters_meta[i];
3977
3978
14.0k
    if (context->filters[i] >= BLOSC_LAST_FILTER && context->filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
3979
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3980
0
                        context->filters[i]);
3981
0
      free(context);
3982
0
      return NULL;
3983
0
    }
3984
14.0k
    if (context->filters[i] > BLOSC_LAST_REGISTERED_FILTER && context->filters[i] <= BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP) {
3985
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
3986
0
                        context->filters[i]);
3987
0
      free(context);
3988
0
      return NULL;
3989
0
    }
3990
14.0k
  }
3991
3992
2.33k
#if defined(HAVE_PLUGINS)
3993
2.33k
#include "blosc2/codecs-registry.h"
3994
2.33k
  if ((context->compcode >= BLOSC_CODEC_ZFP_FIXED_ACCURACY) && (context->compcode <= BLOSC_CODEC_ZFP_FIXED_RATE)) {
3995
0
    for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
3996
0
      if ((context->filters[i] == BLOSC_SHUFFLE) || (context->filters[i] == BLOSC_BITSHUFFLE)) {
3997
0
        BLOSC_TRACE_ERROR("ZFP cannot be run in presence of SHUFFLE / BITSHUFFLE");
3998
0
        return NULL;
3999
0
      }
4000
0
    }
4001
0
  }
4002
2.33k
#endif /* HAVE_PLUGINS */
4003
4004
  /* Check for a BLOSC_SHUFFLE environment variable */
4005
2.33k
  int doshuffle = -1;
4006
2.33k
  char* envvar = getenv("BLOSC_SHUFFLE");
4007
2.33k
  if (envvar != NULL) {
4008
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
4009
0
      doshuffle = BLOSC_NOSHUFFLE;
4010
0
    }
4011
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
4012
0
      doshuffle = BLOSC_SHUFFLE;
4013
0
    }
4014
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
4015
0
      doshuffle = BLOSC_BITSHUFFLE;
4016
0
    }
4017
0
    else {
4018
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
4019
0
    }
4020
0
  }
4021
  /* Check for a BLOSC_DELTA environment variable */
4022
2.33k
  int dodelta = BLOSC_NOFILTER;
4023
2.33k
  envvar = getenv("BLOSC_DELTA");
4024
2.33k
  if (envvar != NULL) {
4025
0
    if (strcmp(envvar, "1") == 0) {
4026
0
      dodelta = BLOSC_DELTA;
4027
0
    } else if (strcmp(envvar, "0") == 0){
4028
0
      dodelta = BLOSC_NOFILTER;
4029
0
    }
4030
0
    else {
4031
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
4032
0
    }
4033
0
  }
4034
  /* Check for a BLOSC_TYPESIZE environment variable */
4035
2.33k
  context->typesize = cparams.typesize;
4036
2.33k
  envvar = getenv("BLOSC_TYPESIZE");
4037
2.33k
  if (envvar != NULL) {
4038
0
    int32_t value;
4039
0
    errno = 0; /* To distinguish success/failure after call */
4040
0
    value = (int32_t) strtol(envvar, NULL, 10);
4041
0
    if ((errno != EINVAL) && (value > 0)) {
4042
0
      context->typesize = value;
4043
0
    }
4044
0
    else {
4045
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
4046
0
    }
4047
0
  }
4048
2.33k
  build_filters(doshuffle, dodelta, context->typesize, context->filters);
4049
4050
2.33k
  context->clevel = cparams.clevel;
4051
  /* Check for a BLOSC_CLEVEL environment variable */
4052
2.33k
  envvar = getenv("BLOSC_CLEVEL");
4053
2.33k
  if (envvar != NULL) {
4054
0
    int value;
4055
0
    errno = 0; /* To distinguish success/failure after call */
4056
0
    value = (int)strtol(envvar, NULL, 10);
4057
0
    if ((errno != EINVAL) && (value >= 0)) {
4058
0
      context->clevel = value;
4059
0
    }
4060
0
    else {
4061
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
4062
0
    }
4063
0
  }
4064
4065
2.33k
  context->compcode = cparams.compcode;
4066
  /* Check for a BLOSC_COMPRESSOR environment variable */
4067
2.33k
  envvar = getenv("BLOSC_COMPRESSOR");
4068
2.33k
  if (envvar != NULL) {
4069
0
    int codec = blosc2_compname_to_compcode(envvar);
4070
0
    if (codec >= BLOSC_LAST_CODEC) {
4071
0
      BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
4072
0
      return NULL;
4073
0
    }
4074
0
    context->compcode = codec;
4075
0
  }
4076
2.33k
  context->compcode_meta = cparams.compcode_meta;
4077
4078
2.33k
  context->blocksize = cparams.blocksize;
4079
  /* Check for a BLOSC_BLOCKSIZE environment variable */
4080
2.33k
  envvar = getenv("BLOSC_BLOCKSIZE");
4081
2.33k
  if (envvar != NULL) {
4082
0
    int32_t blocksize;
4083
0
    errno = 0; /* To distinguish success/failure after call */
4084
0
    blocksize = (int32_t) strtol(envvar, NULL, 10);
4085
0
    if ((errno != EINVAL) && (blocksize > 0)) {
4086
0
      context->blocksize = blocksize;
4087
0
    }
4088
0
    else {
4089
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
4090
0
    }
4091
0
  }
4092
4093
2.33k
  context->nthreads = cparams.nthreads;
4094
  /* Check for a BLOSC_NTHREADS environment variable */
4095
2.33k
  envvar = getenv("BLOSC_NTHREADS");
4096
2.33k
  if (envvar != NULL) {
4097
0
    errno = 0; /* To distinguish success/failure after call */
4098
0
    int16_t nthreads = (int16_t) strtol(envvar, NULL, 10);
4099
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4100
0
      context->nthreads = nthreads;
4101
0
    }
4102
0
    else {
4103
0
      BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
4104
0
    }
4105
0
  }
4106
2.33k
  context->new_nthreads = context->nthreads;
4107
4108
2.33k
  context->splitmode = cparams.splitmode;
4109
  /* Check for a BLOSC_SPLITMODE environment variable */
4110
2.33k
  envvar = getenv("BLOSC_SPLITMODE");
4111
2.33k
  if (envvar != NULL) {
4112
0
    int32_t splitmode = -1;
4113
0
    if (strcmp(envvar, "ALWAYS") == 0) {
4114
0
      splitmode = BLOSC_ALWAYS_SPLIT;
4115
0
    }
4116
0
    else if (strcmp(envvar, "NEVER") == 0) {
4117
0
      splitmode = BLOSC_NEVER_SPLIT;
4118
0
    }
4119
0
    else if (strcmp(envvar, "AUTO") == 0) {
4120
0
      splitmode = BLOSC_AUTO_SPLIT;
4121
0
    }
4122
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
4123
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
4124
0
    }
4125
0
    else {
4126
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
4127
0
    }
4128
0
    if (splitmode >= 0) {
4129
0
      context->splitmode = splitmode;
4130
0
    }
4131
0
  }
4132
4133
2.33k
  context->threads_started = 0;
4134
2.33k
  context->schunk = cparams.schunk;
4135
4136
2.33k
  if (cparams.prefilter != NULL) {
4137
0
    context->prefilter = cparams.prefilter;
4138
0
    context->preparams = (blosc2_prefilter_params*)my_malloc(sizeof(blosc2_prefilter_params));
4139
0
    BLOSC_ERROR_NULL(context->preparams, NULL);
4140
0
    memcpy(context->preparams, cparams.preparams, sizeof(blosc2_prefilter_params));
4141
0
  }
4142
4143
2.33k
  if (cparams.tuner_id <= 0) {
4144
2.33k
    cparams.tuner_id = g_tuner;
4145
2.33k
  } else {
4146
0
    for (int i = 0; i < g_ntuners; ++i) {
4147
0
      if (g_tuners[i].id == cparams.tuner_id) {
4148
0
        if (g_tuners[i].init == NULL) {
4149
0
          if (fill_tuner(&g_tuners[i]) < 0) {
4150
0
            BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4151
0
            return NULL;
4152
0
          }
4153
0
        }
4154
0
        if (g_tuners[i].init(cparams.tuner_params, context, NULL) < 0) {
4155
0
          BLOSC_TRACE_ERROR("Error in user-defined tuner %d init function\n", cparams.tuner_id);
4156
0
          return NULL;
4157
0
        }
4158
0
        goto urtunersuccess;
4159
0
      }
4160
0
    }
4161
0
    BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", cparams.tuner_id);
4162
0
    return NULL;
4163
0
  }
4164
2.33k
  urtunersuccess:;
4165
4166
2.33k
  context->tuner_id = cparams.tuner_id;
4167
4168
2.33k
  context->codec_params = cparams.codec_params;
4169
2.33k
  memcpy(context->filter_params, cparams.filter_params, BLOSC2_MAX_FILTERS * sizeof(void*));
4170
4171
2.33k
  return context;
4172
2.33k
}
4173
4174
/* Create a context for decompression */
4175
2.33k
blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) {
4176
2.33k
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
4177
2.33k
  BLOSC_ERROR_NULL(context, NULL);
4178
4179
  /* Populate the context, using zeros as default values */
4180
2.33k
  memset(context, 0, sizeof(blosc2_context));
4181
2.33k
  context->do_compress = 0;   /* Meant for decompression */
4182
4183
2.33k
  context->nthreads = dparams.nthreads;
4184
2.33k
  char* envvar = getenv("BLOSC_NTHREADS");
4185
2.33k
  if (envvar != NULL) {
4186
0
    errno = 0; /* To distinguish success/failure after call */
4187
0
    long nthreads = strtol(envvar, NULL, 10);
4188
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4189
0
      context->nthreads = (int16_t) nthreads;
4190
0
    }
4191
0
  }
4192
2.33k
  context->new_nthreads = context->nthreads;
4193
4194
2.33k
  context->threads_started = 0;
4195
2.33k
  context->block_maskout = NULL;
4196
2.33k
  context->block_maskout_nitems = 0;
4197
2.33k
  context->schunk = dparams.schunk;
4198
4199
2.33k
  if (dparams.postfilter != NULL) {
4200
0
    context->postfilter = dparams.postfilter;
4201
0
    context->postparams = (blosc2_postfilter_params*)my_malloc(sizeof(blosc2_postfilter_params));
4202
0
    BLOSC_ERROR_NULL(context->postparams, NULL);
4203
0
    memcpy(context->postparams, dparams.postparams, sizeof(blosc2_postfilter_params));
4204
0
  }
4205
4206
2.33k
  return context;
4207
2.33k
}
4208
4209
4210
18.5k
void blosc2_free_ctx(blosc2_context* context) {
4211
18.5k
  release_threadpool(context);
4212
18.5k
  if (context->serial_context != NULL) {
4213
15.3k
    free_thread_context(context->serial_context);
4214
15.3k
  }
4215
18.5k
  if (context->dict_cdict != NULL) {
4216
0
#ifdef HAVE_ZSTD
4217
0
    ZSTD_freeCDict(context->dict_cdict);
4218
0
#endif
4219
0
  }
4220
18.5k
  if (context->dict_ddict != NULL) {
4221
2.14k
#ifdef HAVE_ZSTD
4222
2.14k
    ZSTD_freeDDict(context->dict_ddict);
4223
2.14k
#endif
4224
2.14k
  }
4225
18.5k
  if (context->tuner_params != NULL) {
4226
0
    int rc;
4227
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
4228
0
      rc = blosc_stune_free(context);
4229
0
    } else {
4230
0
      for (int i = 0; i < g_ntuners; ++i) {
4231
0
        if (g_tuners[i].id == context->tuner_id) {
4232
0
          if (g_tuners[i].free == NULL) {
4233
0
            if (fill_tuner(&g_tuners[i]) < 0) {
4234
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4235
0
              return;
4236
0
            }
4237
0
          }
4238
0
          rc = g_tuners[i].free(context);
4239
0
          goto urtunersuccess;
4240
0
        }
4241
0
      }
4242
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
4243
0
      return;
4244
0
      urtunersuccess:;
4245
0
    }
4246
0
    if (rc < 0) {
4247
0
      BLOSC_TRACE_ERROR("Error in user-defined tuner free function\n");
4248
0
      return;
4249
0
    }
4250
0
  }
4251
18.5k
  if (context->prefilter != NULL) {
4252
0
    my_free(context->preparams);
4253
0
  }
4254
18.5k
  if (context->postfilter != NULL) {
4255
0
    my_free(context->postparams);
4256
0
  }
4257
4258
18.5k
  if (context->block_maskout != NULL) {
4259
0
    free(context->block_maskout);
4260
0
  }
4261
18.5k
  my_free(context);
4262
18.5k
}
4263
4264
4265
0
int blosc2_ctx_get_cparams(blosc2_context *ctx, blosc2_cparams *cparams) {
4266
0
  cparams->compcode = ctx->compcode;
4267
0
  cparams->compcode_meta = ctx->compcode_meta;
4268
0
  cparams->clevel = ctx->clevel;
4269
0
  cparams->use_dict = ctx->use_dict;
4270
0
  cparams->instr_codec = ctx->blosc2_flags & BLOSC2_INSTR_CODEC;
4271
0
  cparams->typesize = ctx->typesize;
4272
0
  cparams->nthreads = ctx->nthreads;
4273
0
  cparams->blocksize = ctx->blocksize;
4274
0
  cparams->splitmode = ctx->splitmode;
4275
0
  cparams->schunk = ctx->schunk;
4276
0
  for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
4277
0
    cparams->filters[i] = ctx->filters[i];
4278
0
    cparams->filters_meta[i] = ctx->filters_meta[i];
4279
0
  }
4280
0
  cparams->prefilter = ctx->prefilter;
4281
0
  cparams->preparams = ctx->preparams;
4282
0
  cparams->tuner_id = ctx->tuner_id;
4283
0
  cparams->codec_params = ctx->codec_params;
4284
4285
0
  return BLOSC2_ERROR_SUCCESS;
4286
0
}
4287
4288
4289
5.78k
int blosc2_ctx_get_dparams(blosc2_context *ctx, blosc2_dparams *dparams) {
4290
5.78k
  dparams->nthreads = ctx->nthreads;
4291
5.78k
  dparams->schunk = ctx->schunk;
4292
5.78k
  dparams->postfilter = ctx->postfilter;
4293
5.78k
  dparams->postparams = ctx->postparams;
4294
4295
5.78k
  return BLOSC2_ERROR_SUCCESS;
4296
5.78k
}
4297
4298
4299
/* Set a maskout in decompression context */
4300
0
int blosc2_set_maskout(blosc2_context *ctx, bool *maskout, int nblocks) {
4301
4302
0
  if (ctx->block_maskout != NULL) {
4303
    // Get rid of a possible mask here
4304
0
    free(ctx->block_maskout);
4305
0
  }
4306
4307
0
  bool *maskout_ = malloc(nblocks);
4308
0
  BLOSC_ERROR_NULL(maskout_, BLOSC2_ERROR_MEMORY_ALLOC);
4309
0
  memcpy(maskout_, maskout, nblocks);
4310
0
  ctx->block_maskout = maskout_;
4311
0
  ctx->block_maskout_nitems = nblocks;
4312
4313
0
  return 0;
4314
0
}
4315
4316
4317
/* Create a chunk made of zeros */
4318
3
int blosc2_chunk_zeros(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4319
3
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4320
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4321
0
    return BLOSC2_ERROR_DATA;
4322
0
  }
4323
4324
3
  if ((nbytes > 0) && (nbytes % cparams.typesize)) {
4325
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4326
0
    return BLOSC2_ERROR_DATA;
4327
0
  }
4328
4329
3
  blosc_header header;
4330
3
  blosc2_context* context = blosc2_create_cctx(cparams);
4331
3
  if (context == NULL) {
4332
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4333
0
    return BLOSC2_ERROR_NULL_POINTER;
4334
0
  }
4335
4336
3
  int error = initialize_context_compression(
4337
3
          context, NULL, nbytes, dest, destsize,
4338
3
          context->clevel, context->filters, context->filters_meta,
4339
3
          context->typesize, context->compcode, context->blocksize,
4340
3
          context->new_nthreads, context->nthreads, context->splitmode,
4341
3
          context->tuner_id, context->tuner_params, context->schunk);
4342
3
  if (error <= 0) {
4343
0
    blosc2_free_ctx(context);
4344
0
    return error;
4345
0
  }
4346
4347
3
  memset(&header, 0, sizeof(header));
4348
3
  header.version = BLOSC2_VERSION_FORMAT;
4349
3
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4350
3
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4351
3
  header.typesize = context->typesize;
4352
3
  header.nbytes = (int32_t)nbytes;
4353
3
  header.blocksize = context->blocksize;
4354
3
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4355
3
  header.blosc2_flags = BLOSC2_SPECIAL_ZERO << 4;  // mark chunk as all zeros
4356
3
  memcpy((uint8_t *)dest, &header, sizeof(header));
4357
4358
3
  blosc2_free_ctx(context);
4359
4360
3
  return BLOSC_EXTENDED_HEADER_LENGTH;
4361
3
}
4362
4363
4364
/* Create a chunk made of uninitialized values */
4365
0
int blosc2_chunk_uninit(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4366
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4367
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4368
0
    return BLOSC2_ERROR_DATA;
4369
0
  }
4370
4371
0
  if (nbytes % cparams.typesize) {
4372
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4373
0
    return BLOSC2_ERROR_DATA;
4374
0
  }
4375
4376
0
  blosc_header header;
4377
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4378
0
  if (context == NULL) {
4379
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4380
0
    return BLOSC2_ERROR_NULL_POINTER;
4381
0
  }
4382
0
  int error = initialize_context_compression(
4383
0
          context, NULL, nbytes, dest, destsize,
4384
0
          context->clevel, context->filters, context->filters_meta,
4385
0
          context->typesize, context->compcode, context->blocksize,
4386
0
          context->new_nthreads, context->nthreads, context->splitmode,
4387
0
          context->tuner_id, context->tuner_params, context->schunk);
4388
0
  if (error <= 0) {
4389
0
    blosc2_free_ctx(context);
4390
0
    return error;
4391
0
  }
4392
4393
0
  memset(&header, 0, sizeof(header));
4394
0
  header.version = BLOSC2_VERSION_FORMAT;
4395
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4396
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4397
0
  header.typesize = context->typesize;
4398
0
  header.nbytes = (int32_t)nbytes;
4399
0
  header.blocksize = context->blocksize;
4400
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4401
0
  header.blosc2_flags = BLOSC2_SPECIAL_UNINIT << 4;  // mark chunk as uninitialized
4402
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4403
4404
0
  blosc2_free_ctx(context);
4405
4406
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4407
0
}
4408
4409
4410
/* Create a chunk made of nans */
4411
0
int blosc2_chunk_nans(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4412
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4413
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4414
0
    return BLOSC2_ERROR_DATA;
4415
0
  }
4416
4417
0
  if (nbytes % cparams.typesize) {
4418
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4419
0
    return BLOSC2_ERROR_DATA;
4420
0
  }
4421
4422
0
  blosc_header header;
4423
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4424
0
  if (context == NULL) {
4425
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4426
0
    return BLOSC2_ERROR_NULL_POINTER;
4427
0
  }
4428
4429
0
  int error = initialize_context_compression(
4430
0
          context, NULL, nbytes, dest, destsize,
4431
0
          context->clevel, context->filters, context->filters_meta,
4432
0
          context->typesize, context->compcode, context->blocksize,
4433
0
          context->new_nthreads, context->nthreads, context->splitmode,
4434
0
          context->tuner_id, context->tuner_params, context->schunk);
4435
0
  if (error <= 0) {
4436
0
    blosc2_free_ctx(context);
4437
0
    return error;
4438
0
  }
4439
4440
0
  memset(&header, 0, sizeof(header));
4441
0
  header.version = BLOSC2_VERSION_FORMAT;
4442
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4443
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4444
0
  header.typesize = context->typesize;
4445
0
  header.nbytes = (int32_t)nbytes;
4446
0
  header.blocksize = context->blocksize;
4447
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4448
0
  header.blosc2_flags = BLOSC2_SPECIAL_NAN << 4;  // mark chunk as all NaNs
4449
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4450
4451
0
  blosc2_free_ctx(context);
4452
4453
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4454
0
}
4455
4456
4457
/* Create a chunk made of repeated values */
4458
int blosc2_chunk_repeatval(blosc2_cparams cparams, const int32_t nbytes,
4459
0
                           void* dest, int32_t destsize, const void* repeatval) {
4460
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH + cparams.typesize) {
4461
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4462
0
    return BLOSC2_ERROR_DATA;
4463
0
  }
4464
4465
0
  if (nbytes % cparams.typesize) {
4466
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4467
0
    return BLOSC2_ERROR_DATA;
4468
0
  }
4469
4470
0
  blosc_header header;
4471
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4472
0
  if (context == NULL) {
4473
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4474
0
    return BLOSC2_ERROR_NULL_POINTER;
4475
0
  }
4476
4477
0
  int error = initialize_context_compression(
4478
0
          context, NULL, nbytes, dest, destsize,
4479
0
          context->clevel, context->filters, context->filters_meta,
4480
0
          context->typesize, context->compcode, context->blocksize,
4481
0
          context->new_nthreads, context->nthreads, context->splitmode,
4482
0
          context->tuner_id, context->tuner_params, context->schunk);
4483
0
  if (error <= 0) {
4484
0
    blosc2_free_ctx(context);
4485
0
    return error;
4486
0
  }
4487
4488
0
  memset(&header, 0, sizeof(header));
4489
0
  header.version = BLOSC2_VERSION_FORMAT;
4490
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4491
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4492
0
  header.typesize = context->typesize;
4493
0
  header.nbytes = (int32_t)nbytes;
4494
0
  header.blocksize = context->blocksize;
4495
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH + cparams.typesize;
4496
0
  header.blosc2_flags = BLOSC2_SPECIAL_VALUE << 4;  // mark chunk as all repeated value
4497
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4498
0
  memcpy((uint8_t *)dest + sizeof(header), repeatval, cparams.typesize);
4499
4500
0
  blosc2_free_ctx(context);
4501
4502
0
  return BLOSC_EXTENDED_HEADER_LENGTH + cparams.typesize;
4503
0
}
4504
4505
4506
/* Register filters */
4507
4508
69.5k
int register_filter_private(blosc2_filter *filter) {
4509
69.5k
    BLOSC_ERROR_NULL(filter, BLOSC2_ERROR_INVALID_PARAM);
4510
69.5k
    if (g_nfilters == UINT8_MAX) {
4511
0
        BLOSC_TRACE_ERROR("Can not register more filters");
4512
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
4513
0
    }
4514
69.5k
    if (filter->id < BLOSC2_GLOBAL_REGISTERED_FILTERS_START) {
4515
0
        BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_FILTERS_START);
4516
0
        return BLOSC2_ERROR_FAILURE;
4517
0
    }
4518
    /* This condition can never be fulfilled
4519
    if (filter->id > BLOSC2_USER_REGISTERED_FILTERS_STOP) {
4520
        BLOSC_TRACE_ERROR("The id must be less than or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_STOP);
4521
        return BLOSC2_ERROR_FAILURE;
4522
    }
4523
    */
4524
4525
208k
    for (uint64_t i = 0; i < g_nfilters; ++i) {
4526
139k
      if (g_filters[i].id == filter->id) {
4527
0
        if (strcmp(g_filters[i].name, filter->name) != 0) {
4528
0
          BLOSC_TRACE_ERROR("The filter (ID: %d) plugin is already registered with name: %s."
4529
0
                            "  Choose another one !", filter->id, g_filters[i].name);
4530
0
          return BLOSC2_ERROR_FAILURE;
4531
0
        }
4532
0
        else {
4533
          // Already registered, so no more actions needed
4534
0
          return BLOSC2_ERROR_SUCCESS;
4535
0
        }
4536
0
      }
4537
139k
    }
4538
4539
69.5k
    blosc2_filter *filter_new = &g_filters[g_nfilters++];
4540
69.5k
    memcpy(filter_new, filter, sizeof(blosc2_filter));
4541
4542
69.5k
    return BLOSC2_ERROR_SUCCESS;
4543
69.5k
}
4544
4545
4546
0
int blosc2_register_filter(blosc2_filter *filter) {
4547
0
  if (filter->id < BLOSC2_USER_REGISTERED_FILTERS_START) {
4548
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_START);
4549
0
    return BLOSC2_ERROR_FAILURE;
4550
0
  }
4551
4552
0
  return register_filter_private(filter);
4553
0
}
4554
4555
4556
/* Register codecs */
4557
4558
83.4k
int register_codec_private(blosc2_codec *codec) {
4559
83.4k
    BLOSC_ERROR_NULL(codec, BLOSC2_ERROR_INVALID_PARAM);
4560
83.4k
    if (g_ncodecs == UINT8_MAX) {
4561
0
      BLOSC_TRACE_ERROR("Can not register more codecs");
4562
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
4563
0
    }
4564
83.4k
    if (codec->compcode < BLOSC2_GLOBAL_REGISTERED_CODECS_START) {
4565
0
      BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_CODECS_START);
4566
0
      return BLOSC2_ERROR_FAILURE;
4567
0
    }
4568
    /* This condition can never be fulfilled
4569
    if (codec->compcode > BLOSC2_USER_REGISTERED_CODECS_STOP) {
4570
      BLOSC_TRACE_ERROR("The id must be less or equal to %d", BLOSC2_USER_REGISTERED_CODECS_STOP);
4571
      return BLOSC2_ERROR_FAILURE;
4572
    }
4573
     */
4574
4575
292k
    for (int i = 0; i < g_ncodecs; ++i) {
4576
208k
      if (g_codecs[i].compcode == codec->compcode) {
4577
0
        if (strcmp(g_codecs[i].compname, codec->compname) != 0) {
4578
0
          BLOSC_TRACE_ERROR("The codec (ID: %d) plugin is already registered with name: %s."
4579
0
                            "  Choose another one !", codec->compcode, codec->compname);
4580
0
          return BLOSC2_ERROR_CODEC_PARAM;
4581
0
        }
4582
0
        else {
4583
          // Already registered, so no more actions needed
4584
0
          return BLOSC2_ERROR_SUCCESS;
4585
0
        }
4586
0
      }
4587
208k
    }
4588
4589
83.4k
    blosc2_codec *codec_new = &g_codecs[g_ncodecs++];
4590
83.4k
    memcpy(codec_new, codec, sizeof(blosc2_codec));
4591
4592
83.4k
    return BLOSC2_ERROR_SUCCESS;
4593
83.4k
}
4594
4595
4596
0
int blosc2_register_codec(blosc2_codec *codec) {
4597
0
  if (codec->compcode < BLOSC2_USER_REGISTERED_CODECS_START) {
4598
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_USER_REGISTERED_CODECS_START);
4599
0
    return BLOSC2_ERROR_CODEC_PARAM;
4600
0
  }
4601
4602
0
  return register_codec_private(codec);
4603
0
}
4604
4605
4606
/* Register tuners */
4607
4608
13.9k
int register_tuner_private(blosc2_tuner *tuner) {
4609
13.9k
  BLOSC_ERROR_NULL(tuner, BLOSC2_ERROR_INVALID_PARAM);
4610
13.9k
  if (g_ntuners == UINT8_MAX) {
4611
0
    BLOSC_TRACE_ERROR("Can not register more tuners");
4612
0
    return BLOSC2_ERROR_CODEC_SUPPORT;
4613
0
  }
4614
13.9k
  if (tuner->id < BLOSC2_GLOBAL_REGISTERED_TUNER_START) {
4615
0
    BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_TUNER_START);
4616
0
    return BLOSC2_ERROR_FAILURE;
4617
0
  }
4618
4619
13.9k
  for (int i = 0; i < g_ntuners; ++i) {
4620
0
    if (g_tuners[i].id == tuner->id) {
4621
0
      if (strcmp(g_tuners[i].name, tuner->name) != 0) {
4622
0
        BLOSC_TRACE_ERROR("The tuner (ID: %d) plugin is already registered with name: %s."
4623
0
                          "  Choose another one !", tuner->id, g_tuners[i].name);
4624
0
        return BLOSC2_ERROR_FAILURE;
4625
0
      }
4626
0
      else {
4627
        // Already registered, so no more actions needed
4628
0
        return BLOSC2_ERROR_SUCCESS;
4629
0
      }
4630
0
    }
4631
0
  }
4632
4633
13.9k
  blosc2_tuner *tuner_new = &g_tuners[g_ntuners++];
4634
13.9k
  memcpy(tuner_new, tuner, sizeof(blosc2_tuner));
4635
4636
13.9k
  return BLOSC2_ERROR_SUCCESS;
4637
13.9k
}
4638
4639
4640
0
int blosc2_register_tuner(blosc2_tuner *tuner) {
4641
0
  if (tuner->id < BLOSC2_USER_REGISTERED_TUNER_START) {
4642
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_TUNER_START);
4643
0
    return BLOSC2_ERROR_FAILURE;
4644
0
  }
4645
4646
0
  return register_tuner_private(tuner);
4647
0
}
4648
4649
4650
27.8k
int _blosc2_register_io_cb(const blosc2_io_cb *io) {
4651
4652
41.7k
  for (uint64_t i = 0; i < g_nio; ++i) {
4653
41.7k
    if (g_ios[i].id == io->id) {
4654
27.8k
      if (strcmp(g_ios[i].name, io->name) != 0) {
4655
0
        BLOSC_TRACE_ERROR("The IO (ID: %d) plugin is already registered with name: %s."
4656
0
                          "  Choose another one !", io->id, g_ios[i].name);
4657
0
        return BLOSC2_ERROR_PLUGIN_IO;
4658
0
      }
4659
27.8k
      else {
4660
        // Already registered, so no more actions needed
4661
27.8k
        return BLOSC2_ERROR_SUCCESS;
4662
27.8k
      }
4663
27.8k
    }
4664
41.7k
  }
4665
4666
8
  blosc2_io_cb *io_new = &g_ios[g_nio++];
4667
8
  memcpy(io_new, io, sizeof(blosc2_io_cb));
4668
4669
8
  return BLOSC2_ERROR_SUCCESS;
4670
27.8k
}
4671
4672
0
int blosc2_register_io_cb(const blosc2_io_cb *io) {
4673
0
  BLOSC_ERROR_NULL(io, BLOSC2_ERROR_INVALID_PARAM);
4674
0
  if (g_nio == UINT8_MAX) {
4675
0
    BLOSC_TRACE_ERROR("Can not register more codecs");
4676
0
    return BLOSC2_ERROR_PLUGIN_IO;
4677
0
  }
4678
4679
0
  if (io->id < BLOSC2_IO_REGISTERED) {
4680
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_IO_REGISTERED);
4681
0
    return BLOSC2_ERROR_PLUGIN_IO;
4682
0
  }
4683
4684
0
  return _blosc2_register_io_cb(io);
4685
0
}
4686
4687
2.37k
blosc2_io_cb *blosc2_get_io_cb(uint8_t id) {
4688
  // If g_initlib is not set by blosc2_init() this function will try to read
4689
  // uninitialized memory. We should therefore always return NULL in that case
4690
2.37k
  if (!g_initlib) {
4691
0
    return NULL;
4692
0
  }
4693
2.37k
  for (uint64_t i = 0; i < g_nio; ++i) {
4694
2.37k
    if (g_ios[i].id == id) {
4695
2.37k
      return &g_ios[i];
4696
2.37k
    }
4697
2.37k
  }
4698
0
  if (id == BLOSC2_IO_FILESYSTEM) {
4699
0
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS) < 0) {
4700
0
      BLOSC_TRACE_ERROR("Error registering the default IO API");
4701
0
      return NULL;
4702
0
    }
4703
0
    return blosc2_get_io_cb(id);
4704
0
  }
4705
0
  else if (id == BLOSC2_IO_FILESYSTEM_MMAP) {
4706
0
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_MMAP) < 0) {
4707
0
      BLOSC_TRACE_ERROR("Error registering the mmap IO API");
4708
0
      return NULL;
4709
0
    }
4710
0
    return blosc2_get_io_cb(id);
4711
0
  }
4712
0
  return NULL;
4713
0
}
4714
4715
0
void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) {
4716
0
  if (ndim == 0) {
4717
0
    return;
4718
0
  }
4719
0
  assert(ndim <= B2ND_MAX_DIM);
4720
0
  int64_t strides[B2ND_MAX_DIM];
4721
4722
0
  strides[ndim - 1] = 1;
4723
0
  for (int j = ndim - 2; j >= 0; --j) {
4724
0
      strides[j] = shape[j + 1] * strides[j + 1];
4725
0
  }
4726
4727
0
  index[0] = i / strides[0];
4728
0
  for (int j = 1; j < ndim; ++j) {
4729
0
      index[j] = (i % strides[j - 1]) / strides[j];
4730
0
  }
4731
0
}
4732
4733
0
void blosc2_multidim_to_unidim(const int64_t *index, int8_t ndim, const int64_t *strides, int64_t *i) {
4734
0
  *i = 0;
4735
0
  for (int j = 0; j < ndim; ++j) {
4736
0
    *i += index[j] * strides[j];
4737
0
  }
4738
0
}
4739
4740
0
int blosc2_get_slice_nchunks(blosc2_schunk* schunk, int64_t *start, int64_t *stop, int64_t **chunks_idx) {
4741
0
  BLOSC_ERROR_NULL(schunk, BLOSC2_ERROR_NULL_POINTER);
4742
0
  if (blosc2_meta_exists(schunk, "b2nd") < 0) {
4743
    // Try with a caterva metalayer; we are meant to be backward compatible with it
4744
0
    if (blosc2_meta_exists(schunk, "caterva") < 0) {
4745
0
      return schunk_get_slice_nchunks(schunk, *start, *stop, chunks_idx);
4746
0
    }
4747
0
  }
4748
4749
0
  b2nd_array_t *array;
4750
0
  int rc = b2nd_from_schunk(schunk, &array);
4751
0
  if (rc < 0) {
4752
0
    BLOSC_TRACE_ERROR("Could not get b2nd array from schunk.");
4753
0
    return rc;
4754
0
  }
4755
0
  rc = b2nd_get_slice_nchunks(array, start, stop, chunks_idx);
4756
0
  array->sc = NULL; // Free only array struct
4757
0
  b2nd_free(array);
4758
4759
0
  return rc;
4760
0
}
4761
4762
0
blosc2_cparams blosc2_get_blosc2_cparams_defaults(void) {
4763
0
  return BLOSC2_CPARAMS_DEFAULTS;
4764
0
};
4765
4766
0
blosc2_dparams blosc2_get_blosc2_dparams_defaults(void) {
4767
0
  return BLOSC2_DPARAMS_DEFAULTS;
4768
0
};
4769
4770
0
blosc2_storage blosc2_get_blosc2_storage_defaults(void) {
4771
0
  return BLOSC2_STORAGE_DEFAULTS;
4772
0
};
4773
4774
0
blosc2_io blosc2_get_blosc2_io_defaults(void) {
4775
0
  return BLOSC2_IO_DEFAULTS;
4776
0
};
4777
4778
0
blosc2_stdio_mmap blosc2_get_blosc2_stdio_mmap_defaults(void) {
4779
0
  return BLOSC2_STDIO_MMAP_DEFAULTS;
4780
0
};
4781
4782
79
const char *blosc2_error_string(int error_code) {
4783
79
  switch (error_code) {
4784
73
    case BLOSC2_ERROR_FAILURE:
4785
73
      return "Generic failure";
4786
0
    case BLOSC2_ERROR_STREAM:
4787
0
      return "Bad stream";
4788
0
    case BLOSC2_ERROR_DATA:
4789
0
      return "Invalid data";
4790
0
    case BLOSC2_ERROR_MEMORY_ALLOC:
4791
0
      return "Memory alloc/realloc failure";
4792
0
    case BLOSC2_ERROR_READ_BUFFER:
4793
0
      return "Not enough space to read";
4794
6
    case BLOSC2_ERROR_WRITE_BUFFER:
4795
6
      return "Not enough space to write";
4796
0
    case BLOSC2_ERROR_CODEC_SUPPORT:
4797
0
      return "Codec not supported";
4798
0
    case BLOSC2_ERROR_CODEC_PARAM:
4799
0
      return "Invalid parameter supplied to codec";
4800
0
    case BLOSC2_ERROR_CODEC_DICT:
4801
0
      return "Codec dictionary error";
4802
0
    case BLOSC2_ERROR_VERSION_SUPPORT:
4803
0
      return "Version not supported";
4804
0
    case BLOSC2_ERROR_INVALID_HEADER:
4805
0
      return "Invalid value in header";
4806
0
    case BLOSC2_ERROR_INVALID_PARAM:
4807
0
      return "Invalid parameter supplied to function";
4808
0
    case BLOSC2_ERROR_FILE_READ:
4809
0
      return "File read failure";
4810
0
    case BLOSC2_ERROR_FILE_WRITE:
4811
0
      return "File write failure";
4812
0
    case BLOSC2_ERROR_FILE_OPEN:
4813
0
      return "File open failure";
4814
0
    case BLOSC2_ERROR_NOT_FOUND:
4815
0
      return "Not found";
4816
0
    case BLOSC2_ERROR_RUN_LENGTH:
4817
0
      return "Bad run length encoding";
4818
0
    case BLOSC2_ERROR_FILTER_PIPELINE:
4819
0
      return "Filter pipeline error";
4820
0
    case BLOSC2_ERROR_CHUNK_INSERT:
4821
0
      return "Chunk insert failure";
4822
0
    case BLOSC2_ERROR_CHUNK_APPEND:
4823
0
      return "Chunk append failure";
4824
0
    case BLOSC2_ERROR_CHUNK_UPDATE:
4825
0
      return "Chunk update failure";
4826
0
    case BLOSC2_ERROR_2GB_LIMIT:
4827
0
      return "Sizes larger than 2gb not supported";
4828
0
    case BLOSC2_ERROR_SCHUNK_COPY:
4829
0
      return "Super-chunk copy failure";
4830
0
    case BLOSC2_ERROR_FRAME_TYPE:
4831
0
      return "Wrong type for frame";
4832
0
    case BLOSC2_ERROR_FILE_TRUNCATE:
4833
0
      return "File truncate failure";
4834
0
    case BLOSC2_ERROR_THREAD_CREATE:
4835
0
      return "Thread or thread context creation failure";
4836
0
    case BLOSC2_ERROR_POSTFILTER:
4837
0
      return "Postfilter failure";
4838
0
    case BLOSC2_ERROR_FRAME_SPECIAL:
4839
0
      return "Special frame failure";
4840
0
    case BLOSC2_ERROR_SCHUNK_SPECIAL:
4841
0
      return "Special super-chunk failure";
4842
0
    case BLOSC2_ERROR_PLUGIN_IO:
4843
0
      return "IO plugin error";
4844
0
    case BLOSC2_ERROR_FILE_REMOVE:
4845
0
      return "Remove file failure";
4846
0
    case BLOSC2_ERROR_NULL_POINTER:
4847
0
      return "Pointer is null";
4848
0
    case BLOSC2_ERROR_INVALID_INDEX:
4849
0
      return "Invalid index";
4850
0
    case BLOSC2_ERROR_METALAYER_NOT_FOUND:
4851
0
      return "Metalayer has not been found";
4852
0
    case BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED:
4853
0
      return "Maximum buffersize exceeded";
4854
0
    case BLOSC2_ERROR_TUNER:
4855
0
      return "Tuner failure";
4856
0
    default:
4857
0
      return "Unknown error";
4858
79
  }
4859
79
}