Coverage Report

Created: 2026-01-22 06:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/c-blosc2/blosc/blosc2.c
Line
Count
Source
1
/*********************************************************************
2
  Blosc - Blocked Shuffling and Compression Library
3
4
  Copyright (c) 2021  Blosc Development Team <blosc@blosc.org>
5
  https://blosc.org
6
  License: BSD 3-Clause (see LICENSE.txt)
7
8
  See LICENSE.txt for details about copyright and rights to use.
9
**********************************************************************/
10
11
12
#include "blosc2.h"
13
#include "blosc-private.h"
14
#include "../plugins/codecs/zfp/blosc2-zfp.h"
15
#include "frame.h"
16
#include "b2nd-private.h"
17
#include "schunk-private.h"
18
19
#if defined(USING_CMAKE)
20
  #include "config.h"
21
#endif /*  USING_CMAKE */
22
#include "context.h"
23
24
#include "shuffle.h"
25
#include "delta.h"
26
#include "trunc-prec.h"
27
#include "blosclz.h"
28
#include "stune.h"
29
#include "blosc2/codecs-registry.h"
30
#include "blosc2/filters-registry.h"
31
#include "blosc2/tuners-registry.h"
32
33
#include "lz4.h"
34
#include "lz4hc.h"
35
#ifdef HAVE_IPP
36
  #include <ipps.h>
37
  #include <ippdc.h>
38
#endif
39
#if defined(HAVE_ZLIB_NG)
40
#ifdef ZLIB_COMPAT
41
  #include "zlib.h"
42
#else
43
  #include "zlib-ng.h"
44
#endif
45
#elif defined(HAVE_ZLIB)
46
  #include "zlib.h"
47
#endif /*  HAVE_MINIZ */
48
#if defined(HAVE_ZSTD)
49
  #include "zstd.h"
50
  #include "zstd_errors.h"
51
  // #include "cover.h"  // for experimenting with fast cover training for building dicts
52
  #include "zdict.h"
53
#endif /*  HAVE_ZSTD */
54
55
#if defined(_WIN32) && !defined(__MINGW32__)
56
  #include <windows.h>
57
  #include <malloc.h>
58
  #include <process.h>
59
  #define getpid _getpid
60
#endif  /* _WIN32 */
61
62
#include "threading.h"
63
64
#include <stdio.h>
65
#include <stdlib.h>
66
#include <errno.h>
67
#include <string.h>
68
#include <sys/types.h>
69
#include <assert.h>
70
#include <math.h>
71
#include <stdint.h>
72
73
74
/* Synchronization variables */
75
76
/* Global context for non-contextual API */
77
static blosc2_context* g_global_context;
78
static blosc2_pthread_mutex_t global_comp_mutex;
79
static int g_compressor = BLOSC_BLOSCLZ;
80
static int g_delta = 0;
81
/* The default splitmode */
82
static int32_t g_splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
83
/* the compressor to use by default */
84
static int16_t g_nthreads = 1;
85
static int32_t g_force_blocksize = 0;
86
static int g_initlib = 0;
87
static blosc2_schunk* g_schunk = NULL;   /* the pointer to super-chunk */
88
89
blosc2_codec g_codecs[256] = {0};
90
uint8_t g_ncodecs = 0;
91
92
static blosc2_filter g_filters[256] = {0};
93
static uint64_t g_nfilters = 0;
94
95
static blosc2_io_cb g_ios[256] = {0};
96
static uint64_t g_nio = 0;
97
98
blosc2_tuner g_tuners[256] = {0};
99
int g_ntuners = 0;
100
101
static int g_tuner = BLOSC_STUNE;
102
103
// Forward declarations
104
int init_threadpool(blosc2_context *context);
105
int release_threadpool(blosc2_context *context);
106
107
/* Macros for synchronization */
108
109
/* Wait until all threads are initialized */
110
#ifdef BLOSC_POSIX_BARRIERS
111
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
112
0
  do {                                                                 \
113
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_init);              \
114
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
115
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (init): %d", rc);   \
116
0
      return((RET_VAL));                                               \
117
0
    }                                                                  \
118
0
  } while (0)
119
#else
120
#define WAIT_INIT(RET_VAL, CONTEXT_PTR)                                \
121
  do {                                                                 \
122
    blosc2_pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
123
    if ((CONTEXT_PTR)->count_threads < (CONTEXT_PTR)->nthreads) {      \
124
      (CONTEXT_PTR)->count_threads++;                                  \
125
      blosc2_pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
126
                        &(CONTEXT_PTR)->count_threads_mutex);          \
127
    }                                                                  \
128
    else {                                                             \
129
      blosc2_pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
130
    }                                                                  \
131
    blosc2_pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
132
  } while (0)
133
#endif
134
135
/* Wait for all threads to finish */
136
#ifdef BLOSC_POSIX_BARRIERS
137
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
138
0
  do {                                                                 \
139
0
    rc = pthread_barrier_wait(&(CONTEXT_PTR)->barr_finish);            \
140
0
    if (rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) {              \
141
0
      BLOSC_TRACE_ERROR("Could not wait on barrier (finish)");         \
142
0
      return((RET_VAL));                                               \
143
0
    }                                                                  \
144
0
  } while (0)
145
#else
146
#define WAIT_FINISH(RET_VAL, CONTEXT_PTR)                              \
147
  do {                                                                 \
148
    blosc2_pthread_mutex_lock(&(CONTEXT_PTR)->count_threads_mutex);           \
149
    if ((CONTEXT_PTR)->count_threads > 0) {                            \
150
      (CONTEXT_PTR)->count_threads--;                                  \
151
      blosc2_pthread_cond_wait(&(CONTEXT_PTR)->count_threads_cv,              \
152
                        &(CONTEXT_PTR)->count_threads_mutex);          \
153
    }                                                                  \
154
    else {                                                             \
155
      blosc2_pthread_cond_broadcast(&(CONTEXT_PTR)->count_threads_cv);        \
156
    }                                                                  \
157
    blosc2_pthread_mutex_unlock(&(CONTEXT_PTR)->count_threads_mutex);         \
158
  } while (0)
159
#endif
160
161
162
/* global variable to change threading backend from Blosc-managed to caller-managed */
163
static blosc_threads_callback threads_callback = 0;
164
static void *threads_callback_data = 0;
165
166
167
/* non-threadsafe function should be called before any other Blosc function in
168
   order to change how threads are managed */
169
void blosc2_set_threads_callback(blosc_threads_callback callback, void *callback_data)
170
0
{
171
0
  threads_callback = callback;
172
0
  threads_callback_data = callback_data;
173
0
}
174
175
176
/* A function for aligned malloc that is portable */
177
2.92k
static uint8_t* my_malloc(size_t size) {
178
2.92k
  void* block = NULL;
179
2.92k
  int res = 0;
180
181
/* Do an alignment to 32 bytes because AVX2 is supported */
182
#if defined(_WIN32)
183
  /* A (void *) cast needed for avoiding a warning with MINGW :-/ */
184
  block = (void *)_aligned_malloc(size, 32);
185
#elif _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
186
  /* Platform does have an implementation of posix_memalign */
187
2.92k
  res = posix_memalign(&block, 32, size);
188
#else
189
  block = malloc(size);
190
#endif  /* _WIN32 */
191
192
2.92k
  if (block == NULL || res != 0) {
193
0
    BLOSC_TRACE_ERROR("Error allocating memory!");
194
0
    return NULL;
195
0
  }
196
197
2.92k
  return (uint8_t*)block;
198
2.92k
}
199
200
201
/* Release memory booked by my_malloc */
202
2.92k
static void my_free(void* block) {
203
#if defined(_WIN32)
204
  _aligned_free(block);
205
#else
206
2.92k
  free(block);
207
2.92k
#endif  /* _WIN32 */
208
2.92k
}
209
210
211
/*
212
 * Conversion routines between compressor and compression libraries
213
 */
214
215
/* Return the library code associated with the compressor name */
216
0
static int compname_to_clibcode(const char* compname) {
217
0
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0)
218
0
    return BLOSC_BLOSCLZ_LIB;
219
0
  if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0)
220
0
    return BLOSC_LZ4_LIB;
221
0
  if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0)
222
0
    return BLOSC_LZ4_LIB;
223
0
  if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0)
224
0
    return BLOSC_ZLIB_LIB;
225
0
  if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0)
226
0
    return BLOSC_ZSTD_LIB;
227
0
  for (int i = 0; i < g_ncodecs; ++i) {
228
0
    if (strcmp(compname, g_codecs[i].compname) == 0)
229
0
      return g_codecs[i].complib;
230
0
  }
231
0
  return BLOSC2_ERROR_NOT_FOUND;
232
0
}
233
234
/* Return the library name associated with the compressor code */
235
0
static const char* clibcode_to_clibname(int clibcode) {
236
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) return BLOSC_BLOSCLZ_LIBNAME;
237
0
  if (clibcode == BLOSC_LZ4_LIB) return BLOSC_LZ4_LIBNAME;
238
0
  if (clibcode == BLOSC_ZLIB_LIB) return BLOSC_ZLIB_LIBNAME;
239
0
  if (clibcode == BLOSC_ZSTD_LIB) return BLOSC_ZSTD_LIBNAME;
240
0
  for (int i = 0; i < g_ncodecs; ++i) {
241
0
    if (clibcode == g_codecs[i].complib)
242
0
      return g_codecs[i].compname;
243
0
  }
244
0
  return NULL;                  /* should never happen */
245
0
}
246
247
248
/*
249
 * Conversion routines between compressor names and compressor codes
250
 */
251
252
/* Get the compressor name associated with the compressor code */
253
0
int blosc2_compcode_to_compname(int compcode, const char** compname) {
254
0
  int code = -1;    /* -1 means non-existent compressor code */
255
0
  const char* name = NULL;
256
257
  /* Map the compressor code */
258
0
  if (compcode == BLOSC_BLOSCLZ)
259
0
    name = BLOSC_BLOSCLZ_COMPNAME;
260
0
  else if (compcode == BLOSC_LZ4)
261
0
    name = BLOSC_LZ4_COMPNAME;
262
0
  else if (compcode == BLOSC_LZ4HC)
263
0
    name = BLOSC_LZ4HC_COMPNAME;
264
0
  else if (compcode == BLOSC_ZLIB)
265
0
    name = BLOSC_ZLIB_COMPNAME;
266
0
  else if (compcode == BLOSC_ZSTD)
267
0
    name = BLOSC_ZSTD_COMPNAME;
268
0
  else {
269
0
    for (int i = 0; i < g_ncodecs; ++i) {
270
0
      if (compcode == g_codecs[i].compcode) {
271
0
        name = g_codecs[i].compname;
272
0
        break;
273
0
      }
274
0
    }
275
0
  }
276
277
0
  *compname = name;
278
279
  /* Guess if there is support for this code */
280
0
  if (compcode == BLOSC_BLOSCLZ)
281
0
    code = BLOSC_BLOSCLZ;
282
0
  else if (compcode == BLOSC_LZ4)
283
0
    code = BLOSC_LZ4;
284
0
  else if (compcode == BLOSC_LZ4HC)
285
0
    code = BLOSC_LZ4HC;
286
0
#if defined(HAVE_ZLIB)
287
0
  else if (compcode == BLOSC_ZLIB)
288
0
    code = BLOSC_ZLIB;
289
0
#endif /* HAVE_ZLIB */
290
0
#if defined(HAVE_ZSTD)
291
0
  else if (compcode == BLOSC_ZSTD)
292
0
    code = BLOSC_ZSTD;
293
0
#endif /* HAVE_ZSTD */
294
0
  else if (compcode >= BLOSC_LAST_CODEC)
295
0
    code = compcode;
296
0
  return code;
297
0
}
298
299
/* Get the compressor code for the compressor name. -1 if it is not available */
300
15.0k
int blosc2_compname_to_compcode(const char* compname) {
301
15.0k
  int code = -1;  /* -1 means non-existent compressor code */
302
303
15.0k
  if (strcmp(compname, BLOSC_BLOSCLZ_COMPNAME) == 0) {
304
511
    code = BLOSC_BLOSCLZ;
305
511
  }
306
14.5k
  else if (strcmp(compname, BLOSC_LZ4_COMPNAME) == 0) {
307
616
    code = BLOSC_LZ4;
308
616
  }
309
13.9k
  else if (strcmp(compname, BLOSC_LZ4HC_COMPNAME) == 0) {
310
3.34k
    code = BLOSC_LZ4HC;
311
3.34k
  }
312
10.6k
#if defined(HAVE_ZLIB)
313
10.6k
  else if (strcmp(compname, BLOSC_ZLIB_COMPNAME) == 0) {
314
2.44k
    code = BLOSC_ZLIB;
315
2.44k
  }
316
8.17k
#endif /*  HAVE_ZLIB */
317
8.17k
#if defined(HAVE_ZSTD)
318
8.17k
  else if (strcmp(compname, BLOSC_ZSTD_COMPNAME) == 0) {
319
8.17k
    code = BLOSC_ZSTD;
320
8.17k
  }
321
0
#endif /*  HAVE_ZSTD */
322
0
  else{
323
0
    for (int i = 0; i < g_ncodecs; ++i) {
324
0
      if (strcmp(compname, g_codecs[i].compname) == 0) {
325
0
        code = g_codecs[i].compcode;
326
0
        break;
327
0
      }
328
0
    }
329
0
  }
330
15.0k
  return code;
331
15.0k
}
332
333
334
/* Convert compressor code to blosc compressor format code */
335
15.0k
static int compcode_to_compformat(int compcode) {
336
15.0k
  switch (compcode) {
337
504
    case BLOSC_BLOSCLZ: return BLOSC_BLOSCLZ_FORMAT;
338
613
    case BLOSC_LZ4:     return BLOSC_LZ4_FORMAT;
339
3.33k
    case BLOSC_LZ4HC:   return BLOSC_LZ4HC_FORMAT;
340
341
0
#if defined(HAVE_ZLIB)
342
2.43k
    case BLOSC_ZLIB:    return BLOSC_ZLIB_FORMAT;
343
0
#endif /*  HAVE_ZLIB */
344
345
0
#if defined(HAVE_ZSTD)
346
8.15k
    case BLOSC_ZSTD:    return BLOSC_ZSTD_FORMAT;
347
0
      break;
348
0
#endif /*  HAVE_ZSTD */
349
0
    default:
350
0
      return BLOSC_UDCODEC_FORMAT;
351
15.0k
  }
352
0
  BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
353
0
}
354
355
356
/* Convert compressor code to blosc compressor format version */
357
15.0k
static int compcode_to_compversion(int compcode) {
358
  /* Write compressor format */
359
15.0k
  switch (compcode) {
360
505
    case BLOSC_BLOSCLZ:
361
505
      return BLOSC_BLOSCLZ_VERSION_FORMAT;
362
614
    case BLOSC_LZ4:
363
614
      return BLOSC_LZ4_VERSION_FORMAT;
364
3.33k
    case BLOSC_LZ4HC:
365
3.33k
      return BLOSC_LZ4HC_VERSION_FORMAT;
366
367
0
#if defined(HAVE_ZLIB)
368
2.43k
    case BLOSC_ZLIB:
369
2.43k
      return BLOSC_ZLIB_VERSION_FORMAT;
370
0
      break;
371
0
#endif /*  HAVE_ZLIB */
372
373
0
#if defined(HAVE_ZSTD)
374
8.16k
    case BLOSC_ZSTD:
375
8.16k
      return BLOSC_ZSTD_VERSION_FORMAT;
376
0
      break;
377
0
#endif /*  HAVE_ZSTD */
378
0
    default:
379
0
      for (int i = 0; i < g_ncodecs; ++i) {
380
0
        if (compcode == g_codecs[i].compcode) {
381
0
          return g_codecs[i].version;
382
0
        }
383
0
      }
384
15.0k
  }
385
0
  return BLOSC2_ERROR_FAILURE;
386
15.0k
}
387
388
389
static int lz4_wrap_compress(const char* input, size_t input_length,
390
6.98k
                             char* output, size_t maxout, int accel, void* hash_table) {
391
6.98k
  BLOSC_UNUSED_PARAM(accel);
392
6.98k
  int cbytes;
393
#ifdef HAVE_IPP
394
  if (hash_table == NULL) {
395
    return BLOSC2_ERROR_INVALID_PARAM;  // the hash table should always be initialized
396
  }
397
  int outlen = (int)maxout;
398
  int inlen = (int)input_length;
399
  // I have not found any function that uses `accel` like in `LZ4_compress_fast`, but
400
  // the IPP LZ4Safe call does a pretty good job on compressing well, so let's use it
401
  IppStatus status = ippsEncodeLZ4Safe_8u((const Ipp8u*)input, &inlen,
402
                                          (Ipp8u*)output, &outlen, (Ipp8u*)hash_table);
403
  if (status == ippStsDstSizeLessExpected) {
404
    return 0;  // we cannot compress in required outlen
405
  }
406
  else if (status != ippStsNoErr) {
407
    return BLOSC2_ERROR_FAILURE;  // an unexpected error happened
408
  }
409
  cbytes = outlen;
410
#else
411
6.98k
  BLOSC_UNUSED_PARAM(hash_table);
412
6.98k
  accel = 1;  // deactivate acceleration to match IPP behaviour
413
6.98k
  cbytes = LZ4_compress_fast(input, output, (int)input_length, (int)maxout, accel);
414
6.98k
#endif
415
6.98k
  return cbytes;
416
6.98k
}
417
418
419
static int lz4hc_wrap_compress(const char* input, size_t input_length,
420
30.2k
                               char* output, size_t maxout, int clevel) {
421
30.2k
  int cbytes;
422
30.2k
  if (input_length > (size_t)(UINT32_C(2) << 30))
423
0
    return BLOSC2_ERROR_2GB_LIMIT;
424
  /* clevel for lz4hc goes up to 12, at least in LZ4 1.7.5
425
   * but levels larger than 9 do not buy much compression. */
426
30.2k
  cbytes = LZ4_compress_HC(input, output, (int)input_length, (int)maxout,
427
30.2k
                           clevel);
428
30.2k
  return cbytes;
429
30.2k
}
430
431
432
static int lz4_wrap_decompress(const char* input, size_t compressed_length,
433
3.52k
                               char* output, size_t maxout) {
434
3.52k
  int nbytes;
435
#ifdef HAVE_IPP
436
  int outlen = (int)maxout;
437
  int inlen = (int)compressed_length;
438
  IppStatus status;
439
  status = ippsDecodeLZ4_8u((const Ipp8u*)input, inlen, (Ipp8u*)output, &outlen);
440
  //status = ippsDecodeLZ4Dict_8u((const Ipp8u*)input, &inlen, (Ipp8u*)output, 0, &outlen, NULL, 1 << 16);
441
  nbytes = (status == ippStsNoErr) ? outlen : -outlen;
442
#else
443
3.52k
  nbytes = LZ4_decompress_safe(input, output, (int)compressed_length, (int)maxout);
444
3.52k
#endif
445
3.52k
  if (nbytes != (int)maxout) {
446
0
    return 0;
447
0
  }
448
3.52k
  return (int)maxout;
449
3.52k
}
450
451
#if defined(HAVE_ZLIB)
452
/* zlib is not very respectful with sharing name space with others.
453
 Fortunately, its names do not collide with those already in blosc. */
454
static int zlib_wrap_compress(const char* input, size_t input_length,
455
33.7k
                              char* output, size_t maxout, int clevel) {
456
33.7k
  int status;
457
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
458
  size_t cl = maxout;
459
  status = zng_compress2(
460
      (uint8_t*)output, &cl, (uint8_t*)input, input_length, clevel);
461
#else
462
33.7k
  uLongf cl = (uLongf)maxout;
463
33.7k
  status = compress2(
464
33.7k
      (Bytef*)output, &cl, (Bytef*)input, (uLong)input_length, clevel);
465
33.7k
#endif
466
33.7k
  if (status != Z_OK) {
467
7.49k
    return 0;
468
7.49k
  }
469
26.2k
  return (int)cl;
470
33.7k
}
471
472
static int zlib_wrap_decompress(const char* input, size_t compressed_length,
473
4.40k
                                char* output, size_t maxout) {
474
4.40k
  int status;
475
#if defined(HAVE_ZLIB_NG) && ! defined(ZLIB_COMPAT)
476
  size_t ul = maxout;
477
  status = zng_uncompress(
478
      (uint8_t*)output, &ul, (uint8_t*)input, compressed_length);
479
#else
480
4.40k
  uLongf ul = (uLongf)maxout;
481
4.40k
  status = uncompress(
482
4.40k
      (Bytef*)output, &ul, (Bytef*)input, (uLong)compressed_length);
483
4.40k
#endif
484
4.40k
  if (status != Z_OK) {
485
0
    return 0;
486
0
  }
487
4.40k
  return (int)ul;
488
4.40k
}
489
#endif /*  HAVE_ZLIB */
490
491
492
#if defined(HAVE_ZSTD)
493
static int zstd_wrap_compress(struct thread_context* thread_context,
494
                              const char* input, size_t input_length,
495
173k
                              char* output, size_t maxout, int clevel) {
496
173k
  size_t code;
497
173k
  blosc2_context* context = thread_context->parent_context;
498
499
173k
  clevel = (clevel < 9) ? clevel * 2 - 1 : ZSTD_maxCLevel();
500
  /* Make the level 8 close enough to maxCLevel */
501
173k
  if (clevel == 8) clevel = ZSTD_maxCLevel() - 2;
502
503
173k
  if (thread_context->zstd_cctx == NULL) {
504
1.02k
    thread_context->zstd_cctx = ZSTD_createCCtx();
505
1.02k
  }
506
507
173k
  if (context->use_dict) {
508
0
    assert(context->dict_cdict != NULL);
509
0
    code = ZSTD_compress_usingCDict(
510
0
            thread_context->zstd_cctx, (void*)output, maxout, (void*)input,
511
0
            input_length, context->dict_cdict);
512
173k
  } else {
513
173k
    code = ZSTD_compressCCtx(thread_context->zstd_cctx,
514
173k
        (void*)output, maxout, (void*)input, input_length, clevel);
515
173k
  }
516
173k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
517
    // Blosc will just memcpy this buffer
518
35.0k
    return 0;
519
35.0k
  }
520
138k
  return (int)code;
521
173k
}
522
523
static int zstd_wrap_decompress(struct thread_context* thread_context,
524
                                const char* input, size_t compressed_length,
525
5.92k
                                char* output, size_t maxout) {
526
5.92k
  size_t code;
527
5.92k
  blosc2_context* context = thread_context->parent_context;
528
529
5.92k
  if (thread_context->zstd_dctx == NULL) {
530
1
    thread_context->zstd_dctx = ZSTD_createDCtx();
531
1
  }
532
533
5.92k
  if (context->use_dict) {
534
0
    assert(context->dict_ddict != NULL);
535
0
    code = ZSTD_decompress_usingDDict(
536
0
            thread_context->zstd_dctx, (void*)output, maxout, (void*)input,
537
0
            compressed_length, context->dict_ddict);
538
5.92k
  } else {
539
5.92k
    code = ZSTD_decompressDCtx(thread_context->zstd_dctx,
540
5.92k
        (void*)output, maxout, (void*)input, compressed_length);
541
5.92k
  }
542
5.92k
  if (ZSTD_isError(code) != ZSTD_error_no_error) {
543
0
    BLOSC_TRACE_ERROR("Error in ZSTD decompression: '%s'.  Giving up.",
544
0
                      ZDICT_getErrorName(code));
545
0
    return 0;
546
0
  }
547
5.92k
  return (int)code;
548
5.92k
}
549
#endif /*  HAVE_ZSTD */
550
551
/* Compute acceleration for blosclz */
552
254k
static int get_accel(const blosc2_context* context) {
553
254k
  int clevel = context->clevel;
554
555
254k
  if (context->compcode == BLOSC_LZ4) {
556
    /* This acceleration setting based on discussions held in:
557
     * https://groups.google.com/forum/#!topic/lz4c/zosy90P8MQw
558
     */
559
7.36k
    return (10 - clevel);
560
7.36k
  }
561
246k
  return 1;
562
254k
}
563
564
565
1.73M
int do_nothing(uint8_t filter, char cmode) {
566
1.73M
  if (cmode == 'c') {
567
1.52M
    return (filter == BLOSC_NOFILTER);
568
1.52M
  } else {
569
    // TRUNC_PREC do not have to be applied during decompression
570
211k
    return ((filter == BLOSC_NOFILTER) || (filter == BLOSC_TRUNC_PREC));
571
211k
  }
572
1.73M
}
573
574
575
22.2k
int next_filter(const uint8_t* filters, int current_filter, char cmode) {
576
22.2k
  for (int i = current_filter - 1; i >= 0; i--) {
577
22.2k
    if (!do_nothing(filters[i], cmode)) {
578
22.2k
      return filters[i];
579
22.2k
    }
580
22.2k
  }
581
0
  return BLOSC_NOFILTER;
582
22.2k
}
583
584
585
285k
int last_filter(const uint8_t* filters, char cmode) {
586
285k
  int last_index = -1;
587
2.00M
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
588
1.71M
    if (!do_nothing(filters[i], cmode))  {
589
173k
      last_index = i;
590
173k
    }
591
1.71M
  }
592
285k
  return last_index;
593
285k
}
594
595
596
/* Convert filter pipeline to filter flags */
597
16.5k
static uint8_t filters_to_flags(const uint8_t* filters) {
598
16.5k
  uint8_t flags = 0;
599
600
115k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
601
99.1k
    switch (filters[i]) {
602
0
      case BLOSC_SHUFFLE:
603
0
        flags |= BLOSC_DOSHUFFLE;
604
0
        break;
605
6.18k
      case BLOSC_BITSHUFFLE:
606
6.18k
        flags |= BLOSC_DOBITSHUFFLE;
607
6.18k
        break;
608
0
      case BLOSC_DELTA:
609
0
        flags |= BLOSC_DODELTA;
610
0
        break;
611
93.0k
      default :
612
93.0k
        break;
613
99.1k
    }
614
99.1k
  }
615
16.5k
  return flags;
616
16.5k
}
617
618
619
/* Convert filter flags to filter pipeline */
620
40.6k
static void flags_to_filters(const uint8_t flags, uint8_t* filters) {
621
  /* Initialize the filter pipeline */
622
40.6k
  memset(filters, 0, BLOSC2_MAX_FILTERS);
623
  /* Fill the filter pipeline */
624
40.6k
  if (flags & BLOSC_DOSHUFFLE)
625
40.6k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
626
40.6k
  if (flags & BLOSC_DOBITSHUFFLE)
627
40.6k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
628
40.6k
  if (flags & BLOSC_DODELTA)
629
0
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
630
40.6k
}
631
632
633
/* Get filter flags from header flags */
634
static uint8_t get_filter_flags(const uint8_t header_flags,
635
0
                                const int32_t typesize) {
636
0
  uint8_t flags = 0;
637
638
0
  if ((header_flags & BLOSC_DOSHUFFLE) && (typesize > 1)) {
639
0
    flags |= BLOSC_DOSHUFFLE;
640
0
  }
641
0
  if (header_flags & BLOSC_DOBITSHUFFLE) {
642
0
    flags |= BLOSC_DOBITSHUFFLE;
643
0
  }
644
0
  if (header_flags & BLOSC_DODELTA) {
645
0
    flags |= BLOSC_DODELTA;
646
0
  }
647
0
  if (header_flags & BLOSC_MEMCPYED) {
648
0
    flags |= BLOSC_MEMCPYED;
649
0
  }
650
0
  return flags;
651
0
}
652
653
typedef struct blosc_header_s {
654
  uint8_t version;
655
  uint8_t versionlz;
656
  uint8_t flags;
657
  uint8_t typesize;
658
  int32_t nbytes;
659
  int32_t blocksize;
660
  int32_t cbytes;
661
  // Extended Blosc2 header
662
  uint8_t filters[BLOSC2_MAX_FILTERS];
663
  uint8_t udcompcode;
664
  uint8_t compcode_meta;
665
  uint8_t filters_meta[BLOSC2_MAX_FILTERS];
666
  uint8_t reserved2;
667
  uint8_t blosc2_flags;
668
} blosc_header;
669
670
671
int read_chunk_header(const uint8_t* src, int32_t srcsize, bool extended_header, blosc_header* header)
672
49.7k
{
673
49.7k
  memset(header, 0, sizeof(blosc_header));
674
675
49.7k
  if (srcsize < BLOSC_MIN_HEADER_LENGTH) {
676
0
    BLOSC_TRACE_ERROR("Not enough space to read Blosc header.");
677
0
    return BLOSC2_ERROR_READ_BUFFER;
678
0
  }
679
680
49.7k
  memcpy(header, src, BLOSC_MIN_HEADER_LENGTH);
681
682
49.7k
  bool little_endian = is_little_endian();
683
684
49.7k
  if (!little_endian) {
685
0
    header->nbytes = bswap32_(header->nbytes);
686
0
    header->blocksize = bswap32_(header->blocksize);
687
0
    header->cbytes = bswap32_(header->cbytes);
688
0
  }
689
690
49.7k
  if (header->version > BLOSC2_VERSION_FORMAT) {
691
    /* Version from future */
692
0
    return BLOSC2_ERROR_VERSION_SUPPORT;
693
0
  }
694
49.7k
  if (header->cbytes < BLOSC_MIN_HEADER_LENGTH) {
695
0
    BLOSC_TRACE_ERROR("`cbytes` is too small to read min header.");
696
0
    return BLOSC2_ERROR_INVALID_HEADER;
697
0
  }
698
49.7k
  if (header->blocksize <= 0 || (header->nbytes > 0 && (header->blocksize > header->nbytes))) {
699
0
    BLOSC_TRACE_ERROR("`blocksize` is zero or greater than uncompressed size");
700
0
    return BLOSC2_ERROR_INVALID_HEADER;
701
0
  }
702
49.7k
  if (header->blocksize > BLOSC2_MAXBLOCKSIZE) {
703
0
    BLOSC_TRACE_ERROR("`blocksize` greater than maximum allowed");
704
0
    return BLOSC2_ERROR_INVALID_HEADER;
705
0
  }
706
49.7k
  if (header->typesize == 0) {
707
0
    BLOSC_TRACE_ERROR("`typesize` is zero.");
708
0
    return BLOSC2_ERROR_INVALID_HEADER;
709
0
  }
710
711
  /* Read extended header if it is wanted */
712
49.7k
  if ((extended_header) && (header->flags & BLOSC_DOSHUFFLE) && (header->flags & BLOSC_DOBITSHUFFLE)) {
713
9.07k
    if (header->cbytes < BLOSC_EXTENDED_HEADER_LENGTH) {
714
0
      BLOSC_TRACE_ERROR("`cbytes` is too small to read extended header.");
715
0
      return BLOSC2_ERROR_INVALID_HEADER;
716
0
    }
717
9.07k
    if (srcsize < BLOSC_EXTENDED_HEADER_LENGTH) {
718
0
      BLOSC_TRACE_ERROR("Not enough space to read Blosc extended header.");
719
0
      return BLOSC2_ERROR_READ_BUFFER;
720
0
    }
721
722
9.07k
    memcpy((uint8_t *)header + BLOSC_MIN_HEADER_LENGTH, src + BLOSC_MIN_HEADER_LENGTH,
723
9.07k
      BLOSC_EXTENDED_HEADER_LENGTH - BLOSC_MIN_HEADER_LENGTH);
724
725
9.07k
    int32_t special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
726
9.07k
    if (special_type != 0) {
727
0
      if (special_type == BLOSC2_SPECIAL_VALUE) {
728
        // In this case, the actual type size must be derived from the cbytes
729
0
        int32_t typesize = header->cbytes - BLOSC_EXTENDED_HEADER_LENGTH;
730
0
        if (typesize <= 0) {
731
0
          BLOSC_TRACE_ERROR("`typesize` is zero or negative");
732
0
          return BLOSC2_ERROR_INVALID_HEADER;
733
0
        }
734
0
        if (typesize > BLOSC2_MAXTYPESIZE) {
735
0
          BLOSC_TRACE_ERROR("`typesize` is greater than maximum allowed");
736
0
          return BLOSC2_ERROR_INVALID_HEADER;
737
0
        }
738
0
        if (typesize > header->nbytes) {
739
0
          BLOSC_TRACE_ERROR("`typesize` is greater than `nbytes`");
740
0
          return BLOSC2_ERROR_INVALID_HEADER;
741
0
        }
742
0
        if (header->nbytes % typesize != 0) {
743
0
          BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
744
0
          return BLOSC2_ERROR_INVALID_HEADER;
745
0
        }
746
0
      }
747
0
      else {
748
0
        if (header->nbytes % header->typesize != 0) {
749
0
          BLOSC_TRACE_ERROR("`nbytes` is not a multiple of typesize");
750
0
          return BLOSC2_ERROR_INVALID_HEADER;
751
0
        }
752
0
      }
753
0
    }
754
    // The number of filters depends on the version of the header. Blosc2 alpha series
755
    // did not initialize filters to zero beyond the max supported.
756
9.07k
    if (header->version == BLOSC2_VERSION_FORMAT_ALPHA) {
757
0
      header->filters[5] = 0;
758
0
      header->filters_meta[5] = 0;
759
0
    }
760
9.07k
  }
761
40.6k
  else {
762
40.6k
    flags_to_filters(header->flags, header->filters);
763
40.6k
  }
764
49.7k
  return 0;
765
49.7k
}
766
767
16.5k
static inline void blosc2_calculate_blocks(blosc2_context* context) {
768
  /* Compute number of blocks in buffer */
769
16.5k
  context->nblocks = context->sourcesize / context->blocksize;
770
16.5k
  context->leftover = context->sourcesize % context->blocksize;
771
16.5k
  context->nblocks = (context->leftover > 0) ?
772
10.1k
                     (context->nblocks + 1) : context->nblocks;
773
16.5k
}
774
775
1.44k
static int blosc2_initialize_context_from_header(blosc2_context* context, blosc_header* header) {
776
1.44k
  context->header_flags = header->flags;
777
1.44k
  context->typesize = header->typesize;
778
1.44k
  context->sourcesize = header->nbytes;
779
1.44k
  context->blocksize = header->blocksize;
780
1.44k
  context->blosc2_flags = header->blosc2_flags;
781
1.44k
  context->compcode = header->flags >> 5;
782
1.44k
  if (context->compcode == BLOSC_UDCODEC_FORMAT) {
783
0
    context->compcode = header->udcompcode;
784
0
  }
785
1.44k
  blosc2_calculate_blocks(context);
786
787
1.44k
  bool is_lazy = false;
788
1.44k
  if ((context->header_flags & BLOSC_DOSHUFFLE) &&
789
1.44k
      (context->header_flags & BLOSC_DOBITSHUFFLE)) {
790
    /* Extended header */
791
1.44k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
792
793
1.44k
    memcpy(context->filters, header->filters, BLOSC2_MAX_FILTERS);
794
1.44k
    memcpy(context->filters_meta, header->filters_meta, BLOSC2_MAX_FILTERS);
795
1.44k
    context->compcode_meta = header->compcode_meta;
796
797
1.44k
    context->filter_flags = filters_to_flags(header->filters);
798
1.44k
    context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
799
800
1.44k
    is_lazy = (context->blosc2_flags & 0x08u);
801
1.44k
  }
802
0
  else {
803
0
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
804
0
    context->filter_flags = get_filter_flags(context->header_flags, context->typesize);
805
0
    flags_to_filters(context->header_flags, context->filters);
806
0
  }
807
808
  // Some checks for malformed headers
809
1.44k
  if (!is_lazy && header->cbytes > context->srcsize) {
810
0
    return BLOSC2_ERROR_INVALID_HEADER;
811
0
  }
812
813
1.44k
  return 0;
814
1.44k
}
815
816
817
0
int fill_filter(blosc2_filter *filter) {
818
0
  char libpath[PATH_MAX];
819
0
  void *lib = load_lib(filter->name, libpath);
820
0
  if(lib == NULL) {
821
0
    BLOSC_TRACE_ERROR("Error while loading the library");
822
0
    return BLOSC2_ERROR_FAILURE;
823
0
  }
824
825
0
  filter_info *info = dlsym(lib, "info");
826
0
  filter->forward = dlsym(lib, info->forward);
827
0
  filter->backward = dlsym(lib, info->backward);
828
829
0
  if (filter->forward == NULL || filter->backward == NULL){
830
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
831
0
    dlclose(lib);
832
0
    return BLOSC2_ERROR_FAILURE;
833
0
  }
834
835
0
  return BLOSC2_ERROR_SUCCESS;
836
0
}
837
838
839
0
int fill_codec(blosc2_codec *codec) {
840
0
  char libpath[PATH_MAX];
841
0
  void *lib = load_lib(codec->compname, libpath);
842
0
  if(lib == NULL) {
843
0
    BLOSC_TRACE_ERROR("Error while loading the library for codec `%s`", codec->compname);
844
0
    return BLOSC2_ERROR_FAILURE;
845
0
  }
846
847
0
  codec_info *info = dlsym(lib, "info");
848
0
  if (info == NULL) {
849
0
    BLOSC_TRACE_ERROR("`info` symbol cannot be loaded from plugin `%s`", codec->compname);
850
0
    dlclose(lib);
851
0
    return BLOSC2_ERROR_FAILURE;
852
0
  }
853
0
  codec->encoder = dlsym(lib, info->encoder);
854
0
  codec->decoder = dlsym(lib, info->decoder);
855
856
0
  if (codec->encoder == NULL || codec->decoder == NULL) {
857
0
    BLOSC_TRACE_ERROR("encoder or decoder cannot be loaded from plugin `%s`", codec->compname);
858
0
    dlclose(lib);
859
0
    return BLOSC2_ERROR_FAILURE;
860
0
  }
861
862
  /* If ever add .free function in future for codec params
863
  codecparams_info *info2 = dlsym(lib, "info2");
864
  if (info2 != NULL) {
865
    // New plugin (e.g. openzl) with free function for codec_params defined
866
    // will be used when destroying context in blosc2_free_ctx
867
      codec->free = dlsym(lib, info2->free);
868
  }
869
  else{
870
    codec->free = NULL;
871
  }
872
  */
873
 
874
0
  return BLOSC2_ERROR_SUCCESS;
875
0
}
876
877
878
0
int fill_tuner(blosc2_tuner *tuner) {
879
0
  char libpath[PATH_MAX] = {0};
880
0
  void *lib = load_lib(tuner->name, libpath);
881
0
  if(lib == NULL) {
882
0
    BLOSC_TRACE_ERROR("Error while loading the library");
883
0
    return BLOSC2_ERROR_FAILURE;
884
0
  }
885
886
0
  tuner_info *info = dlsym(lib, "info");
887
0
  tuner->init = dlsym(lib, info->init);
888
0
  tuner->update = dlsym(lib, info->update);
889
0
  tuner->next_blocksize = dlsym(lib, info->next_blocksize);
890
0
  tuner->free = dlsym(lib, info->free);
891
0
  tuner->next_cparams = dlsym(lib, info->next_cparams);
892
893
0
  if (tuner->init == NULL || tuner->update == NULL || tuner->next_blocksize == NULL || tuner->free == NULL
894
0
      || tuner->next_cparams == NULL){
895
0
    BLOSC_TRACE_ERROR("Wrong library loaded");
896
0
    dlclose(lib);
897
0
    return BLOSC2_ERROR_FAILURE;
898
0
  }
899
900
0
  return BLOSC2_ERROR_SUCCESS;
901
0
}
902
903
904
15.0k
static int blosc2_intialize_header_from_context(blosc2_context* context, blosc_header* header, bool extended_header) {
905
15.0k
  memset(header, 0, sizeof(blosc_header));
906
907
15.0k
  header->version = BLOSC2_VERSION_FORMAT;
908
15.0k
  header->versionlz = compcode_to_compversion(context->compcode);
909
15.0k
  header->flags = context->header_flags;
910
15.0k
  header->typesize = (uint8_t)context->typesize;
911
15.0k
  header->nbytes = (int32_t)context->sourcesize;
912
15.0k
  header->blocksize = (int32_t)context->blocksize;
913
914
15.0k
  int little_endian = is_little_endian();
915
15.0k
  if (!little_endian) {
916
0
    header->nbytes = bswap32_(header->nbytes);
917
0
    header->blocksize = bswap32_(header->blocksize);
918
    // cbytes written after compression
919
0
  }
920
921
15.0k
  if (extended_header) {
922
    /* Store filter pipeline info at the end of the header */
923
105k
    for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
924
90.3k
      header->filters[i] = context->filters[i];
925
90.3k
      header->filters_meta[i] = context->filters_meta[i];
926
90.3k
    }
927
15.0k
    header->udcompcode = context->compcode;
928
15.0k
    header->compcode_meta = context->compcode_meta;
929
930
15.0k
    if (!little_endian) {
931
0
      header->blosc2_flags |= BLOSC2_BIGENDIAN;
932
0
    }
933
15.0k
    if (context->use_dict) {
934
0
      header->blosc2_flags |= BLOSC2_USEDICT;
935
0
    }
936
15.0k
    if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
937
0
      header->blosc2_flags |= BLOSC2_INSTR_CODEC;
938
0
    }
939
15.0k
  }
940
941
15.0k
  return 0;
942
15.0k
}
943
944
173k
void _cycle_buffers(uint8_t **src, uint8_t **dest, uint8_t **tmp) {
945
173k
  uint8_t *tmp2 = *src;
946
173k
  *src = *dest;
947
173k
  *dest = *tmp;
948
173k
  *tmp = tmp2;
949
173k
}
950
951
uint8_t* pipeline_forward(struct thread_context* thread_context, const int32_t bsize,
952
                          const uint8_t* src, const int32_t offset,
953
151k
                          uint8_t* dest, uint8_t* tmp) {
954
151k
  blosc2_context* context = thread_context->parent_context;
955
151k
  uint8_t* _src = (uint8_t*)src + offset;
956
151k
  uint8_t* _tmp = tmp;
957
151k
  uint8_t* _dest = dest;
958
151k
  int32_t typesize = context->typesize;
959
151k
  uint8_t* filters = context->filters;
960
151k
  uint8_t* filters_meta = context->filters_meta;
961
151k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
962
151k
  bool output_is_disposable = (context->preparams != NULL) ? context->preparams->output_is_disposable : false;
963
  
964
  /* Prefilter function */
965
151k
  if (context->prefilter != NULL) {
966
    /* Set unwritten values to zero */
967
0
    if (!output_is_disposable) {
968
0
      memset(_dest, 0, bsize);
969
0
    }
970
    // Create new prefilter parameters for this block (must be private for each thread)
971
0
    blosc2_prefilter_params preparams;
972
0
    memcpy(&preparams, context->preparams, sizeof(preparams));
973
0
    preparams.input = _src;
974
0
    preparams.output = _dest;
975
0
    preparams.output_size = bsize;
976
0
    preparams.output_typesize = typesize;
977
0
    preparams.output_offset = offset;
978
0
    preparams.nblock = offset / context->blocksize;
979
0
    preparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
980
0
    preparams.tid = thread_context->tid;
981
0
    preparams.ttmp = thread_context->tmp;
982
0
    preparams.ttmp_nbytes = thread_context->tmp_nbytes;
983
0
    preparams.ctx = context;
984
0
    preparams.output_is_disposable = output_is_disposable;
985
986
0
    if (context->prefilter(&preparams) != 0) {
987
0
      if (output_is_disposable) {
988
        // Output is going to be discarded; no more filters are required
989
0
        BLOSC_TRACE_INFO("Output is disposable");
990
0
        return _dest;
991
0
      }
992
0
      BLOSC_TRACE_ERROR("Execution of prefilter function failed");
993
0
      return NULL;
994
0
    }
995
996
0
    if (memcpyed) {
997
      // No more filters are required
998
0
      return _dest;
999
0
    }
1000
0
    _cycle_buffers(&_src, &_dest, &_tmp);
1001
0
  }
1002
1003
  /* Process the filter pipeline */
1004
1.05M
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
1005
908k
    int rc = BLOSC2_ERROR_SUCCESS;
1006
908k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1007
908k
      switch (filters[i]) {
1008
0
        case BLOSC_SHUFFLE:
1009
0
          blosc2_shuffle(typesize, bsize, _src, _dest);
1010
0
          break;
1011
151k
        case BLOSC_BITSHUFFLE:
1012
151k
          if (blosc2_bitshuffle(typesize, bsize, _src, _dest) < 0) {
1013
0
            return NULL;
1014
0
          }
1015
151k
          break;
1016
151k
        case BLOSC_DELTA:
1017
0
          delta_encoder(src, offset, bsize, typesize, _src, _dest);
1018
0
          break;
1019
0
        case BLOSC_TRUNC_PREC:
1020
0
          if (truncate_precision(filters_meta[i], typesize, bsize, _src, _dest) < 0) {
1021
0
            return NULL;
1022
0
          }
1023
0
          break;
1024
757k
        default:
1025
757k
          if (filters[i] != BLOSC_NOFILTER) {
1026
0
            BLOSC_TRACE_ERROR("Filter %d not handled during compression\n", filters[i]);
1027
0
            return NULL;
1028
0
          }
1029
908k
      }
1030
908k
    }
1031
0
    else {
1032
      // Look for the filters_meta in user filters and run it
1033
0
      for (uint64_t j = 0; j < g_nfilters; ++j) {
1034
0
        if (g_filters[j].id == filters[i]) {
1035
0
          if (g_filters[j].forward == NULL) {
1036
            // Dynamically load library
1037
0
            if (fill_filter(&g_filters[j]) < 0) {
1038
0
              BLOSC_TRACE_ERROR("Could not load filter %d\n", g_filters[j].id);
1039
0
              return NULL;
1040
0
            }
1041
0
          }
1042
0
          if (g_filters[j].forward != NULL) {
1043
0
            blosc2_cparams cparams;
1044
0
            blosc2_ctx_get_cparams(context, &cparams);
1045
0
            rc = g_filters[j].forward(_src, _dest, bsize, filters_meta[i], &cparams, g_filters[j].id);
1046
0
          } else {
1047
0
            BLOSC_TRACE_ERROR("Forward function is NULL");
1048
0
            return NULL;
1049
0
          }
1050
0
          if (rc != BLOSC2_ERROR_SUCCESS) {
1051
0
            BLOSC_TRACE_ERROR("User-defined filter %d failed during compression\n", filters[i]);
1052
0
            return NULL;
1053
0
          }
1054
0
          goto urfiltersuccess;
1055
0
        }
1056
0
      }
1057
0
      BLOSC_TRACE_ERROR("User-defined filter %d not found during compression\n", filters[i]);
1058
0
      return NULL;
1059
1060
0
      urfiltersuccess:;
1061
1062
0
    }
1063
1064
    // Cycle buffers when required
1065
908k
    if (filters[i] != BLOSC_NOFILTER) {
1066
151k
      _cycle_buffers(&_src, &_dest, &_tmp);
1067
151k
    }
1068
908k
  }
1069
151k
  return _src;
1070
151k
}
1071
1072
1073
// Optimized version for detecting runs.  It compares 8 bytes values wherever possible.
1074
254k
static bool get_run(const uint8_t* ip, const uint8_t* ip_bound) {
1075
254k
  uint8_t x = *ip;
1076
254k
  int64_t value, value2;
1077
  /* Broadcast the value for every byte in a 64-bit register */
1078
254k
  memset(&value, x, 8);
1079
4.55M
  while (ip < (ip_bound - 8)) {
1080
#if defined(BLOSC_STRICT_ALIGN)
1081
    memcpy(&value2, ip, 8);
1082
#else
1083
4.55M
    value2 = *(int64_t*)ip;
1084
4.55M
#endif
1085
4.55M
    if (value != value2) {
1086
      // Values differ.  We don't have a run.
1087
249k
      return false;
1088
249k
    }
1089
4.30M
    else {
1090
4.30M
      ip += 8;
1091
4.30M
    }
1092
4.55M
  }
1093
  /* Look into the remainder */
1094
37.1k
  while ((ip < ip_bound) && (*ip == x)) ip++;
1095
4.51k
  return ip == ip_bound ? true : false;
1096
254k
}
1097
1098
1099
/* Shuffle & compress a single block */
1100
static int blosc_c(struct thread_context* thread_context, int32_t bsize,
1101
                   int32_t leftoverblock, int32_t ntbytes, int32_t destsize,
1102
                   const uint8_t* src, const int32_t offset, uint8_t* dest,
1103
254k
                   uint8_t* tmp, uint8_t* tmp2) {
1104
254k
  blosc2_context* context = thread_context->parent_context;
1105
254k
  int dont_split = (context->header_flags & 0x10) >> 4;
1106
254k
  int dict_training = context->use_dict && context->dict_cdict == NULL;
1107
254k
  int32_t j, neblock, nstreams;
1108
254k
  int32_t cbytes;                   /* number of compressed bytes in split */
1109
254k
  int32_t ctbytes = 0;              /* number of compressed bytes in block */
1110
254k
  int32_t maxout;
1111
254k
  int32_t typesize = context->typesize;
1112
254k
  bool output_is_disposable = (context->preparams != NULL) ? context->preparams->output_is_disposable : false;
1113
254k
  const char* compname;
1114
254k
  int accel;
1115
254k
  const uint8_t* _src;
1116
254k
  uint8_t *_tmp = tmp, *_tmp2 = tmp2;
1117
254k
  int last_filter_index = last_filter(context->filters, 'c');
1118
254k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1119
254k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1120
254k
  blosc_timestamp_t last, current;
1121
254k
  float filter_time = 0.f;
1122
1123
254k
  if (instr_codec) {
1124
0
    blosc_set_timestamp(&last);
1125
0
  }
1126
1127
  // See whether we have a run here
1128
254k
  if (last_filter_index >= 0 || context->prefilter != NULL) {
1129
    /* Apply the filter pipeline just for the prefilter */
1130
151k
    if (memcpyed && context->prefilter != NULL) {
1131
      // We only need the prefilter output
1132
0
      _src = pipeline_forward(thread_context, bsize, src, offset, dest, _tmp2);
1133
0
      if (_src == NULL) {
1134
0
        return BLOSC2_ERROR_FILTER_PIPELINE;
1135
0
      }
1136
0
      return bsize;
1137
0
    }
1138
    /* Apply regular filter pipeline */
1139
151k
    _src = pipeline_forward(thread_context, bsize, src, offset, _tmp, _tmp2);
1140
151k
    if (_src == NULL) {
1141
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1142
0
    }
1143
151k
  } else {
1144
102k
    _src = src + offset;
1145
102k
  }
1146
1147
254k
  if (instr_codec) {
1148
0
    blosc_set_timestamp(&current);
1149
0
    filter_time = (float) blosc_elapsed_secs(last, current);
1150
0
    last = current;
1151
0
  }
1152
1153
254k
  assert(context->clevel > 0);
1154
1155
  /* Calculate acceleration for different compressors */
1156
254k
  accel = get_accel(context);
1157
1158
  /* The number of compressed data streams for this block */
1159
254k
  if (!dont_split && !leftoverblock && !dict_training) {
1160
0
    nstreams = (int32_t)typesize;
1161
0
  }
1162
254k
  else {
1163
254k
    nstreams = 1;
1164
254k
  }
1165
254k
  neblock = bsize / nstreams;
1166
502k
  for (j = 0; j < nstreams; j++) {
1167
254k
    if (instr_codec) {
1168
0
      blosc_set_timestamp(&last);
1169
0
    }
1170
254k
    if (!dict_training) {
1171
254k
      dest += sizeof(int32_t);
1172
254k
      ntbytes += sizeof(int32_t);
1173
254k
      ctbytes += sizeof(int32_t);
1174
1175
254k
      if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && output_is_disposable) {
1176
        // Simulate a run of 0s
1177
0
        BLOSC_TRACE_INFO("Output is disposable, simulating a run of 0s");
1178
0
        memset(dest - 4, 0, sizeof(int32_t));
1179
0
        continue;
1180
0
      }
1181
1182
254k
      const uint8_t *ip = (uint8_t *) _src + j * neblock;
1183
254k
      const uint8_t *ipbound = (uint8_t *) _src + (j + 1) * neblock;
1184
1185
254k
      if (context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH && get_run(ip, ipbound)) {
1186
        // A run
1187
4.02k
        int32_t value = _src[j * neblock];
1188
4.02k
        if (ntbytes > destsize) {
1189
5
          return 0;    /* Non-compressible data */
1190
5
        }
1191
1192
4.02k
        if (instr_codec) {
1193
0
          blosc_set_timestamp(&current);
1194
0
          int32_t instr_size = sizeof(blosc2_instr);
1195
0
          ntbytes += instr_size;
1196
0
          ctbytes += instr_size;
1197
0
          if (ntbytes > destsize) {
1198
0
            return 0;    /* Non-compressible data */
1199
0
          }
1200
0
          _sw32(dest - 4, instr_size);
1201
0
          blosc2_instr *desti = (blosc2_instr *)dest;
1202
0
          memset(desti, 0, sizeof(blosc2_instr));
1203
          // Special values have an overhead of about 1 int32
1204
0
          int32_t ssize = value == 0 ? sizeof(int32_t) : sizeof(int32_t) + 1;
1205
0
          desti->cratio = (float) neblock / (float) ssize;
1206
0
          float ctime = (float) blosc_elapsed_secs(last, current);
1207
0
          desti->cspeed = (float) neblock / ctime;
1208
0
          desti->filter_speed = (float) neblock / filter_time;
1209
0
          desti->flags[0] = 1;    // mark a runlen
1210
0
          dest += instr_size;
1211
0
          continue;
1212
0
        }
1213
1214
        // Encode the repeated byte in the first (LSB) byte of the length of the split.
1215
4.02k
        _sw32(dest - 4, -value);    // write the value in two's complement
1216
4.02k
        if (value > 0) {
1217
          // Mark encoding as a run-length (== 0 is always a 0's run)
1218
2.53k
          ntbytes += 1;
1219
2.53k
          ctbytes += 1;
1220
2.53k
          if (ntbytes > destsize) {
1221
7
            return 0;    /* Non-compressible data */
1222
7
          }
1223
          // Set MSB bit (sign) to 1 (not really necessary here, but for demonstration purposes)
1224
          // dest[-1] |= 0x80;
1225
2.52k
          dest[0] = 0x1;   // set run-length bit (0) in token
1226
2.52k
          dest += 1;
1227
2.52k
        }
1228
4.01k
        continue;
1229
4.02k
      }
1230
254k
    }
1231
1232
250k
    maxout = neblock;
1233
250k
    if (ntbytes + maxout > destsize && !instr_codec) {
1234
      /* avoid buffer * overrun */
1235
10.1k
      maxout = destsize - ntbytes;
1236
10.1k
      if (maxout <= 0) {
1237
109
        return 0;                  /* non-compressible block */
1238
109
      }
1239
10.1k
    }
1240
250k
    if (dict_training) {
1241
      // We are in the build dict state, so don't compress
1242
      // TODO: copy only a percentage for sampling
1243
0
      memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1244
0
      cbytes = (int32_t)neblock;
1245
0
    }
1246
250k
    else if (context->compcode == BLOSC_BLOSCLZ) {
1247
5.52k
      cbytes = blosclz_compress(context->clevel, _src + j * neblock,
1248
5.52k
                                (int)neblock, dest, maxout, context);
1249
5.52k
    }
1250
244k
    else if (context->compcode == BLOSC_LZ4) {
1251
6.98k
      void *hash_table = NULL;
1252
    #ifdef HAVE_IPP
1253
      hash_table = (void*)thread_context->lz4_hash_table;
1254
    #endif
1255
6.98k
      cbytes = lz4_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1256
6.98k
                                 (char*)dest, (size_t)maxout, accel, hash_table);
1257
6.98k
    }
1258
237k
    else if (context->compcode == BLOSC_LZ4HC) {
1259
30.2k
      cbytes = lz4hc_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1260
30.2k
                                   (char*)dest, (size_t)maxout, context->clevel);
1261
30.2k
    }
1262
207k
  #if defined(HAVE_ZLIB)
1263
207k
    else if (context->compcode == BLOSC_ZLIB) {
1264
33.7k
      cbytes = zlib_wrap_compress((char*)_src + j * neblock, (size_t)neblock,
1265
33.7k
                                  (char*)dest, (size_t)maxout, context->clevel);
1266
33.7k
    }
1267
173k
  #endif /* HAVE_ZLIB */
1268
173k
  #if defined(HAVE_ZSTD)
1269
173k
    else if (context->compcode == BLOSC_ZSTD) {
1270
173k
      cbytes = zstd_wrap_compress(thread_context,
1271
173k
                                  (char*)_src + j * neblock, (size_t)neblock,
1272
173k
                                  (char*)dest, (size_t)maxout, context->clevel);
1273
173k
    }
1274
0
  #endif /* HAVE_ZSTD */
1275
0
    else if (context->compcode > BLOSC2_DEFINED_CODECS_STOP) {
1276
0
      for (int i = 0; i < g_ncodecs; ++i) {
1277
0
        if (g_codecs[i].compcode == context->compcode) {
1278
0
          if (g_codecs[i].encoder == NULL) {
1279
            // Dynamically load codec plugin
1280
0
            if (fill_codec(&g_codecs[i]) < 0) {
1281
0
              BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1282
0
              return BLOSC2_ERROR_CODEC_SUPPORT;
1283
0
            }
1284
0
          }
1285
0
          blosc2_cparams cparams;
1286
0
          blosc2_ctx_get_cparams(context, &cparams);
1287
0
          cbytes = g_codecs[i].encoder(_src + j * neblock,
1288
0
                                        neblock,
1289
0
                                        dest,
1290
0
                                        maxout,
1291
0
                                        context->compcode_meta,
1292
0
                                        &cparams,
1293
0
                                        context->src);
1294
0
          goto urcodecsuccess;
1295
0
        }
1296
0
      }
1297
0
      BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during compression", context->compcode);
1298
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1299
0
    urcodecsuccess:
1300
0
      ;
1301
0
    } else {
1302
0
      blosc2_compcode_to_compname(context->compcode, &compname);
1303
0
      BLOSC_TRACE_ERROR("Blosc has not been compiled with '%s' compression support."
1304
0
                        "Please use one having it.", compname);
1305
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
1306
0
    }
1307
1308
250k
    if (cbytes > maxout) {
1309
      /* Buffer overrun caused by compression (should never happen) */
1310
0
      return BLOSC2_ERROR_WRITE_BUFFER;
1311
0
    }
1312
250k
    if (cbytes < 0) {
1313
      /* cbytes should never be negative */
1314
0
      return BLOSC2_ERROR_DATA;
1315
0
    }
1316
250k
    if (cbytes == 0) {
1317
      // When cbytes is 0, the compressor has not been able to compress anything
1318
52.3k
      cbytes = neblock;
1319
52.3k
    }
1320
1321
250k
    if (instr_codec) {
1322
0
      blosc_set_timestamp(&current);
1323
0
      int32_t instr_size = sizeof(blosc2_instr);
1324
0
      ntbytes += instr_size;
1325
0
      ctbytes += instr_size;
1326
0
      if (ntbytes > destsize) {
1327
0
        return 0;    /* Non-compressible data */
1328
0
      }
1329
0
      _sw32(dest - 4, instr_size);
1330
0
      float ctime = (float)blosc_elapsed_secs(last, current);
1331
0
      blosc2_instr *desti = (blosc2_instr *)dest;
1332
0
      memset(desti, 0, sizeof(blosc2_instr));
1333
      // cratio is computed having into account 1 additional int (csize)
1334
0
      desti->cratio = (float)neblock / (float)(cbytes + sizeof(int32_t));
1335
0
      desti->cspeed = (float)neblock / ctime;
1336
0
      desti->filter_speed = (float) neblock / filter_time;
1337
0
      dest += instr_size;
1338
0
      continue;
1339
0
    }
1340
1341
250k
    if (!dict_training) {
1342
250k
      if (cbytes == neblock) {
1343
        /* The compressor has been unable to compress data at all. */
1344
        /* Before doing the copy, check that we are not running into a
1345
           buffer overflow. */
1346
52.7k
        if ((ntbytes + neblock) > destsize) {
1347
5.85k
          return 0;    /* Non-compressible data */
1348
5.85k
        }
1349
46.8k
        memcpy(dest, _src + j * neblock, (unsigned int)neblock);
1350
46.8k
        cbytes = neblock;
1351
46.8k
      }
1352
244k
      _sw32(dest - 4, cbytes);
1353
244k
    }
1354
244k
    dest += cbytes;
1355
244k
    ntbytes += cbytes;
1356
244k
    ctbytes += cbytes;
1357
244k
  }  /* Closes j < nstreams */
1358
1359
248k
  return ctbytes;
1360
254k
}
1361
1362
1363
/* Process the filter pipeline (decompression mode) */
1364
int pipeline_backward(struct thread_context* thread_context, const int32_t bsize, uint8_t* dest,
1365
                      const int32_t offset, uint8_t* src, uint8_t* tmp,
1366
22.2k
                      uint8_t* tmp2, int last_filter_index, int32_t nblock) {
1367
22.2k
  blosc2_context* context = thread_context->parent_context;
1368
22.2k
  int32_t typesize = context->typesize;
1369
22.2k
  uint8_t* filters = context->filters;
1370
22.2k
  uint8_t* filters_meta = context->filters_meta;
1371
22.2k
  uint8_t* _src = src;
1372
22.2k
  uint8_t* _dest = tmp;
1373
22.2k
  uint8_t* _tmp = tmp2;
1374
22.2k
  int errcode = 0;
1375
1376
22.2k
  for (int i = BLOSC2_MAX_FILTERS - 1; i >= 0; i--) {
1377
    // Delta filter requires the whole chunk ready
1378
22.2k
    int last_copy_filter = (last_filter_index == i) || (next_filter(filters, i, 'd') == BLOSC_DELTA);
1379
22.2k
    if (last_copy_filter && context->postfilter == NULL) {
1380
22.2k
      _dest = dest + offset;
1381
22.2k
    }
1382
22.2k
    int rc = BLOSC2_ERROR_SUCCESS;
1383
22.2k
    if (filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
1384
22.2k
      switch (filters[i]) {
1385
0
        case BLOSC_SHUFFLE:
1386
0
          blosc2_unshuffle(typesize, bsize, _src, _dest);
1387
0
          break;
1388
22.2k
        case BLOSC_BITSHUFFLE:
1389
22.2k
          if (bitunshuffle(typesize, bsize, _src, _dest, context->src[BLOSC2_CHUNK_VERSION]) < 0) {
1390
0
            return BLOSC2_ERROR_FILTER_PIPELINE;
1391
0
          }
1392
22.2k
          break;
1393
22.2k
        case BLOSC_DELTA:
1394
0
          if (context->nthreads == 1) {
1395
            /* Serial mode */
1396
0
            delta_decoder(dest, offset, bsize, typesize, _dest);
1397
0
          } else {
1398
            /* Force the thread in charge of the block 0 to go first */
1399
0
            blosc2_pthread_mutex_lock(&context->delta_mutex);
1400
0
            if (context->dref_not_init) {
1401
0
              if (offset != 0) {
1402
0
                blosc2_pthread_cond_wait(&context->delta_cv, &context->delta_mutex);
1403
0
              } else {
1404
0
                delta_decoder(dest, offset, bsize, typesize, _dest);
1405
0
                context->dref_not_init = 0;
1406
0
                blosc2_pthread_cond_broadcast(&context->delta_cv);
1407
0
              }
1408
0
            }
1409
0
            blosc2_pthread_mutex_unlock(&context->delta_mutex);
1410
0
            if (offset != 0) {
1411
0
              delta_decoder(dest, offset, bsize, typesize, _dest);
1412
0
            }
1413
0
          }
1414
0
          break;
1415
0
        case BLOSC_TRUNC_PREC:
1416
          // TRUNC_PREC filter does not need to be undone
1417
0
          break;
1418
0
        default:
1419
0
          if (filters[i] != BLOSC_NOFILTER) {
1420
0
            BLOSC_TRACE_ERROR("Filter %d not handled during decompression.",
1421
0
                              filters[i]);
1422
0
            errcode = -1;
1423
0
          }
1424
22.2k
      }
1425
22.2k
    } else {
1426
        // Look for the filters_meta in user filters and run it
1427
0
        for (uint64_t j = 0; j < g_nfilters; ++j) {
1428
0
          if (g_filters[j].id == filters[i]) {
1429
0
            if (g_filters[j].backward == NULL) {
1430
              // Dynamically load filter
1431
0
              if (fill_filter(&g_filters[j]) < 0) {
1432
0
                BLOSC_TRACE_ERROR("Could not load filter %d.", g_filters[j].id);
1433
0
                return BLOSC2_ERROR_FILTER_PIPELINE;
1434
0
              }
1435
0
            }
1436
0
            if (g_filters[j].backward != NULL) {
1437
0
              blosc2_dparams dparams;
1438
0
              blosc2_ctx_get_dparams(context, &dparams);
1439
0
              rc = g_filters[j].backward(_src, _dest, bsize, filters_meta[i], &dparams, g_filters[j].id);
1440
0
            } else {
1441
0
              BLOSC_TRACE_ERROR("Backward function is NULL");
1442
0
              return BLOSC2_ERROR_FILTER_PIPELINE;
1443
0
            }
1444
0
            if (rc != BLOSC2_ERROR_SUCCESS) {
1445
0
              BLOSC_TRACE_ERROR("User-defined filter %d failed during decompression.", filters[i]);
1446
0
              return rc;
1447
0
            }
1448
0
            goto urfiltersuccess;
1449
0
          }
1450
0
        }
1451
0
      BLOSC_TRACE_ERROR("User-defined filter %d not found during decompression.", filters[i]);
1452
0
      return BLOSC2_ERROR_FILTER_PIPELINE;
1453
0
      urfiltersuccess:;
1454
0
    }
1455
1456
    // Cycle buffers when required
1457
22.2k
    if ((filters[i] != BLOSC_NOFILTER) && (filters[i] != BLOSC_TRUNC_PREC)) {
1458
22.2k
      _cycle_buffers(&_src, &_dest, &_tmp);
1459
22.2k
    }
1460
22.2k
    if (last_filter_index == i) {
1461
22.2k
      break;
1462
22.2k
    }
1463
22.2k
  }
1464
1465
  /* Postfilter function */
1466
22.2k
  if (context->postfilter != NULL) {
1467
    // Create new postfilter parameters for this block (must be private for each thread)
1468
0
    blosc2_postfilter_params postparams;
1469
0
    memcpy(&postparams, context->postparams, sizeof(postparams));
1470
0
    postparams.input = _src;
1471
0
    postparams.output = dest + offset;
1472
0
    postparams.size = bsize;
1473
0
    postparams.typesize = typesize;
1474
0
    postparams.offset = nblock * context->blocksize;
1475
0
    postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1476
0
    postparams.nblock = nblock;
1477
0
    postparams.tid = thread_context->tid;
1478
0
    postparams.ttmp = thread_context->tmp;
1479
0
    postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1480
0
    postparams.ctx = context;
1481
1482
0
    if (context->postfilter(&postparams) != 0) {
1483
0
      BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1484
0
      return BLOSC2_ERROR_POSTFILTER;
1485
0
    }
1486
0
  }
1487
1488
22.2k
  return errcode;
1489
22.2k
}
1490
1491
1492
0
static int32_t set_nans(int32_t typesize, uint8_t* dest, int32_t destsize) {
1493
0
  if (destsize % typesize != 0) {
1494
0
    BLOSC_TRACE_ERROR("destsize can only be a multiple of typesize");
1495
0
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1496
0
  }
1497
0
  int32_t nitems = destsize / typesize;
1498
0
  if (nitems == 0) {
1499
0
    return 0;
1500
0
  }
1501
1502
0
  if (typesize == 4) {
1503
0
    float* dest_ = (float*)dest;
1504
0
    float val = nanf("");
1505
0
    for (int i = 0; i < nitems; i++) {
1506
0
      dest_[i] = val;
1507
0
    }
1508
0
    return nitems;
1509
0
  }
1510
0
  else if (typesize == 8) {
1511
0
    double* dest_ = (double*)dest;
1512
0
    double val = nan("");
1513
0
    for (int i = 0; i < nitems; i++) {
1514
0
      dest_[i] = val;
1515
0
    }
1516
0
    return nitems;
1517
0
  }
1518
1519
0
  BLOSC_TRACE_ERROR("Unsupported typesize for NaN");
1520
0
  return BLOSC2_ERROR_DATA;
1521
0
}
1522
1523
1524
0
static int32_t set_values(int32_t typesize, const uint8_t* src, uint8_t* dest, int32_t destsize) {
1525
#if defined(BLOSC_STRICT_ALIGN)
1526
  if (destsize % typesize != 0) {
1527
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1528
  }
1529
  int32_t nitems = destsize / typesize;
1530
  if (nitems == 0) {
1531
    return 0;
1532
  }
1533
  for (int i = 0; i < nitems; i++) {
1534
    memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1535
  }
1536
#else
1537
  // destsize can only be a multiple of typesize
1538
0
  int64_t val8;
1539
0
  int64_t* dest8;
1540
0
  int32_t val4;
1541
0
  int32_t* dest4;
1542
0
  int16_t val2;
1543
0
  int16_t* dest2;
1544
0
  int8_t val1;
1545
0
  int8_t* dest1;
1546
1547
0
  if (destsize % typesize != 0) {
1548
0
    BLOSC_ERROR(BLOSC2_ERROR_FAILURE);
1549
0
  }
1550
0
  int32_t nitems = destsize / typesize;
1551
0
  if (nitems == 0) {
1552
0
    return 0;
1553
0
  }
1554
1555
0
  switch (typesize) {
1556
0
    case 8:
1557
0
      val8 = ((int64_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1558
0
      dest8 = (int64_t*)dest;
1559
0
      for (int i = 0; i < nitems; i++) {
1560
0
        dest8[i] = val8;
1561
0
      }
1562
0
      break;
1563
0
    case 4:
1564
0
      val4 = ((int32_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1565
0
      dest4 = (int32_t*)dest;
1566
0
      for (int i = 0; i < nitems; i++) {
1567
0
        dest4[i] = val4;
1568
0
      }
1569
0
      break;
1570
0
    case 2:
1571
0
      val2 = ((int16_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1572
0
      dest2 = (int16_t*)dest;
1573
0
      for (int i = 0; i < nitems; i++) {
1574
0
        dest2[i] = val2;
1575
0
      }
1576
0
      break;
1577
0
    case 1:
1578
0
      val1 = ((int8_t*)(src + BLOSC_EXTENDED_HEADER_LENGTH))[0];
1579
0
      dest1 = (int8_t*)dest;
1580
0
      for (int i = 0; i < nitems; i++) {
1581
0
        dest1[i] = val1;
1582
0
      }
1583
0
      break;
1584
0
    default:
1585
0
      for (int i = 0; i < nitems; i++) {
1586
0
        memcpy(dest + i * typesize, src + BLOSC_EXTENDED_HEADER_LENGTH, typesize);
1587
0
      }
1588
0
  }
1589
0
#endif
1590
1591
0
  return nitems;
1592
0
}
1593
1594
1595
/* Decompress & unshuffle a single block */
1596
static int blosc_d(
1597
    struct thread_context* thread_context, int32_t bsize,
1598
    int32_t leftoverblock, bool memcpyed, const uint8_t* src, int32_t srcsize, int32_t src_offset,
1599
31.6k
    int32_t nblock, uint8_t* dest, int32_t dest_offset, uint8_t* tmp, uint8_t* tmp2) {
1600
31.6k
  blosc2_context* context = thread_context->parent_context;
1601
31.6k
  uint8_t* filters = context->filters;
1602
31.6k
  uint8_t *tmp3 = thread_context->tmp4;
1603
31.6k
  int32_t compformat = (context->header_flags & (uint8_t)0xe0) >> 5u;
1604
31.6k
  int dont_split = (context->header_flags & 0x10) >> 4;
1605
31.6k
  int32_t chunk_nbytes;
1606
31.6k
  int32_t chunk_cbytes;
1607
31.6k
  int nstreams;
1608
31.6k
  int32_t neblock;
1609
31.6k
  int32_t nbytes;                /* number of decompressed bytes in split */
1610
31.6k
  int32_t cbytes;                /* number of compressed bytes in split */
1611
  // int32_t ctbytes = 0;           /* number of compressed bytes in block */
1612
31.6k
  int32_t ntbytes = 0;           /* number of uncompressed bytes in block */
1613
31.6k
  uint8_t* _dest;
1614
31.6k
  int32_t typesize = context->typesize;
1615
31.6k
  bool instr_codec = context->blosc2_flags & BLOSC2_INSTR_CODEC;
1616
31.6k
  const char* compname;
1617
31.6k
  int rc;
1618
1619
31.6k
  if (context->block_maskout != NULL && context->block_maskout[nblock]) {
1620
    // Do not decompress, but act as if we successfully decompressed everything
1621
0
    return bsize;
1622
0
  }
1623
1624
31.6k
  rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, NULL);
1625
31.6k
  if (rc < 0) {
1626
0
    return rc;
1627
0
  }
1628
31.6k
  if (context->special_type == BLOSC2_SPECIAL_VALUE) {
1629
    // We need the actual typesize in this case, but it cannot be encoded in the header, so derive it from cbytes
1630
0
    typesize = chunk_cbytes - context->header_overhead;
1631
0
  }
1632
1633
  // In some situations (lazychunks) the context can arrive uninitialized
1634
  // (but BITSHUFFLE needs it for accessing the format of the chunk)
1635
31.6k
  if (context->src == NULL) {
1636
0
    context->src = src;
1637
0
  }
1638
1639
  // Chunks with special values cannot be lazy
1640
31.6k
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
1641
31.6k
          (context->blosc2_flags & 0x08u) && !context->special_type);
1642
31.6k
  if (is_lazy) {
1643
    // The chunk is on disk, so just lazily load the block
1644
0
    if (context->schunk == NULL) {
1645
0
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated super-chunk.");
1646
0
      return BLOSC2_ERROR_INVALID_PARAM;
1647
0
    }
1648
0
    if (context->schunk->frame == NULL) {
1649
0
      BLOSC_TRACE_ERROR("Lazy chunk needs an associated frame.");
1650
0
      return BLOSC2_ERROR_INVALID_PARAM;
1651
0
    }
1652
0
    blosc2_frame_s* frame = (blosc2_frame_s*)context->schunk->frame;
1653
0
    char* urlpath = frame->urlpath;
1654
0
    size_t trailer_offset = BLOSC_EXTENDED_HEADER_LENGTH + context->nblocks * sizeof(int32_t);
1655
0
    int32_t nchunk;
1656
0
    int64_t chunk_offset;
1657
    // The nchunk and the offset of the current chunk are in the trailer
1658
0
    nchunk = *(int32_t*)(src + trailer_offset);
1659
0
    chunk_offset = *(int64_t*)(src + trailer_offset + sizeof(int32_t));
1660
    // Get the csize of the nblock
1661
0
    int32_t *block_csizes = (int32_t *)(src + trailer_offset + sizeof(int32_t) + sizeof(int64_t));
1662
0
    int32_t block_csize = block_csizes[nblock];
1663
    // Read the lazy block on disk
1664
0
    void* fp = NULL;
1665
0
    blosc2_io_cb *io_cb = blosc2_get_io_cb(context->schunk->storage->io->id);
1666
0
    if (io_cb == NULL) {
1667
0
      BLOSC_TRACE_ERROR("Error getting the input/output API");
1668
0
      return BLOSC2_ERROR_PLUGIN_IO;
1669
0
    }
1670
1671
0
    int64_t io_pos = 0;
1672
0
    if (frame->sframe) {
1673
      // The chunk is not in the frame
1674
0
      char* chunkpath = malloc(strlen(frame->urlpath) + 1 + 8 + strlen(".chunk") + 1);
1675
0
      BLOSC_ERROR_NULL(chunkpath, BLOSC2_ERROR_MEMORY_ALLOC);
1676
0
      sprintf(chunkpath, "%s/%08X.chunk", frame->urlpath, nchunk);
1677
0
      fp = io_cb->open(chunkpath, "rb", context->schunk->storage->io->params);
1678
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1679
0
      free(chunkpath);
1680
      // The offset of the block is src_offset
1681
0
      io_pos = src_offset;
1682
0
    }
1683
0
    else {
1684
0
      fp = io_cb->open(urlpath, "rb", context->schunk->storage->io->params);
1685
0
      BLOSC_ERROR_NULL(fp, BLOSC2_ERROR_FILE_OPEN);
1686
      // The offset of the block is src_offset
1687
0
      io_pos = frame->file_offset + chunk_offset + src_offset;
1688
0
    }
1689
    // We can make use of tmp3 because it will be used after src is not needed anymore
1690
0
    int64_t rbytes = io_cb->read((void**)&tmp3, 1, block_csize, io_pos, fp);
1691
0
    io_cb->close(fp);
1692
0
    if ((int32_t)rbytes != block_csize) {
1693
0
      BLOSC_TRACE_ERROR("Cannot read the (lazy) block out of the fileframe.");
1694
0
      return BLOSC2_ERROR_READ_BUFFER;
1695
0
    }
1696
0
    src = tmp3;
1697
0
    src_offset = 0;
1698
0
    srcsize = block_csize;
1699
0
  }
1700
1701
  // If the chunk is memcpyed, we just have to copy the block to dest and return
1702
31.6k
  if (memcpyed) {
1703
0
    int bsize_ = leftoverblock ? chunk_nbytes % context->blocksize : bsize;
1704
0
    if (!context->special_type) {
1705
0
      if (chunk_nbytes + context->header_overhead != chunk_cbytes) {
1706
0
        return BLOSC2_ERROR_WRITE_BUFFER;
1707
0
      }
1708
0
      if (chunk_cbytes < context->header_overhead + (nblock * context->blocksize) + bsize_) {
1709
        /* Not enough input to copy block */
1710
0
        return BLOSC2_ERROR_READ_BUFFER;
1711
0
      }
1712
0
    }
1713
0
    if (!is_lazy) {
1714
0
      src += context->header_overhead + nblock * context->blocksize;
1715
0
    }
1716
0
    _dest = dest + dest_offset;
1717
0
    if (context->postfilter != NULL) {
1718
      // We are making use of a postfilter, so use a temp for destination
1719
0
      _dest = tmp;
1720
0
    }
1721
0
    rc = 0;
1722
0
    switch (context->special_type) {
1723
0
      case BLOSC2_SPECIAL_VALUE:
1724
        // All repeated values
1725
0
        rc = set_values(typesize, context->src, _dest, bsize_);
1726
0
        if (rc < 0) {
1727
0
          BLOSC_TRACE_ERROR("set_values failed");
1728
0
          return BLOSC2_ERROR_DATA;
1729
0
        }
1730
0
        break;
1731
0
      case BLOSC2_SPECIAL_NAN:
1732
0
        rc = set_nans(context->typesize, _dest, bsize_);
1733
0
        if (rc < 0) {
1734
0
          BLOSC_TRACE_ERROR("set_nans failed");
1735
0
          return BLOSC2_ERROR_DATA;
1736
0
        }
1737
0
        break;
1738
0
      case BLOSC2_SPECIAL_ZERO:
1739
0
        memset(_dest, 0, bsize_);
1740
0
        break;
1741
0
      case BLOSC2_SPECIAL_UNINIT:
1742
        // We do nothing here
1743
0
        break;
1744
0
      default:
1745
0
        memcpy(_dest, src, bsize_);
1746
0
    }
1747
0
    if (context->postfilter != NULL) {
1748
      // Create new postfilter parameters for this block (must be private for each thread)
1749
0
      blosc2_postfilter_params postparams;
1750
0
      memcpy(&postparams, context->postparams, sizeof(postparams));
1751
0
      postparams.input = tmp;
1752
0
      postparams.output = dest + dest_offset;
1753
0
      postparams.size = bsize;
1754
0
      postparams.typesize = typesize;
1755
0
      postparams.offset = nblock * context->blocksize;
1756
0
      postparams.nchunk = context->schunk != NULL ? context->schunk->current_nchunk : -1;
1757
0
      postparams.nblock = nblock;
1758
0
      postparams.tid = thread_context->tid;
1759
0
      postparams.ttmp = thread_context->tmp;
1760
0
      postparams.ttmp_nbytes = thread_context->tmp_nbytes;
1761
0
      postparams.ctx = context;
1762
1763
      // Execute the postfilter (the processed block will be copied to dest)
1764
0
      if (context->postfilter(&postparams) != 0) {
1765
0
        BLOSC_TRACE_ERROR("Execution of postfilter function failed");
1766
0
        return BLOSC2_ERROR_POSTFILTER;
1767
0
      }
1768
0
    }
1769
0
    thread_context->zfp_cell_nitems = 0;
1770
1771
0
    return bsize_;
1772
0
  }
1773
1774
31.6k
  if (!is_lazy && (src_offset <= 0 || src_offset >= srcsize)) {
1775
    /* Invalid block src offset encountered */
1776
0
    return BLOSC2_ERROR_DATA;
1777
0
  }
1778
1779
31.6k
  src += src_offset;
1780
31.6k
  srcsize -= src_offset;
1781
1782
31.6k
  int last_filter_index = last_filter(filters, 'd');
1783
31.6k
  if (instr_codec) {
1784
    // If instrumented, we don't want to run the filters
1785
0
    _dest = dest + dest_offset;
1786
0
  }
1787
31.6k
  else if (((last_filter_index >= 0) &&
1788
22.2k
       (next_filter(filters, BLOSC2_MAX_FILTERS, 'd') != BLOSC_DELTA)) ||
1789
22.2k
    context->postfilter != NULL) {
1790
    // We are making use of some filter, so use a temp for destination
1791
22.2k
    _dest = tmp;
1792
22.2k
  }
1793
9.33k
  else {
1794
    // If no filters, or only DELTA in pipeline
1795
9.33k
    _dest = dest + dest_offset;
1796
9.33k
  }
1797
1798
  /* The number of compressed data streams for this block */
1799
31.6k
  if (!dont_split && !leftoverblock) {
1800
0
    nstreams = context->typesize;
1801
0
  }
1802
31.6k
  else {
1803
31.6k
    nstreams = 1;
1804
31.6k
  }
1805
1806
31.6k
  neblock = bsize / nstreams;
1807
31.6k
  if (neblock == 0) {
1808
    /* Not enough space to output bytes */
1809
0
    BLOSC_ERROR(BLOSC2_ERROR_WRITE_BUFFER);
1810
0
  }
1811
63.2k
  for (int j = 0; j < nstreams; j++) {
1812
31.6k
    if (srcsize < (signed)sizeof(int32_t)) {
1813
      /* Not enough input to read compressed size */
1814
0
      return BLOSC2_ERROR_READ_BUFFER;
1815
0
    }
1816
31.6k
    srcsize -= sizeof(int32_t);
1817
31.6k
    cbytes = sw32_(src);      /* amount of compressed bytes */
1818
31.6k
    if (cbytes > 0) {
1819
31.5k
      if (srcsize < cbytes) {
1820
        /* Not enough input to read compressed bytes */
1821
0
        return BLOSC2_ERROR_READ_BUFFER;
1822
0
      }
1823
31.5k
      srcsize -= cbytes;
1824
31.5k
    }
1825
31.6k
    src += sizeof(int32_t);
1826
    // ctbytes += (signed)sizeof(int32_t);
1827
1828
    /* Uncompress */
1829
31.6k
    if (cbytes == 0) {
1830
      // A run of 0's
1831
6
      memset(_dest, 0, (unsigned int)neblock);
1832
6
      nbytes = neblock;
1833
6
    }
1834
31.6k
    else if (cbytes < 0) {
1835
      // A negative number means some encoding depending on the token that comes next
1836
23
      uint8_t token;
1837
1838
23
      if (srcsize < (signed)sizeof(uint8_t)) {
1839
        // Not enough input to read token */
1840
0
        return BLOSC2_ERROR_READ_BUFFER;
1841
0
      }
1842
23
      srcsize -= sizeof(uint8_t);
1843
1844
23
      token = src[0];
1845
23
      src += 1;
1846
      // ctbytes += 1;
1847
1848
23
      if (token & 0x1) {
1849
        // A run of bytes that are different than 0
1850
23
        if (cbytes < -255) {
1851
          // Runs can only encode a byte
1852
0
          return BLOSC2_ERROR_RUN_LENGTH;
1853
0
        }
1854
23
        uint8_t value = -cbytes;
1855
23
        memset(_dest, value, (unsigned int)neblock);
1856
23
      } else {
1857
0
        BLOSC_TRACE_ERROR("Invalid or unsupported compressed stream token value - %d", token);
1858
0
        return BLOSC2_ERROR_RUN_LENGTH;
1859
0
      }
1860
23
      nbytes = neblock;
1861
23
      cbytes = 0;  // everything is encoded in the cbytes token
1862
23
    }
1863
31.5k
    else if (cbytes == neblock) {
1864
17.6k
      memcpy(_dest, src, (unsigned int)neblock);
1865
17.6k
      nbytes = (int32_t)neblock;
1866
17.6k
    }
1867
13.9k
    else {
1868
13.9k
      if (compformat == BLOSC_BLOSCLZ_FORMAT) {
1869
92
        nbytes = blosclz_decompress(src, cbytes, _dest, (int)neblock);
1870
92
      }
1871
13.8k
      else if (compformat == BLOSC_LZ4_FORMAT) {
1872
3.52k
        nbytes = lz4_wrap_decompress((char*)src, (size_t)cbytes,
1873
3.52k
                                     (char*)_dest, (size_t)neblock);
1874
3.52k
      }
1875
10.3k
  #if defined(HAVE_ZLIB)
1876
10.3k
      else if (compformat == BLOSC_ZLIB_FORMAT) {
1877
4.40k
        nbytes = zlib_wrap_decompress((char*)src, (size_t)cbytes,
1878
4.40k
                                      (char*)_dest, (size_t)neblock);
1879
4.40k
      }
1880
5.92k
  #endif /*  HAVE_ZLIB */
1881
5.92k
  #if defined(HAVE_ZSTD)
1882
5.92k
      else if (compformat == BLOSC_ZSTD_FORMAT) {
1883
5.92k
        nbytes = zstd_wrap_decompress(thread_context,
1884
5.92k
                                      (char*)src, (size_t)cbytes,
1885
5.92k
                                      (char*)_dest, (size_t)neblock);
1886
5.92k
      }
1887
0
  #endif /*  HAVE_ZSTD */
1888
0
      else if (compformat == BLOSC_UDCODEC_FORMAT) {
1889
0
        bool getcell = false;
1890
1891
0
#if defined(HAVE_PLUGINS)
1892
0
        if ((context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) &&
1893
0
            (thread_context->zfp_cell_nitems > 0)) {
1894
0
          nbytes = zfp_getcell(thread_context, src, cbytes, _dest, neblock);
1895
0
          if (nbytes < 0) {
1896
0
            return BLOSC2_ERROR_DATA;
1897
0
          }
1898
0
          if (nbytes == thread_context->zfp_cell_nitems * typesize) {
1899
0
            getcell = true;
1900
0
          }
1901
0
        }
1902
0
#endif /* HAVE_PLUGINS */
1903
0
        if (!getcell) {
1904
0
          thread_context->zfp_cell_nitems = 0;
1905
0
          for (int i = 0; i < g_ncodecs; ++i) {
1906
0
            if (g_codecs[i].compcode == context->compcode) {
1907
0
              if (g_codecs[i].decoder == NULL) {
1908
                // Dynamically load codec plugin
1909
0
                if (fill_codec(&g_codecs[i]) < 0) {
1910
0
                  BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
1911
0
                  return BLOSC2_ERROR_CODEC_SUPPORT;
1912
0
                }
1913
0
              }
1914
0
              blosc2_dparams dparams;
1915
0
              blosc2_ctx_get_dparams(context, &dparams);
1916
0
              nbytes = g_codecs[i].decoder(src,
1917
0
                                           cbytes,
1918
0
                                           _dest,
1919
0
                                           neblock,
1920
0
                                           context->compcode_meta,
1921
0
                                           &dparams,
1922
0
                                           context->src);
1923
0
              goto urcodecsuccess;
1924
0
            }
1925
0
          }
1926
0
          BLOSC_TRACE_ERROR("User-defined compressor codec %d not found during decompression", context->compcode);
1927
0
          return BLOSC2_ERROR_CODEC_SUPPORT;
1928
0
        }
1929
0
      urcodecsuccess:
1930
0
        ;
1931
0
      }
1932
0
      else {
1933
0
        compname = clibcode_to_clibname(compformat);
1934
0
        BLOSC_TRACE_ERROR(
1935
0
                "Blosc has not been compiled with decompression "
1936
0
                "support for '%s' format.  "
1937
0
                "Please recompile for adding this support.", compname);
1938
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
1939
0
      }
1940
1941
      /* Check that decompressed bytes number is correct */
1942
13.9k
      if ((nbytes != neblock) && (thread_context->zfp_cell_nitems == 0)) {
1943
0
        return BLOSC2_ERROR_DATA;
1944
0
      }
1945
1946
13.9k
    }
1947
31.6k
    src += cbytes;
1948
    // ctbytes += cbytes;
1949
31.6k
    _dest += nbytes;
1950
31.6k
    ntbytes += nbytes;
1951
31.6k
  } /* Closes j < nstreams */
1952
1953
31.6k
  if (!instr_codec) {
1954
31.6k
    if (last_filter_index >= 0 || context->postfilter != NULL) {
1955
      /* Apply regular filter pipeline */
1956
22.2k
      int errcode = pipeline_backward(thread_context, bsize, dest, dest_offset, tmp, tmp2, tmp3,
1957
22.2k
                                      last_filter_index, nblock);
1958
22.2k
      if (errcode < 0)
1959
0
        return errcode;
1960
22.2k
    }
1961
31.6k
  }
1962
1963
  /* Return the number of uncompressed bytes */
1964
31.6k
  return (int)ntbytes;
1965
31.6k
}
1966
1967
1968
/* Serial version for compression/decompression */
1969
16.4k
static int serial_blosc(struct thread_context* thread_context) {
1970
16.4k
  blosc2_context* context = thread_context->parent_context;
1971
16.4k
  int32_t j, bsize, leftoverblock;
1972
16.4k
  int32_t cbytes;
1973
16.4k
  int32_t ntbytes = context->output_bytes;
1974
16.4k
  int32_t* bstarts = context->bstarts;
1975
16.4k
  uint8_t* tmp = thread_context->tmp;
1976
16.4k
  uint8_t* tmp2 = thread_context->tmp2;
1977
16.4k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
1978
16.4k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
1979
16.4k
  if (!context->do_compress && context->special_type) {
1980
    // Fake a runlen as if it was a memcpyed chunk
1981
0
    memcpyed = true;
1982
0
  }
1983
1984
296k
  for (j = 0; j < context->nblocks; j++) {
1985
285k
    if (context->do_compress && !memcpyed && !dict_training) {
1986
254k
      _sw32(bstarts + j, ntbytes);
1987
254k
    }
1988
285k
    bsize = context->blocksize;
1989
285k
    leftoverblock = 0;
1990
285k
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
1991
6.37k
      bsize = context->leftover;
1992
6.37k
      leftoverblock = 1;
1993
6.37k
    }
1994
285k
    if (context->do_compress) {
1995
254k
      if (memcpyed && !context->prefilter) {
1996
        /* We want to memcpy only */
1997
0
        memcpy(context->dest + context->header_overhead + j * context->blocksize,
1998
0
               context->src + j * context->blocksize, (unsigned int)bsize);
1999
0
        cbytes = (int32_t)bsize;
2000
0
      }
2001
254k
      else {
2002
        /* Regular compression */
2003
254k
        cbytes = blosc_c(thread_context, bsize, leftoverblock, ntbytes,
2004
254k
                         context->destsize, context->src, j * context->blocksize,
2005
254k
                         context->dest + ntbytes, tmp, tmp2);
2006
254k
        if (cbytes == 0) {
2007
5.97k
          ntbytes = 0;              /* incompressible data */
2008
5.97k
          break;
2009
5.97k
        }
2010
254k
      }
2011
254k
    }
2012
31.6k
    else {
2013
      /* Regular decompression */
2014
      // If memcpyed we don't have a bstarts section (because it is not needed)
2015
31.6k
      int32_t src_offset = memcpyed ?
2016
31.6k
          context->header_overhead + j * context->blocksize : sw32_(bstarts + j);
2017
31.6k
      cbytes = blosc_d(thread_context, bsize, leftoverblock, memcpyed,
2018
31.6k
                       context->src, context->srcsize, src_offset, j,
2019
31.6k
                       context->dest, j * context->blocksize, tmp, tmp2);
2020
31.6k
    }
2021
2022
279k
    if (cbytes < 0) {
2023
0
      ntbytes = cbytes;         /* error in blosc_c or blosc_d */
2024
0
      break;
2025
0
    }
2026
279k
    ntbytes += cbytes;
2027
279k
  }
2028
2029
16.4k
  return ntbytes;
2030
16.4k
}
2031
2032
static void t_blosc_do_job(void *ctxt);
2033
2034
/* Threaded version for compression/decompression */
2035
0
static int parallel_blosc(blosc2_context* context) {
2036
0
#ifdef BLOSC_POSIX_BARRIERS
2037
0
  int rc;
2038
0
#endif
2039
  /* Set sentinels */
2040
0
  context->thread_giveup_code = 1;
2041
0
  context->thread_nblock = -1;
2042
2043
0
  if (threads_callback) {
2044
0
    threads_callback(threads_callback_data, t_blosc_do_job,
2045
0
                     context->nthreads, sizeof(struct thread_context), (void*) context->thread_contexts);
2046
0
  }
2047
0
  else {
2048
    /* Synchronization point for all threads (wait for initialization) */
2049
0
    WAIT_INIT(-1, context);
2050
2051
    /* Synchronization point for all threads (wait for finalization) */
2052
0
    WAIT_FINISH(-1, context);
2053
0
  }
2054
2055
0
  if (context->thread_giveup_code <= 0) {
2056
    /* Compression/decompression gave up.  Return error code. */
2057
0
    return context->thread_giveup_code;
2058
0
  }
2059
2060
  /* Return the total bytes (de-)compressed in threads */
2061
0
  return (int)context->output_bytes;
2062
0
}
2063
2064
/* initialize a thread_context that has already been allocated */
2065
static int init_thread_context(struct thread_context* thread_context, blosc2_context* context, int32_t tid)
2066
1.46k
{
2067
1.46k
  int32_t ebsize;
2068
2069
1.46k
  thread_context->parent_context = context;
2070
1.46k
  thread_context->tid = tid;
2071
2072
1.46k
  ebsize = context->blocksize + context->typesize * (signed)sizeof(int32_t);
2073
1.46k
  thread_context->tmp_nbytes = (size_t)4 * ebsize;
2074
1.46k
  thread_context->tmp = my_malloc(thread_context->tmp_nbytes);
2075
1.46k
  BLOSC_ERROR_NULL(thread_context->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
2076
1.46k
  thread_context->tmp2 = thread_context->tmp + ebsize;
2077
1.46k
  thread_context->tmp3 = thread_context->tmp2 + ebsize;
2078
1.46k
  thread_context->tmp4 = thread_context->tmp3 + ebsize;
2079
1.46k
  thread_context->tmp_blocksize = context->blocksize;
2080
1.46k
  thread_context->zfp_cell_nitems = 0;
2081
1.46k
  thread_context->zfp_cell_start = 0;
2082
1.46k
  #if defined(HAVE_ZSTD)
2083
1.46k
  thread_context->zstd_cctx = NULL;
2084
1.46k
  thread_context->zstd_dctx = NULL;
2085
1.46k
  #endif
2086
2087
  /* Create the hash table for LZ4 in case we are using IPP */
2088
#ifdef HAVE_IPP
2089
  IppStatus status;
2090
  int inlen = thread_context->tmp_blocksize > 0 ? thread_context->tmp_blocksize : 1 << 16;
2091
  int hash_size = 0;
2092
  status = ippsEncodeLZ4HashTableGetSize_8u(&hash_size);
2093
  if (status != ippStsNoErr) {
2094
      BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableGetSize_8u.");
2095
  }
2096
  Ipp8u *hash_table = ippsMalloc_8u(hash_size);
2097
  status = ippsEncodeLZ4HashTableInit_8u(hash_table, inlen);
2098
  if (status != ippStsNoErr) {
2099
    BLOSC_TRACE_ERROR("Error in ippsEncodeLZ4HashTableInit_8u.");
2100
  }
2101
  thread_context->lz4_hash_table = hash_table;
2102
#endif
2103
1.46k
  return 0;
2104
1.46k
}
2105
2106
static struct thread_context*
2107
1.46k
create_thread_context(blosc2_context* context, int32_t tid) {
2108
1.46k
  struct thread_context* thread_context;
2109
1.46k
  thread_context = (struct thread_context*)my_malloc(sizeof(struct thread_context));
2110
1.46k
  BLOSC_ERROR_NULL(thread_context, NULL);
2111
1.46k
  int rc = init_thread_context(thread_context, context, tid);
2112
1.46k
  if (rc < 0) {
2113
0
    return NULL;
2114
0
  }
2115
1.46k
  return thread_context;
2116
1.46k
}
2117
2118
/* free members of thread_context, but not thread_context itself */
2119
1.46k
static void destroy_thread_context(struct thread_context* thread_context) {
2120
1.46k
  my_free(thread_context->tmp);
2121
1.46k
#if defined(HAVE_ZSTD)
2122
1.46k
  if (thread_context->zstd_cctx != NULL) {
2123
1.02k
    ZSTD_freeCCtx(thread_context->zstd_cctx);
2124
1.02k
  }
2125
1.46k
  if (thread_context->zstd_dctx != NULL) {
2126
0
    ZSTD_freeDCtx(thread_context->zstd_dctx);
2127
0
  }
2128
1.46k
#endif
2129
#ifdef HAVE_IPP
2130
  if (thread_context->lz4_hash_table != NULL) {
2131
    ippsFree(thread_context->lz4_hash_table);
2132
  }
2133
#endif
2134
1.46k
}
2135
2136
1.46k
void free_thread_context(struct thread_context* thread_context) {
2137
1.46k
  destroy_thread_context(thread_context);
2138
1.46k
  my_free(thread_context);
2139
1.46k
}
2140
2141
2142
16.4k
int check_nthreads(blosc2_context* context) {
2143
16.4k
  if (context->nthreads <= 0) {
2144
0
    BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
2145
0
    return BLOSC2_ERROR_INVALID_PARAM;
2146
0
  }
2147
2148
16.4k
  if (context->new_nthreads != context->nthreads) {
2149
0
    if (context->nthreads > 1) {
2150
0
      release_threadpool(context);
2151
0
    }
2152
0
    context->nthreads = context->new_nthreads;
2153
0
  }
2154
16.4k
  if (context->new_nthreads > 1 && context->threads_started == 0) {
2155
0
    init_threadpool(context);
2156
0
  }
2157
2158
16.4k
  return context->nthreads;
2159
16.4k
}
2160
2161
/* Do the compression or decompression of the buffer depending on the
2162
   global params. */
2163
16.4k
static int do_job(blosc2_context* context) {
2164
16.4k
  int32_t ntbytes;
2165
2166
  /* Set sentinels */
2167
16.4k
  context->dref_not_init = 1;
2168
2169
  /* Check whether we need to restart threads */
2170
16.4k
  check_nthreads(context);
2171
2172
  /* Run the serial version when nthreads is 1 or when the buffers are
2173
     not larger than blocksize */
2174
16.4k
  if (context->nthreads == 1 || (context->sourcesize / context->blocksize) <= 1) {
2175
    /* The context for this 'thread' has no been initialized yet */
2176
16.4k
    if (context->serial_context == NULL) {
2177
1
      context->serial_context = create_thread_context(context, 0);
2178
1
    }
2179
16.4k
    else if (context->blocksize != context->serial_context->tmp_blocksize) {
2180
1.46k
      free_thread_context(context->serial_context);
2181
1.46k
      context->serial_context = create_thread_context(context, 0);
2182
1.46k
    }
2183
16.4k
    BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
2184
16.4k
    ntbytes = serial_blosc(context->serial_context);
2185
16.4k
  }
2186
0
  else {
2187
0
    ntbytes = parallel_blosc(context);
2188
0
  }
2189
2190
16.4k
  return ntbytes;
2191
16.4k
}
2192
2193
2194
static int initialize_context_compression(
2195
        blosc2_context* context, const void* src, int32_t srcsize, void* dest,
2196
        int32_t destsize, int clevel, uint8_t const *filters,
2197
        uint8_t const *filters_meta, int32_t typesize, int compressor,
2198
        int32_t blocksize, int16_t new_nthreads, int16_t nthreads,
2199
        int32_t splitmode,
2200
        int tuner_id, void *tuner_params,
2201
15.0k
        blosc2_schunk* schunk) {
2202
2203
  /* Set parameters */
2204
15.0k
  context->do_compress = 1;
2205
15.0k
  context->src = (const uint8_t*)src;
2206
15.0k
  context->srcsize = srcsize;
2207
15.0k
  context->dest = (uint8_t*)dest;
2208
15.0k
  context->output_bytes = 0;
2209
15.0k
  context->destsize = destsize;
2210
15.0k
  context->sourcesize = srcsize;
2211
15.0k
  context->typesize = typesize;
2212
15.0k
  context->filter_flags = filters_to_flags(filters);
2213
105k
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
2214
90.5k
    context->filters[i] = filters[i];
2215
90.5k
    context->filters_meta[i] = filters_meta[i];
2216
90.5k
  }
2217
15.0k
  context->compcode = compressor;
2218
15.0k
  context->nthreads = nthreads;
2219
15.0k
  context->new_nthreads = new_nthreads;
2220
15.0k
  context->end_threads = 0;
2221
15.0k
  context->clevel = clevel;
2222
15.0k
  context->schunk = schunk;
2223
15.0k
  context->tuner_params = tuner_params;
2224
15.0k
  context->tuner_id = tuner_id;
2225
15.0k
  context->splitmode = splitmode;
2226
  /* tuner some compression parameters */
2227
15.0k
  context->blocksize = (int32_t)blocksize;
2228
15.0k
  int rc = 0;
2229
15.0k
  if (context->tuner_params != NULL) {
2230
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2231
0
      if (blosc_stune_next_cparams(context) < 0) {
2232
0
        BLOSC_TRACE_ERROR("Error in stune next_cparams func\n");
2233
0
        return BLOSC2_ERROR_TUNER;
2234
0
      }
2235
0
    } else {
2236
0
      for (int i = 0; i < g_ntuners; ++i) {
2237
0
        if (g_tuners[i].id == context->tuner_id) {
2238
0
          if (g_tuners[i].next_cparams == NULL) {
2239
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2240
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2241
0
              return BLOSC2_ERROR_FAILURE;
2242
0
            }
2243
0
          }
2244
0
          if (g_tuners[i].next_cparams(context) < 0) {
2245
0
            BLOSC_TRACE_ERROR("Error in tuner %d next_cparams func\n", context->tuner_id);
2246
0
            return BLOSC2_ERROR_TUNER;
2247
0
          }
2248
0
          if (g_tuners[i].id == BLOSC_BTUNE && context->blocksize == 0) {
2249
            // Call stune for initializing blocksize
2250
0
            if (blosc_stune_next_blocksize(context) < 0) {
2251
0
              BLOSC_TRACE_ERROR("Error in stune next_blocksize func\n");
2252
0
              return BLOSC2_ERROR_TUNER;
2253
0
            }
2254
0
          }
2255
0
          goto urtunersuccess;
2256
0
        }
2257
0
      }
2258
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2259
0
      return BLOSC2_ERROR_INVALID_PARAM;
2260
0
    }
2261
15.0k
  } else {
2262
15.0k
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2263
15.0k
      rc = blosc_stune_next_blocksize(context);
2264
15.0k
    } else {
2265
0
      for (int i = 0; i < g_ntuners; ++i) {
2266
0
        if (g_tuners[i].id == context->tuner_id) {
2267
0
          if (g_tuners[i].next_blocksize == NULL) {
2268
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2269
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2270
0
              return BLOSC2_ERROR_FAILURE;
2271
0
            }
2272
0
          }
2273
0
          rc = g_tuners[i].next_blocksize(context);
2274
0
          goto urtunersuccess;
2275
0
        }
2276
0
      }
2277
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2278
0
      return BLOSC2_ERROR_INVALID_PARAM;
2279
0
    }
2280
15.0k
  }
2281
15.0k
  urtunersuccess:;
2282
15.0k
  if (rc < 0) {
2283
0
    BLOSC_TRACE_ERROR("Error in tuner next_blocksize func\n");
2284
0
    return BLOSC2_ERROR_TUNER;
2285
0
  }
2286
2287
2288
  /* Check buffer size limits */
2289
15.0k
  if (srcsize > BLOSC2_MAX_BUFFERSIZE) {
2290
0
    BLOSC_TRACE_ERROR("Input buffer size cannot exceed %d bytes.",
2291
0
                      BLOSC2_MAX_BUFFERSIZE);
2292
0
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2293
0
  }
2294
2295
15.0k
  if (destsize < BLOSC2_MAX_OVERHEAD) {
2296
31
    BLOSC_TRACE_ERROR("Output buffer size should be larger than %d bytes.",
2297
31
                      BLOSC2_MAX_OVERHEAD);
2298
31
    return BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED;
2299
31
  }
2300
2301
  /* Compression level */
2302
15.0k
  if (clevel < 0 || clevel > 9) {
2303
    /* If clevel not in 0..9, print an error */
2304
0
    BLOSC_TRACE_ERROR("`clevel` parameter must be between 0 and 9!.");
2305
0
    return BLOSC2_ERROR_CODEC_PARAM;
2306
0
  }
2307
2308
  /* Check typesize limits */
2309
15.0k
  if (context->typesize > BLOSC2_MAXTYPESIZE) {
2310
    // If typesize is too large for Blosc2, return an error
2311
0
    BLOSC_TRACE_ERROR("Typesize cannot exceed %d bytes.", BLOSC2_MAXTYPESIZE);
2312
0
    return BLOSC2_ERROR_INVALID_PARAM;
2313
0
  }
2314
  /* Now, cap typesize so that blosc2 split machinery can continue to work */
2315
15.0k
  if (context->typesize > BLOSC_MAX_TYPESIZE) {
2316
    /* If typesize is too large, treat buffer as an 1-byte stream. */
2317
0
    context->typesize = 1;
2318
0
  }
2319
2320
15.0k
  blosc2_calculate_blocks(context);
2321
2322
15.0k
  return 1;
2323
15.0k
}
2324
2325
2326
static int initialize_context_decompression(blosc2_context* context, blosc_header* header, const void* src,
2327
1.44k
                                            int32_t srcsize, void* dest, int32_t destsize) {
2328
1.44k
  int32_t bstarts_end;
2329
2330
1.44k
  context->do_compress = 0;
2331
1.44k
  context->src = (const uint8_t*)src;
2332
1.44k
  context->srcsize = srcsize;
2333
1.44k
  context->dest = (uint8_t*)dest;
2334
1.44k
  context->destsize = destsize;
2335
1.44k
  context->output_bytes = 0;
2336
1.44k
  context->end_threads = 0;
2337
2338
1.44k
  int rc = blosc2_initialize_context_from_header(context, header);
2339
1.44k
  if (rc < 0) {
2340
0
    return rc;
2341
0
  }
2342
2343
  /* Check that we have enough space to decompress */
2344
1.44k
  if (context->sourcesize > (int32_t)context->destsize) {
2345
0
    return BLOSC2_ERROR_WRITE_BUFFER;
2346
0
  }
2347
2348
1.44k
  if (context->block_maskout != NULL && context->block_maskout_nitems != context->nblocks) {
2349
0
    BLOSC_TRACE_ERROR("The number of items in block_maskout (%d) must match the number"
2350
0
                      " of blocks in chunk (%d).",
2351
0
                      context->block_maskout_nitems, context->nblocks);
2352
0
    return BLOSC2_ERROR_DATA;
2353
0
  }
2354
2355
1.44k
  context->special_type = (header->blosc2_flags >> 4) & BLOSC2_SPECIAL_MASK;
2356
1.44k
  if (context->special_type > BLOSC2_SPECIAL_LASTID) {
2357
0
    BLOSC_TRACE_ERROR("Unknown special values ID (%d) ",
2358
0
                      context->special_type);
2359
0
    return BLOSC2_ERROR_DATA;
2360
0
  }
2361
2362
1.44k
  int memcpyed = (context->header_flags & (uint8_t) BLOSC_MEMCPYED);
2363
1.44k
  if (memcpyed && (header->cbytes != header->nbytes + context->header_overhead)) {
2364
0
    BLOSC_TRACE_ERROR("Wrong header info for this memcpyed chunk");
2365
0
    return BLOSC2_ERROR_DATA;
2366
0
  }
2367
2368
1.44k
  if ((header->nbytes == 0) && (header->cbytes == context->header_overhead) &&
2369
0
      !context->special_type) {
2370
    // A compressed buffer with only a header can only contain a zero-length buffer
2371
0
    return 0;
2372
0
  }
2373
2374
1.44k
  context->bstarts = (int32_t *) (context->src + context->header_overhead);
2375
1.44k
  bstarts_end = context->header_overhead;
2376
1.44k
  if (!context->special_type && !memcpyed) {
2377
    /* If chunk is not special or a memcpyed, we do have a bstarts section */
2378
1.44k
    bstarts_end = (int32_t)(context->header_overhead + (context->nblocks * sizeof(int32_t)));
2379
1.44k
  }
2380
2381
1.44k
  if (srcsize < bstarts_end) {
2382
0
    BLOSC_TRACE_ERROR("`bstarts` exceeds length of source buffer.");
2383
0
    return BLOSC2_ERROR_READ_BUFFER;
2384
0
  }
2385
1.44k
  srcsize -= bstarts_end;
2386
2387
  /* Read optional dictionary if flag set */
2388
1.44k
  if (context->blosc2_flags & BLOSC2_USEDICT) {
2389
0
#if defined(HAVE_ZSTD)
2390
0
    context->use_dict = 1;
2391
0
    if (context->dict_ddict != NULL) {
2392
      // Free the existing dictionary (probably from another chunk)
2393
0
      ZSTD_freeDDict(context->dict_ddict);
2394
0
    }
2395
    // The trained dictionary is after the bstarts block
2396
0
    if (srcsize < (signed)sizeof(int32_t)) {
2397
0
      BLOSC_TRACE_ERROR("Not enough space to read size of dictionary.");
2398
0
      return BLOSC2_ERROR_READ_BUFFER;
2399
0
    }
2400
0
    srcsize -= sizeof(int32_t);
2401
    // Read dictionary size
2402
0
    context->dict_size = sw32_(context->src + bstarts_end);
2403
0
    if (context->dict_size <= 0 || context->dict_size > BLOSC2_MAXDICTSIZE) {
2404
0
      BLOSC_TRACE_ERROR("Dictionary size is smaller than minimum or larger than maximum allowed.");
2405
0
      return BLOSC2_ERROR_CODEC_DICT;
2406
0
    }
2407
0
    if (srcsize < (int32_t)context->dict_size) {
2408
0
      BLOSC_TRACE_ERROR("Not enough space to read entire dictionary.");
2409
0
      return BLOSC2_ERROR_READ_BUFFER;
2410
0
    }
2411
0
    srcsize -= context->dict_size;
2412
    // Read dictionary
2413
0
    context->dict_buffer = (void*)(context->src + bstarts_end + sizeof(int32_t));
2414
0
    context->dict_ddict = ZSTD_createDDict(context->dict_buffer, context->dict_size);
2415
0
#endif   // HAVE_ZSTD
2416
0
  }
2417
2418
1.44k
  return 0;
2419
1.44k
}
2420
2421
15.0k
static int write_compression_header(blosc2_context* context, bool extended_header) {
2422
15.0k
  blosc_header header;
2423
15.0k
  int dont_split;
2424
15.0k
  int dict_training = context->use_dict && (context->dict_cdict == NULL);
2425
2426
15.0k
  context->header_flags = 0;
2427
2428
15.0k
  if (context->clevel == 0) {
2429
    /* Compression level 0 means buffer to be memcpy'ed */
2430
9
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2431
9
  }
2432
15.0k
  if (context->sourcesize < BLOSC_MIN_BUFFERSIZE) {
2433
    /* Buffer is too small.  Try memcpy'ing. */
2434
0
    context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2435
0
  }
2436
2437
15.0k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2438
15.0k
  if (extended_header) {
2439
    /* Indicate that we are building an extended header */
2440
15.0k
    context->header_overhead = BLOSC_EXTENDED_HEADER_LENGTH;
2441
15.0k
    context->header_flags |= (BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE);
2442
    /* Store filter pipeline info at the end of the header */
2443
15.0k
    if (dict_training || memcpyed) {
2444
9
      context->bstarts = NULL;
2445
9
      context->output_bytes = context->header_overhead;
2446
15.0k
    } else {
2447
15.0k
      context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2448
15.0k
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2449
15.0k
    }
2450
15.0k
  } else {
2451
    // Regular header
2452
0
    context->header_overhead = BLOSC_MIN_HEADER_LENGTH;
2453
0
    if (memcpyed) {
2454
0
      context->bstarts = NULL;
2455
0
      context->output_bytes = context->header_overhead;
2456
0
    } else {
2457
0
      context->bstarts = (int32_t *) (context->dest + context->header_overhead);
2458
0
      context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2459
0
    }
2460
0
  }
2461
2462
  // when memcpyed bit is set, there is no point in dealing with others
2463
15.0k
  if (!memcpyed) {
2464
15.0k
    if (context->filter_flags & BLOSC_DOSHUFFLE) {
2465
      /* Byte-shuffle is active */
2466
0
      context->header_flags |= BLOSC_DOSHUFFLE;
2467
0
    }
2468
2469
15.0k
    if (context->filter_flags & BLOSC_DOBITSHUFFLE) {
2470
      /* Bit-shuffle is active */
2471
5.44k
      context->header_flags |= BLOSC_DOBITSHUFFLE;
2472
5.44k
    }
2473
2474
15.0k
    if (context->filter_flags & BLOSC_DODELTA) {
2475
      /* Delta is active */
2476
0
      context->header_flags |= BLOSC_DODELTA;
2477
0
    }
2478
2479
15.0k
    dont_split = !split_block(context, context->typesize,
2480
15.0k
                              context->blocksize);
2481
2482
    /* dont_split is in bit 4 */
2483
15.0k
    context->header_flags |= dont_split << 4;
2484
    /* codec starts at bit 5 */
2485
15.0k
    uint8_t compformat = compcode_to_compformat(context->compcode);
2486
15.0k
    context->header_flags |= compformat << 5;
2487
15.0k
  }
2488
2489
  // Create blosc header and store to dest
2490
15.0k
  blosc2_intialize_header_from_context(context, &header, extended_header);
2491
2492
15.0k
  memcpy(context->dest, &header, (extended_header) ?
2493
15.0k
    BLOSC_EXTENDED_HEADER_LENGTH : BLOSC_MIN_HEADER_LENGTH);
2494
2495
15.0k
  return 1;
2496
15.0k
}
2497
2498
2499
15.0k
static int blosc_compress_context(blosc2_context* context) {
2500
15.0k
  int ntbytes = 0;
2501
15.0k
  blosc_timestamp_t last, current;
2502
15.0k
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
2503
2504
15.0k
  blosc_set_timestamp(&last);
2505
2506
15.0k
  if (!memcpyed) {
2507
    /* Do the actual compression */
2508
15.0k
    ntbytes = do_job(context);
2509
15.0k
    if (ntbytes < 0) {
2510
0
      return ntbytes;
2511
0
    }
2512
15.0k
    if (ntbytes == 0) {
2513
      // Try out with a memcpy later on (last chance for fitting src buffer in dest).
2514
5.97k
      context->header_flags |= (uint8_t)BLOSC_MEMCPYED;
2515
5.97k
      memcpyed = true;
2516
5.97k
    }
2517
15.0k
  }
2518
2519
15.0k
  int dont_split = (context->header_flags & 0x10) >> 4;
2520
15.0k
  int nstreams = context->nblocks;
2521
15.0k
  if (!dont_split) {
2522
    // When splitting, the number of streams is computed differently
2523
9
    if (context->leftover) {
2524
3
      nstreams = (context->nblocks - 1) * context->typesize + 1;
2525
3
    }
2526
6
    else {
2527
6
      nstreams *= context->typesize;
2528
6
    }
2529
9
  }
2530
2531
15.0k
  if (memcpyed) {
2532
5.98k
    if (context->sourcesize + context->header_overhead > context->destsize) {
2533
      /* We are exceeding maximum output size */
2534
5.98k
      ntbytes = 0;
2535
5.98k
    }
2536
0
    else {
2537
0
      context->output_bytes = context->header_overhead;
2538
0
      ntbytes = do_job(context);
2539
0
      if (ntbytes < 0) {
2540
0
        return ntbytes;
2541
0
      }
2542
      // Success!  update the memcpy bit in header
2543
0
      context->dest[BLOSC2_CHUNK_FLAGS] = context->header_flags;
2544
      // and clear the memcpy bit in context (for next reuse)
2545
0
      context->header_flags &= ~(uint8_t)BLOSC_MEMCPYED;
2546
0
    }
2547
5.98k
  }
2548
9.07k
  else {
2549
    // Check whether we have a run for the whole chunk
2550
9.07k
    int start_csizes = context->header_overhead + 4 * context->nblocks;
2551
9.07k
    if (ntbytes == (int)(start_csizes + nstreams * sizeof(int32_t))) {
2552
      // The streams are all zero runs (by construction).  Encode it...
2553
0
      context->dest[BLOSC2_CHUNK_BLOSC2_FLAGS] |= BLOSC2_SPECIAL_ZERO << 4;
2554
      // ...and assign the new chunk length
2555
0
      ntbytes = context->header_overhead;
2556
0
    }
2557
9.07k
  }
2558
2559
  /* Set the number of compressed bytes in header */
2560
15.0k
  _sw32(context->dest + BLOSC2_CHUNK_CBYTES, ntbytes);
2561
15.0k
  if (context->blosc2_flags & BLOSC2_INSTR_CODEC) {
2562
0
    dont_split = (context->header_flags & 0x10) >> 4;
2563
0
    int32_t blocksize = dont_split ? (int32_t)sizeof(blosc2_instr) : (int32_t)sizeof(blosc2_instr) * context->typesize;
2564
0
    _sw32(context->dest + BLOSC2_CHUNK_NBYTES, nstreams * (int32_t)sizeof(blosc2_instr));
2565
0
    _sw32(context->dest + BLOSC2_CHUNK_BLOCKSIZE, blocksize);
2566
0
  }
2567
2568
  /* Set the number of bytes in dest buffer (might be useful for tuner) */
2569
15.0k
  context->destsize = ntbytes;
2570
2571
15.0k
  if (context->tuner_params != NULL) {
2572
0
    blosc_set_timestamp(&current);
2573
0
    double ctime = blosc_elapsed_secs(last, current);
2574
0
    int rc;
2575
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
2576
0
      rc = blosc_stune_update(context, ctime);
2577
0
    } else {
2578
0
      for (int i = 0; i < g_ntuners; ++i) {
2579
0
        if (g_tuners[i].id == context->tuner_id) {
2580
0
          if (g_tuners[i].update == NULL) {
2581
0
            if (fill_tuner(&g_tuners[i]) < 0) {
2582
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
2583
0
              return BLOSC2_ERROR_FAILURE;
2584
0
            }
2585
0
          }
2586
0
          rc = g_tuners[i].update(context, ctime);
2587
0
          goto urtunersuccess;
2588
0
        }
2589
0
      }
2590
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
2591
0
      return BLOSC2_ERROR_INVALID_PARAM;
2592
0
      urtunersuccess:;
2593
0
    }
2594
0
    if (rc < 0) {
2595
0
      BLOSC_TRACE_ERROR("Error in tuner update func\n");
2596
0
      return BLOSC2_ERROR_TUNER;
2597
0
    }
2598
0
  }
2599
2600
15.0k
  return ntbytes;
2601
15.0k
}
2602
2603
2604
/* The public secure routine for compression with context. */
2605
int blosc2_compress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2606
0
                        void* dest, int32_t destsize) {
2607
0
  int error, cbytes;
2608
2609
0
  if (context->do_compress != 1) {
2610
0
    BLOSC_TRACE_ERROR("Context is not meant for compression.  Giving up.");
2611
0
    return BLOSC2_ERROR_INVALID_PARAM;
2612
0
  }
2613
2614
0
  error = initialize_context_compression(
2615
0
          context, src, srcsize, dest, destsize,
2616
0
          context->clevel, context->filters, context->filters_meta,
2617
0
          context->typesize, context->compcode, context->blocksize,
2618
0
          context->new_nthreads, context->nthreads, context->splitmode,
2619
0
          context->tuner_id, context->tuner_params, context->schunk);
2620
0
  if (error <= 0) {
2621
0
    return error;
2622
0
  }
2623
2624
  /* Write the extended header */
2625
0
  error = write_compression_header(context, true);
2626
0
  if (error < 0) {
2627
0
    return error;
2628
0
  }
2629
2630
0
  cbytes = blosc_compress_context(context);
2631
0
  if (cbytes < 0) {
2632
0
    return cbytes;
2633
0
  }
2634
2635
0
  if (context->use_dict && context->dict_cdict == NULL) {
2636
2637
0
    if (context->compcode != BLOSC_ZSTD) {
2638
0
      const char* compname;
2639
0
      compname = clibcode_to_clibname(context->compcode);
2640
0
      BLOSC_TRACE_ERROR("Codec %s does not support dicts.  Giving up.",
2641
0
                        compname);
2642
0
      return BLOSC2_ERROR_CODEC_DICT;
2643
0
    }
2644
2645
0
#ifdef HAVE_ZSTD
2646
    // Build the dictionary out of the filters outcome and compress with it
2647
0
    int32_t dict_maxsize = BLOSC2_MAXDICTSIZE;
2648
    // Do not make the dict more than 5% larger than uncompressed buffer
2649
0
    if (dict_maxsize > srcsize / 20) {
2650
0
      dict_maxsize = srcsize / 20;
2651
0
    }
2652
0
    void* samples_buffer = context->dest + context->header_overhead;
2653
0
    unsigned nblocks = (unsigned)context->nblocks;
2654
0
    int dont_split = (context->header_flags & 0x10) >> 4;
2655
0
    if (!dont_split) {
2656
0
      nblocks = nblocks * context->typesize;
2657
0
    }
2658
0
    if (nblocks < 8) {
2659
0
      nblocks = 8;  // the minimum that accepts zstd as of 1.4.0
2660
0
    }
2661
2662
    // 1 allows to use most of the chunk for training, but it is slower,
2663
    // and it does not always seem to improve compression ratio.
2664
    // Let's use 16, which is faster and still gives good results
2665
    // on test_dict_schunk.c, but this seems very dependent on the data.
2666
0
    unsigned sample_fraction = 16;
2667
0
    size_t sample_size = context->sourcesize / nblocks / sample_fraction;
2668
2669
    // Populate the samples sizes for training the dictionary
2670
0
    size_t* samples_sizes = malloc(nblocks * sizeof(void*));
2671
0
    BLOSC_ERROR_NULL(samples_sizes, BLOSC2_ERROR_MEMORY_ALLOC);
2672
0
    for (size_t i = 0; i < nblocks; i++) {
2673
0
      samples_sizes[i] = sample_size;
2674
0
    }
2675
2676
    // Train from samples
2677
0
    void* dict_buffer = malloc(dict_maxsize);
2678
0
    BLOSC_ERROR_NULL(dict_buffer, BLOSC2_ERROR_MEMORY_ALLOC);
2679
0
    int32_t dict_actual_size = (int32_t)ZDICT_trainFromBuffer(
2680
0
        dict_buffer, dict_maxsize,
2681
0
        samples_buffer, samples_sizes, nblocks);
2682
2683
    // TODO: experiment with parameters of low-level fast cover algorithm
2684
    // Note that this API is still unstable.  See: https://github.com/facebook/zstd/issues/1599
2685
    // ZDICT_fastCover_params_t fast_cover_params;
2686
    // memset(&fast_cover_params, 0, sizeof(fast_cover_params));
2687
    // fast_cover_params.d = nblocks;
2688
    // fast_cover_params.steps = 4;
2689
    // fast_cover_params.zParams.compressionLevel = context->clevel;
2690
    // size_t dict_actual_size = ZDICT_optimizeTrainFromBuffer_fastCover(
2691
    //   dict_buffer, dict_maxsize, samples_buffer, samples_sizes, nblocks,
2692
    //   &fast_cover_params);
2693
2694
0
    if (ZDICT_isError(dict_actual_size) != ZSTD_error_no_error) {
2695
0
      BLOSC_TRACE_ERROR("Error in ZDICT_trainFromBuffer(): '%s'."
2696
0
                        "  Giving up.", ZDICT_getErrorName(dict_actual_size));
2697
0
      return BLOSC2_ERROR_CODEC_DICT;
2698
0
    }
2699
0
    assert(dict_actual_size > 0);
2700
0
    free(samples_sizes);
2701
2702
    // Update bytes counter and pointers to bstarts for the new compressed buffer
2703
0
    context->bstarts = (int32_t*)(context->dest + context->header_overhead);
2704
0
    context->output_bytes = context->header_overhead + (int32_t)sizeof(int32_t) * context->nblocks;
2705
    /* Write the size of trained dict at the end of bstarts */
2706
0
    _sw32(context->dest + context->output_bytes, (int32_t)dict_actual_size);
2707
0
    context->output_bytes += sizeof(int32_t);
2708
    /* Write the trained dict afterwards */
2709
0
    context->dict_buffer = context->dest + context->output_bytes;
2710
0
    memcpy(context->dict_buffer, dict_buffer, (unsigned int)dict_actual_size);
2711
0
    context->dict_cdict = ZSTD_createCDict(dict_buffer, dict_actual_size, 1);  // TODO: use get_accel()
2712
0
    free(dict_buffer);      // the dictionary is copied in the header now
2713
0
    context->output_bytes += (int32_t)dict_actual_size;
2714
0
    context->dict_size = dict_actual_size;
2715
2716
    /* Compress with dict */
2717
0
    cbytes = blosc_compress_context(context);
2718
2719
    // Invalidate the dictionary for compressing other chunks using the same context
2720
0
    context->dict_buffer = NULL;
2721
0
    ZSTD_freeCDict(context->dict_cdict);
2722
0
    context->dict_cdict = NULL;
2723
0
#endif  // HAVE_ZSTD
2724
0
  }
2725
2726
0
  return cbytes;
2727
0
}
2728
2729
2730
void build_filters(const int doshuffle, const int delta,
2731
15.0k
                   const int32_t typesize, uint8_t* filters) {
2732
2733
  /* Fill the end part of the filter pipeline */
2734
15.0k
  if ((doshuffle == BLOSC_SHUFFLE) && (typesize > 1))
2735
0
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_SHUFFLE;
2736
15.0k
  if (doshuffle == BLOSC_BITSHUFFLE)
2737
5.46k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_BITSHUFFLE;
2738
15.0k
  if (doshuffle == BLOSC_NOSHUFFLE)
2739
5.34k
    filters[BLOSC2_MAX_FILTERS - 1] = BLOSC_NOSHUFFLE;
2740
15.0k
  if (delta)
2741
0
    filters[BLOSC2_MAX_FILTERS - 2] = BLOSC_DELTA;
2742
15.0k
}
2743
2744
/* The public secure routine for compression. */
2745
int blosc2_compress(int clevel, int doshuffle, int32_t typesize,
2746
15.0k
                    const void* src, int32_t srcsize, void* dest, int32_t destsize) {
2747
15.0k
  int error;
2748
15.0k
  int result;
2749
15.0k
  char* envvar;
2750
2751
  /* Check whether the library should be initialized */
2752
15.0k
  if (!g_initlib) blosc2_init();
2753
2754
  /* Check for a BLOSC_CLEVEL environment variable */
2755
15.0k
  envvar = getenv("BLOSC_CLEVEL");
2756
15.0k
  if (envvar != NULL) {
2757
0
    long value;
2758
0
    errno = 0; /* To distinguish success/failure after call */
2759
0
    value = strtol(envvar, NULL, 10);
2760
0
    if ((errno != EINVAL) && (value >= 0)) {
2761
0
      clevel = (int)value;
2762
0
    }
2763
0
    else {
2764
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
2765
0
    }
2766
0
  }
2767
2768
  /* Check for a BLOSC_SHUFFLE environment variable */
2769
15.0k
  envvar = getenv("BLOSC_SHUFFLE");
2770
15.0k
  if (envvar != NULL) {
2771
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
2772
0
      doshuffle = BLOSC_NOSHUFFLE;
2773
0
    }
2774
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
2775
0
      doshuffle = BLOSC_SHUFFLE;
2776
0
    }
2777
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
2778
0
      doshuffle = BLOSC_BITSHUFFLE;
2779
0
    }
2780
0
    else {
2781
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
2782
0
    }
2783
0
  }
2784
2785
  /* Check for a BLOSC_DELTA environment variable */
2786
15.0k
  envvar = getenv("BLOSC_DELTA");
2787
15.0k
  if (envvar != NULL) {
2788
0
    if (strcmp(envvar, "1") == 0) {
2789
0
      blosc2_set_delta(1);
2790
0
    } else if (strcmp(envvar, "0") == 0) {
2791
0
      blosc2_set_delta(0);
2792
0
    }
2793
0
    else {
2794
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
2795
0
    }
2796
0
  }
2797
2798
  /* Check for a BLOSC_TYPESIZE environment variable */
2799
15.0k
  envvar = getenv("BLOSC_TYPESIZE");
2800
15.0k
  if (envvar != NULL) {
2801
0
    long value;
2802
0
    errno = 0; /* To distinguish success/failure after call */
2803
0
    value = strtol(envvar, NULL, 10);
2804
0
    if ((errno != EINVAL) && (value > 0)) {
2805
0
      typesize = (int32_t)value;
2806
0
    }
2807
0
    else {
2808
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
2809
0
    }
2810
0
  }
2811
2812
  /* Check for a BLOSC_COMPRESSOR environment variable */
2813
15.0k
  envvar = getenv("BLOSC_COMPRESSOR");
2814
15.0k
  if (envvar != NULL) {
2815
0
    result = blosc1_set_compressor(envvar);
2816
0
    if (result < 0) {
2817
0
      BLOSC_TRACE_WARNING("BLOSC_COMPRESSOR environment variable '%s' not recognized\n", envvar);
2818
0
    }
2819
0
  }
2820
2821
  /* Check for a BLOSC_BLOCKSIZE environment variable */
2822
15.0k
  envvar = getenv("BLOSC_BLOCKSIZE");
2823
15.0k
  if (envvar != NULL) {
2824
0
    long blocksize;
2825
0
    errno = 0; /* To distinguish success/failure after call */
2826
0
    blocksize = strtol(envvar, NULL, 10);
2827
0
    if ((errno != EINVAL) && (blocksize > 0)) {
2828
0
      blosc1_set_blocksize((size_t) blocksize);
2829
0
    }
2830
0
    else {
2831
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
2832
0
    }
2833
0
  }
2834
2835
  /* Check for a BLOSC_NTHREADS environment variable */
2836
15.0k
  envvar = getenv("BLOSC_NTHREADS");
2837
15.0k
  if (envvar != NULL) {
2838
0
    long nthreads;
2839
0
    errno = 0; /* To distinguish success/failure after call */
2840
0
    nthreads = strtol(envvar, NULL, 10);
2841
0
    if ((errno != EINVAL) && (nthreads > 0)) {
2842
0
      result = blosc2_set_nthreads((int16_t) nthreads);
2843
0
      if (result < 0) {
2844
0
        BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
2845
0
      }
2846
0
    }
2847
0
  }
2848
2849
  /* Check for a BLOSC_SPLITMODE environment variable */
2850
15.0k
  envvar = getenv("BLOSC_SPLITMODE");
2851
15.0k
  if (envvar != NULL) {
2852
0
    int32_t splitmode = -1;
2853
0
    if (strcmp(envvar, "ALWAYS") == 0) {
2854
0
      splitmode = BLOSC_ALWAYS_SPLIT;
2855
0
    }
2856
0
    else if (strcmp(envvar, "NEVER") == 0) {
2857
0
      splitmode = BLOSC_NEVER_SPLIT;
2858
0
    }
2859
0
    else if (strcmp(envvar, "AUTO") == 0) {
2860
0
      splitmode = BLOSC_AUTO_SPLIT;
2861
0
    }
2862
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
2863
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
2864
0
    }
2865
0
    else {
2866
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
2867
0
    }
2868
2869
0
    if (splitmode >= 0) {
2870
0
      blosc1_set_splitmode(splitmode);
2871
0
    }
2872
0
  }
2873
2874
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
2875
     that this should be the last env var so that it can take the
2876
     previous ones into account */
2877
15.0k
  envvar = getenv("BLOSC_NOLOCK");
2878
15.0k
  if (envvar != NULL) {
2879
    // TODO: here is the only place that returns an extended header from
2880
    //  a blosc1_compress() call.  This should probably be fixed.
2881
0
    const char *compname;
2882
0
    blosc2_context *cctx;
2883
0
    blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
2884
2885
0
    blosc2_compcode_to_compname(g_compressor, &compname);
2886
    /* Create a context for compression */
2887
0
    build_filters(doshuffle, g_delta, typesize, cparams.filters);
2888
    // TODO: cparams can be shared in a multithreaded environment.  do a copy!
2889
0
    cparams.typesize = (uint8_t)typesize;
2890
0
    cparams.compcode = (uint8_t)g_compressor;
2891
0
    cparams.clevel = (uint8_t)clevel;
2892
0
    cparams.nthreads = g_nthreads;
2893
0
    cparams.splitmode = g_splitmode;
2894
0
    cctx = blosc2_create_cctx(cparams);
2895
0
    if (cctx == NULL) {
2896
0
      BLOSC_TRACE_ERROR("Error while creating the compression context");
2897
0
      return BLOSC2_ERROR_NULL_POINTER;
2898
0
    }
2899
    /* Do the actual compression */
2900
0
    result = blosc2_compress_ctx(cctx, src, srcsize, dest, destsize);
2901
    /* Release context resources */
2902
0
    blosc2_free_ctx(cctx);
2903
0
    return result;
2904
0
  }
2905
2906
15.0k
  blosc2_pthread_mutex_lock(&global_comp_mutex);
2907
2908
  /* Initialize a context compression */
2909
15.0k
  uint8_t* filters = calloc(1, BLOSC2_MAX_FILTERS);
2910
15.0k
  BLOSC_ERROR_NULL(filters, BLOSC2_ERROR_MEMORY_ALLOC);
2911
15.0k
  uint8_t* filters_meta = calloc(1, BLOSC2_MAX_FILTERS);
2912
15.0k
  BLOSC_ERROR_NULL(filters_meta, BLOSC2_ERROR_MEMORY_ALLOC);
2913
15.0k
  build_filters(doshuffle, g_delta, typesize, filters);
2914
15.0k
  error = initialize_context_compression(
2915
15.0k
          g_global_context, src, srcsize, dest, destsize, clevel, filters,
2916
15.0k
          filters_meta, (int32_t)typesize, g_compressor, g_force_blocksize, g_nthreads, g_nthreads,
2917
15.0k
          g_splitmode, g_tuner, NULL, g_schunk);
2918
15.0k
  free(filters);
2919
15.0k
  free(filters_meta);
2920
15.0k
  if (error <= 0) {
2921
31
    blosc2_pthread_mutex_unlock(&global_comp_mutex);
2922
31
    return error;
2923
31
  }
2924
2925
15.0k
  envvar = getenv("BLOSC_BLOSC1_COMPAT");
2926
15.0k
  if (envvar != NULL) {
2927
    /* Write chunk header without extended header (Blosc1 compatibility mode) */
2928
0
    error = write_compression_header(g_global_context, false);
2929
0
  }
2930
15.0k
  else {
2931
15.0k
    error = write_compression_header(g_global_context, true);
2932
15.0k
  }
2933
15.0k
  if (error < 0) {
2934
0
    blosc2_pthread_mutex_unlock(&global_comp_mutex);
2935
0
    return error;
2936
0
  }
2937
2938
15.0k
  result = blosc_compress_context(g_global_context);
2939
2940
15.0k
  blosc2_pthread_mutex_unlock(&global_comp_mutex);
2941
2942
15.0k
  return result;
2943
15.0k
}
2944
2945
2946
/* The public routine for compression. */
2947
int blosc1_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes,
2948
0
                    const void* src, void* dest, size_t destsize) {
2949
0
  return blosc2_compress(clevel, doshuffle, (int32_t)typesize, src, (int32_t)nbytes, dest, (int32_t)destsize);
2950
0
}
2951
2952
2953
2954
static int blosc_run_decompression_with_context(blosc2_context* context, const void* src, int32_t srcsize,
2955
9.07k
                                                void* dest, int32_t destsize) {
2956
9.07k
  blosc_header header;
2957
9.07k
  int32_t ntbytes;
2958
9.07k
  int rc;
2959
2960
9.07k
  rc = read_chunk_header(src, srcsize, true, &header);
2961
9.07k
  if (rc < 0) {
2962
0
    return rc;
2963
0
  }
2964
2965
9.07k
  if (header.nbytes > destsize) {
2966
    // Not enough space for writing into the destination
2967
7.62k
    return BLOSC2_ERROR_WRITE_BUFFER;
2968
7.62k
  }
2969
2970
1.44k
  rc = initialize_context_decompression(context, &header, src, srcsize, dest, destsize);
2971
1.44k
  if (rc < 0) {
2972
0
    return rc;
2973
0
  }
2974
2975
  /* Do the actual decompression */
2976
1.44k
  ntbytes = do_job(context);
2977
1.44k
  if (ntbytes < 0) {
2978
0
    return ntbytes;
2979
0
  }
2980
2981
1.44k
  assert(ntbytes <= (int32_t)destsize);
2982
1.44k
  return ntbytes;
2983
1.44k
}
2984
2985
2986
/* The public secure routine for decompression with context. */
2987
int blosc2_decompress_ctx(blosc2_context* context, const void* src, int32_t srcsize,
2988
0
                          void* dest, int32_t destsize) {
2989
0
  int result;
2990
2991
0
  if (context->do_compress != 0) {
2992
0
    BLOSC_TRACE_ERROR("Context is not meant for decompression.  Giving up.");
2993
0
    return BLOSC2_ERROR_INVALID_PARAM;
2994
0
  }
2995
2996
0
  result = blosc_run_decompression_with_context(context, src, srcsize, dest, destsize);
2997
2998
  // Reset a possible block_maskout
2999
0
  if (context->block_maskout != NULL) {
3000
0
    free(context->block_maskout);
3001
0
    context->block_maskout = NULL;
3002
0
  }
3003
0
  context->block_maskout_nitems = 0;
3004
3005
0
  return result;
3006
0
}
3007
3008
3009
/* The public secure routine for decompression. */
3010
9.07k
int blosc2_decompress(const void* src, int32_t srcsize, void* dest, int32_t destsize) {
3011
9.07k
  int result;
3012
9.07k
  char* envvar;
3013
9.07k
  long nthreads;
3014
9.07k
  blosc2_context *dctx;
3015
9.07k
  blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
3016
3017
  /* Check whether the library should be initialized */
3018
9.07k
  if (!g_initlib) blosc2_init();
3019
3020
  /* Check for a BLOSC_NTHREADS environment variable */
3021
9.07k
  envvar = getenv("BLOSC_NTHREADS");
3022
9.07k
  if (envvar != NULL) {
3023
0
    errno = 0; /* To distinguish success/failure after call */
3024
0
    nthreads = strtol(envvar, NULL, 10);
3025
0
    if ((errno != EINVAL)) {
3026
0
      if ((nthreads <= 0) || (nthreads > INT16_MAX)) {
3027
0
        BLOSC_TRACE_ERROR("nthreads must be >= 1 and <= %d", INT16_MAX);
3028
0
        return BLOSC2_ERROR_INVALID_PARAM;
3029
0
      }
3030
0
      result = blosc2_set_nthreads((int16_t) nthreads);
3031
0
      if (result < 0) {
3032
0
        return result;
3033
0
      }
3034
0
    }
3035
0
  }
3036
3037
  /* Check for a BLOSC_NOLOCK environment variable.  It is important
3038
     that this should be the last env var so that it can take the
3039
     previous ones into account */
3040
9.07k
  envvar = getenv("BLOSC_NOLOCK");
3041
9.07k
  if (envvar != NULL) {
3042
0
    dparams.nthreads = g_nthreads;
3043
0
    dctx = blosc2_create_dctx(dparams);
3044
0
    if (dctx == NULL) {
3045
0
      BLOSC_TRACE_ERROR("Error while creating the decompression context");
3046
0
      return BLOSC2_ERROR_NULL_POINTER;
3047
0
    }
3048
0
    result = blosc2_decompress_ctx(dctx, src, srcsize, dest, destsize);
3049
0
    blosc2_free_ctx(dctx);
3050
0
    return result;
3051
0
  }
3052
3053
9.07k
  blosc2_pthread_mutex_lock(&global_comp_mutex);
3054
3055
9.07k
  result = blosc_run_decompression_with_context(
3056
9.07k
          g_global_context, src, srcsize, dest, destsize);
3057
3058
9.07k
  blosc2_pthread_mutex_unlock(&global_comp_mutex);
3059
3060
9.07k
  return result;
3061
9.07k
}
3062
3063
3064
/* The public routine for decompression. */
3065
9.07k
int blosc1_decompress(const void* src, void* dest, size_t destsize) {
3066
9.07k
  return blosc2_decompress(src, INT32_MAX, dest, (int32_t)destsize);
3067
9.07k
}
3068
3069
3070
/* Specific routine optimized for decompression a small number of
3071
   items out of a compressed chunk.  This does not use threads because
3072
   it would affect negatively to performance. */
3073
int _blosc_getitem(blosc2_context* context, blosc_header* header, const void* src, int32_t srcsize,
3074
0
                   int start, int nitems, void* dest, int32_t destsize) {
3075
0
  uint8_t* _src = (uint8_t*)(src);  /* current pos for source buffer */
3076
0
  uint8_t* _dest = (uint8_t*)(dest);
3077
0
  int32_t ntbytes = 0;              /* the number of uncompressed bytes */
3078
0
  int32_t bsize, bsize2, ebsize, leftoverblock;
3079
0
  int32_t startb, stopb;
3080
0
  int32_t stop = start + nitems;
3081
0
  int j, rc;
3082
3083
0
  if (nitems == 0) {
3084
    // We have nothing to do
3085
0
    return 0;
3086
0
  }
3087
0
  if (nitems * header->typesize > destsize) {
3088
0
    BLOSC_TRACE_ERROR("`nitems`*`typesize` out of dest bounds.");
3089
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3090
0
  }
3091
3092
0
  int32_t* bstarts = (int32_t*)(_src + context->header_overhead);
3093
3094
  /* Check region boundaries */
3095
0
  if ((start < 0) || (start * header->typesize > header->nbytes)) {
3096
0
    BLOSC_TRACE_ERROR("`start` out of bounds.");
3097
0
    return BLOSC2_ERROR_INVALID_PARAM;
3098
0
  }
3099
3100
0
  if ((stop < 0) || (stop * header->typesize > header->nbytes)) {
3101
0
    BLOSC_TRACE_ERROR("`start`+`nitems` out of bounds.");
3102
0
    return BLOSC2_ERROR_INVALID_PARAM;
3103
0
  }
3104
3105
0
  int chunk_memcpy = header->flags & 0x1;
3106
0
  if (!context->special_type && !chunk_memcpy &&
3107
0
      ((uint8_t *)(_src + srcsize) < (uint8_t *)(bstarts + context->nblocks))) {
3108
0
    BLOSC_TRACE_ERROR("`bstarts` out of bounds.");
3109
0
    return BLOSC2_ERROR_READ_BUFFER;
3110
0
  }
3111
3112
0
  bool memcpyed = header->flags & (uint8_t)BLOSC_MEMCPYED;
3113
0
  if (context->special_type) {
3114
    // Fake a runlen as if its a memcpyed chunk
3115
0
    memcpyed = true;
3116
0
  }
3117
3118
0
  bool is_lazy = ((context->header_overhead == BLOSC_EXTENDED_HEADER_LENGTH) &&
3119
0
                  (context->blosc2_flags & 0x08u) && !context->special_type);
3120
0
  if (memcpyed && !is_lazy && !context->postfilter) {
3121
    // Short-circuit for (non-lazy) memcpyed or special values
3122
0
    ntbytes = nitems * header->typesize;
3123
0
    switch (context->special_type) {
3124
0
      case BLOSC2_SPECIAL_VALUE:
3125
        // All repeated values
3126
0
        rc = set_values(context->typesize, _src, _dest, ntbytes);
3127
0
        if (rc < 0) {
3128
0
          BLOSC_TRACE_ERROR("set_values failed");
3129
0
          return BLOSC2_ERROR_DATA;
3130
0
        }
3131
0
        break;
3132
0
      case BLOSC2_SPECIAL_NAN:
3133
0
        rc = set_nans(context->typesize, _dest, ntbytes);
3134
0
        if (rc < 0) {
3135
0
          BLOSC_TRACE_ERROR("set_nans failed");
3136
0
          return BLOSC2_ERROR_DATA;
3137
0
        }
3138
0
        break;
3139
0
      case BLOSC2_SPECIAL_ZERO:
3140
0
        memset(_dest, 0, ntbytes);
3141
0
        break;
3142
0
      case BLOSC2_SPECIAL_UNINIT:
3143
        // We do nothing here
3144
0
        break;
3145
0
      case BLOSC2_NO_SPECIAL:
3146
0
        _src += context->header_overhead + start * context->typesize;
3147
0
        memcpy(_dest, _src, ntbytes);
3148
0
        break;
3149
0
      default:
3150
0
        BLOSC_TRACE_ERROR("Unhandled special value case");
3151
0
        BLOSC_ERROR(BLOSC2_ERROR_SCHUNK_SPECIAL);
3152
0
    }
3153
0
    return ntbytes;
3154
0
  }
3155
3156
0
  ebsize = header->blocksize + header->typesize * (signed)sizeof(int32_t);
3157
0
  struct thread_context* scontext = context->serial_context;
3158
  /* Resize the temporaries in serial context if needed */
3159
0
  if (header->blocksize > scontext->tmp_blocksize) {
3160
0
    my_free(scontext->tmp);
3161
0
    scontext->tmp_nbytes = (size_t)4 * ebsize;
3162
0
    scontext->tmp = my_malloc(scontext->tmp_nbytes);
3163
0
    BLOSC_ERROR_NULL(scontext->tmp, BLOSC2_ERROR_MEMORY_ALLOC);
3164
0
    scontext->tmp2 = scontext->tmp + ebsize;
3165
0
    scontext->tmp3 = scontext->tmp2 + ebsize;
3166
0
    scontext->tmp4 = scontext->tmp3 + ebsize;
3167
0
    scontext->tmp_blocksize = (int32_t)header->blocksize;
3168
0
  }
3169
3170
0
  for (j = 0; j < context->nblocks; j++) {
3171
0
    bsize = header->blocksize;
3172
0
    leftoverblock = 0;
3173
0
    if ((j == context->nblocks - 1) && (context->leftover > 0)) {
3174
0
      bsize = context->leftover;
3175
0
      leftoverblock = 1;
3176
0
    }
3177
3178
    /* Compute start & stop for each block */
3179
0
    startb = start * header->typesize - j * header->blocksize;
3180
0
    stopb = stop * header->typesize - j * header->blocksize;
3181
0
    if (stopb <= 0) {
3182
      // We can exit as soon as this block is beyond stop
3183
0
      break;
3184
0
    }
3185
0
    if (startb >= header->blocksize) {
3186
0
      continue;
3187
0
    }
3188
0
    if (startb < 0) {
3189
0
      startb = 0;
3190
0
    }
3191
0
    if (stopb > header->blocksize) {
3192
0
      stopb = header->blocksize;
3193
0
    }
3194
0
    bsize2 = stopb - startb;
3195
3196
0
#if defined(HAVE_PLUGINS)
3197
0
    if (context->compcode == BLOSC_CODEC_ZFP_FIXED_RATE) {
3198
0
      scontext->zfp_cell_start = startb / context->typesize;
3199
0
      scontext->zfp_cell_nitems = nitems;
3200
0
    }
3201
0
#endif /* HAVE_PLUGINS */
3202
3203
    /* Do the actual data copy */
3204
    // Regular decompression.  Put results in tmp2.
3205
    // If the block is aligned and the worst case fits in destination, let's avoid a copy
3206
0
    bool get_single_block = ((startb == 0) && (bsize == nitems * header->typesize));
3207
0
    uint8_t* tmp2 = get_single_block ? dest : scontext->tmp2;
3208
3209
    // If memcpyed we don't have a bstarts section (because it is not needed)
3210
0
    int32_t src_offset = memcpyed ?
3211
0
      context->header_overhead + j * header->blocksize : sw32_(bstarts + j);
3212
3213
0
    int32_t cbytes = blosc_d(context->serial_context, bsize, leftoverblock, memcpyed,
3214
0
                             src, srcsize, src_offset, j,
3215
0
                             tmp2, 0, scontext->tmp, scontext->tmp3);
3216
0
    if (cbytes < 0) {
3217
0
      ntbytes = cbytes;
3218
0
      break;
3219
0
    }
3220
0
    if (scontext->zfp_cell_nitems > 0) {
3221
0
      if (cbytes == bsize2) {
3222
0
        memcpy((uint8_t *) dest, tmp2, (unsigned int) bsize2);
3223
0
      } else if (cbytes == context->blocksize) {
3224
0
        memcpy((uint8_t *) dest, tmp2 + scontext->zfp_cell_start * context->typesize, (unsigned int) bsize2);
3225
0
        cbytes = bsize2;
3226
0
      }
3227
0
    } else if (!get_single_block) {
3228
      /* Copy to destination */
3229
0
      memcpy((uint8_t *) dest + ntbytes, tmp2 + startb, (unsigned int) bsize2);
3230
0
    }
3231
0
    ntbytes += bsize2;
3232
0
  }
3233
3234
0
  scontext->zfp_cell_nitems = 0;
3235
3236
0
  return ntbytes;
3237
0
}
3238
3239
0
int blosc2_getitem(const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) {
3240
0
  blosc2_context context;
3241
0
  int result;
3242
3243
  /* Minimally populate the context */
3244
0
  memset(&context, 0, sizeof(blosc2_context));
3245
3246
0
  context.schunk = g_schunk;
3247
0
  context.nthreads = 1;  // force a serial decompression; fixes #95
3248
3249
  /* Call the actual getitem function */
3250
0
  result = blosc2_getitem_ctx(&context, src, srcsize, start, nitems, dest, destsize);
3251
3252
  /* Release resources */
3253
0
  if (context.serial_context != NULL) {
3254
0
    free_thread_context(context.serial_context);
3255
0
  }
3256
0
  return result;
3257
0
}
3258
3259
/* Specific routine optimized for decompression a small number of
3260
   items out of a compressed chunk.  Public non-contextual API. */
3261
0
int blosc1_getitem(const void* src, int start, int nitems, void* dest) {
3262
0
  return blosc2_getitem(src, INT32_MAX, start, nitems, dest, INT32_MAX);
3263
0
}
3264
3265
int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize,
3266
0
    int start, int nitems, void* dest, int32_t destsize) {
3267
0
  blosc_header header;
3268
0
  int result;
3269
3270
  /* Minimally populate the context */
3271
0
  result = read_chunk_header((uint8_t *) src, srcsize, true, &header);
3272
0
  if (result < 0) {
3273
0
    return result;
3274
0
  }
3275
3276
0
  context->src = src;
3277
0
  context->srcsize = srcsize;
3278
0
  context->dest = dest;
3279
0
  context->destsize = destsize;
3280
3281
0
  result = blosc2_initialize_context_from_header(context, &header);
3282
0
  if (result < 0) {
3283
0
    return result;
3284
0
  }
3285
3286
0
  if (context->serial_context == NULL) {
3287
0
    context->serial_context = create_thread_context(context, 0);
3288
0
  }
3289
0
  BLOSC_ERROR_NULL(context->serial_context, BLOSC2_ERROR_THREAD_CREATE);
3290
  /* Call the actual getitem function */
3291
0
  result = _blosc_getitem(context, &header, src, srcsize, start, nitems, dest, destsize);
3292
3293
0
  return result;
3294
0
}
3295
3296
/* execute single compression/decompression job for a single thread_context */
3297
static void t_blosc_do_job(void *ctxt)
3298
0
{
3299
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3300
0
  blosc2_context* context = thcontext->parent_context;
3301
0
  int32_t cbytes;
3302
0
  int32_t ntdest;
3303
0
  int32_t tblocks;               /* number of blocks per thread */
3304
0
  int32_t tblock;                /* limit block on a thread */
3305
0
  int32_t nblock_;              /* private copy of nblock */
3306
0
  int32_t bsize;
3307
0
  int32_t leftoverblock;
3308
  /* Parameters for threads */
3309
0
  int32_t blocksize;
3310
0
  int32_t ebsize;
3311
0
  int32_t srcsize;
3312
0
  bool compress = context->do_compress != 0;
3313
0
  int32_t maxbytes;
3314
0
  int32_t nblocks;
3315
0
  int32_t leftover;
3316
0
  int32_t leftover2;
3317
0
  int32_t* bstarts;
3318
0
  const uint8_t* src;
3319
0
  uint8_t* dest;
3320
0
  uint8_t* tmp;
3321
0
  uint8_t* tmp2;
3322
0
  uint8_t* tmp3;
3323
3324
  /* Get parameters for this thread before entering the main loop */
3325
0
  blocksize = context->blocksize;
3326
0
  ebsize = blocksize + context->typesize * (int32_t)sizeof(int32_t);
3327
0
  maxbytes = context->destsize;
3328
0
  nblocks = context->nblocks;
3329
0
  leftover = context->leftover;
3330
0
  bstarts = context->bstarts;
3331
0
  src = context->src;
3332
0
  srcsize = context->srcsize;
3333
0
  dest = context->dest;
3334
3335
  /* Resize the temporaries if needed */
3336
0
  if (blocksize > thcontext->tmp_blocksize) {
3337
0
    my_free(thcontext->tmp);
3338
0
    thcontext->tmp_nbytes = (size_t) 4 * ebsize;
3339
0
    thcontext->tmp = my_malloc(thcontext->tmp_nbytes);
3340
0
    thcontext->tmp2 = thcontext->tmp + ebsize;
3341
0
    thcontext->tmp3 = thcontext->tmp2 + ebsize;
3342
0
    thcontext->tmp4 = thcontext->tmp3 + ebsize;
3343
0
    thcontext->tmp_blocksize = blocksize;
3344
0
  }
3345
3346
0
  tmp = thcontext->tmp;
3347
0
  tmp2 = thcontext->tmp2;
3348
0
  tmp3 = thcontext->tmp3;
3349
3350
  // Determine whether we can do a static distribution of workload among different threads
3351
0
  bool memcpyed = context->header_flags & (uint8_t)BLOSC_MEMCPYED;
3352
0
  if (!context->do_compress && context->special_type) {
3353
    // Fake a runlen as if its a memcpyed chunk
3354
0
    memcpyed = true;
3355
0
  }
3356
3357
0
  bool static_schedule = (!compress || memcpyed) && context->block_maskout == NULL;
3358
0
  if (static_schedule) {
3359
      /* Blocks per thread */
3360
0
      tblocks = nblocks / context->nthreads;
3361
0
      leftover2 = nblocks % context->nthreads;
3362
0
      tblocks = (leftover2 > 0) ? tblocks + 1 : tblocks;
3363
0
      nblock_ = thcontext->tid * tblocks;
3364
0
      tblock = nblock_ + tblocks;
3365
0
      if (tblock > nblocks) {
3366
0
          tblock = nblocks;
3367
0
      }
3368
0
  }
3369
0
  else {
3370
    // Use dynamic schedule via a queue.  Get the next block.
3371
0
    blosc2_pthread_mutex_lock(&context->count_mutex);
3372
0
    context->thread_nblock++;
3373
0
    nblock_ = context->thread_nblock;
3374
0
    blosc2_pthread_mutex_unlock(&context->count_mutex);
3375
0
    tblock = nblocks;
3376
0
  }
3377
3378
  /* Loop over blocks */
3379
0
  leftoverblock = 0;
3380
0
  while ((nblock_ < tblock) && (context->thread_giveup_code > 0)) {
3381
0
    bsize = blocksize;
3382
0
    if (nblock_ == (nblocks - 1) && (leftover > 0)) {
3383
0
      bsize = leftover;
3384
0
      leftoverblock = 1;
3385
0
    }
3386
0
    if (compress) {
3387
0
      if (memcpyed) {
3388
0
        if (!context->prefilter) {
3389
          /* We want to memcpy only */
3390
0
          memcpy(dest + context->header_overhead + nblock_ * blocksize,
3391
0
                 src + nblock_ * blocksize, (unsigned int) bsize);
3392
0
          cbytes = (int32_t) bsize;
3393
0
        }
3394
0
        else {
3395
          /* Only the prefilter has to be executed, and this is done in blosc_c().
3396
           * However, no further actions are needed, so we can put the result
3397
           * directly in dest. */
3398
0
          cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3399
0
                           ebsize, src, nblock_ * blocksize,
3400
0
                           dest + context->header_overhead + nblock_ * blocksize,
3401
0
                           tmp, tmp3);
3402
0
        }
3403
0
      }
3404
0
      else {
3405
        /* Regular compression */
3406
0
        cbytes = blosc_c(thcontext, bsize, leftoverblock, 0,
3407
0
                          ebsize, src, nblock_ * blocksize, tmp2, tmp, tmp3);
3408
0
      }
3409
0
    }
3410
0
    else {
3411
      /* Regular decompression */
3412
0
      if (context->special_type == BLOSC2_NO_SPECIAL && !memcpyed &&
3413
0
          (srcsize < (int32_t)(context->header_overhead + (sizeof(int32_t) * nblocks)))) {
3414
        /* Not enough input to read all `bstarts` */
3415
0
        cbytes = -1;
3416
0
      }
3417
0
      else {
3418
        // If memcpyed we don't have a bstarts section (because it is not needed)
3419
0
        int32_t src_offset = memcpyed ?
3420
0
            context->header_overhead + nblock_ * blocksize : sw32_(bstarts + nblock_);
3421
0
        cbytes = blosc_d(thcontext, bsize, leftoverblock, memcpyed,
3422
0
                          src, srcsize, src_offset, nblock_,
3423
0
                          dest, nblock_ * blocksize, tmp, tmp2);
3424
0
      }
3425
0
    }
3426
3427
    /* Check whether current thread has to giveup */
3428
0
    if (context->thread_giveup_code <= 0) {
3429
0
      break;
3430
0
    }
3431
3432
    /* Check results for the compressed/decompressed block */
3433
0
    if (cbytes < 0) {            /* compr/decompr failure */
3434
      /* Set giveup_code error */
3435
0
      blosc2_pthread_mutex_lock(&context->count_mutex);
3436
0
      context->thread_giveup_code = cbytes;
3437
0
      blosc2_pthread_mutex_unlock(&context->count_mutex);
3438
0
      break;
3439
0
    }
3440
3441
0
    if (compress && !memcpyed) {
3442
      /* Start critical section */
3443
0
      blosc2_pthread_mutex_lock(&context->count_mutex);
3444
0
      ntdest = context->output_bytes;
3445
      // Note: do not use a typical local dict_training variable here
3446
      // because it is probably cached from previous calls if the number of
3447
      // threads does not change (the usual thing).
3448
0
      if (!(context->use_dict && context->dict_cdict == NULL)) {
3449
0
        _sw32(bstarts + nblock_, (int32_t) ntdest);
3450
0
      }
3451
3452
0
      if ((cbytes == 0) || (ntdest + cbytes > maxbytes)) {
3453
0
        context->thread_giveup_code = 0;  /* incompressible buf */
3454
0
        blosc2_pthread_mutex_unlock(&context->count_mutex);
3455
0
        break;
3456
0
      }
3457
0
      context->thread_nblock++;
3458
0
      nblock_ = context->thread_nblock;
3459
0
      context->output_bytes += cbytes;
3460
0
      blosc2_pthread_mutex_unlock(&context->count_mutex);
3461
      /* End of critical section */
3462
3463
      /* Copy the compressed buffer to destination */
3464
0
      memcpy(dest + ntdest, tmp2, (unsigned int) cbytes);
3465
0
    }
3466
0
    else if (static_schedule) {
3467
0
      nblock_++;
3468
0
    }
3469
0
    else {
3470
0
      blosc2_pthread_mutex_lock(&context->count_mutex);
3471
0
      context->thread_nblock++;
3472
0
      nblock_ = context->thread_nblock;
3473
0
      context->output_bytes += cbytes;
3474
0
      blosc2_pthread_mutex_unlock(&context->count_mutex);
3475
0
    }
3476
3477
0
  } /* closes while (nblock_) */
3478
3479
0
  if (static_schedule) {
3480
0
    blosc2_pthread_mutex_lock(&context->count_mutex);
3481
0
    context->output_bytes = context->sourcesize;
3482
0
    if (compress) {
3483
0
      context->output_bytes += context->header_overhead;
3484
0
    }
3485
0
    blosc2_pthread_mutex_unlock(&context->count_mutex);
3486
0
  }
3487
3488
0
}
3489
3490
/* Decompress & unshuffle several blocks in a single thread */
3491
0
static void* t_blosc(void* ctxt) {
3492
0
  struct thread_context* thcontext = (struct thread_context*)ctxt;
3493
0
  blosc2_context* context = thcontext->parent_context;
3494
0
#ifdef BLOSC_POSIX_BARRIERS
3495
0
  int rc;
3496
0
#endif
3497
3498
0
  while (1) {
3499
    /* Synchronization point for all threads (wait for initialization) */
3500
0
    WAIT_INIT(NULL, context);
3501
3502
0
    if (context->end_threads) {
3503
0
      break;
3504
0
    }
3505
3506
0
    t_blosc_do_job(ctxt);
3507
3508
    /* Meeting point for all threads (wait for finalization) */
3509
0
    WAIT_FINISH(NULL, context);
3510
0
  }
3511
3512
  /* Cleanup our working space and context */
3513
0
  free_thread_context(thcontext);
3514
3515
0
  return (NULL);
3516
0
}
3517
3518
3519
0
int init_threadpool(blosc2_context *context) {
3520
0
  int32_t tid;
3521
0
  int rc2;
3522
3523
  /* Initialize mutex and condition variable objects */
3524
0
  blosc2_pthread_mutex_init(&context->count_mutex, NULL);
3525
0
  blosc2_pthread_mutex_init(&context->delta_mutex, NULL);
3526
0
  blosc2_pthread_mutex_init(&context->nchunk_mutex, NULL);
3527
0
  blosc2_pthread_cond_init(&context->delta_cv, NULL);
3528
3529
  /* Set context thread sentinels */
3530
0
  context->thread_giveup_code = 1;
3531
0
  context->thread_nblock = -1;
3532
3533
  /* Barrier initialization */
3534
0
#ifdef BLOSC_POSIX_BARRIERS
3535
0
  pthread_barrier_init(&context->barr_init, NULL, context->nthreads + 1);
3536
0
  pthread_barrier_init(&context->barr_finish, NULL, context->nthreads + 1);
3537
#else
3538
  blosc2_pthread_mutex_init(&context->count_threads_mutex, NULL);
3539
  blosc2_pthread_cond_init(&context->count_threads_cv, NULL);
3540
  context->count_threads = 0;      /* Reset threads counter */
3541
#endif
3542
3543
0
  if (threads_callback) {
3544
      /* Create thread contexts to store data for callback threads */
3545
0
    context->thread_contexts = (struct thread_context *)my_malloc(
3546
0
            context->nthreads * sizeof(struct thread_context));
3547
0
    BLOSC_ERROR_NULL(context->thread_contexts, BLOSC2_ERROR_MEMORY_ALLOC);
3548
0
    for (tid = 0; tid < context->nthreads; tid++)
3549
0
      init_thread_context(context->thread_contexts + tid, context, tid);
3550
0
  }
3551
0
  else {
3552
0
    #if !defined(_WIN32)
3553
      /* Initialize and set thread detached attribute */
3554
0
      pthread_attr_init(&context->ct_attr);
3555
0
      pthread_attr_setdetachstate(&context->ct_attr, PTHREAD_CREATE_JOINABLE);
3556
0
    #endif
3557
3558
    /* Make space for thread handlers */
3559
0
    context->threads = (blosc2_pthread_t*)my_malloc(
3560
0
            context->nthreads * sizeof(blosc2_pthread_t));
3561
0
    BLOSC_ERROR_NULL(context->threads, BLOSC2_ERROR_MEMORY_ALLOC);
3562
    /* Finally, create the threads */
3563
0
    for (tid = 0; tid < context->nthreads; tid++) {
3564
      /* Create a thread context (will destroy when finished) */
3565
0
      struct thread_context *thread_context = create_thread_context(context, tid);
3566
0
      BLOSC_ERROR_NULL(thread_context, BLOSC2_ERROR_THREAD_CREATE);
3567
0
      #if !defined(_WIN32)
3568
0
        rc2 = blosc2_pthread_create(&context->threads[tid], &context->ct_attr, t_blosc,
3569
0
                            (void*)thread_context);
3570
      #else
3571
        rc2 = blosc2_pthread_create(&context->threads[tid], NULL, t_blosc,
3572
                            (void *)thread_context);
3573
      #endif
3574
0
      if (rc2) {
3575
0
        BLOSC_TRACE_ERROR("Return code from blosc2_pthread_create() is %d.\n"
3576
0
                          "\tError detail: %s\n", rc2, strerror(rc2));
3577
0
        return BLOSC2_ERROR_THREAD_CREATE;
3578
0
      }
3579
0
    }
3580
0
  }
3581
3582
  /* We have now started/initialized the threads */
3583
0
  context->threads_started = context->nthreads;
3584
0
  context->new_nthreads = context->nthreads;
3585
3586
0
  return 0;
3587
0
}
3588
3589
int16_t blosc2_get_nthreads(void)
3590
0
{
3591
0
  return g_nthreads;
3592
0
}
3593
3594
15.0k
int16_t blosc2_set_nthreads(int16_t nthreads) {
3595
15.0k
  int16_t ret = g_nthreads;          /* the previous number of threads */
3596
3597
  /* Check whether the library should be initialized */
3598
15.0k
  if (!g_initlib) blosc2_init();
3599
3600
15.0k
 if (nthreads != ret) {
3601
0
   g_nthreads = nthreads;
3602
0
   g_global_context->new_nthreads = nthreads;
3603
0
   int16_t ret2 = check_nthreads(g_global_context);
3604
0
   if (ret2 < 0) {
3605
0
     return ret2;
3606
0
   }
3607
0
 }
3608
3609
15.0k
  return ret;
3610
15.0k
}
3611
3612
3613
const char* blosc1_get_compressor(void)
3614
0
{
3615
0
  const char* compname;
3616
0
  blosc2_compcode_to_compname(g_compressor, &compname);
3617
3618
0
  return compname;
3619
0
}
3620
3621
15.0k
int blosc1_set_compressor(const char* compname) {
3622
15.0k
  int code = blosc2_compname_to_compcode(compname);
3623
15.0k
  if (code >= BLOSC_LAST_CODEC) {
3624
0
    BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
3625
0
    BLOSC_ERROR(BLOSC2_ERROR_CODEC_SUPPORT);
3626
0
  }
3627
15.0k
  g_compressor = code;
3628
3629
  /* Check whether the library should be initialized */
3630
15.0k
  if (!g_initlib) blosc2_init();
3631
3632
15.0k
  return code;
3633
15.0k
}
3634
3635
0
void blosc2_set_delta(int dodelta) {
3636
3637
0
  g_delta = dodelta;
3638
3639
  /* Check whether the library should be initialized */
3640
0
  if (!g_initlib) blosc2_init();
3641
3642
0
}
3643
3644
0
const char* blosc2_list_compressors(void) {
3645
0
  static int compressors_list_done = 0;
3646
0
  static char ret[256];
3647
3648
0
  if (compressors_list_done) return ret;
3649
0
  ret[0] = '\0';
3650
0
  strcat(ret, BLOSC_BLOSCLZ_COMPNAME);
3651
0
  strcat(ret, ",");
3652
0
  strcat(ret, BLOSC_LZ4_COMPNAME);
3653
0
  strcat(ret, ",");
3654
0
  strcat(ret, BLOSC_LZ4HC_COMPNAME);
3655
0
#if defined(HAVE_ZLIB)
3656
0
  strcat(ret, ",");
3657
0
  strcat(ret, BLOSC_ZLIB_COMPNAME);
3658
0
#endif /* HAVE_ZLIB */
3659
0
#if defined(HAVE_ZSTD)
3660
0
  strcat(ret, ",");
3661
0
  strcat(ret, BLOSC_ZSTD_COMPNAME);
3662
0
#endif /* HAVE_ZSTD */
3663
0
  compressors_list_done = 1;
3664
0
  return ret;
3665
0
}
3666
3667
3668
0
const char* blosc2_get_version_string(void) {
3669
0
  return BLOSC2_VERSION_STRING;
3670
0
}
3671
3672
3673
0
int blosc2_get_complib_info(const char* compname, char** complib, char** version) {
3674
0
  int clibcode;
3675
0
  const char* clibname;
3676
0
  const char* clibversion = "unknown";
3677
0
  char sbuffer[256];
3678
3679
0
  clibcode = compname_to_clibcode(compname);
3680
0
  clibname = clibcode_to_clibname(clibcode);
3681
3682
  /* complib version */
3683
0
  if (clibcode == BLOSC_BLOSCLZ_LIB) {
3684
0
    clibversion = BLOSCLZ_VERSION_STRING;
3685
0
  }
3686
0
  else if (clibcode == BLOSC_LZ4_LIB) {
3687
0
    sprintf(sbuffer, "%d.%d.%d",
3688
0
            LZ4_VERSION_MAJOR, LZ4_VERSION_MINOR, LZ4_VERSION_RELEASE);
3689
0
    clibversion = sbuffer;
3690
0
  }
3691
0
#if defined(HAVE_ZLIB)
3692
0
  else if (clibcode == BLOSC_ZLIB_LIB) {
3693
0
#ifdef ZLIB_COMPAT
3694
0
    clibversion = ZLIB_VERSION;
3695
#elif defined(HAVE_ZLIB_NG)
3696
    clibversion = ZLIBNG_VERSION;
3697
#else
3698
    clibversion = ZLIB_VERSION;
3699
#endif
3700
0
  }
3701
0
#endif /* HAVE_ZLIB */
3702
0
#if defined(HAVE_ZSTD)
3703
0
  else if (clibcode == BLOSC_ZSTD_LIB) {
3704
0
    sprintf(sbuffer, "%d.%d.%d",
3705
0
            ZSTD_VERSION_MAJOR, ZSTD_VERSION_MINOR, ZSTD_VERSION_RELEASE);
3706
0
    clibversion = sbuffer;
3707
0
  }
3708
0
#endif /* HAVE_ZSTD */
3709
3710
#ifdef _MSC_VER
3711
  *complib = _strdup(clibname);
3712
  *version = _strdup(clibversion);
3713
#else
3714
0
  *complib = strdup(clibname);
3715
0
  *version = strdup(clibversion);
3716
0
#endif
3717
0
  return clibcode;
3718
0
}
3719
3720
/* Return `nbytes`, `cbytes` and `blocksize` from a compressed buffer. */
3721
9.07k
void blosc1_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, size_t* blocksize) {
3722
9.07k
  int32_t nbytes32, cbytes32, blocksize32;
3723
9.07k
  blosc2_cbuffer_sizes(cbuffer, &nbytes32, &cbytes32, &blocksize32);
3724
9.07k
  *nbytes = nbytes32;
3725
9.07k
  *cbytes = cbytes32;
3726
9.07k
  *blocksize = blocksize32;
3727
9.07k
}
3728
3729
40.6k
int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, int32_t* cbytes, int32_t* blocksize) {
3730
40.6k
  blosc_header header;
3731
40.6k
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3732
40.6k
  if (rc < 0) {
3733
    /* Return zeros if error reading header */
3734
0
    memset(&header, 0, sizeof(header));
3735
0
  }
3736
3737
  /* Read the interesting values */
3738
40.6k
  if (nbytes != NULL)
3739
40.6k
    *nbytes = header.nbytes;
3740
40.6k
  if (cbytes != NULL)
3741
40.6k
    *cbytes = header.cbytes;
3742
40.6k
  if (blocksize != NULL)
3743
9.07k
    *blocksize = header.blocksize;
3744
40.6k
  return rc;
3745
40.6k
}
3746
3747
0
int blosc1_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) {
3748
0
  int32_t header_cbytes;
3749
0
  int32_t header_nbytes;
3750
0
  if (cbytes < BLOSC_MIN_HEADER_LENGTH) {
3751
    /* Compressed data should contain enough space for header */
3752
0
    *nbytes = 0;
3753
0
    return BLOSC2_ERROR_WRITE_BUFFER;
3754
0
  }
3755
0
  int rc = blosc2_cbuffer_sizes(cbuffer, &header_nbytes, &header_cbytes, NULL);
3756
0
  if (rc < 0) {
3757
0
    *nbytes = 0;
3758
0
    return rc;
3759
0
  }
3760
0
  *nbytes = header_nbytes;
3761
0
  if (header_cbytes != (int32_t)cbytes) {
3762
    /* Compressed size from header does not match `cbytes` */
3763
0
    *nbytes = 0;
3764
0
    return BLOSC2_ERROR_INVALID_HEADER;
3765
0
  }
3766
0
  if (*nbytes > BLOSC2_MAX_BUFFERSIZE) {
3767
    /* Uncompressed size is larger than allowed */
3768
0
    *nbytes = 0;
3769
0
    return BLOSC2_ERROR_MEMORY_ALLOC;
3770
0
  }
3771
0
  return 0;
3772
0
}
3773
3774
/* Return `typesize` and `flags` from a compressed buffer. */
3775
0
void blosc1_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) {
3776
0
  blosc_header header;
3777
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3778
0
  if (rc < 0) {
3779
0
    *typesize = *flags = 0;
3780
0
    return;
3781
0
  }
3782
3783
  /* Read the interesting values */
3784
0
  *flags = header.flags;
3785
0
  *typesize = header.typesize;
3786
0
}
3787
3788
3789
/* Return version information from a compressed buffer. */
3790
0
void blosc2_cbuffer_versions(const void* cbuffer, int* version, int* versionlz) {
3791
0
  blosc_header header;
3792
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3793
0
  if (rc < 0) {
3794
0
    *version = *versionlz = 0;
3795
0
    return;
3796
0
  }
3797
3798
  /* Read the version info */
3799
0
  *version = header.version;
3800
0
  *versionlz = header.versionlz;
3801
0
}
3802
3803
3804
/* Return the compressor library/format used in a compressed buffer. */
3805
0
const char* blosc2_cbuffer_complib(const void* cbuffer) {
3806
0
  blosc_header header;
3807
0
  int clibcode;
3808
0
  const char* complib;
3809
0
  int rc = read_chunk_header((uint8_t *) cbuffer, BLOSC_MIN_HEADER_LENGTH, false, &header);
3810
0
  if (rc < 0) {
3811
0
    return NULL;
3812
0
  }
3813
3814
  /* Read the compressor format/library info */
3815
0
  clibcode = (header.flags & 0xe0) >> 5;
3816
0
  complib = clibcode_to_clibname(clibcode);
3817
0
  return complib;
3818
0
}
3819
3820
3821
/* Get the internal blocksize to be used during compression.  0 means
3822
   that an automatic blocksize is computed internally. */
3823
int blosc1_get_blocksize(void)
3824
0
{
3825
0
  return (int)g_force_blocksize;
3826
0
}
3827
3828
3829
/* Force the use of a specific blocksize.  If 0, an automatic
3830
   blocksize will be used (the default). */
3831
3.43k
void blosc1_set_blocksize(size_t blocksize) {
3832
3.43k
  g_force_blocksize = (int32_t)blocksize;
3833
3.43k
}
3834
3835
3836
/* Force the use of a specific split mode. */
3837
void blosc1_set_splitmode(int mode)
3838
0
{
3839
0
  g_splitmode = mode;
3840
0
}
3841
3842
3843
/* Set pointer to super-chunk.  If NULL, no super-chunk will be
3844
   reachable (the default). */
3845
0
void blosc_set_schunk(blosc2_schunk* schunk) {
3846
0
  g_schunk = schunk;
3847
0
  g_global_context->schunk = schunk;
3848
0
}
3849
3850
blosc2_io *blosc2_io_global = NULL;
3851
blosc2_io_cb BLOSC2_IO_CB_DEFAULTS;
3852
blosc2_io_cb BLOSC2_IO_CB_MMAP;
3853
3854
int _blosc2_register_io_cb(const blosc2_io_cb *io);
3855
3856
1
void blosc2_init(void) {
3857
  /* Return if Blosc is already initialized */
3858
1
  if (g_initlib) return;
3859
3860
1
  BLOSC2_IO_CB_DEFAULTS.id = BLOSC2_IO_FILESYSTEM;
3861
1
  BLOSC2_IO_CB_DEFAULTS.name = "filesystem";
3862
1
  BLOSC2_IO_CB_DEFAULTS.is_allocation_necessary = true;
3863
1
  BLOSC2_IO_CB_DEFAULTS.open = (blosc2_open_cb) blosc2_stdio_open;
3864
1
  BLOSC2_IO_CB_DEFAULTS.close = (blosc2_close_cb) blosc2_stdio_close;
3865
1
  BLOSC2_IO_CB_DEFAULTS.size = (blosc2_size_cb) blosc2_stdio_size;
3866
1
  BLOSC2_IO_CB_DEFAULTS.write = (blosc2_write_cb) blosc2_stdio_write;
3867
1
  BLOSC2_IO_CB_DEFAULTS.read = (blosc2_read_cb) blosc2_stdio_read;
3868
1
  BLOSC2_IO_CB_DEFAULTS.truncate = (blosc2_truncate_cb) blosc2_stdio_truncate;
3869
1
  BLOSC2_IO_CB_DEFAULTS.destroy = (blosc2_destroy_cb) blosc2_stdio_destroy;
3870
3871
1
  _blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS);
3872
3873
1
  BLOSC2_IO_CB_MMAP.id = BLOSC2_IO_FILESYSTEM_MMAP;
3874
1
  BLOSC2_IO_CB_MMAP.name = "filesystem_mmap";
3875
1
  BLOSC2_IO_CB_MMAP.is_allocation_necessary = false;
3876
1
  BLOSC2_IO_CB_MMAP.open = (blosc2_open_cb) blosc2_stdio_mmap_open;
3877
1
  BLOSC2_IO_CB_MMAP.close = (blosc2_close_cb) blosc2_stdio_mmap_close;
3878
1
  BLOSC2_IO_CB_MMAP.read = (blosc2_read_cb) blosc2_stdio_mmap_read;
3879
1
  BLOSC2_IO_CB_MMAP.size = (blosc2_size_cb) blosc2_stdio_mmap_size;
3880
1
  BLOSC2_IO_CB_MMAP.write = (blosc2_write_cb) blosc2_stdio_mmap_write;
3881
1
  BLOSC2_IO_CB_MMAP.truncate = (blosc2_truncate_cb) blosc2_stdio_mmap_truncate;
3882
1
  BLOSC2_IO_CB_MMAP.destroy = (blosc2_destroy_cb) blosc2_stdio_mmap_destroy;
3883
3884
1
  _blosc2_register_io_cb(&BLOSC2_IO_CB_MMAP);
3885
3886
1
  g_ncodecs = 0;
3887
1
  g_nfilters = 0;
3888
1
  g_ntuners = 0;
3889
3890
1
#if defined(HAVE_PLUGINS)
3891
1
  #include "blosc2/blosc2-common.h"
3892
1
  #include "blosc2/blosc2-stdio.h"
3893
1
  register_codecs();
3894
1
  register_filters();
3895
1
  register_tuners();
3896
1
#endif
3897
1
  blosc2_pthread_mutex_init(&global_comp_mutex, NULL);
3898
  /* Create a global context */
3899
1
  g_global_context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3900
1
  memset(g_global_context, 0, sizeof(blosc2_context));
3901
1
  g_global_context->nthreads = g_nthreads;
3902
1
  g_global_context->new_nthreads = g_nthreads;
3903
1
  g_initlib = 1;
3904
1
}
3905
3906
3907
0
int blosc2_free_resources(void) {
3908
  /* Return if Blosc is not initialized */
3909
0
  if (!g_initlib) return BLOSC2_ERROR_FAILURE;
3910
3911
0
  return release_threadpool(g_global_context);
3912
0
}
3913
3914
3915
0
void blosc2_destroy(void) {
3916
  /* Return if Blosc is not initialized */
3917
0
  if (!g_initlib) return;
3918
3919
0
  blosc2_free_resources();
3920
0
  g_initlib = 0;
3921
0
  blosc2_free_ctx(g_global_context);
3922
3923
0
  blosc2_pthread_mutex_destroy(&global_comp_mutex);
3924
3925
0
}
3926
3927
3928
0
int release_threadpool(blosc2_context *context) {
3929
0
  int32_t t;
3930
0
  void* status;
3931
0
  int rc;
3932
3933
0
  if (context->threads_started > 0) {
3934
0
    if (threads_callback) {
3935
      /* free context data for user-managed threads */
3936
0
      for (t=0; t<context->threads_started; t++)
3937
0
        destroy_thread_context(context->thread_contexts + t);
3938
0
      my_free(context->thread_contexts);
3939
0
    }
3940
0
    else {
3941
      /* Tell all existing threads to finish */
3942
0
      context->end_threads = 1;
3943
0
      WAIT_INIT(-1, context);
3944
3945
      /* Join exiting threads */
3946
0
      for (t = 0; t < context->threads_started; t++) {
3947
0
        rc = blosc2_pthread_join(context->threads[t], &status);
3948
0
        if (rc) {
3949
0
          BLOSC_TRACE_ERROR("Return code from blosc2_pthread_join() is %d\n"
3950
0
                            "\tError detail: %s.", rc, strerror(rc));
3951
0
        }
3952
0
      }
3953
3954
      /* Thread attributes */
3955
0
      #if !defined(_WIN32)
3956
0
        pthread_attr_destroy(&context->ct_attr);
3957
0
      #endif
3958
3959
      /* Release thread handlers */
3960
0
      my_free(context->threads);
3961
0
    }
3962
3963
    /* Release mutex and condition variable objects */
3964
0
    blosc2_pthread_mutex_destroy(&context->count_mutex);
3965
0
    blosc2_pthread_mutex_destroy(&context->delta_mutex);
3966
0
    blosc2_pthread_mutex_destroy(&context->nchunk_mutex);
3967
0
    blosc2_pthread_cond_destroy(&context->delta_cv);
3968
3969
    /* Barriers */
3970
0
  #ifdef BLOSC_POSIX_BARRIERS
3971
0
    pthread_barrier_destroy(&context->barr_init);
3972
0
    pthread_barrier_destroy(&context->barr_finish);
3973
  #else
3974
    blosc2_pthread_mutex_destroy(&context->count_threads_mutex);
3975
    blosc2_pthread_cond_destroy(&context->count_threads_cv);
3976
    context->count_threads = 0;      /* Reset threads counter */
3977
  #endif
3978
3979
    /* Reset flags and counters */
3980
0
    context->end_threads = 0;
3981
0
    context->threads_started = 0;
3982
0
  }
3983
3984
3985
0
  return 0;
3986
0
}
3987
3988
3989
/* Contexts */
3990
3991
/* Create a context for compression */
3992
0
blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) {
3993
0
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
3994
0
  BLOSC_ERROR_NULL(context, NULL);
3995
3996
  /* Populate the context, using zeros as default values */
3997
0
  memset(context, 0, sizeof(blosc2_context));
3998
0
  context->do_compress = 1;   /* meant for compression */
3999
0
  context->use_dict = cparams.use_dict;
4000
0
  if (cparams.instr_codec) {
4001
0
    context->blosc2_flags = BLOSC2_INSTR_CODEC;
4002
0
  }
4003
4004
0
  for (int i = 0; i < BLOSC2_MAX_FILTERS; i++) {
4005
0
    context->filters[i] = cparams.filters[i];
4006
0
    context->filters_meta[i] = cparams.filters_meta[i];
4007
4008
0
    if (context->filters[i] >= BLOSC_LAST_FILTER && context->filters[i] <= BLOSC2_DEFINED_FILTERS_STOP) {
4009
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
4010
0
                        context->filters[i]);
4011
0
      free(context);
4012
0
      return NULL;
4013
0
    }
4014
0
    if (context->filters[i] > BLOSC_LAST_REGISTERED_FILTER && context->filters[i] <= BLOSC2_GLOBAL_REGISTERED_FILTERS_STOP) {
4015
0
      BLOSC_TRACE_ERROR("filter (%d) is not yet defined",
4016
0
                        context->filters[i]);
4017
0
      free(context);
4018
0
      return NULL;
4019
0
    }
4020
0
  }
4021
4022
0
#if defined(HAVE_PLUGINS)
4023
0
#include "blosc2/codecs-registry.h"
4024
0
  if ((context->compcode >= BLOSC_CODEC_ZFP_FIXED_ACCURACY) && (context->compcode <= BLOSC_CODEC_ZFP_FIXED_RATE)) {
4025
0
    for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
4026
0
      if ((context->filters[i] == BLOSC_SHUFFLE) || (context->filters[i] == BLOSC_BITSHUFFLE)) {
4027
0
        BLOSC_TRACE_ERROR("ZFP cannot be run in presence of SHUFFLE / BITSHUFFLE");
4028
0
        return NULL;
4029
0
      }
4030
0
    }
4031
0
  }
4032
0
#endif /* HAVE_PLUGINS */
4033
4034
  /* Check for a BLOSC_SHUFFLE environment variable */
4035
0
  int doshuffle = -1;
4036
0
  char* envvar = getenv("BLOSC_SHUFFLE");
4037
0
  if (envvar != NULL) {
4038
0
    if (strcmp(envvar, "NOSHUFFLE") == 0) {
4039
0
      doshuffle = BLOSC_NOSHUFFLE;
4040
0
    }
4041
0
    else if (strcmp(envvar, "SHUFFLE") == 0) {
4042
0
      doshuffle = BLOSC_SHUFFLE;
4043
0
    }
4044
0
    else if (strcmp(envvar, "BITSHUFFLE") == 0) {
4045
0
      doshuffle = BLOSC_BITSHUFFLE;
4046
0
    }
4047
0
    else {
4048
0
      BLOSC_TRACE_WARNING("BLOSC_SHUFFLE environment variable '%s' not recognized\n", envvar);
4049
0
    }
4050
0
  }
4051
  /* Check for a BLOSC_DELTA environment variable */
4052
0
  int dodelta = BLOSC_NOFILTER;
4053
0
  envvar = getenv("BLOSC_DELTA");
4054
0
  if (envvar != NULL) {
4055
0
    if (strcmp(envvar, "1") == 0) {
4056
0
      dodelta = BLOSC_DELTA;
4057
0
    } else if (strcmp(envvar, "0") == 0){
4058
0
      dodelta = BLOSC_NOFILTER;
4059
0
    }
4060
0
    else {
4061
0
      BLOSC_TRACE_WARNING("BLOSC_DELTA environment variable '%s' not recognized\n", envvar);
4062
0
    }
4063
0
  }
4064
  /* Check for a BLOSC_TYPESIZE environment variable */
4065
0
  context->typesize = cparams.typesize;
4066
0
  envvar = getenv("BLOSC_TYPESIZE");
4067
0
  if (envvar != NULL) {
4068
0
    int32_t value;
4069
0
    errno = 0; /* To distinguish success/failure after call */
4070
0
    value = (int32_t) strtol(envvar, NULL, 10);
4071
0
    if ((errno != EINVAL) && (value > 0)) {
4072
0
      context->typesize = value;
4073
0
    }
4074
0
    else {
4075
0
      BLOSC_TRACE_WARNING("BLOSC_TYPESIZE environment variable '%s' not recognized\n", envvar);
4076
0
    }
4077
0
  }
4078
0
  build_filters(doshuffle, dodelta, context->typesize, context->filters);
4079
4080
0
  context->clevel = cparams.clevel;
4081
  /* Check for a BLOSC_CLEVEL environment variable */
4082
0
  envvar = getenv("BLOSC_CLEVEL");
4083
0
  if (envvar != NULL) {
4084
0
    int value;
4085
0
    errno = 0; /* To distinguish success/failure after call */
4086
0
    value = (int)strtol(envvar, NULL, 10);
4087
0
    if ((errno != EINVAL) && (value >= 0)) {
4088
0
      context->clevel = value;
4089
0
    }
4090
0
    else {
4091
0
      BLOSC_TRACE_WARNING("BLOSC_CLEVEL environment variable '%s' not recognized\n", envvar);
4092
0
    }
4093
0
  }
4094
4095
0
  context->compcode = cparams.compcode;
4096
  /* Check for a BLOSC_COMPRESSOR environment variable */
4097
0
  envvar = getenv("BLOSC_COMPRESSOR");
4098
0
  if (envvar != NULL) {
4099
0
    int codec = blosc2_compname_to_compcode(envvar);
4100
0
    if (codec >= BLOSC_LAST_CODEC) {
4101
0
      BLOSC_TRACE_ERROR("User defined codecs cannot be set here. Use Blosc2 mechanism instead.");
4102
0
      return NULL;
4103
0
    }
4104
0
    context->compcode = codec;
4105
0
  }
4106
0
  context->compcode_meta = cparams.compcode_meta;
4107
4108
0
  context->blocksize = cparams.blocksize;
4109
  /* Check for a BLOSC_BLOCKSIZE environment variable */
4110
0
  envvar = getenv("BLOSC_BLOCKSIZE");
4111
0
  if (envvar != NULL) {
4112
0
    int32_t blocksize;
4113
0
    errno = 0; /* To distinguish success/failure after call */
4114
0
    blocksize = (int32_t) strtol(envvar, NULL, 10);
4115
0
    if ((errno != EINVAL) && (blocksize > 0)) {
4116
0
      context->blocksize = blocksize;
4117
0
    }
4118
0
    else {
4119
0
      BLOSC_TRACE_WARNING("BLOSC_BLOCKSIZE environment variable '%s' not recognized\n", envvar);
4120
0
    }
4121
0
  }
4122
4123
0
  context->nthreads = cparams.nthreads;
4124
  /* Check for a BLOSC_NTHREADS environment variable */
4125
0
  envvar = getenv("BLOSC_NTHREADS");
4126
0
  if (envvar != NULL) {
4127
0
    errno = 0; /* To distinguish success/failure after call */
4128
0
    int16_t nthreads = (int16_t) strtol(envvar, NULL, 10);
4129
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4130
0
      context->nthreads = nthreads;
4131
0
    }
4132
0
    else {
4133
0
      BLOSC_TRACE_WARNING("BLOSC_NTHREADS environment variable '%s' not recognized\n", envvar);
4134
0
    }
4135
0
  }
4136
0
  context->new_nthreads = context->nthreads;
4137
4138
0
  context->splitmode = cparams.splitmode;
4139
  /* Check for a BLOSC_SPLITMODE environment variable */
4140
0
  envvar = getenv("BLOSC_SPLITMODE");
4141
0
  if (envvar != NULL) {
4142
0
    int32_t splitmode = -1;
4143
0
    if (strcmp(envvar, "ALWAYS") == 0) {
4144
0
      splitmode = BLOSC_ALWAYS_SPLIT;
4145
0
    }
4146
0
    else if (strcmp(envvar, "NEVER") == 0) {
4147
0
      splitmode = BLOSC_NEVER_SPLIT;
4148
0
    }
4149
0
    else if (strcmp(envvar, "AUTO") == 0) {
4150
0
      splitmode = BLOSC_AUTO_SPLIT;
4151
0
    }
4152
0
    else if (strcmp(envvar, "FORWARD_COMPAT") == 0) {
4153
0
      splitmode = BLOSC_FORWARD_COMPAT_SPLIT;
4154
0
    }
4155
0
    else {
4156
0
      BLOSC_TRACE_WARNING("BLOSC_SPLITMODE environment variable '%s' not recognized\n", envvar);
4157
0
    }
4158
0
    if (splitmode >= 0) {
4159
0
      context->splitmode = splitmode;
4160
0
    }
4161
0
  }
4162
4163
0
  context->threads_started = 0;
4164
0
  context->schunk = cparams.schunk;
4165
4166
0
  if (cparams.prefilter != NULL) {
4167
0
    context->prefilter = cparams.prefilter;
4168
0
    context->preparams = (blosc2_prefilter_params*)my_malloc(sizeof(blosc2_prefilter_params));
4169
0
    BLOSC_ERROR_NULL(context->preparams, NULL);
4170
0
    memcpy(context->preparams, cparams.preparams, sizeof(blosc2_prefilter_params));
4171
0
  }
4172
4173
0
  if (cparams.tuner_id <= 0) {
4174
0
    cparams.tuner_id = g_tuner;
4175
0
  } else {
4176
0
    for (int i = 0; i < g_ntuners; ++i) {
4177
0
      if (g_tuners[i].id == cparams.tuner_id) {
4178
0
        if (g_tuners[i].init == NULL) {
4179
0
          if (fill_tuner(&g_tuners[i]) < 0) {
4180
0
            BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4181
0
            return NULL;
4182
0
          }
4183
0
        }
4184
0
        if (g_tuners[i].init(cparams.tuner_params, context, NULL) < 0) {
4185
0
          BLOSC_TRACE_ERROR("Error in user-defined tuner %d init function\n", cparams.tuner_id);
4186
0
          return NULL;
4187
0
        }
4188
0
        goto urtunersuccess;
4189
0
      }
4190
0
    }
4191
0
    BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", cparams.tuner_id);
4192
0
    return NULL;
4193
0
  }
4194
0
  urtunersuccess:;
4195
4196
0
  context->tuner_id = cparams.tuner_id;
4197
4198
0
  context->codec_params = cparams.codec_params;
4199
0
  memcpy(context->filter_params, cparams.filter_params, BLOSC2_MAX_FILTERS * sizeof(void*));
4200
4201
0
  return context;
4202
0
}
4203
4204
/* Create a context for decompression */
4205
0
blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) {
4206
0
  blosc2_context* context = (blosc2_context*)my_malloc(sizeof(blosc2_context));
4207
0
  BLOSC_ERROR_NULL(context, NULL);
4208
4209
  /* Populate the context, using zeros as default values */
4210
0
  memset(context, 0, sizeof(blosc2_context));
4211
0
  context->do_compress = 0;   /* Meant for decompression */
4212
4213
0
  context->nthreads = dparams.nthreads;
4214
0
  char* envvar = getenv("BLOSC_NTHREADS");
4215
0
  if (envvar != NULL) {
4216
0
    errno = 0; /* To distinguish success/failure after call */
4217
0
    long nthreads = strtol(envvar, NULL, 10);
4218
0
    if ((errno != EINVAL) && (nthreads > 0)) {
4219
0
      context->nthreads = (int16_t) nthreads;
4220
0
    }
4221
0
  }
4222
0
  context->new_nthreads = context->nthreads;
4223
4224
0
  context->threads_started = 0;
4225
0
  context->block_maskout = NULL;
4226
0
  context->block_maskout_nitems = 0;
4227
0
  context->schunk = dparams.schunk;
4228
4229
0
  if (dparams.postfilter != NULL) {
4230
0
    context->postfilter = dparams.postfilter;
4231
0
    context->postparams = (blosc2_postfilter_params*)my_malloc(sizeof(blosc2_postfilter_params));
4232
0
    BLOSC_ERROR_NULL(context->postparams, NULL);
4233
0
    memcpy(context->postparams, dparams.postparams, sizeof(blosc2_postfilter_params));
4234
0
  }
4235
4236
0
  return context;
4237
0
}
4238
4239
4240
0
void blosc2_free_ctx(blosc2_context* context) {
4241
0
  release_threadpool(context);
4242
0
  if (context->serial_context != NULL) {
4243
0
    free_thread_context(context->serial_context);
4244
0
  }
4245
0
  if (context->dict_cdict != NULL) {
4246
0
#ifdef HAVE_ZSTD
4247
0
    ZSTD_freeCDict(context->dict_cdict);
4248
0
#endif
4249
0
  }
4250
0
  if (context->dict_ddict != NULL) {
4251
0
#ifdef HAVE_ZSTD
4252
0
    ZSTD_freeDDict(context->dict_ddict);
4253
0
#endif
4254
0
  }
4255
0
  if (context->tuner_params != NULL) {
4256
0
    int rc;
4257
0
    if (context->tuner_id < BLOSC_LAST_TUNER && context->tuner_id == BLOSC_STUNE) {
4258
0
      rc = blosc_stune_free(context);
4259
0
    } else {
4260
0
      for (int i = 0; i < g_ntuners; ++i) {
4261
0
        if (g_tuners[i].id == context->tuner_id) {
4262
0
          if (g_tuners[i].free == NULL) {
4263
0
            if (fill_tuner(&g_tuners[i]) < 0) {
4264
0
              BLOSC_TRACE_ERROR("Could not load tuner %d.", g_tuners[i].id);
4265
0
              return;
4266
0
            }
4267
0
          }
4268
0
          rc = g_tuners[i].free(context);
4269
0
          goto urtunersuccess;
4270
0
        }
4271
0
      }
4272
0
      BLOSC_TRACE_ERROR("User-defined tuner %d not found\n", context->tuner_id);
4273
0
      return;
4274
0
      urtunersuccess:;
4275
0
    }
4276
0
    if (rc < 0) {
4277
0
      BLOSC_TRACE_ERROR("Error in user-defined tuner free function\n");
4278
0
      return;
4279
0
    }
4280
0
  }
4281
  /* May be needed if codec_params ever contains nested objects
4282
  if (context->codec_params != NULL) {
4283
    int rc;
4284
    for (int i = 0; i < g_ncodecs; ++i) {
4285
      if (g_codecs[i].compcode == context->compcode) {
4286
        if (g_codecs[i].free == NULL) {
4287
          // Dynamically load codec plugin
4288
          if (fill_codec(&g_codecs[i]) < 0) {
4289
            BLOSC_TRACE_ERROR("Could not load codec %d.", g_codecs[i].compcode);
4290
            return BLOSC2_ERROR_CODEC_SUPPORT;
4291
          } 
4292
        }
4293
        if (g_codecs[i].free == NULL){
4294
          // no free func, codec_params is simple
4295
          my_free(context->codec_params);
4296
        }
4297
        else{ // has free function for codec_params (e.g. openzl)
4298
        rc = g_codecs[i].free(context->codec_params);
4299
          goto urcodecsuccess;
4300
        }
4301
      }
4302
    }
4303
      BLOSC_TRACE_ERROR("User-defined compressor codec %d not found", context->compcode);
4304
      return BLOSC2_ERROR_CODEC_SUPPORT;
4305
    urcodecsuccess:;
4306
    if (rc < 0) {
4307
      BLOSC_TRACE_ERROR("Error in user-defined codec free function\n");
4308
      return;
4309
    }
4310
  }
4311
  */
4312
0
  if (context->prefilter != NULL) {
4313
0
    my_free(context->preparams);
4314
0
  }
4315
0
  if (context->postfilter != NULL) {
4316
0
    my_free(context->postparams);
4317
0
  }
4318
4319
0
  if (context->block_maskout != NULL) {
4320
0
    free(context->block_maskout);
4321
0
  }
4322
0
  my_free(context);
4323
0
}
4324
4325
4326
0
int blosc2_ctx_get_cparams(blosc2_context *ctx, blosc2_cparams *cparams) {
4327
0
  cparams->compcode = ctx->compcode;
4328
0
  cparams->compcode_meta = ctx->compcode_meta;
4329
0
  cparams->clevel = ctx->clevel;
4330
0
  cparams->use_dict = ctx->use_dict;
4331
0
  cparams->instr_codec = ctx->blosc2_flags & BLOSC2_INSTR_CODEC;
4332
0
  cparams->typesize = ctx->typesize;
4333
0
  cparams->nthreads = ctx->nthreads;
4334
0
  cparams->blocksize = ctx->blocksize;
4335
0
  cparams->splitmode = ctx->splitmode;
4336
0
  cparams->schunk = ctx->schunk;
4337
0
  for (int i = 0; i < BLOSC2_MAX_FILTERS; ++i) {
4338
0
    cparams->filters[i] = ctx->filters[i];
4339
0
    cparams->filters_meta[i] = ctx->filters_meta[i];
4340
0
  }
4341
0
  cparams->prefilter = ctx->prefilter;
4342
0
  cparams->preparams = ctx->preparams;
4343
0
  cparams->tuner_id = ctx->tuner_id;
4344
0
  cparams->codec_params = ctx->codec_params;
4345
4346
0
  return BLOSC2_ERROR_SUCCESS;
4347
0
}
4348
4349
4350
0
int blosc2_ctx_get_dparams(blosc2_context *ctx, blosc2_dparams *dparams) {
4351
0
  dparams->nthreads = ctx->nthreads;
4352
0
  dparams->schunk = ctx->schunk;
4353
0
  dparams->postfilter = ctx->postfilter;
4354
0
  dparams->postparams = ctx->postparams;
4355
0
  dparams->typesize = ctx->typesize;
4356
4357
0
  return BLOSC2_ERROR_SUCCESS;
4358
0
}
4359
4360
4361
/* Set a maskout in decompression context */
4362
0
int blosc2_set_maskout(blosc2_context *ctx, bool *maskout, int nblocks) {
4363
4364
0
  if (ctx->block_maskout != NULL) {
4365
    // Get rid of a possible mask here
4366
0
    free(ctx->block_maskout);
4367
0
  }
4368
4369
0
  bool *maskout_ = malloc(nblocks);
4370
0
  BLOSC_ERROR_NULL(maskout_, BLOSC2_ERROR_MEMORY_ALLOC);
4371
0
  memcpy(maskout_, maskout, nblocks);
4372
0
  ctx->block_maskout = maskout_;
4373
0
  ctx->block_maskout_nitems = nblocks;
4374
4375
0
  return 0;
4376
0
}
4377
4378
4379
/* Create a chunk made of zeros */
4380
0
int blosc2_chunk_zeros(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4381
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4382
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4383
0
    return BLOSC2_ERROR_DATA;
4384
0
  }
4385
4386
0
  if ((nbytes > 0) && (nbytes % cparams.typesize)) {
4387
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4388
0
    return BLOSC2_ERROR_DATA;
4389
0
  }
4390
4391
0
  blosc_header header;
4392
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4393
0
  if (context == NULL) {
4394
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4395
0
    return BLOSC2_ERROR_NULL_POINTER;
4396
0
  }
4397
4398
0
  int error = initialize_context_compression(
4399
0
          context, NULL, nbytes, dest, destsize,
4400
0
          context->clevel, context->filters, context->filters_meta,
4401
0
          context->typesize, context->compcode, context->blocksize,
4402
0
          context->new_nthreads, context->nthreads, context->splitmode,
4403
0
          context->tuner_id, context->tuner_params, context->schunk);
4404
0
  if (error <= 0) {
4405
0
    blosc2_free_ctx(context);
4406
0
    return error;
4407
0
  }
4408
4409
0
  memset(&header, 0, sizeof(header));
4410
0
  header.version = BLOSC2_VERSION_FORMAT;
4411
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4412
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4413
0
  header.typesize = context->typesize;
4414
0
  header.nbytes = (int32_t)nbytes;
4415
0
  header.blocksize = context->blocksize;
4416
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4417
0
  header.blosc2_flags = BLOSC2_SPECIAL_ZERO << 4;  // mark chunk as all zeros
4418
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4419
4420
0
  blosc2_free_ctx(context);
4421
4422
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4423
0
}
4424
4425
4426
/* Create a chunk made of uninitialized values */
4427
0
int blosc2_chunk_uninit(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4428
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4429
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4430
0
    return BLOSC2_ERROR_DATA;
4431
0
  }
4432
4433
0
  if (nbytes % cparams.typesize) {
4434
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4435
0
    return BLOSC2_ERROR_DATA;
4436
0
  }
4437
4438
0
  blosc_header header;
4439
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4440
0
  if (context == NULL) {
4441
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4442
0
    return BLOSC2_ERROR_NULL_POINTER;
4443
0
  }
4444
0
  int error = initialize_context_compression(
4445
0
          context, NULL, nbytes, dest, destsize,
4446
0
          context->clevel, context->filters, context->filters_meta,
4447
0
          context->typesize, context->compcode, context->blocksize,
4448
0
          context->new_nthreads, context->nthreads, context->splitmode,
4449
0
          context->tuner_id, context->tuner_params, context->schunk);
4450
0
  if (error <= 0) {
4451
0
    blosc2_free_ctx(context);
4452
0
    return error;
4453
0
  }
4454
4455
0
  memset(&header, 0, sizeof(header));
4456
0
  header.version = BLOSC2_VERSION_FORMAT;
4457
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4458
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4459
0
  header.typesize = context->typesize;
4460
0
  header.nbytes = (int32_t)nbytes;
4461
0
  header.blocksize = context->blocksize;
4462
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4463
0
  header.blosc2_flags = BLOSC2_SPECIAL_UNINIT << 4;  // mark chunk as uninitialized
4464
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4465
4466
0
  blosc2_free_ctx(context);
4467
4468
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4469
0
}
4470
4471
4472
/* Create a chunk made of nans */
4473
0
int blosc2_chunk_nans(blosc2_cparams cparams, const int32_t nbytes, void* dest, int32_t destsize) {
4474
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH) {
4475
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4476
0
    return BLOSC2_ERROR_DATA;
4477
0
  }
4478
4479
0
  if (nbytes % cparams.typesize) {
4480
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4481
0
    return BLOSC2_ERROR_DATA;
4482
0
  }
4483
4484
0
  blosc_header header;
4485
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4486
0
  if (context == NULL) {
4487
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4488
0
    return BLOSC2_ERROR_NULL_POINTER;
4489
0
  }
4490
4491
0
  int error = initialize_context_compression(
4492
0
          context, NULL, nbytes, dest, destsize,
4493
0
          context->clevel, context->filters, context->filters_meta,
4494
0
          context->typesize, context->compcode, context->blocksize,
4495
0
          context->new_nthreads, context->nthreads, context->splitmode,
4496
0
          context->tuner_id, context->tuner_params, context->schunk);
4497
0
  if (error <= 0) {
4498
0
    blosc2_free_ctx(context);
4499
0
    return error;
4500
0
  }
4501
4502
0
  memset(&header, 0, sizeof(header));
4503
0
  header.version = BLOSC2_VERSION_FORMAT;
4504
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4505
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4506
0
  header.typesize = context->typesize;
4507
0
  header.nbytes = (int32_t)nbytes;
4508
0
  header.blocksize = context->blocksize;
4509
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH;
4510
0
  header.blosc2_flags = BLOSC2_SPECIAL_NAN << 4;  // mark chunk as all NaNs
4511
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4512
4513
0
  blosc2_free_ctx(context);
4514
4515
0
  return BLOSC_EXTENDED_HEADER_LENGTH;
4516
0
}
4517
4518
4519
/* Create a chunk made of repeated values */
4520
int blosc2_chunk_repeatval(blosc2_cparams cparams, const int32_t nbytes,
4521
0
                           void* dest, int32_t destsize, const void* repeatval) {
4522
0
  if (destsize < BLOSC_EXTENDED_HEADER_LENGTH + cparams.typesize) {
4523
0
    BLOSC_TRACE_ERROR("dest buffer is not long enough");
4524
0
    return BLOSC2_ERROR_DATA;
4525
0
  }
4526
4527
0
  if (nbytes % cparams.typesize) {
4528
0
    BLOSC_TRACE_ERROR("nbytes must be a multiple of typesize");
4529
0
    return BLOSC2_ERROR_DATA;
4530
0
  }
4531
4532
0
  blosc_header header;
4533
0
  blosc2_context* context = blosc2_create_cctx(cparams);
4534
0
  if (context == NULL) {
4535
0
    BLOSC_TRACE_ERROR("Error while creating the compression context");
4536
0
    return BLOSC2_ERROR_NULL_POINTER;
4537
0
  }
4538
4539
0
  int error = initialize_context_compression(
4540
0
          context, NULL, nbytes, dest, destsize,
4541
0
          context->clevel, context->filters, context->filters_meta,
4542
0
          context->typesize, context->compcode, context->blocksize,
4543
0
          context->new_nthreads, context->nthreads, context->splitmode,
4544
0
          context->tuner_id, context->tuner_params, context->schunk);
4545
0
  if (error <= 0) {
4546
0
    blosc2_free_ctx(context);
4547
0
    return error;
4548
0
  }
4549
4550
0
  memset(&header, 0, sizeof(header));
4551
0
  header.version = BLOSC2_VERSION_FORMAT;
4552
0
  header.versionlz = BLOSC_BLOSCLZ_VERSION_FORMAT;
4553
0
  header.flags = BLOSC_DOSHUFFLE | BLOSC_DOBITSHUFFLE;  // extended header
4554
0
  header.typesize = context->typesize;
4555
0
  header.nbytes = (int32_t)nbytes;
4556
0
  header.blocksize = context->blocksize;
4557
0
  header.cbytes = BLOSC_EXTENDED_HEADER_LENGTH + cparams.typesize;
4558
0
  header.blosc2_flags = BLOSC2_SPECIAL_VALUE << 4;  // mark chunk as all repeated value
4559
0
  memcpy((uint8_t *)dest, &header, sizeof(header));
4560
0
  memcpy((uint8_t *)dest + sizeof(header), repeatval, cparams.typesize);
4561
4562
0
  blosc2_free_ctx(context);
4563
4564
0
  return BLOSC_EXTENDED_HEADER_LENGTH + cparams.typesize;
4565
0
}
4566
4567
4568
/* Register filters */
4569
4570
5
int register_filter_private(blosc2_filter *filter) {
4571
5
    BLOSC_ERROR_NULL(filter, BLOSC2_ERROR_INVALID_PARAM);
4572
5
    if (g_nfilters == UINT8_MAX) {
4573
0
        BLOSC_TRACE_ERROR("Can not register more filters");
4574
0
        return BLOSC2_ERROR_CODEC_SUPPORT;
4575
0
    }
4576
5
    if (filter->id < BLOSC2_GLOBAL_REGISTERED_FILTERS_START) {
4577
0
        BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_FILTERS_START);
4578
0
        return BLOSC2_ERROR_FAILURE;
4579
0
    }
4580
    /* This condition can never be fulfilled
4581
    if (filter->id > BLOSC2_USER_REGISTERED_FILTERS_STOP) {
4582
        BLOSC_TRACE_ERROR("The id must be less than or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_STOP);
4583
        return BLOSC2_ERROR_FAILURE;
4584
    }
4585
    */
4586
4587
15
    for (uint64_t i = 0; i < g_nfilters; ++i) {
4588
10
      if (g_filters[i].id == filter->id) {
4589
0
        if (strcmp(g_filters[i].name, filter->name) != 0) {
4590
0
          BLOSC_TRACE_ERROR("The filter (ID: %d) plugin is already registered with name: %s."
4591
0
                            "  Choose another one !", filter->id, g_filters[i].name);
4592
0
          return BLOSC2_ERROR_FAILURE;
4593
0
        }
4594
0
        else {
4595
          // Already registered, so no more actions needed
4596
0
          return BLOSC2_ERROR_SUCCESS;
4597
0
        }
4598
0
      }
4599
10
    }
4600
4601
5
    blosc2_filter *filter_new = &g_filters[g_nfilters++];
4602
5
    memcpy(filter_new, filter, sizeof(blosc2_filter));
4603
4604
5
    return BLOSC2_ERROR_SUCCESS;
4605
5
}
4606
4607
4608
0
int blosc2_register_filter(blosc2_filter *filter) {
4609
0
  if (filter->id < BLOSC2_USER_REGISTERED_FILTERS_START) {
4610
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_FILTERS_START);
4611
0
    return BLOSC2_ERROR_FAILURE;
4612
0
  }
4613
4614
0
  return register_filter_private(filter);
4615
0
}
4616
4617
4618
/* Register codecs */
4619
4620
7
int register_codec_private(blosc2_codec *codec) {
4621
7
    BLOSC_ERROR_NULL(codec, BLOSC2_ERROR_INVALID_PARAM);
4622
7
    if (g_ncodecs == UINT8_MAX) {
4623
0
      BLOSC_TRACE_ERROR("Can not register more codecs");
4624
0
      return BLOSC2_ERROR_CODEC_SUPPORT;
4625
0
    }
4626
7
    if (codec->compcode < BLOSC2_GLOBAL_REGISTERED_CODECS_START) {
4627
0
      BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_CODECS_START);
4628
0
      return BLOSC2_ERROR_FAILURE;
4629
0
    }
4630
    /* This condition can never be fulfilled
4631
    if (codec->compcode > BLOSC2_USER_REGISTERED_CODECS_STOP) {
4632
      BLOSC_TRACE_ERROR("The id must be less or equal to %d", BLOSC2_USER_REGISTERED_CODECS_STOP);
4633
      return BLOSC2_ERROR_FAILURE;
4634
    }
4635
     */
4636
4637
28
    for (int i = 0; i < g_ncodecs; ++i) {
4638
21
      if (g_codecs[i].compcode == codec->compcode) {
4639
0
        if (strcmp(g_codecs[i].compname, codec->compname) != 0) {
4640
0
          BLOSC_TRACE_ERROR("The codec (ID: %d) plugin is already registered with name: %s."
4641
0
                            "  Choose another one !", codec->compcode, codec->compname);
4642
0
          return BLOSC2_ERROR_CODEC_PARAM;
4643
0
        }
4644
0
        else {
4645
          // Already registered, so no more actions needed
4646
0
          return BLOSC2_ERROR_SUCCESS;
4647
0
        }
4648
0
      }
4649
21
    }
4650
4651
7
    blosc2_codec *codec_new = &g_codecs[g_ncodecs++];
4652
7
    memcpy(codec_new, codec, sizeof(blosc2_codec));
4653
4654
7
    return BLOSC2_ERROR_SUCCESS;
4655
7
}
4656
4657
4658
0
int blosc2_register_codec(blosc2_codec *codec) {
4659
0
  if (codec->compcode < BLOSC2_USER_REGISTERED_CODECS_START) {
4660
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_USER_REGISTERED_CODECS_START);
4661
0
    return BLOSC2_ERROR_CODEC_PARAM;
4662
0
  }
4663
4664
0
  return register_codec_private(codec);
4665
0
}
4666
4667
4668
/* Register tuners */
4669
4670
1
int register_tuner_private(blosc2_tuner *tuner) {
4671
1
  BLOSC_ERROR_NULL(tuner, BLOSC2_ERROR_INVALID_PARAM);
4672
1
  if (g_ntuners == UINT8_MAX) {
4673
0
    BLOSC_TRACE_ERROR("Can not register more tuners");
4674
0
    return BLOSC2_ERROR_CODEC_SUPPORT;
4675
0
  }
4676
1
  if (tuner->id < BLOSC2_GLOBAL_REGISTERED_TUNER_START) {
4677
0
    BLOSC_TRACE_ERROR("The id must be greater or equal than %d", BLOSC2_GLOBAL_REGISTERED_TUNER_START);
4678
0
    return BLOSC2_ERROR_FAILURE;
4679
0
  }
4680
4681
1
  for (int i = 0; i < g_ntuners; ++i) {
4682
0
    if (g_tuners[i].id == tuner->id) {
4683
0
      if (strcmp(g_tuners[i].name, tuner->name) != 0) {
4684
0
        BLOSC_TRACE_ERROR("The tuner (ID: %d) plugin is already registered with name: %s."
4685
0
                          "  Choose another one !", tuner->id, g_tuners[i].name);
4686
0
        return BLOSC2_ERROR_FAILURE;
4687
0
      }
4688
0
      else {
4689
        // Already registered, so no more actions needed
4690
0
        return BLOSC2_ERROR_SUCCESS;
4691
0
      }
4692
0
    }
4693
0
  }
4694
4695
1
  blosc2_tuner *tuner_new = &g_tuners[g_ntuners++];
4696
1
  memcpy(tuner_new, tuner, sizeof(blosc2_tuner));
4697
4698
1
  return BLOSC2_ERROR_SUCCESS;
4699
1
}
4700
4701
4702
0
int blosc2_register_tuner(blosc2_tuner *tuner) {
4703
0
  if (tuner->id < BLOSC2_USER_REGISTERED_TUNER_START) {
4704
0
    BLOSC_TRACE_ERROR("The id must be greater or equal to %d", BLOSC2_USER_REGISTERED_TUNER_START);
4705
0
    return BLOSC2_ERROR_FAILURE;
4706
0
  }
4707
4708
0
  return register_tuner_private(tuner);
4709
0
}
4710
4711
4712
2
int _blosc2_register_io_cb(const blosc2_io_cb *io) {
4713
4714
3
  for (uint64_t i = 0; i < g_nio; ++i) {
4715
1
    if (g_ios[i].id == io->id) {
4716
0
      if (strcmp(g_ios[i].name, io->name) != 0) {
4717
0
        BLOSC_TRACE_ERROR("The IO (ID: %d) plugin is already registered with name: %s."
4718
0
                          "  Choose another one !", io->id, g_ios[i].name);
4719
0
        return BLOSC2_ERROR_PLUGIN_IO;
4720
0
      }
4721
0
      else {
4722
        // Already registered, so no more actions needed
4723
0
        return BLOSC2_ERROR_SUCCESS;
4724
0
      }
4725
0
    }
4726
1
  }
4727
4728
2
  blosc2_io_cb *io_new = &g_ios[g_nio++];
4729
2
  memcpy(io_new, io, sizeof(blosc2_io_cb));
4730
4731
2
  return BLOSC2_ERROR_SUCCESS;
4732
2
}
4733
4734
0
int blosc2_register_io_cb(const blosc2_io_cb *io) {
4735
0
  BLOSC_ERROR_NULL(io, BLOSC2_ERROR_INVALID_PARAM);
4736
0
  if (g_nio == UINT8_MAX) {
4737
0
    BLOSC_TRACE_ERROR("Can not register more codecs");
4738
0
    return BLOSC2_ERROR_PLUGIN_IO;
4739
0
  }
4740
4741
0
  if (io->id < BLOSC2_IO_REGISTERED) {
4742
0
    BLOSC_TRACE_ERROR("The compcode must be greater or equal than %d", BLOSC2_IO_REGISTERED);
4743
0
    return BLOSC2_ERROR_PLUGIN_IO;
4744
0
  }
4745
4746
0
  return _blosc2_register_io_cb(io);
4747
0
}
4748
4749
0
blosc2_io_cb *blosc2_get_io_cb(uint8_t id) {
4750
  // If g_initlib is not set by blosc2_init() this function will try to read
4751
  // uninitialized memory. We should therefore always return NULL in that case
4752
0
  if (!g_initlib) {
4753
0
    return NULL;
4754
0
  }
4755
0
  for (uint64_t i = 0; i < g_nio; ++i) {
4756
0
    if (g_ios[i].id == id) {
4757
0
      return &g_ios[i];
4758
0
    }
4759
0
  }
4760
0
  if (id == BLOSC2_IO_FILESYSTEM) {
4761
0
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_DEFAULTS) < 0) {
4762
0
      BLOSC_TRACE_ERROR("Error registering the default IO API");
4763
0
      return NULL;
4764
0
    }
4765
0
    return blosc2_get_io_cb(id);
4766
0
  }
4767
0
  else if (id == BLOSC2_IO_FILESYSTEM_MMAP) {
4768
0
    if (_blosc2_register_io_cb(&BLOSC2_IO_CB_MMAP) < 0) {
4769
0
      BLOSC_TRACE_ERROR("Error registering the mmap IO API");
4770
0
      return NULL;
4771
0
    }
4772
0
    return blosc2_get_io_cb(id);
4773
0
  }
4774
0
  return NULL;
4775
0
}
4776
4777
0
void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) {
4778
0
  if (ndim == 0) {
4779
0
    return;
4780
0
  }
4781
0
  assert(ndim <= B2ND_MAX_DIM);
4782
0
  int64_t strides[B2ND_MAX_DIM];
4783
4784
0
  strides[ndim - 1] = 1;
4785
0
  for (int j = ndim - 2; j >= 0; --j) {
4786
0
      strides[j] = shape[j + 1] * strides[j + 1];
4787
0
  }
4788
4789
0
  index[0] = i / strides[0];
4790
0
  for (int j = 1; j < ndim; ++j) {
4791
0
      index[j] = (i % strides[j - 1]) / strides[j];
4792
0
  }
4793
0
}
4794
4795
0
void blosc2_multidim_to_unidim(const int64_t *index, int8_t ndim, const int64_t *strides, int64_t *i) {
4796
0
  *i = 0;
4797
0
  for (int j = 0; j < ndim; ++j) {
4798
0
    *i += index[j] * strides[j];
4799
0
  }
4800
0
}
4801
4802
0
int blosc2_get_slice_nchunks(blosc2_schunk* schunk, int64_t *start, int64_t *stop, int64_t **chunks_idx) {
4803
0
  BLOSC_ERROR_NULL(schunk, BLOSC2_ERROR_NULL_POINTER);
4804
0
  if (blosc2_meta_exists(schunk, "b2nd") < 0) {
4805
    // Try with a caterva metalayer; we are meant to be backward compatible with it
4806
0
    if (blosc2_meta_exists(schunk, "caterva") < 0) {
4807
0
      return schunk_get_slice_nchunks(schunk, *start, *stop, chunks_idx);
4808
0
    }
4809
0
  }
4810
4811
0
  b2nd_array_t *array;
4812
0
  int rc = b2nd_from_schunk(schunk, &array);
4813
0
  if (rc < 0) {
4814
0
    BLOSC_TRACE_ERROR("Could not get b2nd array from schunk.");
4815
0
    return rc;
4816
0
  }
4817
0
  rc = b2nd_get_slice_nchunks(array, start, stop, chunks_idx);
4818
0
  array->sc = NULL; // Free only array struct
4819
0
  b2nd_free(array);
4820
4821
0
  return rc;
4822
0
}
4823
4824
0
blosc2_cparams blosc2_get_blosc2_cparams_defaults(void) {
4825
0
  return BLOSC2_CPARAMS_DEFAULTS;
4826
0
};
4827
4828
0
blosc2_dparams blosc2_get_blosc2_dparams_defaults(void) {
4829
0
  return BLOSC2_DPARAMS_DEFAULTS;
4830
0
};
4831
4832
0
blosc2_storage blosc2_get_blosc2_storage_defaults(void) {
4833
0
  return BLOSC2_STORAGE_DEFAULTS;
4834
0
};
4835
4836
0
blosc2_io blosc2_get_blosc2_io_defaults(void) {
4837
0
  return BLOSC2_IO_DEFAULTS;
4838
0
};
4839
4840
0
blosc2_stdio_mmap blosc2_get_blosc2_stdio_mmap_defaults(void) {
4841
0
  return BLOSC2_STDIO_MMAP_DEFAULTS;
4842
0
};
4843
4844
0
const char *blosc2_error_string(int error_code) {
4845
0
  switch (error_code) {
4846
0
    case BLOSC2_ERROR_FAILURE:
4847
0
      return "Generic failure";
4848
0
    case BLOSC2_ERROR_STREAM:
4849
0
      return "Bad stream";
4850
0
    case BLOSC2_ERROR_DATA:
4851
0
      return "Invalid data";
4852
0
    case BLOSC2_ERROR_MEMORY_ALLOC:
4853
0
      return "Memory alloc/realloc failure";
4854
0
    case BLOSC2_ERROR_READ_BUFFER:
4855
0
      return "Not enough space to read";
4856
0
    case BLOSC2_ERROR_WRITE_BUFFER:
4857
0
      return "Not enough space to write";
4858
0
    case BLOSC2_ERROR_CODEC_SUPPORT:
4859
0
      return "Codec not supported";
4860
0
    case BLOSC2_ERROR_CODEC_PARAM:
4861
0
      return "Invalid parameter supplied to codec";
4862
0
    case BLOSC2_ERROR_CODEC_DICT:
4863
0
      return "Codec dictionary error";
4864
0
    case BLOSC2_ERROR_VERSION_SUPPORT:
4865
0
      return "Version not supported";
4866
0
    case BLOSC2_ERROR_INVALID_HEADER:
4867
0
      return "Invalid value in header";
4868
0
    case BLOSC2_ERROR_INVALID_PARAM:
4869
0
      return "Invalid parameter supplied to function";
4870
0
    case BLOSC2_ERROR_FILE_READ:
4871
0
      return "File read failure";
4872
0
    case BLOSC2_ERROR_FILE_WRITE:
4873
0
      return "File write failure";
4874
0
    case BLOSC2_ERROR_FILE_OPEN:
4875
0
      return "File open failure";
4876
0
    case BLOSC2_ERROR_NOT_FOUND:
4877
0
      return "Not found";
4878
0
    case BLOSC2_ERROR_RUN_LENGTH:
4879
0
      return "Bad run length encoding";
4880
0
    case BLOSC2_ERROR_FILTER_PIPELINE:
4881
0
      return "Filter pipeline error";
4882
0
    case BLOSC2_ERROR_CHUNK_INSERT:
4883
0
      return "Chunk insert failure";
4884
0
    case BLOSC2_ERROR_CHUNK_APPEND:
4885
0
      return "Chunk append failure";
4886
0
    case BLOSC2_ERROR_CHUNK_UPDATE:
4887
0
      return "Chunk update failure";
4888
0
    case BLOSC2_ERROR_2GB_LIMIT:
4889
0
      return "Sizes larger than 2gb not supported";
4890
0
    case BLOSC2_ERROR_SCHUNK_COPY:
4891
0
      return "Super-chunk copy failure";
4892
0
    case BLOSC2_ERROR_FRAME_TYPE:
4893
0
      return "Wrong type for frame";
4894
0
    case BLOSC2_ERROR_FILE_TRUNCATE:
4895
0
      return "File truncate failure";
4896
0
    case BLOSC2_ERROR_THREAD_CREATE:
4897
0
      return "Thread or thread context creation failure";
4898
0
    case BLOSC2_ERROR_POSTFILTER:
4899
0
      return "Postfilter failure";
4900
0
    case BLOSC2_ERROR_FRAME_SPECIAL:
4901
0
      return "Special frame failure";
4902
0
    case BLOSC2_ERROR_SCHUNK_SPECIAL:
4903
0
      return "Special super-chunk failure";
4904
0
    case BLOSC2_ERROR_PLUGIN_IO:
4905
0
      return "IO plugin error";
4906
0
    case BLOSC2_ERROR_FILE_REMOVE:
4907
0
      return "Remove file failure";
4908
0
    case BLOSC2_ERROR_NULL_POINTER:
4909
0
      return "Pointer is null";
4910
0
    case BLOSC2_ERROR_INVALID_INDEX:
4911
0
      return "Invalid index";
4912
0
    case BLOSC2_ERROR_METALAYER_NOT_FOUND:
4913
0
      return "Metalayer has not been found";
4914
0
    case BLOSC2_ERROR_MAX_BUFSIZE_EXCEEDED:
4915
0
      return "Maximum buffersize exceeded";
4916
0
    case BLOSC2_ERROR_TUNER:
4917
0
      return "Tuner failure";
4918
0
    default:
4919
0
      return "Unknown error";
4920
0
  }
4921
0
}